Advertisement
Stormtalons

SocketHive

Aug 29th, 2014
4,387
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package nx.comm
  2.  
  3. import java.net.{InetSocketAddress, StandardSocketOptions}
  4. import java.nio.ByteBuffer
  5. import java.nio.channels.{ServerSocketChannel, SelectionKey, Selector, SocketChannel}
  6.  
  7. import nx.{Util, Asynch}
  8.  
  9. import scala.collection.mutable.ArrayBuffer
  10.  
  11. class SocketHive extends Asynch with Util
  12. {
  13.     var abandonHive = false
  14.     var hiveQueen = Selector.open
  15.     var populationCap = 1
  16.     val swarm = ArrayBuffer[SocketChannel]()
  17.     def primarySwarmling = if (swarm.length > 0) swarm(0) else null
  18.     def spawn(_host: String, _port: Int) =
  19.     {
  20.         migrate
  21.         SocketChannel.open
  22.             .setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, true)
  23.             .configureBlocking(false)
  24.             .register(hiveQueen, SelectionKey.OP_CONNECT)
  25.             .asInstanceOf[SocketChannel]
  26.             .connect(new InetSocketAddress(_host, _port))
  27.     }
  28.     def hatch(_swarmling: SocketChannel): Unit =
  29.     {
  30.         if (swarm.length >= populationCap)
  31.             tryDo(_swarmling.close)
  32.         else
  33.         {
  34.             _swarmling
  35.                 .setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, true)
  36.                 .configureBlocking(false)
  37.             if (!_swarmling.isConnected)
  38.                 _swarmling
  39.                     .register(hiveQueen, SelectionKey.OP_CONNECT)
  40.                     .asInstanceOf[SocketChannel]
  41.                     .connect(primarySwarmling.getRemoteAddress)
  42.             else
  43.             {
  44.                 _swarmling.keyFor(hiveQueen).cancel
  45.                 _swarmling.register(hiveQueen, SelectionKey.OP_READ | SelectionKey.OP_WRITE)
  46.                 synchronized{swarm += _swarmling}
  47.             }
  48.         }
  49.     }
  50.     def hatch(_spawnlingCount: Int): Unit =
  51.         for (i <- 0 until _spawnlingCount)
  52.             if (swarm.length < populationCap)
  53.                 hatch(SocketChannel.open)
  54.     def killSwarmling(_swarmling: SocketChannel) =
  55.     {
  56.         _swarmling.keyFor(hiveQueen).cancel
  57.         _swarmling.close
  58.         synchronized{swarm -= _swarmling}
  59.     }
  60.  
  61.     val spawningPool = ServerSocketChannel.open
  62.     spawningPool.setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, true)
  63.     spawningPool.configureBlocking(false)
  64.     def assimilate(_spawnlingCount: Int) =
  65.     {
  66.         populationCap = _spawnlingCount + 1
  67.         spawningPool.register(hiveQueen, SelectionKey.OP_ACCEPT)
  68.         command(s"spawn|${_spawnlingCount}": SendableString)
  69.     }
  70.  
  71.     def command(_cmd: Sendable[_]) =
  72.     {
  73.         val byteGroups = _cmd.bytes.grouped(swarm.length)
  74.         var i = 0
  75.         while (byteGroups.hasNext)
  76.         {
  77.             tryDo(swarm(i).write(ByteBuffer.wrap(i.toByte +: byteGroups.next union (eom: Array[Byte]))), _e => depart)
  78.             i += 1
  79.         }
  80.     }
  81.  
  82.     addActivity(
  83.         while (hiveQueen.select > 0)
  84.         {
  85.             val hive = hiveQueen.selectedKeys.iterator
  86.             while (hive.hasNext)
  87.                 hive.next match
  88.                 {
  89.                     case swarmling if swarmling.isAcceptable => hatch(swarmling.channel.asInstanceOf[ServerSocketChannel].accept)
  90.  
  91.                     case swarmling if swarmling.isConnectable => hatch(swarmling.channel.asInstanceOf[SocketChannel])
  92.  
  93.                     case swarmling if swarmling.isReadable =>
  94.                         val swarmlingMemory = swarmling.attachment.asInstanceOf[ByteBuffer]
  95.                         val toMemorize = ByteBuffer.allocate(32 * 1024)
  96.                         swarmling.channel.asInstanceOf[SocketChannel].read(toMemorize)
  97.                         toMemorize.flip
  98.                         swarmlingMemory.limit(swarmlingMemory.capacity)
  99.                         swarmlingMemory.put(toMemorize)
  100.  
  101.                     case swarmling if swarmling.isWritable =>
  102.                         val swarmlingMemory = swarmling.attachment.asInstanceOf[ByteBuffer]
  103.                         swarmlingMemory.flip
  104.                         while (swarmlingMemory.hasRemaining)
  105.                             swarmling.channel.asInstanceOf[SocketChannel].write(swarmlingMemory)
  106.                 }
  107.         })
  108.  
  109.     def depart = synchronized
  110.     {
  111.         hiveQueen.close
  112.         swarm.clear
  113.         abandonHive = true
  114.     }
  115.     def infest = synchronized
  116.     {
  117.         abandonHive = false
  118.         hiveQueen = Selector.open
  119.         populationCap = 1
  120.     }
  121.     def migrate =
  122.     {
  123.         depart
  124.         infest
  125.     }
  126.  
  127.     run
  128. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement