Advertisement
Guest User

forwader-kite.cpp

a guest
Jan 17th, 2019
97
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 26.56 KB | None | 0 0
  1. /**
  2.  *
  3.  * TODO
  4.  *
  5.  * - Integrate remaining time in refreshTraceToAnchor (as specified in the
  6.  *   paper)... Why do they do this ? Note that we use NFD schedule, and not NS
  7.  *   one, so things might differ to get the remaining time.
  8.  *
  9.  *   UPDATE: remaining time is implemented in an equivalent way in onFaceAdded,
  10.  *   since we don't update the timer for mobility events
  11.  *
  12.  *
  13.  * NOTES
  14.  *
  15.  * - ??? How do we agree on a trace name ? shall it be specified per-prefix.
  16.  *   the client might just need to know the prefix under which the producer is
  17.  *   publishing.
  18.  * - XXX A traced interest (setting a trace) should not tap into the cache, but should be hooked
  19.  * beforehand.
  20.  * - Note that we cannot use forwarding strategies in that implementation
  21.  * - tracing interest = leaves a trace in the PIT, inRecords hold where to go
  22.  *   (?) don't we erase such entries in the PIT when an interest comes in ?
  23.  *   (?) how does the producer know which interests to send ???
  24.  *   (!) note that we have a trace name which is a circuit identifying a
  25.  *   producer/a content. Both consumer and producer need to agree on a content
  26.  *   (!) KITE == tunnel in PIT ! what is the different between KITE and
  27.  *   ForwardingHint ???
  28.  * - tracing interest: interest that should follow a trace. Note that non
  29.  *   conformant traced interests are dropped, let's be sure that KITE does not
  30.  *   say "might" instead of "should".
  31.  * - We have a tracedPitEntry left which might be invalid and is never cleaned
  32.  *   up
  33.  * - XXX we forward traced interests using the forwarding strategy... broken for
  34.  *   LB !
  35.  *
  36.  * - Multiple pending traces
  37.  *   One solution is to use the flag KITE_FLAG_FOLLOW_TRACE_AND_FIB so that at
  38.  *   least one packet reaches the Anchor, and has the latest road to the
  39.  *   producer...
  40.  *
  41.  *
  42.  * IMPLEMENTATION SPECIFICS
  43.  *
  44.  * - in our test with kite to support real-time traffic, we found several problems
  45.  *   and fixed them in order to make fair comparison with mapme, In particualr,
  46.  *   they include: 1) traced interests are also sent immediately after each handover
  47.  *   other than periodically sent. 2) tracing interests follow both FIB and traces
  48.  *   to avoid being guided only to out-dated producer locations 3) acknowledgement
  49.  *   retransmission of traced interests are enabled, to avoid large handoff latency
  50.  *   in case of traced interest packet loss.
  51.  *
  52.  * - The trace name in KITE paper should be specified in the application. In our
  53.  *   code, we instead hook the interest transmission from an application
  54.  *   (equivalent), and construct the trace name in a deterministic way (instead
  55.  *   of accepting random trace name, again equivalent).
  56.  * - We assume we have a default route in the FIB that we only use to get the
  57.  *   traceName, since we prefix all consumer interests with KITE_BASE_TRACENAME.
  58.  *   This means we need at least one GlobalRouting update after the producer
  59.  *   connects, just like for MapMe, which is done by the ConnectivityManager.
  60.  *
  61.  *
  62.  * CHANGELOG
  63.  *
  64.  * v1.0 - Initial release
  65.  *
  66.  *
  67.  * DEPENDENCIES
  68.  *
  69.  * - ndn-cxx patch: KITE
  70.  * - NFD/ndnSIM extension : CONF_FILE
  71.  *
  72.  *
  73.  * TESTS
  74.  *
  75.  * - NFD
  76.  *
  77.  * - ndnSIM
  78.  *     2.23
  79.  *
  80.  *
  81.  * LAST MERGED COMMIT (TO DEL AFTER UNIFICATION OF CODEBASES)
  82.  
  83.  * commit 368811137b0f9e775213777863cc1842b67d879a
  84.  * Author: zeng xuan <xuan.zeng@irt-systemx.fr>
  85.  * Date:   Wed May 4 12:33:52 2016 +0200
  86.  *
  87.  *  fix to kite
  88.  *
  89.  */
  90.  
  91. #ifdef KITE
  92.  
  93. #include "core/logger.hpp"
  94. #include "face/null-face.hpp"
  95. #include "forwarder.hpp"
  96. #include "strategy.hpp"
  97.  
  98. #include "table/pit-tnt-entry.hpp"
  99. #include "forwarder-kite.hpp"
  100.  
  101. #include <vector>
  102. #include <boost/algorithm/string/predicate.hpp>
  103.  
  104. #define MS2NS(x) time::nanoseconds(x * 1000000)
  105. #define IS_TRACED(interest) ((interest.getTraceForwardingFlag() != 255) && (interest.getTraceForwardingFlag() & 1))
  106. #define SEQ_NA 0
  107. #define TTL_NA 255
  108. #define TTL_INIT 0
  109. #define RETX_NA 255
  110. #define RETX_INIT 0
  111.  
  112. namespace nfd {
  113.  
  114. NFD_LOG_INIT("ForwarderKite");
  115.  
  116. using fw::Strategy;
  117.  
  118. /*------------------------------------------------------------------------------
  119.  * ForwarderKite : forwarding pipeline - Constructor & accessors
  120.  *----------------------------------------------------------------------------*/
  121.  
  122. ForwarderKite::ForwarderKite()
  123.   : Forwarder()
  124. #ifdef WITH_RELIABLE_RETRANSMISSION
  125.   , m_ack_seq(0)
  126. #endif
  127.   , m_isPull(false)
  128.  
  129. #ifdef NDNSIM
  130.   , m_onSpecialInterest(ns3::MakeNullCallback<void, std::string, uint64_t, uint64_t, uint64_t>())
  131.   , m_onProcessing(ns3::MakeNullCallback<void, std::string, uint64_t, uint64_t, uint64_t>())
  132. #endif // NDNSIM
  133.  
  134. {
  135.   getFaceTable().onAdd.connect(bind(&ForwarderKite::onFaceAdded, this, _1));
  136.   getFaceTable().onRemove.connect(bind(&ForwarderKite::onFaceRemoved, this, _1));
  137.  
  138.   m_fib.onAdd.connect(bind(&ForwarderKite::onFibEntryChanged, this, _1));
  139.   m_fib.onUpdate.connect(bind(&ForwarderKite::onFibEntryChanged, this, _1));
  140.   m_fib.onRemove.connect(bind(&ForwarderKite::onFibEntryChanged, this, _1));
  141. }
  142.  
  143.  
  144.  
  145. /*------------------------------------------------------------------------------
  146.  * Event handling
  147.  *----------------------------------------------------------------------------*/
  148.  
  149. void
  150. ForwarderKite::updateTraceToAnchor(weak_ptr<Face> wpFace)
  151. {
  152.   /* ON MOBILITY EVENT
  153.    * update trace, without changing timers.
  154.    *
  155.    * in this case, prefix is empty and we need to reannounce all prefixes.
  156.    *
  157.    * NOTE: This is equivalent to implementing remaining_time !
  158.    * XXX but lifetime of updated packet has to be modified.
  159.    */
  160.   for (auto & it : m_pendingUpdates) {
  161.     shared_ptr<Interest> interest = make_shared<Interest>();
  162.     interest->setName(Name(it.first));
  163.     interest->setInterestLifetime(time::milliseconds(KITE_DEFAULT_RETX));
  164.     interest->setTraceForwardingFlag(KITE_FLAG_TRACEABLE);
  165.    
  166.     // NOTE:this is trick to avoid traced interest to be satisfied by any data packets,
  167.     //equivalent to put traced interests into a separate TFT rather than simply in PIT.
  168.     interest->setMinSuffixComponents(100);
  169.  
  170.     shared_ptr<Face> face = wpFace.lock();
  171.     NFD_LOG_INFO("Update trace to anchor for it.first" << it.first << " on face=" << face->getDescription());
  172.    
  173.     if(!static_cast<bool>(face)) //check the validity of the face before sending
  174.       return;
  175. #ifdef WITH_RELIABLE_RETRANSMISSION
  176.     this->sendReliableTracedInterest(wpFace, it.first, interest);
  177. #else
  178.     face->sendInterest(*interest);
  179. #endif
  180.     auto it_seq = m_seq.find(it.first);
  181.     uint64_t seq = (it_seq != m_seq.end()) ? it_seq->second : -1;
  182.     m_seq[it.first] = ++seq;
  183.     m_retx[it.first] = RETX_INIT;
  184.     m_onSpecialInterest(it.first, seq, TTL_INIT, RETX_INIT);
  185.   }
  186. }
  187.  
  188. #ifdef WITH_RELIABLE_RETRANSMISSION
  189. void
  190. ForwarderKite::sendReliableTracedInterest(weak_ptr<Face> wpFace, std::string prefix, shared_ptr<Interest> interest)
  191. {
  192.   scheduler::cancel(m_pendingRetx[prefix]);
  193.   shared_ptr<Face> face = wpFace.lock();
  194.   if(!static_cast<bool>(face)) //check the validity of the face before sending
  195.     return;
  196. //    std::cout<<time::steady_clock::now()<<", send reliable traced interest, for prefix="<<prefix<<", at="<<(void*)this<<"\n";
  197.   interest->refreshNonce();
  198.   face->sendInterest(*interest);
  199.  
  200.   // logging traced interest transmission at producer
  201.   // LOG: creation of traced interest
  202.   // Let's set seq number here
  203.  
  204.   m_pendingRetx[prefix] = scheduler::schedule(MS2NS(KITE_RELIABLE_RETX),
  205.       bind(&ForwarderKite::sendReliableTracedInterest, this,wpFace, prefix, interest));
  206. }
  207. #endif
  208.  
  209. void
  210. ForwarderKite::refreshTraceToAnchor(std::string prefix)
  211. {  
  212.   shared_ptr<Interest> interest = make_shared<Interest>();
  213.   interest->setName( Name(prefix));
  214.   interest->setInterestLifetime(time::milliseconds(KITE_DEFAULT_RETX));
  215.   interest->setTraceForwardingFlag(KITE_FLAG_TRACEABLE);
  216.  
  217.   interest->setMinSuffixComponents(100);
  218.  
  219.  
  220.  
  221.   // ON TICKS : Loop over all non local faces...
  222.   // NOTE: initially, it is possible that there is no face if the producer is
  223.   // not connected...
  224.   for (auto face : getFaceTable()) {
  225.     if (face->isLocal())
  226.       continue;
  227.     NFD_LOG_INFO("Setup trace to anchor for prefix " << prefix << " on face=" << face->getDescription());
  228.  
  229. #ifdef WITH_RELIABLE_RETRANSMISSION
  230.     this->sendReliableTracedInterest(face,prefix,interest);
  231. #else
  232.     face->sendInterest(*interest);
  233. #endif
  234.   auto it_seq = m_seq.find(prefix);
  235.   uint64_t seq = (it_seq != m_seq.end()) ? it_seq->second : 0;
  236. // Here we don't increase seq number since we did not move... it is just a
  237. // refresh = overhead for maintaining the current position.
  238. //  m_seq[prefix] = ++seq;
  239.   auto it_retx = m_retx.find(prefix);
  240.   uint64_t retx = (it_retx != m_retx.end()) ? it_retx->second : 0;
  241.   m_retx[prefix] = ++retx;
  242.   m_onSpecialInterest(prefix, seq, TTL_INIT, retx);
  243.   }
  244.  
  245.   // ..., clear any pending update  ...
  246.   auto it = m_pendingUpdates.find(prefix);
  247.   if (it != m_pendingUpdates.end()) {
  248.     scheduler::cancel(it->second);
  249.     // XXX m_pendingUpdates.erase(it);
  250.   }
  251.  
  252.   // ... and schedule next one
  253.   m_pendingUpdates[prefix] = scheduler::schedule(MS2NS(KITE_DEFAULT_RETX / KITE_DEFAULT_RETX_FACTOR),
  254.       bind(&ForwarderKite::refreshTraceToAnchor, this, prefix));
  255.  
  256. }
  257.  
  258. void
  259. ForwarderKite::onFaceAdded(shared_ptr<Face> face)
  260. {
  261.   /* Trigger re-sending the traced interests immediately after a mobility event,
  262.    * so that the trace gets past the mobile node to the base station, then up to
  263.    * the anchor.
  264.    *
  265.    * NOTE:
  266.    * - Previous traces are still present in the network, hence the need to use
  267.    *   the KITE_FLAG_FOLLOW_TRACE_AND_FIB which duplicates the interest on both
  268.    *   the trace and towards the anchor...
  269.    */
  270.  
  271.   // Ignore local faces
  272.   if (face->isLocal())
  273.     return;
  274.  
  275.   NFD_LOG_INFO("*** onFaceAdded, mobility event, resending all prefixes - face=" << face->getDescription());
  276.  
  277.   scheduler::schedule(MS2NS(0), bind(&ForwarderKite::updateTraceToAnchor, this,
  278.        face));
  279. }
  280.  
  281. void
  282. ForwarderKite::onFaceRemoved(shared_ptr<Face> face)
  283. {
  284. //  std::cout << "TODO" << std::endl;
  285. }
  286.  
  287. // maybe we could hook the FIB instead XXX XXX much better XXX XXX
  288. void
  289. ForwarderKite::onFibEntryChanged(shared_ptr<fib::Entry> fibEntry)
  290. {
  291.   for (auto & it : fibEntry->getNextHops()) {
  292.     if (it.getFace()->getLocalUri() == FaceUri("appFace://")) {
  293.       std::string prefix = fibEntry->getPrefix().toUri();
  294.       //NOTE: we need to skip the anchor application, which is not a real producer application:
  295.  
  296.       /* The node is a producer for prefix fibEntry->getPrefix() */
  297.       if (m_pendingUpdates.find(prefix) == m_pendingUpdates.end()) {
  298.           /* We discovered a new prefix for which we are producer */
  299.           NFD_LOG_INFO("We discovered a new prefix for which we are producer =" << prefix << ". sending trace");
  300.           refreshTraceToAnchor(prefix);
  301.           break;
  302.       }
  303.     }
  304.   }
  305. }
  306.  
  307. /*-----------------------------------------------------------------------------
  308.  * Overloaded functions
  309.  *----------------------------------------------------------------------------*/
  310.  
  311. void
  312. ForwarderKite::onContentStoreMiss(const Face& inFace, shared_ptr<pit::Entry>
  313.     pitEntry, const Interest& interest)
  314. {
  315.   shared_ptr<Face> face = const_pointer_cast<Face>(inFace.shared_from_this());
  316.   // insert InRecord
  317.   // XXX For a traced interest, this will make up the trace !!
  318.   pitEntry->insertOrUpdateInRecord(face, interest);
  319.  
  320.   // set PIT unsatisfy timer
  321.   this->setUnsatisfyTimer(pitEntry);
  322.  
  323.   bool isForwardedAsTracing = false;
  324.   std::vector<shared_ptr<Face> > traceForwardingOutFaces;
  325.  
  326.   // We should have : tracename, traceonly and traceable flags...
  327.   // TFT: all traced interests with traceable flag
  328.   //   tracing interests forwarded according to the TFT
  329.   // TNT
  330.   //   traced interests will pull the corresponding tracing interests from the
  331.   //   TNT
  332.   // pull = get pending interests after movement
  333.  
  334.  
  335.   /* HOOK : TRACING INTERESTS = consumer interests following a trace
  336.    * traceName should be set in the header
  337.    *
  338.    * we create a temp interest to create a pit entry just to check whether it
  339.    * was existing before = to check for the presence of a trace.
  340.    * if trace follow trace otherwise follow FIB ... based on what...
  341.    *
  342.    * If the PIT entry was already existing, this means someone left a trace
  343.    */
  344.   if (!interest.getTraceName().empty()) {
  345.     NFD_LOG_INFO("1) processed as tracing interest follow traceName=" << interest.getTraceName());
  346.     shared_ptr<Interest> TmpInterest = make_shared<Interest>(interest.getTraceName());
  347.  
  348.     TmpInterest->setMinSuffixComponents(100);
  349.  
  350.  
  351.     std::pair<shared_ptr<pit::Entry>, bool> tfEntryElement = m_pit.insert(*TmpInterest);
  352.     shared_ptr<pit::Entry> tracedPitEntry = tfEntryElement.first;
  353.  
  354.     // We search for the trace in the PIT !
  355.  
  356.     if (tfEntryElement.second) { // isNew
  357.       // The second element of the pair is a boolean indicating if the insertion
  358.       // happened (True) or if the call returned an existing PIT entry (False)
  359.       //
  360.       // No trace, we won't be able to forward this interest. Let's wait for a
  361.       // new trace eventually.
  362.       //
  363.       // Note that even if we had forwarded it, it could be an old trace. That's
  364.       // also why we forward on all of them when we find more than one.
  365.       NFD_LOG_TRACE("traced pit entry not found, tracename="<< interest.getTraceName());
  366.     } else {
  367.       NFD_LOG_TRACE("traced pit entry is found" << ", traced pitEntry=" << (void*)(tracedPitEntry.get()) );
  368.       const pit::InRecordCollection& inRecords = tracedPitEntry->getInRecords();
  369.  
  370.       NFD_LOG_TRACE("traced pitEntry's inrecord size=" << inRecords.size());
  371.       //for(pit::InRecordCollection::const_iterator it=inRecords.begin(); it!=inRecords.end(); ++it) {
  372.       for (auto & it : inRecords) {
  373.         if (!IS_TRACED(it.getInterest())) {
  374.           NFD_LOG_TRACE("meet traced interest, inFace=" << it.getFace()->getId() <<", not traceable");
  375.  
  376.           continue; /*NOTE: instead of dropping the interest, we continue the processing of interest*/
  377.         }
  378.  
  379.         shared_ptr<Face> incomingFace = it.getFace();
  380.         NFD_LOG_TRACE("meet traced interest, inFace=" << incomingFace->getId() <<", traceable");
  381.  
  382.         time::steady_clock::TimePoint now = time::steady_clock::now();
  383.         if (it.getExpiry() < now) {
  384.           NFD_LOG_TRACE("expried");
  385.         } else {
  386.           NFD_LOG_TRACE("not expried");
  387.           if (&inFace == incomingFace.get())
  388.             NFD_LOG_TRACE("its inFace=" << incomingFace->getId() <<", equal to inface of tracing interest, not forwardable to");
  389.           else if(incomingFace->getId() == -1)
  390.             NFD_LOG_TRACE("its inFace=" << incomingFace->getId() <<", now it is down, not forwardable to this face");
  391.           else {
  392.             if (pitEntry->canForwardTo(*incomingFace)) { //if the tracing interest is forwardable to this incoming face
  393.               NFD_LOG_TRACE("tracing interest is forwardable to its inface=" << incomingFace->getId() << ", to be forwarded ...");
  394.               isForwardedAsTracing = true;
  395. ///////////
  396. // My changes.
  397.             }
  398.           }
  399.         }
  400.       }
  401.     }
  402. //////////
  403.  
  404.     //NOTE: check if pullbit is set
  405.     if(m_isPull) {
  406.       NFD_LOG_INFO("insert tracing interest into the TNT(Trace Name Table) entry" << interest.getName());
  407.  
  408.       //get the TNT entry:
  409.       shared_ptr<pit::TntEntry> pitTntEntry = tracedPitEntry->getStrategyInfo<pit::TntEntry>();
  410.       if (!static_cast<bool>(pitTntEntry)) {
  411.         NFD_LOG_TRACE("TNT entry not found , create it");
  412.         pitTntEntry=make_shared<pit::TntEntry>(tracedPitEntry->getName());
  413.         tracedPitEntry->setStrategyInfo(pitTntEntry);
  414.       }
  415.       NFD_LOG_TRACE("add tracing record");
  416.       // add tracing record:
  417.       pitTntEntry->addTracingRecord(weak_ptr<const Interest>(interest.shared_from_this()),
  418.       weak_ptr<Face>(face), weak_ptr<pit::Entry>(pitEntry));
  419.     } // end m_isPull
  420.  
  421.     //    NFD_LOG_INFO("add tracing record, successful");
  422.   } // END HOOK : TRACING INTEREST
  423.  
  424.   /* HOOK : TRACED INTEREST
  425.    *
  426.    * This will leave a trace in the PIT ! it is addressed towards the prefix,
  427.    * advertized by the anchor...
  428.    *
  429.    * Composed of all trace names.
  430.    * TFT contains the trace
  431.    * TNT pending interests
  432.    */
  433.   if (IS_TRACED(interest)) {
  434.     m_onProcessing(interest.getName().toUri(), SEQ_NA, TTL_NA, RETX_NA);
  435.     NFD_LOG_INFO("2) ForwarderKite processing traced interest, creating trace, name=" << interest.getName() << ", traceable" <<", pitEntry=" <<(void*)(pitEntry.get()) << " - inFace=" << inFace.getDescription() );
  436.  
  437. //  std::cout<<"receiving traced interest="<<interest.getName()<<", at="<<(void*)this<<"\n";
  438.     //NOTE: check if pull bit is set
  439.     if(m_isPull){
  440.       //get TNT (trace name table) entry
  441.       // XXX it might be that we store everything in the same place... -- jordan
  442.       //
  443.       shared_ptr<pit::TntEntry> pitTntEntry = pitEntry->getStrategyInfo<pit::TntEntry>();
  444.       if (static_cast<bool>(pitTntEntry)) {
  445.         NFD_LOG_TRACE("TNT entry (tracing interests) are found");
  446.  
  447.         // TODO for (auto & it : pitTntEntry->getTracingInterests()) {
  448.         const pit::TntEntry::TracingInterestList& tracingInterestRecords = pitTntEntry->getTracingInterests();
  449.         pit::TntEntry::TracingInterestList::const_iterator it;
  450.         for (it=tracingInterestRecords.begin(); it != tracingInterestRecords.end(); ++it) {
  451.  
  452.           const pit::TntRecord& TracingRecord = it->second;
  453.           if (!TracingRecord.isValid()) {
  454.             NFD_LOG_TRACE("SHOULD NOT HAPPEN ! meet tracing record. It's not valid, remove it");
  455.             pitTntEntry->removeTracingRecord(it);
  456.             continue;
  457.           }
  458.  
  459.           NFD_LOG_TRACE("meet tracing record. It's valid");
  460.           //valid check guarantees below are valid share_pt:
  461.           shared_ptr<pit::Entry> tracingPitEntry = TracingRecord.getPitEntry().lock();
  462.           // XXX a single interest by record ??
  463.           shared_ptr<const Interest> tracingInterest = TracingRecord.getInterest().lock();
  464.  
  465.           if ((TracingRecord.getInFace()).lock().get() == &inFace ||
  466.               ! tracingPitEntry->canForwardTo(inFace)) {
  467.             // XXX How could this happen, since we update
  468.             NFD_LOG_TRACE("SHOULD NOT HAPPEN ! tracing interest, not forwardable to inFace="<< inFace.getId() );
  469.             continue;
  470.           }
  471.  
  472.           NFD_LOG_TRACE("tracing interest is forwardable to inFace=" << inFace.getId() );
  473.           //send the current interest to that incoming face:
  474.           // insert OutRecord
  475.           tracingPitEntry->insertOrUpdateOutRecord(face, *tracingInterest);
  476.           // send Interest
  477.           face->sendInterest(*tracingInterest);
  478.           ++m_counters.getNOutInterests();
  479.           NFD_LOG_INFO("tracing interest=" << tracingInterest->getName()
  480.               << ", pulled by interest="<< interest.getName()
  481.               <<", to face=" << inFace.getId());
  482.         }
  483.       }
  484.     } //end if m_isPull
  485.   }  /* END HOOK : TRACED INTEREST */
  486.  
  487.   if (isForwardedAsTracing && interest.getTraceForwardingFlag() != 255 &&
  488.       ((interest.getTraceForwardingFlag() >> 1) & 1)) {
  489.     NFD_LOG_INFO("fib lookup is to be skipped");
  490.   } else {
  491.     //NFD_LOG_INFO("will continue processing to the fib");
  492.  
  493.     // FIB lookup
  494.     shared_ptr<fib::Entry> fibEntry = m_fib.findLongestPrefixMatch(*pitEntry);
  495.  
  496.     //if this is a traced interest(interest that updates routing)
  497.     if (IS_TRACED(interest)) {
  498.       // don't leave the forwarding operation to strategy layer, instead send it
  499.       // to all nexthops directly to avoid some problems related to
  500.       // loop/supression of traced interest(routing update)
  501.       const fib::NextHopList& nexthops = fibEntry->getNextHops();
  502.  
  503.       NFD_LOG_INFO("Broadcasting traced interest based on FIB entries" << fibEntry->getPrefix());
  504.       //for(fib::NextHopList::const_iterator it=nexthops.begin(); it!=nexthops.end(); it++)
  505.       if(nexthops.size()!=0) {
  506.         for (auto it : nexthops) {
  507.           shared_ptr<Face> outFace = it.getFace();
  508.             if(outFace.get()==&inFace) //avoid loops
  509.               continue;
  510.           //std::cout << "  . outFace " << outFace->getDescription() << std::endl;
  511.           outFace->sendInterest(interest);
  512.           ++m_counters.getNOutInterests();
  513.           pitEntry->insertOrUpdateOutRecord(outFace, interest);
  514.  
  515.           //logging traced interest transmission at each router
  516.           // LOG: forwarding of Traced interest
  517.           // XXX there is no seq info available, let's put 0 in the mean time. See comment
  518.           // in other places in the code.
  519.           m_onSpecialInterest(interest.getName().toUri(), SEQ_NA, TTL_NA, RETX_NA);
  520.         }
  521.       }
  522. #ifdef WITH_RELIABLE_RETRANSMISSION
  523.       else {
  524.         // THIS IS TO DETECT WE HAVE REACHED THE ANCHOR... it has no FIB entry
  525.           //automatically send an ack interest back:
  526.         std::ostringstream nstream;
  527.         //NOTE: here it is assumed that prefix has only one name componet like /prefix0 ,
  528.         //we add a sequence number in the ack name to avoid the ack to be pending somewhere
  529.           //xxx, should be generalized for longer prefix
  530.         nstream<<KITE_TRACE_ACK_PREFIX<<"/"<<m_ack_seq++<<interest.getName().toUri();
  531.         // std::cout <<"***************send back ack interest="<<nstream.str()<<"\n";
  532.         Name ackInterestName(nstream.str());
  533.         shared_ptr<Interest> ackInterest = make_shared<Interest>(ackInterestName);
  534.  
  535.         ackInterest->setTraceForwardingFlag(2); // interest shall be forwarded using trace only
  536.         ackInterest->setTraceName(interest.getName());
  537.         face->sendInterest(*ackInterest);
  538.         ++m_counters.getNOutInterests();
  539.         pitEntry->insertOrUpdateOutRecord(face, *ackInterest);
  540.  
  541.         //Ack loggging is dsiabled
  542.         // m_onSpecialInterest(ackInterest->getName().toUri(), SEQ_NA, TTL_NA, RETX_NA);
  543.       }
  544. #endif
  545.     } else {
  546.         // dispatch to strategy
  547.         //NFD_LOG_INFO("Sending interest based on FW strategy and FIB entries");
  548.  
  549.         //NOTE: the following check is to guarantee that the pit entry created by following trace forwarding will not
  550.         //be removed later by the attempt of FIB forwarding, which makes trace forwarding fail.
  551.         //actually the straggler time of 100ms can prevent us from seeing this problem in most cases.
  552.         if(isForwardedAsTracing) {
  553.           const fib::NextHopList& nexthops = fibEntry->getNextHops();
  554.           if (nexthops.size()!=0 && !(nexthops.size()==1 && nexthops.begin()->getFace().get() == &inFace)) { //if it is neither the case that there's no matching FIB Nor the case that the only output face in FIB is the same as inFace of this interest(loop forwarding)
  555.               this->dispatchToStrategy(pitEntry, bind(&Strategy::afterReceiveInterest, _1,
  556.                   cref(inFace), cref(interest), fibEntry, pitEntry));
  557.           }
  558.         } else {
  559.           this->dispatchToStrategy(pitEntry, bind(&Strategy::afterReceiveInterest, _1,
  560.               cref(inFace), cref(interest), fibEntry, pitEntry));
  561.         }
  562.     }
  563.   }
  564.  
  565.   if (isForwardedAsTracing) {
  566.     //for(std::vector<shared_ptr<Face> >::iterator it=traceForwardingOutFaces.begin();it!=traceForwardingOutFaces.end();++it)
  567.     NFD_LOG_INFO("insert outrecord for traceforwarding OutFaces");
  568.     for (auto it : traceForwardingOutFaces) {
  569.       NFD_LOG_INFO(" - insert out record");
  570.       pitEntry->insertOrUpdateOutRecord(it, interest);
  571.     }
  572.   }
  573.  
  574. }
  575.  
  576.  
  577. /* Hooks for special interest processing */
  578. void
  579. ForwarderKite::onIncomingInterest(Face& inFace, const Interest& interest)
  580. {
  581.   Interest & interest_ = const_cast<Interest &>(interest);
  582.  
  583. #ifdef WITH_RELIABLE_RETRANSMISSION
  584.   if( boost::starts_with(interest.getName().toUri(), KITE_TRACE_ACK_PREFIX)) // an ack interest is received
  585.   {
  586.       std::string prefix=interest.getName().getSubName(2, 3).toUri();
  587. //        std::cout<<"recieved ack for "<<prefix<<", at="<<(void*)this<<", from="<<inFace.getId()<<",prefix="<<prefix<<"\n";
  588.       if(!m_pendingUpdates.empty() && m_pendingUpdates.find(prefix)!=m_pendingUpdates.end())
  589.       {
  590.             scheduler::cancel(m_pendingRetx[prefix]);
  591. //          std::cout<<"cancel retx for "<<prefix<<", at="<<(void*)this<<", from="<<inFace.getId()<<"\n";
  592.         //then drop the ack
  593.       }
  594.       else
  595.       {
  596.        Forwarder::onIncomingInterest(inFace, interest_);/* Add trace for regular consumer interests */
  597.       }
  598.  
  599.       return;
  600.   }
  601. #endif
  602.  
  603.   /* Tag all interests incoming from a local face with KITE flags */
  604.   if (inFace.isLocal() && !boost::starts_with(interest.getName().toUri(), "/localhost")) {
  605.     /* We assume we have a default route in the FIB that we only use to get
  606.      * the traceName, since we prefix all consumer interests with
  607.      * KITE_BASE_TRACENAME
  608.      *
  609.      */
  610.     Name traceName(interest.getName().getSubName(0, 1).toUri());
  611.     NFD_LOG_INFO("Hooked consumer interest with traceName=" << traceName);
  612.     interest_.setTraceName(traceName);
  613.     interest_.setTraceForwardingFlag(KITE_FLAG_FOLLOW_TRACE_AND_FIB);
  614.   }
  615.  
  616.  
  617.   //XXX duplicate some codes from forwarder::OnIncomingInterest(), to be fixed later
  618. //here is to avoid content sore lookup for traced interest
  619.  
  620.    if(IS_TRACED(interest) && interest.getTraceName().empty())// is traced and not tracing interest
  621.    {
  622.      NFD_LOG_DEBUG("onIncomingInterest face=" << inFace.getId() <<
  623.                 " interest=" << interest.getName());
  624.      const_cast<Interest&>(interest).setIncomingFaceId(inFace.getId());
  625.      ++m_counters.getNInInterests();
  626.  
  627.      // /localhost scope control
  628.      bool isViolatingLocalhost = !inFace.isLocal() &&
  629.                               LOCALHOST_NAME.isPrefixOf(interest.getName());
  630.      if (isViolatingLocalhost) {
  631.         NFD_LOG_DEBUG("onIncomingInterest face=" << inFace.getId() <<
  632.                       " interest=" << interest.getName() << " violates /localhost");
  633.          // (drop)
  634.         return;
  635.      }
  636.  
  637.      // PIT insert
  638.      shared_ptr<pit::Entry> pitEntry = m_pit.insert(interest).first;
  639.  
  640.      // detect duplicate Nonce
  641.      int dnw = pitEntry->findNonce(interest.getNonce(), inFace);
  642.      bool hasDuplicateNonce = (dnw != pit::DUPLICATE_NONCE_NONE) ||
  643.                            m_deadNonceList.has(interest.getName(), interest.getNonce());
  644.      if (hasDuplicateNonce) {
  645.        // goto Interest loop pipeline
  646.        this->onInterestLoop(inFace, interest_, pitEntry);
  647.        return;
  648.      }
  649.  
  650.      // cancel unsatisfy & straggler timer
  651.      this->cancelUnsatisfyAndStragglerTimer(pitEntry);
  652.  
  653.      this->onContentStoreMiss(inFace, pitEntry, interest);
  654.      return;
  655.    }
  656.  
  657.   /* Add trace for regular consumer interests */
  658.   Forwarder::onIncomingInterest(inFace, interest_);
  659.  
  660.  
  661.  
  662. }
  663.  
  664. } // namespace nfd
  665.  
  666. #endif // KITE
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement