daily pastebin goal
62%
SHARE
TWEET

Untitled

a guest May 17th, 2018 97 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package psync.runtime
  2.  
  3. import psync._
  4. import dzufferey.utils.Logger
  5. import dzufferey.utils.LogLevel._
  6. import io.netty.buffer.ByteBuf
  7. import io.netty.channel.socket._
  8. import java.util.concurrent.ArrayBlockingQueue
  9. import java.util.concurrent.TimeUnit
  10. import scala.collection.mutable.HashMap
  11.  
  12.  
  13. //TODO what should be the interface ?
  14. //- val lock = new java.util.concurrent.locks.ReentrantLock
  15. //- @volatile var roundStart: Long
  16. //- var roundDuration: Long //TO is roundStart+roundDuration
  17. //- def newPacket(dp: DatagramPacket): Unit
  18. //- def interrupt(inst: Short): Unit or stop(inst: Short)
  19. //TODO break it into smaller parts
  20. //-for learning TO/roundDuration
  21. //  - used a discounted sum / geometric serie: coeff, window, expected RTT
  22. //  - step increment/decrement
  23. //  - fixed
  24. trait InstHandler {
  25.  
  26.   /** Handle packets received from this instance */
  27.   def newPacket(dp: DatagramPacket): Unit
  28.  
  29.   /** This instance should stop.
  30.    *  Since there might be multiple threads working. It might take
  31.    *  some time after this call returns until the instance actually
  32.    *  finishes. */
  33.   def interrupt(inst: Int): Unit
  34.  
  35. }
  36.  
  37. class InstanceHandler[IO,P <: Process[IO]](proc: P,
  38.                           rt: psync.runtime.Runtime[IO,P],
  39.                           pktServ: PacketServer,
  40.                           dispatcher: InstanceDispatcher,
  41.                           defaultHandler: DatagramPacket => Unit,
  42.                           options: RuntimeOptions) extends Runnable with InstHandler {
  43.  
  44.   protected val buffer = new ArrayBlockingQueue[DatagramPacket](options.bufferSize)
  45.   protected val mbox = new HashMap[Int, ArrayBlockingQueue[DatagramPacket] ]()
  46.   // protected val mbox = new HashMap[Int, HashMap[Int, ByteBuf] ]()
  47.   protected var timeout = options.timeout
  48.   protected val earlyMoving = options.earlyMoving
  49.   protected val adaptative = options.adaptative
  50.   protected val sendWhenCatchingUp = options.sendWhenCatchingUp
  51.   protected val delayFirstSend = options.delayFirstSend
  52.  
  53.   protected var didTimeOut = 0
  54.  
  55.   protected var instance: Short = 0
  56.   protected var grp: Group = null
  57.  
  58.   protected var n = 0
  59.   protected var currentRound = 0
  60.   protected var next_r = 0
  61.  
  62.   protected var from: Array[Boolean] = null
  63.   protected var roundHasEnoughMessages = false
  64.  
  65.  
  66.   /** A new packet is received and should be processed */
  67.   def newPacket(dp: DatagramPacket) = {
  68.     if (!buffer.offer(dp)) {
  69.       Logger("InstanceHandler", Warning, "too many packets")
  70.       dp.release
  71.     }
  72.   }
  73.  
  74.   /** Forward the packet to the defaultHandler */
  75.   protected def default(pkt: DatagramPacket) {
  76.     rt.submitTask(new Runnable { def run = defaultHandler(pkt) })
  77.   }
  78.  
  79.   /** Prepare the handler for a execution.
  80.    *  call this just before giving it to the executor */
  81.   def prepare(io: IO, g: Group, inst: Short, msgs: Set[Message]) {
  82.     // clear the buffer
  83.     freeRemainingMessages
  84.  
  85.     // init the process
  86.     proc.setGroup(g)
  87.     proc.init(io)
  88.  
  89.     // init this
  90.     instance = inst
  91.     grp = g
  92.     n = g.size
  93.     currentRound = 0
  94.  
  95.     // checkResources
  96.     if (from == null || from.size != n) {
  97.       from = Array.ofDim[Boolean](n)
  98.       for (i <- 0 until n) from(i) = false
  99.     }
  100.  
  101.     // enqueue pending messages
  102.     msgs.foreach(p => newPacket(p.packet))
  103.  
  104.     // register
  105.     dispatcher.add(inst, this)
  106.   }
  107.  
  108.   protected def freeRemainingMessages {
  109.     var pkt = buffer.poll
  110.     while(pkt != null) {
  111.       pkt.release
  112.       pkt = buffer.poll
  113.     }
  114.     var i = 0
  115.     while (i < n) {
  116.       from(i) = false
  117.       i += 1
  118.     }
  119.   }
  120.  
  121.   protected def stop {
  122.     Logger("InstanceHandler", Info, "stopping instance " + instance)
  123.     dispatcher.remove(instance)
  124.     freeRemainingMessages
  125.     rt.recycle(this)
  126.   }
  127.  
  128.   protected var again = true
  129.  
  130.   def interrupt(inst: Int) {
  131.     if (instance == inst)
  132.       again = false
  133.   }
  134.  
  135.   @inline
  136.   private def adaptTimeout {
  137.     if (adaptative) {
  138.       //TODO something amortized to avoid oscillations
  139.       if (didTimeOut > 5) {
  140.         didTimeOut = 0
  141.         timeout += 10
  142.       } else if (didTimeOut < -50) {
  143.         didTimeOut = 0
  144.         timeout -= 10
  145.       }
  146.     }
  147.   }
  148.  
  149.  
  150.   @inline private final def more = again && !Thread.interrupted
  151.   @inline private final def rndDiff(rnd: Int) = rnd - currentRound
  152.   @inline private final def enoughMessages = roundHasEnoughMessages || !earlyMoving
  153.  
  154.   def current_round(roundNbr: Int){
  155.     send
  156.   }
  157.  
  158.   def run {
  159.     // println("<inside run")
  160.     // println(grp.self)
  161.     // println("Number: "+ n)
  162.     Logger("InstanceHandler", Info, "starting instance " + instance)
  163.     println(instance)
  164.     again = true
  165.     var msg: DatagramPacket = null
  166.     var msgRound = 0
  167.     try {
  168.       if (delayFirstSend > 0) {
  169.         Thread.sleep(delayFirstSend)
  170.       }
  171.       // one round
  172.       while(more) {
  173.         //Send message
  174.         if(msg == null || sendWhenCatchingUp) {
  175.           val msgs_to_send = current_round(currentRound)
  176.           println("CHECK HERE: " + msgs_to_send)
  177.         }
  178.  
  179.          while ( mbox(next_r).size() < 2*n/3)                        // terminate
  180.          {
  181.             msg = mbox(currentRound).poll()
  182.  
  183.  
  184.             if(msg != null) {
  185.               msgRound = Message.getTag(msg.content).roundNbr
  186.               if(msgRound < currentRound){
  187.                 msg.release
  188.                 msg = null
  189.               } else {
  190.                 val sender = grp.inetToId(msg.sender)
  191.                 mbox(msgRound) += ()  /// HOW TO HANDLE THIS?
  192.                 // storePacket(sender, msg.content, msgRound)
  193.                 if(mbox(msgRound).size() > 2*n/3){
  194.                   next_r = msgRound
  195.                 }
  196.                 msg = null
  197.                 // mbox[msgRound] +=
  198.               }
  199.  
  200.             }
  201.  
  202.             if(mbox(next_r).size() >= 2*n/3){
  203.               mbox(currentRound).update
  204.               currentRound += 1
  205.               next_r = math.max(currentRound,next_r)
  206.             }
  207.  
  208.  
  209.  
  210.            
  211.          }
  212.       }
  213.     } catch {
  214.       case _: java.lang.InterruptedException => ()
  215.       case t: Throwable =>
  216.         Logger("InstanceHandler", Error, "got an error " + t + " terminating instance: " + instance + "\n  " + t.getStackTrace.mkString("\n  "))
  217.     } finally {
  218.       if (msg != null) {
  219.         msg.release
  220.       }
  221.       stop
  222.     }
  223.   }
  224.  
  225.   ///////////////////
  226.   // current round //
  227.   ///////////////////
  228.  
  229.  
  230.   // responsible for freeing the msg if returns false
  231.   protected def checkInstanceAndTag(msg: DatagramPacket): Boolean = {
  232.     val tag = Message.getTag(msg.content)
  233.     if (instance != tag.instanceNbr) { // wrong instance
  234.       msg.release
  235.       false
  236.     } else if (tag.flag == Flags.normal) {
  237.       // nothing to do we are fine
  238.       true
  239.     } else if (tag.flag == Flags.dummy) {
  240.       Logger("InstanceHandler", Debug, grp.self.id + ", " + instance + "dummy flag (ignoring)")
  241.       msg.release
  242.       false
  243.     } else {
  244.       if (tag.flag == Flags.error) {
  245.         Logger("InstanceHandler", Warning, "error flag (pushing to user)")
  246.       }
  247.       default(msg)
  248.       false
  249.     }
  250.   }
  251.   //mbox , create another temprory mbox for storing temprory msg for comparision.
  252.  
  253.   // protected def mbox(sender: ProcessID, )
  254.   protected def storePacket(sender: ProcessID, buf: ByteBuf) {
  255.     val id = sender.id
  256.     if (!from(id)) {
  257.       from(id) = true
  258.       assert(Message.getTag(buf).roundNbr == currentRound, Message.getTag(buf).roundNbr + " vs " + currentRound)
  259.       println("PRINTING GETTAG: " + Message.getTag(buf).roundNbr)
  260.       roundHasEnoughMessages = proc.receive(sender, buf)
  261.     } else {
  262.       // duplicate packet
  263.       buf.release
  264.     }
  265.   }
  266.  
  267.  
  268.   protected def update = {
  269.     Logger("InstanceHandler", Debug, grp.self.id + ", " + instance + " delivering for round " + currentRound)
  270.     // clean
  271.     roundHasEnoughMessages = false
  272.     for (i <- 0 until n) {
  273.       from(i) = false
  274.     }
  275.     // update
  276.     proc.update
  277.   }
  278.  
  279.   protected def send {
  280.     val tag = Tag(instance, currentRound)
  281.     var sent = 0
  282.     def sending(pid: ProcessID, payload: ByteBuf) {
  283.       payload.setLong(0, tag.underlying)
  284.       if (pid == grp.self) {
  285.         storePacket(pid, payload)
  286.       } else {
  287.         pktServ.send(pid, payload)
  288.       }
  289.       sent += 1
  290.     }
  291.     proc.send(sending)
  292.     Logger("InstanceHandler", Debug,
  293.       grp.self.id + ", " + instance + " sending for round " + currentRound + " -> " + sent + "\n")
  294.   }
  295.  
  296. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top