View difference between Paste ID: WP6qDbnX and kpujahsU
SHOW: | | - or go back to the newest paste.
1
package nx.comm
2
3
import java.net.{InetSocketAddress, StandardSocketOptions}
4
import java.nio.ByteBuffer
5
import java.nio.channels.{ServerSocketChannel, SelectionKey, Selector, SocketChannel}
6
7
import nx.{Util, Asynch}
8
9
import scala.collection.mutable.ArrayBuffer
10
11
class SocketHive extends Asynch with Util
12
{
13
	var abandonHive = false
14
	var hiveQueen = Selector.open
15
	var populationCap = 1
16
	val swarm = ArrayBuffer[SocketChannel]()
17
	def primarySwarmling = if (swarm.length > 0) swarm(0) else null
18
	def spawn(_host: String, _port: Int) =
19
	{
20
		migrate
21
		SocketChannel.open
22
			.setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, true)
23
			.configureBlocking(false)
24
			.register(hiveQueen, SelectionKey.OP_CONNECT)
25
			.asInstanceOf[SocketChannel]
26
			.connect(new InetSocketAddress(_host, _port))
27
	}
28
	def hatch(_swarmling: SocketChannel): Unit =
29
	{
30
		if (swarm.length >= populationCap)
31
			tryDo(_swarmling.close)
32
		else
33
		{
34
			_swarmling
35
				.setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, true)
36
				.configureBlocking(false)
37
			if (!_swarmling.isConnected)
38
				_swarmling
39
					.register(hiveQueen, SelectionKey.OP_CONNECT)
40
					.asInstanceOf[SocketChannel]
41
					.connect(primarySwarmling.getRemoteAddress)
42
			else
43
			{
44
				_swarmling.keyFor(hiveQueen).cancel
45
				_swarmling.register(hiveQueen, SelectionKey.OP_READ | SelectionKey.OP_WRITE)
46
				synchronized{swarm += _swarmling}
47
			}
48
		}
49
	}
50
	def hatch(_spawnlingCount: Int): Unit =
51
		for (i <- 0 until _spawnlingCount)
52
			if (swarm.length < populationCap)
53
				hatch(SocketChannel.open)
54
	def killSwarmling(_swarmling: SocketChannel) =
55
	{
56
		_swarmling.keyFor(hiveQueen).cancel
57
		_swarmling.close
58
		synchronized{swarm -= _swarmling}
59
	}
60
61
	val spawningPool = ServerSocketChannel.open
62
	spawningPool.setOption[java.lang.Boolean](StandardSocketOptions.SO_REUSEADDR, true)
63
	spawningPool.configureBlocking(false)
64
	def assimilate(_spawnlingCount: Int) =
65
	{
66
		populationCap = _spawnlingCount + 1
67
		spawningPool.register(hiveQueen, SelectionKey.OP_ACCEPT)
68
		command(s"spawn|${_spawnlingCount}": SendableString)
69
	}
70
71
	def command(_cmd: Sendable[_]) =
72
	{
73
		val byteGroups = _cmd.bytes.grouped(swarm.length)
74
		var i = 0
75
		while (byteGroups.hasNext)
76
		{
77
			tryDo(swarm(i).write(ByteBuffer.wrap(i.toByte +: byteGroups.next union (eom: Array[Byte]))), _e => depart)
78
			i += 1
79
		}
80
	}
81
82
	addActivity(
83
		while (hiveQueen.select > 0)
84
		{
85
			val hive = hiveQueen.selectedKeys.iterator
86
			while (hive.hasNext)
87
				hive.next match
88
				{
89
					case swarmling if swarmling.isAcceptable => hatch(swarmling.channel.asInstanceOf[ServerSocketChannel].accept)
90
91
					case swarmling if swarmling.isConnectable => hatch(swarmling.channel.asInstanceOf[SocketChannel])
92
93
					case swarmling if swarmling.isReadable =>
94
						val swarmlingMemory = swarmling.attachment.asInstanceOf[ByteBuffer]
95
						val toMemorize = ByteBuffer.allocate(32 * 1024)
96
						swarmling.channel.asInstanceOf[SocketChannel].read(toMemorize)
97
						toMemorize.flip
98
						swarmlingMemory.limit(swarmlingMemory.capacity)
99
						swarmlingMemory.put(toMemorize)
100
101
					case swarmling if swarmling.isWritable =>
102
						val swarmlingMemory = swarmling.attachment.asInstanceOf[ByteBuffer]
103
						swarmlingMemory.flip
104
						while (swarmlingMemory.hasRemaining)
105
							swarmling.channel.asInstanceOf[SocketChannel].write(swarmlingMemory)
106
				}
107
		})
108
109
	def depart = synchronized
110
	{
111
		hiveQueen.close
112
		swarm.clear
113
		abandonHive = true
114
	}
115-
	def settle = synchronized
115+
	def infest = synchronized
116
	{
117
		abandonHive = false
118
		hiveQueen = Selector.open
119
		populationCap = 1
120
	}
121
	def migrate =
122
	{
123
		depart
124-
		settle
124+
		infest
125
	}
126
127
	run
128
}