Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- private AsyncMessageFilterCallback notificationWaiter = new SlowAsyncMessageFilterCallback() {
- @Override
- public void onMatched(Message m1) {
- if(logMINOR)
- Logger.minor(this, "Received "+m1);
- if ((m1 != null) && m1.getSpec().equals(DMT.sendAborted)) {
- String desc=m1.getString(DMT.DESCRIPTION);
- if (desc.indexOf("Upstream")<0)
- desc="Upstream transmit error: "+desc;
- _prb.abort(m1.getInt(DMT.REASON), desc, false);
- synchronized(BlockReceiver.this) {
- senderAborted = true;
- }
- complete(m1.getInt(DMT.REASON), desc);
- return;
- }
- boolean truncateTimeout = false;
- if ((m1 != null) && (m1.getSpec().equals(DMT.packetTransmit))) {
- // packetTransmit received
- int packetNo = m1.getInt(DMT.PACKET_NO);
- BitArray sent = (BitArray) m1.getObject(DMT.SENT);
- Buffer data = (Buffer) m1.getObject(DMT.DATA);
- int missing = 0;
- try {
- synchronized(BlockReceiver.this) {
- if(completed) return;
- }
- if(CHECK_DUPES && _prb.isReceived(packetNo)) {
- // Transmitter sent the same packet twice?!?!?
- Logger.error(this, "Already received the packet - DoS??? on "+this+" uid "+_uid+" from "+_sender);
- // Does not extend timeouts.
- truncateTimeout = true;
- } else {
- _prb.addPacket(packetNo, data);
- if(logMINOR) {
- synchronized(BlockReceiver.this) {
- long interval = System.currentTimeMillis() - timeStartedWaiting;
- Logger.minor(this, "Packet interval: "+interval+" = "+TimeUtil.formatTime(interval, 2, true)+" from "+_sender);
- }
- }
- // Check that we have what the sender thinks we have
- for (int x = 0; x < sent.getSize(); x++) {
- if (sent.bitAt(x) && !_prb.isReceived(x)) {
- missing++;
- }
- }
- if(logMINOR && missing != 0)
- Logger.minor(this, "Packets which the sender says it has sent but we have not received: "+missing);
- }
- } catch (AbortedException e) {
- // We didn't cause it?!
- Logger.error(this, "Caught in receive - probably a bug as receive sets it: "+e, e);
- complete(RetrievalException.UNKNOWN, "Aborted?");
- return;
- }
- } else if (m1 != null && m1.getSpec().equals(DMT.allSent)) {
- synchronized(BlockReceiver.this) {
- if(completed) return;
- if(gotAllSent)
- // Multiple allSent's don't extend the timeouts.
- truncateTimeout = true;
- gotAllSent = true;
- }
- }
- try {
- if(_prb.allReceived()) {
- try {
- Message m = DMT.createAllReceived(_uid);
- if(completeAfterAckedAllReceived) {
- try {
- // FIXME layer violation
- // FIXME make asynchronous
- ((PeerNode)_sender).sendSync(m, _ctr, _realTime);
- } catch (SyncSendWaitedTooLongException e) {
- // Complete anyway.
- }
- } else {
- _usm.send(_sender, m, _ctr);
- }
- discardEndTime=System.currentTimeMillis()+CLEANUP_TIMEOUT;
- discardFilter=relevantMessages(CLEANUP_TIMEOUT);
- maybeResetDiscardFilter();
- } catch (NotConnectedException e1) {
- // Ignore, we've got it.
- if(logMINOR) Logger.minor(this, "Got data but can't send allReceived to "+_sender+" as is disconnected");
- }
- long endTime = System.currentTimeMillis();
- long transferTime = (endTime - startTime);
- if(logMINOR) {
- synchronized(avgTimeTaken) {
- avgTimeTaken.report(transferTime);
- Logger.minor(this, "Block transfer took "+transferTime+"ms - average is "+avgTimeTaken);
- }
- }
- complete(_prb.getBlock());
- return;
- }
- } catch (AbortedException e1) {
- // We didn't cause it?!
- Logger.error(this, "Caught in receive - probably a bug as receive sets it: "+e1, e1);
- complete(RetrievalException.UNKNOWN, "Aborted?");
- return;
- }
- try {
- // Even if timeout <= 0, we still add the filter, because we want to receive any messages that are already buffered before we timeout.
- waitNotification(truncateTimeout);
- } catch (DisconnectedException e) {
- onDisconnect(null);
- return;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment