Advertisement
Cool_Dalek

Flames event loop experiments

Aug 19th, 2022 (edited)
923
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.46 KB | None | 0 0
  1. package flames.concurrent.execution.events
  2.  
  3. import scala.concurrent.ExecutionContext
  4. import flames.concurrent.execution.{Cancellable, Shutdown}
  5.  
  6. import java.net.{InetSocketAddress, SocketAddress}
  7. import java.nio.*
  8. import java.nio.channels.*
  9. import scala.collection.mutable
  10. import scala.concurrent.duration.*
  11.  
  12. class SingleThreadedScheduler(
  13.                                nioLoop: NIOEventLoop = NIOEventLoop(),
  14.                                yieldCount: Int = 8,
  15.                                yieldTime: Duration = 1 minute span,
  16.                              ) extends ExecutionContext with Runnable with Shutdown {
  17.  
  18.   export nioLoop.listenChannel
  19.  
  20.   private val tasks = mutable.Queue.empty[Runnable]
  21.  
  22.   override def execute(runnable: Runnable): Unit =
  23.     tasks.enqueue(runnable)
  24.  
  25.   override def reportFailure(cause: Throwable): Unit =
  26.     cause.printStackTrace()
  27.  
  28.   private var stop: Boolean = false
  29.  
  30.   override def shutdown(): Unit = {
  31.     nioLoop.shutdown()
  32.     stop = true
  33.   }
  34.  
  35.   override def isShutdown: Boolean = stop
  36.  
  37.   override def run(): Unit =
  38.     while (!stop) {
  39.       nioLoop.poll().drain { x =>
  40.         tasks.enqueue(x)
  41.       }
  42.       var count = 0
  43.       val deadline = System.nanoTime() + yieldTime.toNanos
  44.       while {
  45.         count < yieldCount && deadline > System.nanoTime() && tasks.nonEmpty
  46.       } do {
  47.         tasks.dequeue().run()
  48.         count += 1
  49.       }
  50.       if (!nioLoop.haveWork && tasks.isEmpty) stop = true
  51.     }
  52.   end run
  53.  
  54. }
  55. object SingleThreadedScheduler {
  56.  
  57.   val serverAddress = InetSocketAddress(
  58.     "localhost",
  59.     9532,
  60.   )
  61.  
  62.   val scheduler = SingleThreadedScheduler()
  63.   given ExecutionContext = scheduler
  64.  
  65.   @main def server(): Unit =
  66.     val serverChannel = ServerSocketChannel.open()
  67.     serverChannel.bind(serverAddress)
  68.     scheduler.listenChannel(serverChannel) { key => () =>
  69.       if(key.isAcceptable) {
  70.         val client = serverChannel.accept()
  71.         val buffer = ByteBuffer.allocateDirect(1024)
  72.         var write = false
  73.         scheduler.listenChannel(client) { key => () =>
  74.           if(client.isConnected) {
  75.             if (key.isReadable) {
  76.               println("Read")
  77.               val bytes = client.read(buffer)
  78.               if(bytes > 0) {
  79.                 write = true
  80.                 buffer.flip()
  81.               } else scheduler.shutdown()
  82.             }
  83.             if (write && key.isWritable) {
  84.               println("Echoing")
  85.               client.write(buffer)
  86.               write = false
  87.               buffer.clear()
  88.             }
  89.           } else scheduler.shutdown()
  90.         }
  91.       }
  92.     }
  93.     scheduler.run()
  94.   end server
  95.  
  96.   @main def client(): Unit =
  97.     val channel = SocketChannel.open()
  98.     channel.connect(serverAddress)
  99.     val buffer = ByteBuffer.allocateDirect(1024)
  100.     var write = true
  101.     scheduler.listenChannel(channel) { key => () =>
  102.       if(key.isWritable && write) {
  103.         println("Write")
  104.         buffer.put("Hello world".getBytes())
  105.         buffer.flip()
  106.         channel.write(buffer)
  107.         buffer.clear()
  108.         write = false
  109.       }
  110.       if (key.isReadable) {
  111.         println("Read")
  112.         channel.read(buffer)
  113.         buffer.flip()
  114.         printBuffer(buffer)
  115.         buffer.rewind()
  116.         scheduler.shutdown()
  117.       }
  118.     }
  119.     scheduler.run()
  120.   end client
  121.  
  122.   def printBuffer(buffer: ByteBuffer): Unit =
  123.     while (buffer.hasRemaining) {
  124.       print(buffer.get().toChar)
  125.     }
  126.     println
  127.   end printBuffer
  128.  
  129. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement