Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import scala.actors.{Actor, TIMEOUT, Exit}
- import scala.actors.Actor.State.{New, Terminated}
- import scala.util.logging.{Logged, ConsoleLogger}
- import scala.util.control.Exception.allCatch
- import scala.collection.JavaConversions._
- import scala.util.Random
- import java.net.InetSocketAddress
- import java.nio.ByteBuffer
- import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel}
- import java.nio.charset.Charset
- import java.io.IOException
- object EchoServerRev1 {
- def main (args:Array[String]) {
- val supervisor = new EchoServerSupervisor() with EchoServerConsoleLoggerFactory
- supervisor.start
- Thread.sleep(60000)
- supervisor.stop
- }
- }
- sealed abstract class SupervisorMessage
- case class Link(childActor: Actor) extends SupervisorMessage
- case object Stop extends SupervisorMessage
- class EchoServerSupervisor(
- port: Int = 10000
- ) extends Actor with EchoServerLoggerFactory {
- trapExit = true
- val logger = makeLogger()
- val acceptor = new EchoServerAcceptor(this, logger, port)
- def act() {
- startChildren()
- loop {
- react {
- case Link(child: Actor) =>
- link(child)
- case Exit(child: Actor, 'normal) if child == acceptor =>
- exit("stop")
- case Exit(child: Actor, 'normal) =>
- case Exit(child: Actor, reason: Exception) =>
- logger.write("receive Exit: %s" format reason.getMessage)
- restartChild(child)
- case Exit(child: Actor, reason) =>
- logger.write("receive Exit: %s" format reason)
- restartChild(child)
- case Stop =>
- acceptor.stop()
- case unknown =>
- logger.write("unknown message [%s], ignoring" format unknown)
- }
- }
- }
- def startChildren() {
- Seq(logger, acceptor) foreach { child =>
- child.getState match {
- case New => startChild(child)
- case Terminated => exit("Could not restart server.")
- case _ =>
- }
- }
- }
- def startChild(child: Actor) {
- link(child)
- child.start
- }
- def restartChild(child: Actor) {
- link(child)
- child.restart
- }
- def stop = this ! Stop
- }
- sealed abstract class LoggerMessage
- case class Log(message: String) extends LoggerMessage
- class EchoServerLogger extends Actor with Logged {
- def act = loop {
- react {
- case Log(message) => log(message)
- case unknown => log("unknown message [%s], ignoring" format unknown)
- }
- }
- def write(message: String) {
- if (this.mailboxSize < 100) {
- this ! Log(message)
- }
- }
- }
- trait EchoServerLoggerFactory {
- def makeLogger(): EchoServerLogger =
- new EchoServerLogger()
- }
- trait EchoServerConsoleLoggerFactory extends EchoServerLoggerFactory {
- override def makeLogger(): EchoServerLogger =
- new EchoServerLogger() with ConsoleLogger
- }
- case class ChangeRequest(socket:SocketChannel, pos: Int)
- case class CloseChannel(socket:SocketChannel)
- class EchoServerAcceptor(
- supervisor: EchoServerSupervisor,
- logger: EchoServerLogger,
- port: Int
- ) extends Actor {
- val selector = Selector.open()
- val serverChannel = {
- val channel = ServerSocketChannel.open()
- channel.configureBlocking(false)
- val socket = channel.socket
- socket.setReuseAddress(true)
- socket.bind(new InetSocketAddress(port))
- channel
- }
- val serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT)
- val random = Random
- logger.write("Start echo server. listen port is %d" format port)
- def act {
- while (true) {
- selector.select()
- handleKeys()
- receiveStop()
- if (random.nextInt(500000) == 0) {
- throw new Exception("acceptor exception test")
- }
- }
- }
- def receiveStop() {
- receiveWithin(0) {
- case Stop =>
- serverChannel.close()
- selector.close()
- logger.write("Stop echo server.")
- exit()
- case ChangeRequest(sc, ops) =>
- sc.keyFor(selector).interestOps(ops)
- case CloseChannel(sc) =>
- sc.keyFor(selector).cancel
- sc.close
- case TIMEOUT =>
- case unknown =>
- logger.write("unknown message [%s], ignoring" format unknown)
- }
- }
- def stop() = {
- this ! Stop
- selector.wakeup
- }
- def handleKeys() {
- selector.selectedKeys foreach { key =>
- if (key.isValid) handleKey(key)
- }
- selector.selectedKeys.clear()
- }
- def handleKey(key: SelectionKey) {
- if (serverKey == key && key.isAcceptable) {
- accept()
- } else {
- val handler = key.attachment.asInstanceOf[EchoServerHandler]
- if (key.isReadable) {
- handler.sendMessage(Read)
- key.interestOps(0)
- }
- if (key.isWritable) {
- handler.sendMessage(Write)
- key.interestOps(0)
- }
- }
- }
- def accept() {
- serverChannel.accept() match {
- case channel: SocketChannel =>
- val remoteAddress = channel.socket.getRemoteSocketAddress.toString
- logger.write("connect from [%s]" format remoteAddress)
- channel.configureBlocking(false)
- val handler = new EchoServerHandler(this, logger, channel);
- supervisor ! Link(handler)
- handler.start
- channel.register(
- selector,
- SelectionKey.OP_READ,
- handler
- )
- handler ! Ack
- case _ =>
- }
- }
- }
- sealed abstract class HandlerMessage
- case object Read extends HandlerMessage
- case object Write extends HandlerMessage
- case object Ack extends HandlerMessage
- class EchoServerHandler(
- acceptor: EchoServerAcceptor,
- logger: EchoServerLogger,
- channel: SocketChannel
- ) extends Actor {
- type State = PartialFunction[Any, Unit]
- val buffer = ByteBuffer.allocate(1024)
- val decoder = Charset.forName("UTF-8")
- val remoteAddress = channel.socket.getRemoteSocketAddress.toString
- val MessageLine = """^(.*)[\r\n]{0,2}$""".r
- def sendMessage(message: HandlerMessage) {
- if (this.mailboxSize < 5000) {
- this ! message
- }
- }
- def act = {
- buffer.clear()
- reactWithin(100)(ack)
- }
- def ack: State = {
- case Ack => react(doAck())
- case TIMEOUT => close()
- }
- def read: State = {
- case Read => reactWithin(100)(doRead())
- case Write => react(read)
- case TIMEOUT => react(read)
- case unknown =>
- logger.write("unknown message [%s], ignoring" format unknown)
- react(read)
- }
- def write: State = {
- case Write => react(doWrite())
- case TIMEOUT => react(read)
- }
- def doAck(): State = {
- "hello\r\n".map(_.hashCode.toByte).foreach(buffer.put)
- buffer.flip()
- acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE)
- acceptor.selector.wakeup
- write
- }
- def doRead(): State = {
- allCatch opt channel.read(buffer) match {
- case Some(0) =>
- acceptor ! ChangeRequest(channel, SelectionKey.OP_READ)
- acceptor.selector.wakeup
- read
- case Some(-1) => close()
- case Some(_) =>
- buffer.flip()
- handleMessage()
- case None =>
- logger.write("read error")
- close()
- }
- }
- def close(): Nothing = {
- acceptor ! CloseChannel(channel)
- acceptor.selector.wakeup
- logger.write("disconnect from [%s]" format remoteAddress)
- exit()
- }
- def handleMessage(): State = {
- val message = getMessage
- printLog("read", message)
- message match {
- case MessageLine("exit") => close()
- case MessageLine("test") => throw new Exception("handler exception test")
- case _ =>
- acceptor ! ChangeRequest(channel, SelectionKey.OP_WRITE)
- acceptor.selector.wakeup
- write
- }
- }
- def doWrite(): State = {
- printLog("write", getMessage)
- writeBuffer()
- }
- def getMessage: String = {
- val message = decoder.decode(buffer).toString
- buffer.flip()
- message
- }
- def printLog(state: String, message: String) {
- logger.write("%s-Actor[%s] %s %s: %s".format(
- Thread.currentThread,
- this,
- state,
- remoteAddress,
- message
- ))
- }
- def writeBuffer(): State = {
- // TODO: There is no guarantee that channel.write(buffer)
- // writes all data at one time trial.
- allCatch opt channel.write(buffer) match {
- case Some(_) =>
- buffer.clear()
- acceptor ! ChangeRequest(channel, SelectionKey.OP_READ)
- acceptor.selector.wakeup
- read
- case None =>
- logger.write("write error")
- close()
- }
- }
- }
Add Comment
Please, Sign In to add comment