SHOW:
|
|
- or go back to the newest paste.
1 | package nx.comm | |
2 | ||
3 | import java.net.{InetSocketAddress, StandardSocketOptions} | |
4 | import java.nio.ByteBuffer | |
5 | import java.nio.channels.{ServerSocketChannel, SelectionKey, Selector, SocketChannel} | |
6 | ||
7 | import nx.{Util, Asynch} | |
8 | ||
9 | import scala.collection.mutable.ArrayBuffer | |
10 | ||
11 | class SocketHive extends Asynch with Util | |
12 | { | |
13 | var abandonHive = false | |
14 | var hiveQueen = Selector.open | |
15 | var populationCap = 1 | |
16 | val swarm = ArrayBuffer[SocketChannel]() | |
17 | def primarySwarmling = if (swarm.length > 0) swarm(0) else null | |
18 | def spawn(_host: String, _port: Int) = | |
19 | { | |
20 | migrate | |
21 | SocketChannel.open | |
22 | .setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, true) | |
23 | .configureBlocking(false) | |
24 | .register(hiveQueen, SelectionKey.OP_CONNECT) | |
25 | .asInstanceOf[SocketChannel] | |
26 | .connect(new InetSocketAddress(_host, _port)) | |
27 | } | |
28 | def hatch(_swarmling: SocketChannel): Unit = | |
29 | { | |
30 | if (swarm.length >= populationCap) | |
31 | tryDo(_swarmling.close) | |
32 | else | |
33 | { | |
34 | _swarmling | |
35 | .setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, true) | |
36 | .configureBlocking(false) | |
37 | if (!_swarmling.isConnected) | |
38 | _swarmling | |
39 | .register(hiveQueen, SelectionKey.OP_CONNECT) | |
40 | .asInstanceOf[SocketChannel] | |
41 | .connect(primarySwarmling.getRemoteAddress) | |
42 | else | |
43 | { | |
44 | _swarmling.keyFor(hiveQueen).cancel | |
45 | _swarmling.register(hiveQueen, SelectionKey.OP_READ | SelectionKey.OP_WRITE) | |
46 | synchronized{swarm += _swarmling} | |
47 | } | |
48 | } | |
49 | } | |
50 | def hatch(_spawnlingCount: Int): Unit = | |
51 | for (i <- 0 until _spawnlingCount) | |
52 | if (swarm.length < populationCap) | |
53 | hatch(SocketChannel.open) | |
54 | def killSwarmling(_swarmling: SocketChannel) = | |
55 | { | |
56 | _swarmling.keyFor(hiveQueen).cancel | |
57 | _swarmling.close | |
58 | synchronized{swarm -= _swarmling} | |
59 | } | |
60 | ||
61 | val spawningPool = ServerSocketChannel.open | |
62 | spawningPool.setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, true) | |
63 | spawningPool.configureBlocking(false) | |
64 | def assimilate(_spawnlingCount: Int) = | |
65 | { | |
66 | populationCap = _spawnlingCount + 1 | |
67 | spawningPool.register(hiveQueen, SelectionKey.OP_ACCEPT) | |
68 | command(s"spawn|${_spawnlingCount}": SendableString) | |
69 | } | |
70 | ||
71 | def command(_cmd: Sendable[_]) = | |
72 | { | |
73 | val byteGroups = _cmd.bytes.grouped(swarm.length) | |
74 | var i = 0 | |
75 | while (byteGroups.hasNext) | |
76 | { | |
77 | tryDo(swarm(i).write(ByteBuffer.wrap(i.toByte +: byteGroups.next union (eom: Array[Byte]))), _e => depart) | |
78 | i += 1 | |
79 | } | |
80 | } | |
81 | ||
82 | addActivity( | |
83 | while (hiveQueen.select > 0) | |
84 | { | |
85 | val hive = hiveQueen.selectedKeys.iterator | |
86 | while (hive.hasNext) | |
87 | hive.next match | |
88 | { | |
89 | case swarmling if swarmling.isAcceptable => hatch(swarmling.channel.asInstanceOf[ServerSocketChannel].accept) | |
90 | ||
91 | case swarmling if swarmling.isConnectable => hatch(swarmling.channel.asInstanceOf[SocketChannel]) | |
92 | ||
93 | case swarmling if swarmling.isReadable => | |
94 | val swarmlingMemory = swarmling.attachment.asInstanceOf[ByteBuffer] | |
95 | val toMemorize = ByteBuffer.allocate(32 * 1024) | |
96 | swarmling.channel.asInstanceOf[SocketChannel].read(toMemorize) | |
97 | toMemorize.flip | |
98 | swarmlingMemory.limit(swarmlingMemory.capacity) | |
99 | swarmlingMemory.put(toMemorize) | |
100 | ||
101 | case swarmling if swarmling.isWritable => | |
102 | val swarmlingMemory = swarmling.attachment.asInstanceOf[ByteBuffer] | |
103 | swarmlingMemory.flip | |
104 | while (swarmlingMemory.hasRemaining) | |
105 | swarmling.channel.asInstanceOf[SocketChannel].write(swarmlingMemory) | |
106 | } | |
107 | }) | |
108 | ||
109 | def depart = synchronized | |
110 | { | |
111 | hiveQueen.close | |
112 | swarm.clear | |
113 | abandonHive = true | |
114 | } | |
115 | - | def settle = synchronized |
115 | + | def infest = synchronized |
116 | { | |
117 | abandonHive = false | |
118 | hiveQueen = Selector.open | |
119 | populationCap = 1 | |
120 | } | |
121 | def migrate = | |
122 | { | |
123 | depart | |
124 | - | settle |
124 | + | infest |
125 | } | |
126 | ||
127 | run | |
128 | } |