Advertisement
Guest User

Untitled

a guest
May 17th, 2018
120
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.38 KB | None | 0 0
  1. package psync.runtime
  2.  
  3. import psync._
  4. import dzufferey.utils.Logger
  5. import dzufferey.utils.LogLevel._
  6. import io.netty.buffer.ByteBuf
  7. import io.netty.channel.socket._
  8. import java.util.concurrent.ArrayBlockingQueue
  9. import java.util.concurrent.TimeUnit
  10. import scala.collection.mutable.HashMap
  11.  
  12.  
  13. //TODO what should be the interface ?
  14. //- val lock = new java.util.concurrent.locks.ReentrantLock
  15. //- @volatile var roundStart: Long
  16. //- var roundDuration: Long //TO is roundStart+roundDuration
  17. //- def newPacket(dp: DatagramPacket): Unit
  18. //- def interrupt(inst: Short): Unit or stop(inst: Short)
  19. //TODO break it into smaller parts
  20. //-for learning TO/roundDuration
  21. // - used a discounted sum / geometric serie: coeff, window, expected RTT
  22. // - step increment/decrement
  23. // - fixed
  24. trait InstHandler {
  25.  
  26. /** Handle packets received from this instance */
  27. def newPacket(dp: DatagramPacket): Unit
  28.  
  29. /** This instance should stop.
  30. * Since there might be multiple threads working. It might take
  31. * some time after this call returns until the instance actually
  32. * finishes. */
  33. def interrupt(inst: Int): Unit
  34.  
  35. }
  36.  
  37. class InstanceHandler[IO,P <: Process[IO]](proc: P,
  38. rt: psync.runtime.Runtime[IO,P],
  39. pktServ: PacketServer,
  40. dispatcher: InstanceDispatcher,
  41. defaultHandler: DatagramPacket => Unit,
  42. options: RuntimeOptions) extends Runnable with InstHandler {
  43.  
  44. protected val buffer = new ArrayBlockingQueue[DatagramPacket](options.bufferSize)
  45. protected val mbox = new HashMap[Int, ArrayBlockingQueue[DatagramPacket] ]()
  46. // protected val mbox = new HashMap[Int, HashMap[Int, ByteBuf] ]()
  47. protected var timeout = options.timeout
  48. protected val earlyMoving = options.earlyMoving
  49. protected val adaptative = options.adaptative
  50. protected val sendWhenCatchingUp = options.sendWhenCatchingUp
  51. protected val delayFirstSend = options.delayFirstSend
  52.  
  53. protected var didTimeOut = 0
  54.  
  55. protected var instance: Short = 0
  56. protected var grp: Group = null
  57.  
  58. protected var n = 0
  59. protected var currentRound = 0
  60. protected var next_r = 0
  61.  
  62. protected var from: Array[Boolean] = null
  63. protected var roundHasEnoughMessages = false
  64.  
  65.  
  66. /** A new packet is received and should be processed */
  67. def newPacket(dp: DatagramPacket) = {
  68. if (!buffer.offer(dp)) {
  69. Logger("InstanceHandler", Warning, "too many packets")
  70. dp.release
  71. }
  72. }
  73.  
  74. /** Forward the packet to the defaultHandler */
  75. protected def default(pkt: DatagramPacket) {
  76. rt.submitTask(new Runnable { def run = defaultHandler(pkt) })
  77. }
  78.  
  79. /** Prepare the handler for a execution.
  80. * call this just before giving it to the executor */
  81. def prepare(io: IO, g: Group, inst: Short, msgs: Set[Message]) {
  82. // clear the buffer
  83. freeRemainingMessages
  84.  
  85. // init the process
  86. proc.setGroup(g)
  87. proc.init(io)
  88.  
  89. // init this
  90. instance = inst
  91. grp = g
  92. n = g.size
  93. currentRound = 0
  94.  
  95. // checkResources
  96. if (from == null || from.size != n) {
  97. from = Array.ofDim[Boolean](n)
  98. for (i <- 0 until n) from(i) = false
  99. }
  100.  
  101. // enqueue pending messages
  102. msgs.foreach(p => newPacket(p.packet))
  103.  
  104. // register
  105. dispatcher.add(inst, this)
  106. }
  107.  
  108. protected def freeRemainingMessages {
  109. var pkt = buffer.poll
  110. while(pkt != null) {
  111. pkt.release
  112. pkt = buffer.poll
  113. }
  114. var i = 0
  115. while (i < n) {
  116. from(i) = false
  117. i += 1
  118. }
  119. }
  120.  
  121. protected def stop {
  122. Logger("InstanceHandler", Info, "stopping instance " + instance)
  123. dispatcher.remove(instance)
  124. freeRemainingMessages
  125. rt.recycle(this)
  126. }
  127.  
  128. protected var again = true
  129.  
  130. def interrupt(inst: Int) {
  131. if (instance == inst)
  132. again = false
  133. }
  134.  
  135. @inline
  136. private def adaptTimeout {
  137. if (adaptative) {
  138. //TODO something amortized to avoid oscillations
  139. if (didTimeOut > 5) {
  140. didTimeOut = 0
  141. timeout += 10
  142. } else if (didTimeOut < -50) {
  143. didTimeOut = 0
  144. timeout -= 10
  145. }
  146. }
  147. }
  148.  
  149.  
  150. @inline private final def more = again && !Thread.interrupted
  151. @inline private final def rndDiff(rnd: Int) = rnd - currentRound
  152. @inline private final def enoughMessages = roundHasEnoughMessages || !earlyMoving
  153.  
  154. def current_round(roundNbr: Int){
  155. send
  156. }
  157.  
  158. def run {
  159. // println("<inside run")
  160. // println(grp.self)
  161. // println("Number: "+ n)
  162. Logger("InstanceHandler", Info, "starting instance " + instance)
  163. println(instance)
  164. again = true
  165. var msg: DatagramPacket = null
  166. var msgRound = 0
  167. try {
  168. if (delayFirstSend > 0) {
  169. Thread.sleep(delayFirstSend)
  170. }
  171. // one round
  172. while(more) {
  173. //Send message
  174. if(msg == null || sendWhenCatchingUp) {
  175. val msgs_to_send = current_round(currentRound)
  176. println("CHECK HERE: " + msgs_to_send)
  177. }
  178.  
  179. while ( mbox(next_r).size() < 2*n/3) // terminate
  180. {
  181. msg = mbox(currentRound).poll()
  182.  
  183.  
  184. if(msg != null) {
  185. msgRound = Message.getTag(msg.content).roundNbr
  186. if(msgRound < currentRound){
  187. msg.release
  188. msg = null
  189. } else {
  190. val sender = grp.inetToId(msg.sender)
  191. mbox(msgRound) += () /// HOW TO HANDLE THIS?
  192. // storePacket(sender, msg.content, msgRound)
  193. if(mbox(msgRound).size() > 2*n/3){
  194. next_r = msgRound
  195. }
  196. msg = null
  197. // mbox[msgRound] +=
  198. }
  199.  
  200. }
  201.  
  202. if(mbox(next_r).size() >= 2*n/3){
  203. mbox(currentRound).update
  204. currentRound += 1
  205. next_r = math.max(currentRound,next_r)
  206. }
  207.  
  208.  
  209.  
  210.  
  211. }
  212. }
  213. } catch {
  214. case _: java.lang.InterruptedException => ()
  215. case t: Throwable =>
  216. Logger("InstanceHandler", Error, "got an error " + t + " terminating instance: " + instance + "\n " + t.getStackTrace.mkString("\n "))
  217. } finally {
  218. if (msg != null) {
  219. msg.release
  220. }
  221. stop
  222. }
  223. }
  224.  
  225. ///////////////////
  226. // current round //
  227. ///////////////////
  228.  
  229.  
  230. // responsible for freeing the msg if returns false
  231. protected def checkInstanceAndTag(msg: DatagramPacket): Boolean = {
  232. val tag = Message.getTag(msg.content)
  233. if (instance != tag.instanceNbr) { // wrong instance
  234. msg.release
  235. false
  236. } else if (tag.flag == Flags.normal) {
  237. // nothing to do we are fine
  238. true
  239. } else if (tag.flag == Flags.dummy) {
  240. Logger("InstanceHandler", Debug, grp.self.id + ", " + instance + "dummy flag (ignoring)")
  241. msg.release
  242. false
  243. } else {
  244. if (tag.flag == Flags.error) {
  245. Logger("InstanceHandler", Warning, "error flag (pushing to user)")
  246. }
  247. default(msg)
  248. false
  249. }
  250. }
  251. //mbox , create another temprory mbox for storing temprory msg for comparision.
  252.  
  253. // protected def mbox(sender: ProcessID, )
  254. protected def storePacket(sender: ProcessID, buf: ByteBuf) {
  255. val id = sender.id
  256. if (!from(id)) {
  257. from(id) = true
  258. assert(Message.getTag(buf).roundNbr == currentRound, Message.getTag(buf).roundNbr + " vs " + currentRound)
  259. println("PRINTING GETTAG: " + Message.getTag(buf).roundNbr)
  260. roundHasEnoughMessages = proc.receive(sender, buf)
  261. } else {
  262. // duplicate packet
  263. buf.release
  264. }
  265. }
  266.  
  267.  
  268. protected def update = {
  269. Logger("InstanceHandler", Debug, grp.self.id + ", " + instance + " delivering for round " + currentRound)
  270. // clean
  271. roundHasEnoughMessages = false
  272. for (i <- 0 until n) {
  273. from(i) = false
  274. }
  275. // update
  276. proc.update
  277. }
  278.  
  279. protected def send {
  280. val tag = Tag(instance, currentRound)
  281. var sent = 0
  282. def sending(pid: ProcessID, payload: ByteBuf) {
  283. payload.setLong(0, tag.underlying)
  284. if (pid == grp.self) {
  285. storePacket(pid, payload)
  286. } else {
  287. pktServ.send(pid, payload)
  288. }
  289. sent += 1
  290. }
  291. proc.send(sending)
  292. Logger("InstanceHandler", Debug,
  293. grp.self.id + ", " + instance + " sending for round " + currentRound + " -> " + sent + "\n")
  294. }
  295.  
  296. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement