Advertisement
Guest User

SocketServer receive data from socket

a guest
Jan 27th, 2015
257
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 4.81 KB | None | 0 0
  1. // --- SocketServerActor.scala
  2. import akka.actor.{ActorLogging, Actor, Props}
  3. import akka.io.{IO, Tcp}
  4. import java.net.InetSocketAddress
  5.  
  6. class SocketServerActor(serverPort: Int) extends Actor with ActorLogging {
  7.  
  8.   import Tcp._
  9.   import context.system
  10.  
  11.   log.info("SocketServerActor created\nTrying to bind socket.")
  12.   IO(Tcp) ! Bind(self, new InetSocketAddress(serverPort))
  13.  
  14.   def receive = {
  15.     case Bound(localAddress) => log.info(s"Server listening on $localAddress")
  16.     case CommandFailed(_: Bind) => context stop self
  17.     case Connected(remote, local) =>
  18.       val sb = remote.getAddress.getAddress.addString(new StringBuilder(), "tcp-", "_", "x").append(remote.getPort)
  19.       val actorName = local.getAddress.getAddress.addString(sb, "-", "_", "x").append(local.getPort).toString()
  20.       val connection = sender()
  21.       val handler = context.actorOf(Props(classOf[SocketClientActor], connection), actorName)
  22.       connection ! Register(handler)
  23.     case any => log.info(s">>> Unhandled message '$any'")
  24.   }
  25. }
  26.  
  27.  
  28. // --- SocketClientActor.scala
  29. import akka.actor.{Props, ActorLogging, Actor, ActorRef}
  30. import akka.io.Tcp
  31. import ProtoMessages.MessageRequestBase.MessageRequest
  32. import ProtoMessages.MessageResponseBase.MessageResponse
  33. import net.orionlab.brr.utils.CoreSystem
  34. import net.orionlab.brr.CommunicationMessage.BinarySerializer
  35. import akka.util.ByteString
  36.  
  37. case class IncomingMessage(message: MessageRequest)
  38.  
  39. case class OutgoingMessage(message: MessageResponse)
  40.  
  41. class SocketClientActor(connection: ActorRef) extends Actor with ActorLogging {
  42.  
  43.   import Tcp._
  44.  
  45.   val actorId = System.nanoTime()
  46.   val frameBuilder = context.actorOf(Props(classOf[FrameBuilderActor], 256))
  47.   var userActor: Option[ActorRef] = None
  48.  
  49.   CoreSystem.supervisor.foreach(_ ! ClientConnected(self, actorId))
  50.  
  51.   def receive = {
  52.     case msg: ClientConnected =>
  53.       if (msg.actor == self && msg.actorId == actorId) {
  54.         log.info(s"ClientConnected '${sender()}'")
  55.         userActor = Some(sender())
  56.       }
  57.     case Received(data) => frameBuilder ! BuildFrame(data)
  58.     case CompleteMessage(data) =>
  59.       userActor match {
  60.         case None => log.error(s"Cant send CompleteMessage to empty UserActor.")
  61.         case Some(actor) =>
  62.           BinarySerializer.Deserialize(data.toArray) match {
  63.             case None => log.info(s"Cant send empty message to UserActor.")
  64.             case Some(message) =>
  65.               //              log.info("IncomingMessage sent to UserActor.")
  66.               actor ! IncomingMessage(message)
  67.           }
  68.       }
  69.     case OutgoingMessage(message) =>
  70.       //      log.info(s"Received OutgoingMessage, send it to $connection")
  71.       connection ! Write(ByteString(BinarySerializer.Serialize(message)))
  72.     case PeerClosed =>
  73.       CoreSystem.supervisor.foreach(_ ! ClientDisconnected(self, actorId))
  74.       context.stop(self)
  75.     case any => log.info(s"Unhandled event $any")
  76.   }
  77. }
  78.  
  79. case class BuildFrame(data: ByteString)
  80.  
  81. case class CompleteMessage(data: ByteString)
  82.  
  83. class FrameBuilderActor(bufferSize: Int) extends Actor with ActorLogging {
  84.   private val headerSize = 4
  85.   private var messageLength = 0
  86.   private var messageBuffer = ByteString.empty
  87.  
  88.   def receive = {
  89.     case BuildFrame(data) => parse(data)
  90.   }
  91.  
  92.   private def parse(data: ByteString) {
  93.     if (data.isEmpty) {
  94.       messageLength = 0
  95.       messageBuffer = ByteString.empty
  96.     } else {
  97.       if (messageLength == 0) {
  98.         messageLength = data.iterator.getLongPart(headerSize)(java.nio.ByteOrder.LITTLE_ENDIAN).toInt
  99.         val newData = data.drop(headerSize)
  100.         //        log.info(s"Message(${data.map("%02X" format _).mkString(" ")}) MessageLength(${messageLength}) dataLenBefore(${data.length}) dataLenAfter(${newData.length})")
  101.         messageBuffer = ByteString.empty
  102.         parse(newData)
  103.       } else {
  104.         val canTakeLen = if (data.length <= bufferSize) data.length else bufferSize
  105.         val needTakeLen = if (messageBuffer.length + canTakeLen > messageLength) messageBuffer.length + canTakeLen - messageLength else canTakeLen
  106.         messageBuffer ++= data.take(needTakeLen)
  107.         if (messageBuffer.length == messageLength) {
  108.           //          log.info(s"messageBuffer(${messageBuffer.map("%02X" format _).mkString(" ")})")
  109.           context.parent ! CompleteMessage(messageBuffer)
  110.           messageLength = 0
  111.         }
  112.         //        MessageLength(22) dataLenBefore(46) dataLenAfter(42)]]
  113.         //        cantTakeLen(42) needTakeLen(20) messageBufferLen(20) dataLenBefore(42) dataLenAfter(22)
  114.  
  115.         val newData = data.drop(needTakeLen)
  116.         //        log.info(s"cantTakeLen(${canTakeLen}) needTakeLen(${needTakeLen}) messageBufferLen(${messageBuffer.length}) dataLenBefore(${data.length}) dataLenAfter(${newData.length}))")
  117.         parse(newData)
  118.       }
  119.     }
  120.   }
  121. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement