Advertisement
Cool_Dalek

Flames actors event loop experiments

Aug 19th, 2022 (edited)
960
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 5.11 KB | None | 0 0
  1. package flames.concurrent.execution.events
  2.  
  3. import flames.concurrent.actor.*
  4. import flames.concurrent.actor.mailbox.SystemMessage.*
  5. import flames.concurrent.actor.behavior.Behavior
  6. import flames.concurrent.execution.*
  7. import flames.logging.*
  8. import sourcecode.{Enclosing, Line}
  9.  
  10. import java.net.{InetSocketAddress, SocketAddress}
  11. import java.nio.*
  12. import java.nio.channels.*
  13. import scala.collection.mutable
  14. import scala.concurrent.duration.FiniteDuration
  15.  
  16. class SingleThreadedScheduler(
  17.                                nioLoop: NIOEventLoop = NIOEventLoop(),
  18.                              ) extends Scheduler with Runnable {
  19.  
  20.   export nioLoop.listenChannel
  21.  
  22.   private val tasks = mutable.Queue.empty[Runnable]
  23.  
  24.   override def execute(runnable: Runnable): Unit =
  25.     tasks.enqueue(runnable)
  26.  
  27.   override def reportFailure(cause: Throwable): Unit =
  28.     cause.printStackTrace()
  29.  
  30.   private var stop: Boolean = false
  31.  
  32.   override def shutdown(): Unit = {
  33.     nioLoop.shutdown()
  34.     stop = true
  35.   }
  36.  
  37.   override def isShutdown: Boolean = stop
  38.  
  39.   override def run(): Unit =
  40.     while (!stop) {
  41.       nioLoop.poll().drain { x =>
  42.         tasks.enqueue(x)
  43.       }
  44.       if(tasks.nonEmpty) tasks.dequeue().run()
  45.       if (!nioLoop.haveWork && tasks.isEmpty) stop = true
  46.     }
  47.   end run
  48.  
  49.   override def blocking(action: Runnable): Unit =
  50.     tasks.enqueue(action)
  51.  
  52.   override def schedule[T](delay: FiniteDuration)(action: => T): Cancellable = ???
  53.  
  54.   override def schedule[T](delay: FiniteDuration, period: FiniteDuration)(action: => T): Cancellable = ???
  55.  
  56.   override def config: SchedulerConfig = ???
  57.  
  58. }
  59. object SingleThreadedScheduler {
  60.  
  61.   val serverAddress = InetSocketAddress(
  62.     "localhost",
  63.     9532,
  64.   )
  65.  
  66.   val scheduler = SingleThreadedScheduler()
  67.   given runtime: ActorRuntime = ActorRuntime.default(
  68.     LogLevel.Debug,
  69.     makeScheduler = (_, _) => scheduler
  70.   )
  71.  
  72.   @main def server(): Unit =
  73.     val serverChannel = ServerSocketChannel.open()
  74.     serverChannel.bind(serverAddress)
  75.     runtime.spawn(Server(serverChannel))
  76.     scheduler.run()
  77.   end server
  78.  
  79.   trait NetworkHandler(channel: SelectableChannel) extends Actor[SelectionKey, ExecutionModel.Async]:
  80.  
  81.     private var token: Cancellable | Null = null
  82.  
  83.     override def act(): Behavior[SelectionKey] =
  84.       token = scheduler.listenChannel(channel) { key =>
  85.         self.tell(key)
  86.         null
  87.       }
  88.       handle
  89.     end act
  90.  
  91.     protected def handle: Behavior[SelectionKey]
  92.  
  93.     protected def shutdown: Behavior[SelectionKey] = {
  94.       if(token != null) token.cancel()
  95.       channel.close()
  96.       stop
  97.     }
  98.    
  99.     inline protected def logBuffer(buffer: ByteBuffer, logLevel: LogLevel = LogLevel.Debug)(using Enclosing, Line): Unit =
  100.       if(logger.isEnabled(logLevel)) {
  101.         val builder = new StringBuilder(buffer.limit())
  102.         while (buffer.hasRemaining) {
  103.           builder += buffer.get().toChar
  104.         }
  105.         val message = builder.result()
  106.         logger.log(logLevel, message, null)
  107.       }
  108.     end logBuffer
  109.  
  110.   end NetworkHandler
  111.  
  112.   class Server(channel: ServerSocketChannel)(using ActorEnv) extends NetworkHandler(channel) {
  113.  
  114.     override def handle: Behavior[SelectionKey] =
  115.       receive { key =>
  116.         if(key.isAcceptable) {
  117.           val clientChannel = channel.accept()
  118.           spawnRef(new ClientHandler(clientChannel))
  119.         }
  120.         same
  121.       } and receiveSystem {
  122.         case ChildStopped(_, _) => shutdown
  123.       }
  124.  
  125.   }
  126.  
  127.   class ClientHandler(channel: SocketChannel)(using ActorEnv) extends NetworkHandler(channel) {
  128.  
  129.     private val buffer = ByteBuffer.allocateDirect(1024)
  130.  
  131.     override def handle: Behavior[SelectionKey] = read
  132.  
  133.     def read: Behavior[SelectionKey] =
  134.       receive { key =>
  135.         if(key.isReadable) {
  136.           val bytesRead = channel.read(buffer)
  137.           if(bytesRead > 0) {
  138.             buffer.flip()
  139.             write
  140.           } else shutdown
  141.         } else same
  142.       }.ignoreSystem
  143.  
  144.     def write: Behavior[SelectionKey] =
  145.       receive { key =>
  146.         if(key.isWritable) {
  147.           channel.write(buffer)
  148.           buffer.clear()
  149.           read
  150.         } else same
  151.       }.ignoreSystem
  152.  
  153.   }
  154.  
  155.   @main def client(): Unit =
  156.     val channel = SocketChannel.open()
  157.     channel.connect(serverAddress)
  158.     runtime.spawn(new Client(channel))
  159.     scheduler.run()
  160.   end client
  161.  
  162.   class Client(channel: SocketChannel)(using ActorEnv) extends NetworkHandler(channel) {
  163.  
  164.     private val buffer = ByteBuffer.allocateDirect(1024)
  165.  
  166.     override def handle: Behavior[SelectionKey] = write
  167.  
  168.     def write: Behavior[SelectionKey] =
  169.       receive { key =>
  170.         if(key.isWritable) {
  171.           buffer.put("Hello World".getBytes)
  172.           buffer.flip()
  173.           channel.write(buffer)
  174.           buffer.clear()
  175.           read
  176.         } else same
  177.       }.ignoreSystem
  178.  
  179.     def read: Behavior[SelectionKey] =
  180.       receive { key =>
  181.         if(key.isReadable) {
  182.           channel.read(buffer)
  183.           buffer.flip()
  184.           logBuffer(buffer)
  185.           buffer.rewind()
  186.           shutdown
  187.         } else same
  188.       }.ignoreSystem
  189.  
  190.   }
  191.  
  192. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement