Advertisement
Guest User

SocketHive

a guest
Aug 29th, 2014
298
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.61 KB | None | 0 0
  1. package nx.comm
  2.  
  3. import java.net.{InetSocketAddress, StandardSocketOptions}
  4. import java.nio.ByteBuffer
  5. import java.nio.channels.{ServerSocketChannel, SelectionKey, Selector, SocketChannel}
  6.  
  7. import nx.{Util, Asynch}
  8.  
  9. import scala.collection.mutable.ArrayBuffer
  10.  
  11. class SocketHive extends Asynch with Util
  12. {
  13. var abandonHive = false
  14. var hiveQueen = Selector.open
  15. var populationCap = 1
  16. val swarm = ArrayBuffer[SocketChannel]()
  17. def primarySwarmling = if (swarm.length > 0) swarm(0) else null
  18. def spawn(_host: String, _port: Int) =
  19. {
  20. migrate
  21. SocketChannel.open
  22. .setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, true)
  23. .configureBlocking(false)
  24. .register(hiveQueen, SelectionKey.OP_CONNECT)
  25. .asInstanceOf[SocketChannel]
  26. .connect(new InetSocketAddress(_host, _port))
  27. }
  28. def hatch(_swarmling: SocketChannel): Unit =
  29. {
  30. if (swarm.length >= populationCap)
  31. tryDo(_swarmling.close)
  32. else
  33. {
  34. _swarmling
  35. .setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, true)
  36. .configureBlocking(false)
  37. if (!_swarmling.isConnected)
  38. _swarmling
  39. .register(hiveQueen, SelectionKey.OP_CONNECT)
  40. .asInstanceOf[SocketChannel]
  41. .connect(primarySwarmling.getRemoteAddress)
  42. else
  43. {
  44. _swarmling.keyFor(hiveQueen).cancel
  45. _swarmling.register(hiveQueen, SelectionKey.OP_READ | SelectionKey.OP_WRITE)
  46. synchronized{swarm += _swarmling}
  47. }
  48. }
  49. }
  50. def hatch(_spawnlingCount: Int): Unit =
  51. for (i <- 0 until _spawnlingCount)
  52. if (swarm.length < populationCap)
  53. hatch(SocketChannel.open)
  54. def killSwarmling(_swarmling: SocketChannel) =
  55. {
  56. _swarmling.keyFor(hiveQueen).cancel
  57. _swarmling.close
  58. synchronized{swarm -= _swarmling}
  59. }
  60.  
  61. val spawningPool = ServerSocketChannel.open
  62. spawningPool.setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, true)
  63. spawningPool.configureBlocking(false)
  64. def assimilate(_spawnlingCount: Int) =
  65. {
  66. populationCap = _spawnlingCount + 1
  67. spawningPool.register(hiveQueen, SelectionKey.OP_ACCEPT)
  68. command(s"spawn|${_spawnlingCount}": SendableString)
  69. }
  70.  
  71. def command(_cmd: Sendable[_]) =
  72. {
  73. val byteGroups = _cmd.bytes.grouped(swarm.length)
  74. var i = 0
  75. while (byteGroups.hasNext)
  76. {
  77. tryDo(swarm(i).write(ByteBuffer.wrap(i.toByte +: byteGroups.next union (eom: Array[Byte]))), _e => depart)
  78. i += 1
  79. }
  80. }
  81.  
  82. addActivity(
  83. while (hiveQueen.select > 0)
  84. {
  85. val hive = hiveQueen.selectedKeys.iterator
  86. while (hive.hasNext)
  87. hive.next match
  88. {
  89. case swarmling if swarmling.isAcceptable => hatch(swarmling.channel.asInstanceOf[ServerSocketChannel].accept)
  90.  
  91. case swarmling if swarmling.isConnectable => hatch(swarmling.channel.asInstanceOf[SocketChannel])
  92.  
  93. case swarmling if swarmling.isReadable =>
  94. val swarmlingMemory = swarmling.attachment.asInstanceOf[ByteBuffer]
  95. val toMemorize = ByteBuffer.allocate(32 * 1024)
  96. swarmling.channel.asInstanceOf[SocketChannel].read(toMemorize)
  97. toMemorize.flip
  98. swarmlingMemory.limit(swarmlingMemory.capacity)
  99. swarmlingMemory.put(toMemorize)
  100.  
  101. case swarmling if swarmling.isWritable =>
  102. val swarmlingMemory = swarmling.attachment.asInstanceOf[ByteBuffer]
  103. swarmlingMemory.flip
  104. while (swarmlingMemory.hasRemaining)
  105. swarmling.channel.asInstanceOf[SocketChannel].write(swarmlingMemory)
  106. }
  107. })
  108.  
  109. def depart = synchronized
  110. {
  111. hiveQueen.close
  112. swarm.clear
  113. abandonHive = true
  114. }
  115. def settle = synchronized
  116. {
  117. abandonHive = false
  118. hiveQueen = Selector.open
  119. populationCap = 1
  120. }
  121. def migrate =
  122. {
  123. depart
  124. settle
  125. }
  126.  
  127. run
  128. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement