Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- From 8ee993cf53df7f22c95ca0ba42384d1d62e17f8c Mon Sep 17 00:00:00 2001
- From: Your Name <you@example.com>
- Date: Fri, 22 Sep 2017 10:10:42 +0800
- Subject: [PATCH 1/1] test workqueue
- Signed-off-by: Your Name <you@example.com>
- ---
- src/common/tq.h | 90 +++++++++++++++++++++++++++++++++++++++++++++++++++
- src/core/HDCSCore.cpp | 5 ++-
- src/core/HDCSCore.h | 2 ++
- 3 files changed, 96 insertions(+), 1 deletion(-)
- create mode 100644 src/common/tq.h
- diff --git a/src/common/tq.h b/src/common/tq.h
- new file mode 100644
- index 0000000..8e33a7a
- --- /dev/null
- +++ b/src/common/tq.h
- @@ -0,0 +1,90 @@
- +// Copyright [2017] <Intel>
- +#ifndef WORK_QUEUE_HPP_
- +#define WORK_QUEUE_HPP_
- +
- +#include <thread>
- +#include <mutex>
- +#include <vector>
- +#include <deque>
- +#include <list>
- +#include <future>
- +#include <functional>
- +#include <condition_variable>
- +
- +namespace hdcs {
- +class TWorkQueue {
- + typedef std::function<void(void)> fp_t;
- +
- + bool stop;
- + std::deque<fp_t> job_queue;
- + std::vector<std::thread> threads;
- + std::mutex q_lock;
- + std::condition_variable cond;
- +
- + void add_worker() {
- + std::thread t([this]() {
- + while(1) {
- + std::function<void()> task;
- + {
- + std::unique_lock<std::mutex> lock(q_lock);
- + cond.wait(lock, [this]{ return this->stop || !this->job_queue.empty(); });
- +
- + if(this->stop && this->job_queue.empty()) {
- + return;
- + }
- +
- + task = std::move(job_queue.front());
- + job_queue.pop_front();
- + }
- + task();
- + }
- + });
- + threads.push_back(std::move(t));
- + }
- +
- + public:
- + TWorkQueue(size_t thd_cnt = 1) : stop(false) {
- + for ( unsigned i = 0; i < thd_cnt; ++i )
- + add_worker();
- + }
- + ~TWorkQueue() {
- + {
- + std::unique_lock<std::mutex> lock(q_lock);
- + stop = true;
- + }
- + cond.notify_all();
- + for (auto &t : threads)
- + t.join();
- + }
- +
- + template<class F, class... Args>
- + auto add_task(F&& f, Args&&... args)
- + -> std::future<typename std::result_of<F(Args...)>::type> {
- + using return_type = typename std::result_of<F(Args...)>::type;
- +
- + auto task = std::make_shared< std::packaged_task<return_type()> >(
- + std::bind(std::forward<F>(f), std::forward<Args>(args)...)
- + );
- +
- + std::future<return_type> ret = task->get_future();
- + {
- + std::unique_lock<std::mutex> lock(q_lock);
- +
- + if(stop) {
- + throw std::runtime_error("add_task on stopped WorkQueue");
- + }
- +
- + job_queue.push_back([task](){ (*task)(); });
- + }
- +
- + cond.notify_one();
- +
- + return ret;
- + }
- +
- +
- +
- +};
- +} // namespace dslab
- +
- +#endif // WORK_QUEUE_HPP_
- diff --git a/src/core/HDCSCore.cpp b/src/core/HDCSCore.cpp
- index 34bd3a5..d2fa40d 100644
- --- a/src/core/HDCSCore.cpp
- +++ b/src/core/HDCSCore.cpp
- @@ -23,6 +23,7 @@ HDCSCore::HDCSCore() {
- int hdcs_thread_max = stoi(config->configValues["cacheservice_threads_num"]);
- hdcs_op_threads = new ThreadPool( hdcs_thread_max );
- + tq = new TWorkQueue(hdcs_thread_max);
- uint64_t total_size = stoull(config->configValues["total_size"]);
- uint64_t block_size = stoull(config->configValues["cache_min_alloc_size"]);
- uint64_t cache_size = stoull(config->configValues["cache_total_size"]);
- @@ -44,6 +45,7 @@ HDCSCore::HDCSCore() {
- HDCSCore::~HDCSCore() {
- go = false;
- delete hdcs_op_threads;
- + delete tq;
- delete policy;
- delete block_guard;
- //delete main_thread;
- @@ -96,7 +98,8 @@ void HDCSCore::map_block(BlockRequest &&block_request) {
- }
- block->block_mutex.unlock();
- if (do_process) {
- - hdcs_op_threads->schedule(std::bind(&BlockOp::send, block_ops_head));
- + tq->add_task(std::bind(&BlockOp::send, block_ops_head));
- + //hdcs_op_threads->schedule(std::bind(&BlockOp::send, block_ops_head));
- //block_ops_head->send();
- }
- }
- diff --git a/src/core/HDCSCore.h b/src/core/HDCSCore.h
- index 8d45396..6a7c58c 100644
- --- a/src/core/HDCSCore.h
- +++ b/src/core/HDCSCore.h
- @@ -3,6 +3,7 @@
- #define HDCS_CORE_H
- #include "common/WorkQueue.h"
- +#include "common/tq.h"
- #include "common/Request.h"
- #include "common/Config.h"
- #include "common/ThreadPool.h"
- @@ -23,6 +24,7 @@ namespace core {
- void aio_write (const char* data, uint64_t offset, uint64_t length, void* c);
- private:
- ThreadPool *hdcs_op_threads;
- + TWorkQueue* tq;
- ThreadPool *main_thread;
- bool go;
- Config *config;
- --
- 1.9.1
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement