Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import akka.actor.{Actor, ActorRef, Address}
- import scalafx.collections.{ObservableHashSet, ObservableSet}
- import akka.pattern.ask
- import akka.remote.DisassociatedEvent
- import scala.concurrent.ExecutionContext.Implicits.global
- import akka.util.Timeout
- import scala.concurrent.Future
- import scala.concurrent.duration._
- class Server extends Actor {
- import Server._
- import Client._
- val clients: ObservableSet[User] = new ObservableHashSet()
- var circularIterator: Option[Iterator[User]] = None
- var ballOwner: Option[Address] = None
- var role: Option[String] = None
- //var address: String
- //var port: String
- clients.onChange((_, _) => {
- for (client <- clients) {
- client.ref ! UserList(clients.toList)
- }
- })
- var host: User = null
- for(client <- clients){
- if(client.role == "Host"){
- host = client
- }
- }
- override def preStart(): Unit = {
- context.system.eventStream.subscribe(self, classOf[akka.remote.DisassociatedEvent])
- //context.system.eventStream.subscribe(self, classOf[akka.remote.AssociatedEvent])
- }
- override def receive: Receive = {
- case DisassociatedEvent(localAddress, remoteAddress, _) =>
- //clients.removeIf(x => x.ref.path.address == remoteAddress)
- clients.remove()
- for(client <- clients){
- if(clients.contains(host) == false){
- clients.clear()
- MyApp.displayActor ! CloseChat
- }
- }
- case Join(x) =>
- if (clients.size() == 0){
- clients += User(x, sender(), "Host")
- sender() ! Joined("Host")
- //sender() ! TellRole("Host")
- }
- else{
- clients += User(x, sender(), "Client")
- sender() ! Joined("Client")
- // sender() ! TellRole("Client")
- }
- //case HostAddress(s,p) =>
- // address = s
- //port = p
- case Start =>
- implicit val timeout: Timeout = Timeout(20 second)
- context.become(started)
- val futures = for (client <- clients) yield {
- client.ref ? Begin
- }
- val totalf = Future {
- var count = 0
- for (f <- futures) {
- for (v <- f) {
- count = count + 1
- }
- }
- count
- }
- //taking prcessing
- // totalf foreach { x =>
- // //take a ball from the collection
- // circularIterator = Some(Iterator.continually(clients.toList).flatten)
- // //take 1 client circular iterator
- // val clients1 = circularIterator.get.take(1).toList
- // for (client <- clients1) {
- // client ! Take
- // ballOwner = Some(client.path.address)
- // }
- // }
- case _ => println("unknown message")
- }
- def started: Receive = {
- case DisassociatedEvent(localAddress, remoteAddress, _) =>
- clients.removeIf(x => x.ref.path.address == remoteAddress)
- for(client <- clients){
- if(clients.contains(host) == false){
- clients.clear()
- MyApp.displayActor ! CloseChat
- }
- }
- // implicit val timeout = Timeout(20 second)
- // if (remoteAddress == ballOwner.get) {
- // self ? Pass
- // }
- case Pass =>
- //take 1 client circular iterator
- val clients1 = circularIterator.get.take(1).toList
- for (client <- clients1) {
- client.ref ! Take
- // ballOwner = Some(client.path.address)
- }
- sender() ! "OK"
- case _ => println("unknown message")
- }
- }
- object Server {
- case class Join(text: String)
- case object Start
- case object CloseChat
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement