Advertisement
Guest User

Untitled

a guest
Nov 24th, 2017
84
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.53 KB | None | 0 0
  1. import akka.actor.{Actor, ActorRef, Address}
  2.  
  3. import scalafx.collections.{ObservableHashSet, ObservableSet}
  4. import akka.pattern.ask
  5. import akka.remote.DisassociatedEvent
  6.  
  7. import scala.concurrent.ExecutionContext.Implicits.global
  8. import akka.util.Timeout
  9.  
  10. import scala.concurrent.Future
  11. import scala.concurrent.duration._
  12.  
  13. class Server extends Actor {
  14.  
  15. import Server._
  16. import Client._
  17.  
  18. val clients: ObservableSet[User] = new ObservableHashSet()
  19. var circularIterator: Option[Iterator[User]] = None
  20. var ballOwner: Option[Address] = None
  21. var role: Option[String] = None
  22. //var address: String
  23. //var port: String
  24.  
  25. clients.onChange((_, _) => {
  26. for (client <- clients) {
  27. client.ref ! UserList(clients.toList)
  28. }
  29. })
  30. var host: User = null
  31. for(client <- clients){
  32. if(client.role == "Host"){
  33. host = client
  34. }
  35. }
  36.  
  37. override def preStart(): Unit = {
  38. context.system.eventStream.subscribe(self, classOf[akka.remote.DisassociatedEvent])
  39. //context.system.eventStream.subscribe(self, classOf[akka.remote.AssociatedEvent])
  40. }
  41.  
  42. override def receive: Receive = {
  43. case DisassociatedEvent(localAddress, remoteAddress, _) =>
  44. //clients.removeIf(x => x.ref.path.address == remoteAddress)
  45. clients.remove()
  46. for(client <- clients){
  47. if(clients.contains(host) == false){
  48. clients.clear()
  49. MyApp.displayActor ! CloseChat
  50. }
  51. }
  52. case Join(x) =>
  53. if (clients.size() == 0){
  54. clients += User(x, sender(), "Host")
  55. sender() ! Joined("Host")
  56. //sender() ! TellRole("Host")
  57. }
  58. else{
  59. clients += User(x, sender(), "Client")
  60. sender() ! Joined("Client")
  61. // sender() ! TellRole("Client")
  62. }
  63. //case HostAddress(s,p) =>
  64. // address = s
  65. //port = p
  66.  
  67. case Start =>
  68. implicit val timeout: Timeout = Timeout(20 second)
  69. context.become(started)
  70. val futures = for (client <- clients) yield {
  71. client.ref ? Begin
  72. }
  73. val totalf = Future {
  74. var count = 0
  75. for (f <- futures) {
  76. for (v <- f) {
  77. count = count + 1
  78. }
  79. }
  80. count
  81. }
  82. //taking prcessing
  83. // totalf foreach { x =>
  84. // //take a ball from the collection
  85. // circularIterator = Some(Iterator.continually(clients.toList).flatten)
  86. // //take 1 client circular iterator
  87. // val clients1 = circularIterator.get.take(1).toList
  88. // for (client <- clients1) {
  89. // client ! Take
  90. // ballOwner = Some(client.path.address)
  91. // }
  92. // }
  93.  
  94. case _ => println("unknown message")
  95. }
  96.  
  97. def started: Receive = {
  98. case DisassociatedEvent(localAddress, remoteAddress, _) =>
  99. clients.removeIf(x => x.ref.path.address == remoteAddress)
  100. for(client <- clients){
  101. if(clients.contains(host) == false){
  102. clients.clear()
  103. MyApp.displayActor ! CloseChat
  104. }
  105. }
  106. // implicit val timeout = Timeout(20 second)
  107. // if (remoteAddress == ballOwner.get) {
  108. // self ? Pass
  109. // }
  110. case Pass =>
  111. //take 1 client circular iterator
  112. val clients1 = circularIterator.get.take(1).toList
  113. for (client <- clients1) {
  114. client.ref ! Take
  115. // ballOwner = Some(client.path.address)
  116. }
  117. sender() ! "OK"
  118. case _ => println("unknown message")
  119. }
  120. }
  121.  
  122. object Server {
  123.  
  124. case class Join(text: String)
  125.  
  126. case object Start
  127.  
  128. case object CloseChat
  129.  
  130. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement