Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.example
- import kotlinx.coroutines.*
- import io.ktor.network.selector.*
- import io.ktor.network.sockets.*
- import io.ktor.utils.io.*
- import kotlinx.coroutines.channels.BroadcastChannel
- import kotlinx.coroutines.channels.ClosedReceiveChannelException
- import kotlinx.coroutines.channels.ConflatedBroadcastChannel
- import kotlinx.coroutines.channels.ReceiveChannel
- import java.io.IOException
- import java.lang.StringBuilder
- import java.nio.ByteBuffer
- suspend fun ByteReadChannel.readString(): String {
- val result = StringBuilder()
- val decoder = Charsets.US_ASCII.newDecoder()
- val buffer = ByteBuffer.allocate(1)
- while (!isClosedForRead) {
- val byte = readByte()
- if (byte > 127 || byte < 0) {
- continue
- }
- val c = decoder.decode(buffer.also {
- it.put(byte)
- it.rewind()
- })[0]
- result.append(c)
- if (c == '\n') {
- return result.toString().trim('\r', '\n')
- }
- buffer.rewind()
- decoder.reset()
- }
- return ""
- }
- suspend fun ByteWriteChannel.println(text: String) {
- writeStringUtf8(text)
- writeStringUtf8("\r\n")
- }
- class Client(private val clientSocket: Socket, private val room: BroadcastChannel<String>) {
- private val output = clientSocket.openWriteChannel(autoFlush = true)
- private val input = clientSocket.openReadChannel()
- var nick: String? = null
- private set
- suspend fun start() = coroutineScope {
- input.discard(input.availableForRead.toLong())
- output.writeStringUtf8("Welcome! And your name: ")
- val nick = input.readString()
- room.send("$nick is here")
- output.println("Welcome $nick")
- this@Client.nick = nick
- val roomSubscription = room.openSubscription()
- launch {
- for (message in roomSubscription) {
- output.println(message)
- }
- }
- launch {
- processUserInput(nick)
- }.join()
- roomSubscription.cancel()
- }
- private suspend fun processUserInput(nick: String) {
- while (!clientSocket.isClosed) {
- val text = input.readString()
- room.send("$nick: $text")
- if (text == "bye") {
- room.send("$nick left")
- return
- }
- }
- }
- }
- suspend fun stdoutRoomProcessor(input: ReceiveChannel<String>) {
- for (message in input) {
- println(message)
- }
- }
- suspend fun server(port: Int) = coroutineScope {
- val serverSocket = aSocket(ActorSelectorManager(coroutineContext)).tcp().bind(port = port)
- val room = ConflatedBroadcastChannel<String>()
- launch {
- stdoutRoomProcessor(room.openSubscription())
- }
- while (coroutineContext.isActive) {
- val clientSocket = serverSocket.accept()
- room.send("Client connected ${clientSocket.remoteAddress}")
- launch {
- val client = Client(clientSocket, room)
- try {
- client.start()
- clientSocket.dispose()
- } catch (q: ClosedReceiveChannelException) {
- //Fuck coco for not having multicatch
- } catch (e: IOException) {
- }
- room.send("User died: ${(client.nick ?: clientSocket.remoteAddress)}")
- }
- }
- }
- const val port = 1234
- fun main(): Unit = runBlocking(Dispatchers.Unconfined) {
- launch {
- server(port)
- }
- println("App started at port $port")
- }
Add Comment
Please, Sign In to add comment