Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class TCPMessageSender(listener: ActorRef) extends BaseActor {
- final val MESSAGE_DELIMITER = "\n"
- val buffer = new ListBuffer[Any]
- IO(Tcp) ! Connect(new InetSocketAddress(configuration.data.tcp.host, configuration.data.tcp.port))
- override def receive = {
- case msg @ (_: UserMessage | _: GroupMessage | _: RequestMessage) =>
- logger.warn(s"Received message ($msg) before connected. Buffering...")
- buffer += msg
- case CommandFailed(_: Connect) =>
- logger.warn("Can't connect. All messages will be ignored")
- listener ! Terminate
- context stop self
- case c @ Connected(remote, local) =>
- logger.info("Connected to " + c.remoteAddress)
- val connection = sender
- connection ! Register(self)
- logger.info("Sending previous received messages: " + buffer.size)
- buffer.foreach(msg => {
- val msgString: String = JsonHelper.toJson(Map[String, Any]("message_type" -> msg.getClass.getSimpleName, "message" -> msg))
- connection ! Write(ByteString(msgString + MESSAGE_DELIMITER))
- })
- buffer.clear
- logger.info("Sent")
- context become {
- case msg @ (_: UserMessage | _: GroupMessage | _: RequestMessage) =>
- val msgString: String = JsonHelper.toJson(Map[String, Any]("message_type" -> msg.getClass.getSimpleName, "message" -> msg))
- logger.trace(s"Sending message: $msgString")
- connection ! Write(ByteString(msgString + MESSAGE_DELIMITER))
- case data: ByteString =>
- connection ! Write(data)
- case CommandFailed(w: Write) =>
- // O/S buffer was full
- case Received(data) =>
- logger.warn(s"I am not supposed to receive this data: $data")
- case "close" =>
- connection ! Close
- case _: ConnectionClosed =>
- context stop self
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement