Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- --- a/src/osd/OSD.cc
- +++ b/src/osd/OSD.cc
- @@ -906,7 +906,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
- stat_lock("OSD::stat_lock"),
- finished_lock("OSD::finished_lock"),
- op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
- - peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp, 200),
- + peering_wq(this, g_conf->osd_op_thread_timeout, &recovery_tp, 200),
- map_lock("OSD::map_lock"),
- peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
- debug_drop_pg_create_probability(g_conf->osd_debug_drop_pg_create_probability),
- @@ -6422,6 +6422,57 @@ void OSD::enqueue_op(PG *pg, OpRequestRef op)
- op_wq.queue(make_pair(PGRef(pg), op));
- }
- +void OSD::OpWQ::_calculate_pg_cost(PGRef pg) {
- + assert(qlock.is_locked());
- + int cost = 0;
- + if (pg == NULL) return;
- + bool recovery_prio = false;
- + if (pg_for_processing.count(&*pg)) {
- + for (list<OpRequestRef>::iterator i = pg_for_processing[&*pg].begin(); i != pg_for_processing[&*pg].end(); i++) {
- + OpRequestRef op = *i;
- + if (op->request->get_type() == CEPH_MSG_OSD_OP ||
- + op->request->get_type() == CEPH_MSG_OSD_OPREPLY ||
- + op->request->get_type() == MSG_OSD_SUBOP ||
- + op->request->get_type() == MSG_OSD_SUBOPREPLY) {
- + recovery_prio = true;
- + }
- + }
- + for (list<OpRequestRef>::iterator i = pg_for_processing[&*pg].begin(); i != pg_for_processing[&*pg].end(); i++) {
- + OpRequestRef op = *i;
- + if (op->request->get_type() == MSG_OSD_PG_BACKFILL ||
- + op->request->get_type() == MSG_OSD_PG_SCAN
- + /* || op->request->op == MOSDPGBackfill::OP_BACKFILL_PROGRESS
- + || op->request->op == MOSDPGBackfill::OP_BACKFILL_FINISH */) {
- + if (recovery_prio) {
- + cost += 1000;
- + } else {
- + cost += 1;
- + }
- + } else {
- + cost += 10 * (op->request->get_priority() - 1) + 1;
- + }
- + }
- + }
- + pg_for_processing_costs[&*pg] = cost;
- +}
- +
- +double OSD::OpWQ::_get_pg_cost(PGRef pg) {
- + assert(qlock.is_locked());
- + int sum = pg_for_processing_costs[&*pg];
- + if (sum > 0 && pg_for_processing.count(&*pg)) {
- + return (double)pg_for_processing_costs[&*pg] / pg_for_processing[&*pg].size();
- + }
- + return 0.0;
- +}
- +
- +double OSD::pg_cost(PG *pg)
- +{
- + {
- + Mutex::Locker l(op_wq.qlock);
- + return op_wq.pg_for_processing_costs[pg];
- + }
- +}
- +
- void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item)
- {
- unsigned priority = item.second->request->get_priority();
- @@ -6433,6 +6484,10 @@ void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item)
- else
- pqueue.enqueue(item.second->request->get_source_inst(),
- priority, cost, item);
- + {
- + Mutex::Locker l(qlock);
- + _calculate_pg_cost(&*(item.first));
- + }
- osd->logger->set(l_osd_opq, pqueue.length());
- }
- @@ -6455,6 +6510,10 @@ void OSD::OpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item)
- else
- pqueue.enqueue_front(item.second->request->get_source_inst(),
- priority, cost, item);
- + {
- + Mutex::Locker l(qlock);
- + _calculate_pg_cost(&*(item.first));
- + }
- osd->logger->set(l_osd_opq, pqueue.length());
- }
- @@ -6552,6 +6611,14 @@ struct C_CompleteSplits : public Context {
- }
- };
- +struct comparer {
- + OSD *osd;
- + comparer(OSD *osd) : osd(osd) {}
- + bool operator()(PG *a, PG *b) {
- + return osd->pg_cost(a) > osd->pg_cost(b);
- + }
- +};
- +
- void OSD::process_peering_events(
- const list<PG*> &pgs,
- ThreadPool::TPHandle &handle
- @@ -6561,8 +6628,10 @@ void OSD::process_peering_events(
- epoch_t same_interval_since = 0;
- OSDMapRef curmap = service.get_osdmap();
- PG::RecoveryCtx rctx = create_context();
- - for (list<PG*>::const_iterator i = pgs.begin();
- - i != pgs.end();
- + list<PG*> pglist(pgs.begin(), pgs.end());
- + pglist.sort(comparer(this));
- + for (list<PG*>::const_reverse_iterator i = pglist.rbegin();
- + i != pglist.rend();
- ++i) {
- set<boost::intrusive_ptr<PG> > split_pgs;
- PG *pg = *i;
- @@ -6600,6 +6669,7 @@ void OSD::process_peering_events(
- dispatch_context(rctx, 0, curmap);
- service.send_pg_temp();
- + //send_pg_stats(ceph_clock_now(g_ceph_context));
- }
- // --------------------------------
- --- a/src/osd/OSD.h
- +++ b/src/osd/OSD.h
- @@ -482,6 +482,7 @@ public:
- virtual const char** get_tracked_conf_keys() const;
- virtual void handle_conf_change(const struct md_config_t *conf,
- const std::set <std::string> &changed);
- + double pg_cost(PG* pg);
- protected:
- Mutex osd_lock; // global lock
- @@ -735,6 +736,7 @@ private:
- PGRef > {
- Mutex qlock;
- map<PG*, list<OpRequestRef> > pg_for_processing;
- + map<PG*, double> pg_for_processing_costs;
- OSD *osd;
- PrioritizedQueue<pair<PGRef, OpRequestRef>, entity_inst_t > pqueue;
- OpWQ(OSD *o, time_t ti, ThreadPool *tp)
- @@ -751,6 +753,9 @@ private:
- pqueue.dump(f);
- }
- + void _calculate_pg_cost(PGRef pg);
- + double _get_pg_cost(PGRef pg);
- + void _push_pg(PGRef pg);
- void _enqueue_front(pair<PGRef, OpRequestRef> item);
- void _enqueue(pair<PGRef, OpRequestRef> item);
- PGRef _dequeue();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement