Advertisement
Guest User

Untitled

a guest
May 29th, 2013
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Diff 5.37 KB | None | 0 0
  1. --- a/src/osd/OSD.cc
  2. +++ b/src/osd/OSD.cc
  3. @@ -906,7 +906,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
  4.    stat_lock("OSD::stat_lock"),
  5.    finished_lock("OSD::finished_lock"),
  6.    op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
  7. -  peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp, 200),
  8. +  peering_wq(this, g_conf->osd_op_thread_timeout, &recovery_tp, 200),
  9.    map_lock("OSD::map_lock"),
  10.    peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
  11.    debug_drop_pg_create_probability(g_conf->osd_debug_drop_pg_create_probability),
  12. @@ -6422,6 +6422,57 @@ void OSD::enqueue_op(PG *pg, OpRequestRef op)
  13.    op_wq.queue(make_pair(PGRef(pg), op));
  14.  }
  15.  
  16. +void OSD::OpWQ::_calculate_pg_cost(PGRef pg) {
  17. +    assert(qlock.is_locked());
  18. +    int cost = 0;
  19. +    if (pg == NULL) return;
  20. +    bool recovery_prio = false;
  21. +    if (pg_for_processing.count(&*pg)) {
  22. +        for (list<OpRequestRef>::iterator i = pg_for_processing[&*pg].begin(); i != pg_for_processing[&*pg].end(); i++) {
  23. +            OpRequestRef op = *i;
  24. +            if (op->request->get_type() == CEPH_MSG_OSD_OP ||
  25. +                    op->request->get_type() == CEPH_MSG_OSD_OPREPLY ||
  26. +                    op->request->get_type() == MSG_OSD_SUBOP ||
  27. +                    op->request->get_type() == MSG_OSD_SUBOPREPLY) {
  28. +                recovery_prio = true;
  29. +            }
  30. +        }
  31. +        for (list<OpRequestRef>::iterator i = pg_for_processing[&*pg].begin(); i != pg_for_processing[&*pg].end(); i++) {
  32. +            OpRequestRef op = *i;
  33. +            if (op->request->get_type() == MSG_OSD_PG_BACKFILL ||
  34. +                    op->request->get_type() == MSG_OSD_PG_SCAN
  35. +                    /* || op->request->op == MOSDPGBackfill::OP_BACKFILL_PROGRESS
  36. +                    || op->request->op == MOSDPGBackfill::OP_BACKFILL_FINISH */) {
  37. +                if (recovery_prio) {
  38. +                    cost += 1000;
  39. +                } else {
  40. +                    cost += 1;
  41. +                }
  42. +            } else {
  43. +                cost += 10 * (op->request->get_priority() - 1) + 1;
  44. +            }
  45. +        }
  46. +    }
  47. +    pg_for_processing_costs[&*pg] = cost;
  48. +}
  49. +
  50. +double OSD::OpWQ::_get_pg_cost(PGRef pg) {
  51. +    assert(qlock.is_locked());
  52. +    int sum = pg_for_processing_costs[&*pg];
  53. +    if (sum > 0 && pg_for_processing.count(&*pg)) {
  54. +        return (double)pg_for_processing_costs[&*pg] / pg_for_processing[&*pg].size();
  55. +    }
  56. +    return 0.0;
  57. +}
  58. +
  59. +double OSD::pg_cost(PG *pg)
  60. +{
  61. +    {
  62. +        Mutex::Locker l(op_wq.qlock);
  63. +        return op_wq.pg_for_processing_costs[pg];
  64. +    }
  65. +}
  66. +
  67.  void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item)
  68.  {
  69.    unsigned priority = item.second->request->get_priority();
  70. @@ -6433,6 +6484,10 @@ void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item)
  71.    else
  72.      pqueue.enqueue(item.second->request->get_source_inst(),
  73.        priority, cost, item);
  74. +  {
  75. +      Mutex::Locker l(qlock);
  76. +      _calculate_pg_cost(&*(item.first));
  77. +  }
  78.    osd->logger->set(l_osd_opq, pqueue.length());
  79.  }
  80.  
  81. @@ -6455,6 +6510,10 @@ void OSD::OpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item)
  82.    else
  83.      pqueue.enqueue_front(item.second->request->get_source_inst(),
  84.        priority, cost, item);
  85. +  {
  86. +      Mutex::Locker l(qlock);
  87. +      _calculate_pg_cost(&*(item.first));
  88. +  }
  89.    osd->logger->set(l_osd_opq, pqueue.length());
  90.  }
  91.  
  92. @@ -6552,6 +6611,14 @@ struct C_CompleteSplits : public Context {
  93.    }
  94.  };
  95.  
  96. +struct comparer {
  97. +    OSD *osd;
  98. +    comparer(OSD *osd) : osd(osd) {}
  99. +    bool operator()(PG *a, PG *b) {
  100. +        return osd->pg_cost(a) > osd->pg_cost(b);
  101. +    }
  102. +};
  103. +
  104.  void OSD::process_peering_events(
  105.    const list<PG*> &pgs,
  106.    ThreadPool::TPHandle &handle
  107. @@ -6561,8 +6628,10 @@ void OSD::process_peering_events(
  108.    epoch_t same_interval_since = 0;
  109.    OSDMapRef curmap = service.get_osdmap();
  110.    PG::RecoveryCtx rctx = create_context();
  111. -  for (list<PG*>::const_iterator i = pgs.begin();
  112. -       i != pgs.end();
  113. +  list<PG*> pglist(pgs.begin(), pgs.end());
  114. +  pglist.sort(comparer(this));
  115. +  for (list<PG*>::const_reverse_iterator i = pglist.rbegin();
  116. +       i != pglist.rend();
  117.         ++i) {
  118.      set<boost::intrusive_ptr<PG> > split_pgs;
  119.      PG *pg = *i;
  120. @@ -6600,6 +6669,7 @@ void OSD::process_peering_events(
  121.    dispatch_context(rctx, 0, curmap);
  122.  
  123.    service.send_pg_temp();
  124. +  //send_pg_stats(ceph_clock_now(g_ceph_context));
  125.  }
  126.  
  127.  // --------------------------------
  128. --- a/src/osd/OSD.h
  129. +++ b/src/osd/OSD.h
  130. @@ -482,6 +482,7 @@ public:
  131.    virtual const char** get_tracked_conf_keys() const;
  132.    virtual void handle_conf_change(const struct md_config_t *conf,
  133.                   const std::set <std::string> &changed);
  134. +  double pg_cost(PG* pg);
  135.  
  136.  protected:
  137.    Mutex osd_lock;          // global lock
  138. @@ -735,6 +736,7 @@ private:
  139.                            PGRef > {
  140.      Mutex qlock;
  141.      map<PG*, list<OpRequestRef> > pg_for_processing;
  142. +    map<PG*, double> pg_for_processing_costs;
  143.      OSD *osd;
  144.      PrioritizedQueue<pair<PGRef, OpRequestRef>, entity_inst_t > pqueue;
  145.      OpWQ(OSD *o, time_t ti, ThreadPool *tp)
  146. @@ -751,6 +753,9 @@ private:
  147.        pqueue.dump(f);
  148.      }
  149.  
  150. +    void _calculate_pg_cost(PGRef pg);
  151. +    double _get_pg_cost(PGRef pg);
  152. +    void _push_pg(PGRef pg);
  153.      void _enqueue_front(pair<PGRef, OpRequestRef> item);
  154.      void _enqueue(pair<PGRef, OpRequestRef> item);
  155.      PGRef _dequeue();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement