s243a

BlockReciever.notificationWaiter

Jun 20th, 2015
446
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.08 KB | None | 0 0
  1.     private AsyncMessageFilterCallback notificationWaiter = new SlowAsyncMessageFilterCallback() {
  2.  
  3.         @Override
  4.         public void onMatched(Message m1) {
  5.             if(logMINOR)
  6.                 Logger.minor(this, "Received "+m1);
  7.             if ((m1 != null) && m1.getSpec().equals(DMT.sendAborted)) {
  8.                 String desc=m1.getString(DMT.DESCRIPTION);
  9.                 if (desc.indexOf("Upstream")<0)
  10.                     desc="Upstream transmit error: "+desc;
  11.                 _prb.abort(m1.getInt(DMT.REASON), desc, false);
  12.                 synchronized(BlockReceiver.this) {
  13.                     senderAborted = true;
  14.                 }
  15.                 complete(m1.getInt(DMT.REASON), desc);
  16.                 return;
  17.             }
  18.             boolean truncateTimeout = false;
  19.             if ((m1 != null) && (m1.getSpec().equals(DMT.packetTransmit))) {
  20.                 // packetTransmit received
  21.                 int packetNo = m1.getInt(DMT.PACKET_NO);
  22.                 BitArray sent = (BitArray) m1.getObject(DMT.SENT);
  23.                 Buffer data = (Buffer) m1.getObject(DMT.DATA);
  24.                 int missing = 0;
  25.                 try {
  26.                     synchronized(BlockReceiver.this) {
  27.                         if(completed) return;
  28.                     }
  29.                     if(CHECK_DUPES && _prb.isReceived(packetNo)) {
  30.                         // Transmitter sent the same packet twice?!?!?
  31.                         Logger.error(this, "Already received the packet - DoS??? on "+this+" uid "+_uid+" from "+_sender);
  32.                         // Does not extend timeouts.
  33.                         truncateTimeout = true;
  34.                     } else {
  35.                         _prb.addPacket(packetNo, data);
  36.                         if(logMINOR) {
  37.                             synchronized(BlockReceiver.this) {
  38.                                 long interval = System.currentTimeMillis() - timeStartedWaiting;
  39.                                 Logger.minor(this, "Packet interval: "+interval+" = "+TimeUtil.formatTime(interval, 2, true)+" from "+_sender);
  40.                             }
  41.                         }
  42.                         // Check that we have what the sender thinks we have
  43.                         for (int x = 0; x < sent.getSize(); x++) {
  44.                             if (sent.bitAt(x) && !_prb.isReceived(x)) {
  45.                                 missing++;
  46.                             }
  47.                         }
  48.                         if(logMINOR && missing != 0)
  49.                             Logger.minor(this, "Packets which the sender says it has sent but we have not received: "+missing);
  50.                     }
  51.                 } catch (AbortedException e) {
  52.                     // We didn't cause it?!
  53.                     Logger.error(this, "Caught in receive - probably a bug as receive sets it: "+e, e);
  54.                     complete(RetrievalException.UNKNOWN, "Aborted?");
  55.                     return;
  56.                 }
  57.             } else if (m1 != null && m1.getSpec().equals(DMT.allSent)) {
  58.                 synchronized(BlockReceiver.this) {
  59.                     if(completed) return;
  60.                     if(gotAllSent)
  61.                         // Multiple allSent's don't extend the timeouts.
  62.                         truncateTimeout = true;
  63.                     gotAllSent = true;
  64.                 }
  65.             }
  66.             try {
  67.                 if(_prb.allReceived()) {
  68.                     try {
  69.                         Message m = DMT.createAllReceived(_uid);
  70.                         if(completeAfterAckedAllReceived) {
  71.                             try {
  72.                                 // FIXME layer violation
  73.                                 // FIXME make asynchronous
  74.                                 ((PeerNode)_sender).sendSync(m, _ctr, _realTime);
  75.                             } catch (SyncSendWaitedTooLongException e) {
  76.                                 // Complete anyway.
  77.                             }
  78.                         } else {
  79.                             _usm.send(_sender, m, _ctr);
  80.                         }
  81.                         discardEndTime=System.currentTimeMillis()+CLEANUP_TIMEOUT;
  82.                         discardFilter=relevantMessages(CLEANUP_TIMEOUT);
  83.                         maybeResetDiscardFilter();
  84.                     } catch (NotConnectedException e1) {
  85.                         // Ignore, we've got it.
  86.                         if(logMINOR) Logger.minor(this, "Got data but can't send allReceived to "+_sender+" as is disconnected");
  87.                     }
  88.                     long endTime = System.currentTimeMillis();
  89.                     long transferTime = (endTime - startTime);
  90.                     if(logMINOR) {
  91.                         synchronized(avgTimeTaken) {
  92.                             avgTimeTaken.report(transferTime);
  93.                             Logger.minor(this, "Block transfer took "+transferTime+"ms - average is "+avgTimeTaken);
  94.                         }
  95.                     }
  96.                     complete(_prb.getBlock());
  97.                     return;
  98.                 }
  99.             } catch (AbortedException e1) {
  100.                 // We didn't cause it?!
  101.                 Logger.error(this, "Caught in receive - probably a bug as receive sets it: "+e1, e1);
  102.                 complete(RetrievalException.UNKNOWN, "Aborted?");
  103.                 return;
  104.             }
  105.             try {
  106.                 // Even if timeout <= 0, we still add the filter, because we want to receive any messages that are already buffered before we timeout.
  107.                 waitNotification(truncateTimeout);
  108.             } catch (DisconnectedException e) {
  109.                 onDisconnect(null);
  110.                 return;
  111.             }
  112.         }
Advertisement
Add Comment
Please, Sign In to add comment