Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // --- SocketServerActor.scala
- import akka.actor.{ActorLogging, Actor, Props}
- import akka.io.{IO, Tcp}
- import java.net.InetSocketAddress
- class SocketServerActor(serverPort: Int) extends Actor with ActorLogging {
- import Tcp._
- import context.system
- log.info("SocketServerActor created\nTrying to bind socket.")
- IO(Tcp) ! Bind(self, new InetSocketAddress(serverPort))
- def receive = {
- case Bound(localAddress) => log.info(s"Server listening on $localAddress")
- case CommandFailed(_: Bind) => context stop self
- case Connected(remote, local) =>
- val sb = remote.getAddress.getAddress.addString(new StringBuilder(), "tcp-", "_", "x").append(remote.getPort)
- val actorName = local.getAddress.getAddress.addString(sb, "-", "_", "x").append(local.getPort).toString()
- val connection = sender()
- val handler = context.actorOf(Props(classOf[SocketClientActor], connection), actorName)
- connection ! Register(handler)
- case any => log.info(s">>> Unhandled message '$any'")
- }
- }
- // --- SocketClientActor.scala
- import akka.actor.{Props, ActorLogging, Actor, ActorRef}
- import akka.io.Tcp
- import ProtoMessages.MessageRequestBase.MessageRequest
- import ProtoMessages.MessageResponseBase.MessageResponse
- import net.orionlab.brr.utils.CoreSystem
- import net.orionlab.brr.CommunicationMessage.BinarySerializer
- import akka.util.ByteString
- case class IncomingMessage(message: MessageRequest)
- case class OutgoingMessage(message: MessageResponse)
- class SocketClientActor(connection: ActorRef) extends Actor with ActorLogging {
- import Tcp._
- val actorId = System.nanoTime()
- val frameBuilder = context.actorOf(Props(classOf[FrameBuilderActor], 256))
- var userActor: Option[ActorRef] = None
- CoreSystem.supervisor.foreach(_ ! ClientConnected(self, actorId))
- def receive = {
- case msg: ClientConnected =>
- if (msg.actor == self && msg.actorId == actorId) {
- log.info(s"ClientConnected '${sender()}'")
- userActor = Some(sender())
- }
- case Received(data) => frameBuilder ! BuildFrame(data)
- case CompleteMessage(data) =>
- userActor match {
- case None => log.error(s"Cant send CompleteMessage to empty UserActor.")
- case Some(actor) =>
- BinarySerializer.Deserialize(data.toArray) match {
- case None => log.info(s"Cant send empty message to UserActor.")
- case Some(message) =>
- // log.info("IncomingMessage sent to UserActor.")
- actor ! IncomingMessage(message)
- }
- }
- case OutgoingMessage(message) =>
- // log.info(s"Received OutgoingMessage, send it to $connection")
- connection ! Write(ByteString(BinarySerializer.Serialize(message)))
- case PeerClosed =>
- CoreSystem.supervisor.foreach(_ ! ClientDisconnected(self, actorId))
- context.stop(self)
- case any => log.info(s"Unhandled event $any")
- }
- }
- case class BuildFrame(data: ByteString)
- case class CompleteMessage(data: ByteString)
- class FrameBuilderActor(bufferSize: Int) extends Actor with ActorLogging {
- private val headerSize = 4
- private var messageLength = 0
- private var messageBuffer = ByteString.empty
- def receive = {
- case BuildFrame(data) => parse(data)
- }
- private def parse(data: ByteString) {
- if (data.isEmpty) {
- messageLength = 0
- messageBuffer = ByteString.empty
- } else {
- if (messageLength == 0) {
- messageLength = data.iterator.getLongPart(headerSize)(java.nio.ByteOrder.LITTLE_ENDIAN).toInt
- val newData = data.drop(headerSize)
- // log.info(s"Message(${data.map("%02X" format _).mkString(" ")}) MessageLength(${messageLength}) dataLenBefore(${data.length}) dataLenAfter(${newData.length})")
- messageBuffer = ByteString.empty
- parse(newData)
- } else {
- val canTakeLen = if (data.length <= bufferSize) data.length else bufferSize
- val needTakeLen = if (messageBuffer.length + canTakeLen > messageLength) messageBuffer.length + canTakeLen - messageLength else canTakeLen
- messageBuffer ++= data.take(needTakeLen)
- if (messageBuffer.length == messageLength) {
- // log.info(s"messageBuffer(${messageBuffer.map("%02X" format _).mkString(" ")})")
- context.parent ! CompleteMessage(messageBuffer)
- messageLength = 0
- }
- // MessageLength(22) dataLenBefore(46) dataLenAfter(42)]]
- // cantTakeLen(42) needTakeLen(20) messageBufferLen(20) dataLenBefore(42) dataLenAfter(22)
- val newData = data.drop(needTakeLen)
- // log.info(s"cantTakeLen(${canTakeLen}) needTakeLen(${needTakeLen}) messageBufferLen(${messageBuffer.length}) dataLenBefore(${data.length}) dataLenAfter(${newData.length}))")
- parse(newData)
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement