Guest User

Untitled

a guest
Aug 14th, 2014
249
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.03 KB | None | 0 0
  1. package misc
  2.  
  3. import java.net.InetSocketAddress
  4.  
  5. import akka.actor.{ActorSystem, Props, Actor}
  6. import akka.io.{IO, Tcp}
  7. import akka.util.ByteString
  8.  
  9. class Server extends Actor {
  10.  
  11.   import Tcp._
  12.   import context.system
  13.  
  14.   IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 4321))
  15.  
  16.   def receive = {
  17.     case b @ Bound(localAddress) =>
  18.     // do some logging or setup ...
  19.  
  20.     case CommandFailed(_: Bind) => context stop self
  21.  
  22.     case c @ Connected(remote, local) =>
  23.       val handler = context.actorOf(Props[SimplisticHandler])
  24.       val connection = sender()
  25.       connection ! Register(handler)
  26.   }
  27.  
  28. }
  29.  
  30. object SimplisticHandler {
  31.   object PacketByteString{
  32.     /* метод возвращает Some(пакет, остаток), если есть пакет и None, если нет пакета */
  33.     /* http://danielwestheide.com/blog/2012/11/21/the-neophytes-guide-to-scala-part-1-extractors.html */
  34.     def unapply(bs: ByteString): Option[(ByteString, ByteString)] = {
  35.       val (left, right) = (bs.takeWhile(_ != '_'), bs.dropWhile(_ != '_'))
  36.       right match {
  37.         case ByteString.empty           => None
  38.         case _                          => Some(left, right.tail)
  39.       }
  40.     }
  41.   }
  42.  
  43.   /* неявное преобразование ByteString в PacketByteString, чтобы можно было юзать метод наш unapply и паттерн матчить ByteString, отделяя пакеты */
  44.   implicit class PacketByteString(b: ByteString) {
  45.     def apply(b: ByteString) = new PacketByteString(b)
  46.   }
  47. }
  48.  
  49. class SimplisticHandler extends Actor {
  50.   import Tcp._
  51.  
  52.   import SimplisticHandler._
  53.  
  54.   def receive = receiveBs(ByteString.empty) // инициируем пустой остаток
  55.  
  56.   def receiveBs(bs: ByteString): Receive = {
  57.  
  58.     case Received(data) => // получили данные
  59.       /* функция рекурсивно отделяет пакеты и отпавляет их клиенту */
  60.       def process: PartialFunction[ByteString, Unit] = {
  61.           // юзаем наш метод unapply
  62.           case PacketByteString(packet, rest) => // получилось отделить пакет
  63.             sender() ! Write(packet) // отправляем его
  64.             process(rest) // пробуем отделить еще один
  65.           case rest => // не получилось отделить пакет
  66.             context.become(receiveBs(rest)) // сохраняем остаток в области видимости новой функции receive и меняем поведение актора
  67.             /* про context.become http://cs.nyu.edu/wies/teaching/ppc-14/material/lecture10.pdf */
  68.       }
  69.  
  70.       process(bs.concat(data)) // склеиваем остаток от предыдущих пакетов и новые данные
  71.  
  72.     case PeerClosed     =>
  73.       context stop self
  74.   }
  75. }
  76.  
  77. object Run extends App {
  78.  
  79.   val actorSystem = ActorSystem("TcpServer")
  80.   actorSystem.actorOf(Props[Server])
  81. }
Advertisement
Add Comment
Please, Sign In to add comment