Guest User

kotlin koroutine chat

a guest
May 22nd, 2021
66
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Kotlin 3.51 KB | None | 0 0
  1. package com.example
  2.  
  3. import kotlinx.coroutines.*
  4. import io.ktor.network.selector.*
  5. import io.ktor.network.sockets.*
  6. import io.ktor.utils.io.*
  7. import kotlinx.coroutines.channels.BroadcastChannel
  8. import kotlinx.coroutines.channels.ClosedReceiveChannelException
  9. import kotlinx.coroutines.channels.ConflatedBroadcastChannel
  10. import kotlinx.coroutines.channels.ReceiveChannel
  11. import java.io.IOException
  12. import java.lang.StringBuilder
  13. import java.nio.ByteBuffer
  14.  
  15. suspend fun ByteReadChannel.readString(): String {
  16.     val result = StringBuilder()
  17.     val decoder = Charsets.US_ASCII.newDecoder()
  18.     val buffer = ByteBuffer.allocate(1)
  19.     while (!isClosedForRead) {
  20.         val byte = readByte()
  21.         if (byte > 127 || byte < 0) {
  22.             continue
  23.         }
  24.         val c = decoder.decode(buffer.also {
  25.             it.put(byte)
  26.             it.rewind()
  27.         })[0]
  28.         result.append(c)
  29.         if (c == '\n') {
  30.             return result.toString().trim('\r', '\n')
  31.         }
  32.         buffer.rewind()
  33.         decoder.reset()
  34.     }
  35.     return ""
  36. }
  37.  
  38. suspend fun ByteWriteChannel.println(text: String) {
  39.     writeStringUtf8(text)
  40.     writeStringUtf8("\r\n")
  41. }
  42.  
  43. class Client(private val clientSocket: Socket, private val room: BroadcastChannel<String>) {
  44.     private val output = clientSocket.openWriteChannel(autoFlush = true)
  45.     private val input = clientSocket.openReadChannel()
  46.     var nick: String? = null
  47.         private set
  48.  
  49.     suspend fun start() = coroutineScope {
  50.         input.discard(input.availableForRead.toLong())
  51.  
  52.         output.writeStringUtf8("Welcome! And your name: ")
  53.         val nick = input.readString()
  54.         room.send("$nick is here")
  55.         output.println("Welcome $nick")
  56.         this@Client.nick = nick
  57.         val roomSubscription = room.openSubscription()
  58.         launch {
  59.             for (message in roomSubscription) {
  60.                 output.println(message)
  61.             }
  62.         }
  63.         launch {
  64.             processUserInput(nick)
  65.         }.join()
  66.         roomSubscription.cancel()
  67.     }
  68.  
  69.     private suspend fun processUserInput(nick: String) {
  70.         while (!clientSocket.isClosed) {
  71.             val text = input.readString()
  72.             room.send("$nick: $text")
  73.             if (text == "bye") {
  74.                 room.send("$nick left")
  75.                 return
  76.             }
  77.         }
  78.     }
  79. }
  80.  
  81.  
  82. suspend fun stdoutRoomProcessor(input: ReceiveChannel<String>) {
  83.     for (message in input) {
  84.         println(message)
  85.     }
  86. }
  87.  
  88. suspend fun server(port: Int) = coroutineScope {
  89.     val serverSocket = aSocket(ActorSelectorManager(coroutineContext)).tcp().bind(port = port)
  90.     val room = ConflatedBroadcastChannel<String>()
  91.     launch {
  92.         stdoutRoomProcessor(room.openSubscription())
  93.     }
  94.     while (coroutineContext.isActive) {
  95.         val clientSocket = serverSocket.accept()
  96.         room.send("Client connected ${clientSocket.remoteAddress}")
  97.         launch {
  98.             val client = Client(clientSocket, room)
  99.             try {
  100.                 client.start()
  101.                 clientSocket.dispose()
  102.             } catch (q: ClosedReceiveChannelException) {
  103.                 //Fuck coco for not having multicatch
  104.             } catch (e: IOException) {
  105.             }
  106.             room.send("User died: ${(client.nick ?: clientSocket.remoteAddress)}")
  107.         }
  108.     }
  109. }
  110.  
  111. const val port = 1234
  112. fun main(): Unit = runBlocking(Dispatchers.Unconfined) {
  113.     launch {
  114.         server(port)
  115.     }
  116.     println("App started at port $port")
  117. }
  118.  
Add Comment
Please, Sign In to add comment