Guest User

Untitled

a guest
Jul 18th, 2018
100
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.26 KB | None | 0 0
  1. import scala.actors.{Actor, TIMEOUT, Exit}
  2. import scala.actors.Actor.State.{New, Terminated}
  3. import scala.util.logging.{Logged, ConsoleLogger}
  4. import scala.util.control.Exception.allCatch
  5. import scala.collection.JavaConversions._
  6. import scala.util.Random
  7.  
  8. import java.net.InetSocketAddress
  9. import java.nio.ByteBuffer
  10. import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel}
  11. import java.nio.charset.Charset
  12. import java.io.IOException
  13.  
  14. object EchoServerRev1 {
  15. def main (args:Array[String]) {
  16. val supervisor = new EchoServerSupervisor() with EchoServerConsoleLoggerFactory
  17. supervisor.start
  18. Thread.sleep(60000)
  19. supervisor.stop
  20. }
  21. }
  22.  
  23. sealed abstract class SupervisorMessage
  24. case class Link(childActor: Actor) extends SupervisorMessage
  25. case object Stop extends SupervisorMessage
  26.  
  27. class EchoServerSupervisor(
  28. port: Int = 10000
  29. ) extends Actor with EchoServerLoggerFactory {
  30. trapExit = true
  31.  
  32. val logger = makeLogger()
  33. val acceptor = new EchoServerAcceptor(this, logger, port)
  34.  
  35. def act() {
  36. startChildren()
  37. loop {
  38. react {
  39. case Link(child: Actor) =>
  40. link(child)
  41. case Exit(child: Actor, 'normal) if child == acceptor =>
  42. exit("stop")
  43. case Exit(child: Actor, 'normal) =>
  44. case Exit(child: Actor, reason: Exception) =>
  45. logger.write("receive Exit: %s" format reason.getMessage)
  46. restartChild(child)
  47. case Exit(child: Actor, reason) =>
  48. logger.write("receive Exit: %s" format reason)
  49. restartChild(child)
  50. case Stop =>
  51. acceptor.stop()
  52. case unknown =>
  53. logger.write("unknown message [%s], ignoring" format unknown)
  54. }
  55. }
  56. }
  57.  
  58. def startChildren() {
  59. Seq(logger, acceptor) foreach { child =>
  60. child.getState match {
  61. case New => startChild(child)
  62. case Terminated => exit("Could not restart server.")
  63. case _ =>
  64. }
  65. }
  66. }
  67.  
  68. def startChild(child: Actor) {
  69. link(child)
  70. child.start
  71. }
  72.  
  73. def restartChild(child: Actor) {
  74. link(child)
  75. child.restart
  76. }
  77.  
  78. def stop = this ! Stop
  79. }
  80.  
  81. sealed abstract class LoggerMessage
  82. case class Log(message: String) extends LoggerMessage
  83.  
  84. class EchoServerLogger extends Actor with Logged {
  85. def act = loop {
  86. react {
  87. case Log(message) => log(message)
  88. case unknown => log("unknown message [%s], ignoring" format unknown)
  89. }
  90. }
  91.  
  92. def write(message: String) {
  93. if (this.mailboxSize < 100) {
  94. this ! Log(message)
  95. }
  96. }
  97. }
  98.  
  99. trait EchoServerLoggerFactory {
  100. def makeLogger(): EchoServerLogger =
  101. new EchoServerLogger()
  102. }
  103.  
  104. trait EchoServerConsoleLoggerFactory extends EchoServerLoggerFactory {
  105. override def makeLogger(): EchoServerLogger =
  106. new EchoServerLogger() with ConsoleLogger
  107. }
  108.  
  109. case class ChangeRequest(socket:SocketChannel, pos: Int)
  110. case class CloseChannel(socket:SocketChannel)
  111.  
  112. class EchoServerAcceptor(
  113. supervisor: EchoServerSupervisor,
  114. logger: EchoServerLogger,
  115. port: Int
  116. ) extends Actor {
  117. val selector = Selector.open()
  118.  
  119. val serverChannel = {
  120. val channel = ServerSocketChannel.open()
  121. channel.configureBlocking(false)
  122.  
  123. val socket = channel.socket
  124. socket.setReuseAddress(true)
  125. socket.bind(new InetSocketAddress(port))
  126.  
  127. channel
  128. }
  129.  
  130. val serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT)
  131.  
  132. val random = Random
  133.  
  134. logger.write("Start echo server. listen port is %d" format port)
  135.  
  136. def act {
  137. while (true) {
  138. selector.select()
  139. handleKeys()
  140. receiveStop()
  141. if (random.nextInt(500000) == 0) {
  142. throw new Exception("acceptor exception test")
  143. }
  144. }
  145. }
  146.  
  147. def receiveStop() {
  148. receiveWithin(0) {
  149. case Stop =>
  150. serverChannel.close()
  151. selector.close()
  152. logger.write("Stop echo server.")
  153. exit()
  154. case ChangeRequest(sc, ops) =>
  155. sc.keyFor(selector).interestOps(ops)
  156. case CloseChannel(sc) =>
  157. sc.keyFor(selector).cancel
  158. sc.close
  159. case TIMEOUT =>
  160. case unknown =>
  161. logger.write("unknown message [%s], ignoring" format unknown)
  162. }
  163. }
  164.  
  165. def stop() = {
  166. this ! Stop
  167. selector.wakeup
  168. }
  169.  
  170. def handleKeys() {
  171. selector.selectedKeys foreach { key =>
  172. if (key.isValid) handleKey(key)
  173. }
  174. selector.selectedKeys.clear()
  175. }
  176.  
  177. def handleKey(key: SelectionKey) {
  178. if (serverKey == key && key.isAcceptable) {
  179. accept()
  180. } else {
  181. val handler = key.attachment.asInstanceOf[EchoServerHandler]
  182. if (key.isReadable) {
  183. handler.sendMessage(Read)
  184. key.interestOps(0)
  185. }
  186. if (key.isWritable) {
  187. handler.sendMessage(Write)
  188. key.interestOps(0)
  189. }
  190. }
  191. }
  192.  
  193. def accept() {
  194. serverChannel.accept() match {
  195. case channel: SocketChannel =>
  196. val remoteAddress = channel.socket.getRemoteSocketAddress.toString
  197. logger.write("connect from [%s]" format remoteAddress)
  198.  
  199. channel.configureBlocking(false)
  200.  
  201. val handler = new EchoServerHandler(this, logger, channel);
  202. supervisor ! Link(handler)
  203. handler.start
  204.  
  205. channel.register(
  206. selector,
  207. SelectionKey.OP_READ,
  208. handler
  209. )
  210.  
  211. handler ! Ack
  212. case _ =>
  213. }
  214. }
  215. }
  216.  
  217. sealed abstract class HandlerMessage
  218. case object Read extends HandlerMessage
  219. case object Write extends HandlerMessage
  220. case object Ack extends HandlerMessage
  221.  
  222. class EchoServerHandler(
  223. acceptor: EchoServerAcceptor,
  224. logger: EchoServerLogger,
  225. channel: SocketChannel
  226. ) extends Actor {
  227. type State = PartialFunction[Any, Unit]
  228.  
  229. val buffer = ByteBuffer.allocate(1024)
  230. val decoder = Charset.forName("UTF-8")
  231. val remoteAddress = channel.socket.getRemoteSocketAddress.toString
  232. val MessageLine = """^(.*)[\r\n]{0,2}$""".r
  233.  
  234. def sendMessage(message: HandlerMessage) {
  235. if (this.mailboxSize < 5000) {
  236. this ! message
  237. }
  238. }
  239.  
  240. def act = {
  241. buffer.clear()
  242. reactWithin(100)(ack)
  243. }
  244.  
  245. def ack: State = {
  246. case Ack => react(doAck())
  247. case TIMEOUT => close()
  248. }
  249.  
  250. def read: State = {
  251. case Read => reactWithin(100)(doRead())
  252. case Write => react(read)
  253. case TIMEOUT => react(read)
  254. case unknown =>
  255. logger.write("unknown message [%s], ignoring" format unknown)
  256. react(read)
  257. }
  258.  
  259. def write: State = {
  260. case Write => react(doWrite())
  261. case TIMEOUT => react(read)
  262. }
  263.  
  264. def doAck(): State = {
  265. "hello\r\n".map(_.hashCode.toByte).foreach(buffer.put)
  266. buffer.flip()
  267. acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE)
  268. acceptor.selector.wakeup
  269. write
  270. }
  271.  
  272. def doRead(): State = {
  273. allCatch opt channel.read(buffer) match {
  274. case Some(0) =>
  275. acceptor ! ChangeRequest(channel, SelectionKey.OP_READ)
  276. acceptor.selector.wakeup
  277. read
  278. case Some(-1) => close()
  279. case Some(_) =>
  280. buffer.flip()
  281. handleMessage()
  282. case None =>
  283. logger.write("read error")
  284. close()
  285. }
  286. }
  287.  
  288. def close(): Nothing = {
  289. acceptor ! CloseChannel(channel)
  290. acceptor.selector.wakeup
  291. logger.write("disconnect from [%s]" format remoteAddress)
  292. exit()
  293. }
  294.  
  295. def handleMessage(): State = {
  296. val message = getMessage
  297. printLog("read", message)
  298. message match {
  299. case MessageLine("exit") => close()
  300. case MessageLine("test") => throw new Exception("handler exception test")
  301. case _ =>
  302. acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE)
  303. acceptor.selector.wakeup
  304. write
  305. }
  306. }
  307.  
  308. def doWrite(): State = {
  309. printLog("write", getMessage)
  310. writeBuffer()
  311. }
  312.  
  313. def getMessage: String = {
  314. val message = decoder.decode(buffer).toString
  315. buffer.flip()
  316. message
  317. }
  318.  
  319. def printLog(state: String, message: String) {
  320. logger.write("%s-Actor[%s] %s %s: %s".format(
  321. Thread.currentThread,
  322. this,
  323. state,
  324. remoteAddress,
  325. message
  326. ))
  327. }
  328.  
  329. def writeBuffer(): State = {
  330. // TODO: There is no guarantee that channel.write(buffer)
  331. // writes all data at one time trial.
  332. allCatch opt channel.write(buffer) match {
  333. case Some(_) =>
  334. buffer.clear()
  335. acceptor ! ChangeRequest(channel, SelectionKey.OP_READ)
  336. acceptor.selector.wakeup
  337. read
  338. case None =>
  339. logger.write("write error")
  340. close()
  341. }
  342. }
  343. }
Add Comment
Please, Sign In to add comment