Advertisement
Guest User

Untitled

a guest
Sep 21st, 2017
416
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.30 KB | None | 0 0
  1. From 8ee993cf53df7f22c95ca0ba42384d1d62e17f8c Mon Sep 17 00:00:00 2001
  2. From: Your Name <you@example.com>
  3. Date: Fri, 22 Sep 2017 10:10:42 +0800
  4. Subject: [PATCH 1/1] test workqueue
  5.  
  6. Signed-off-by: Your Name <you@example.com>
  7. ---
  8. src/common/tq.h | 90 +++++++++++++++++++++++++++++++++++++++++++++++++++
  9. src/core/HDCSCore.cpp | 5 ++-
  10. src/core/HDCSCore.h | 2 ++
  11. 3 files changed, 96 insertions(+), 1 deletion(-)
  12. create mode 100644 src/common/tq.h
  13.  
  14. diff --git a/src/common/tq.h b/src/common/tq.h
  15. new file mode 100644
  16. index 0000000..8e33a7a
  17. --- /dev/null
  18. +++ b/src/common/tq.h
  19. @@ -0,0 +1,90 @@
  20. +// Copyright [2017] <Intel>
  21. +#ifndef WORK_QUEUE_HPP_
  22. +#define WORK_QUEUE_HPP_
  23. +
  24. +#include <thread>
  25. +#include <mutex>
  26. +#include <vector>
  27. +#include <deque>
  28. +#include <list>
  29. +#include <future>
  30. +#include <functional>
  31. +#include <condition_variable>
  32. +
  33. +namespace hdcs {
  34. +class TWorkQueue {
  35. + typedef std::function<void(void)> fp_t;
  36. +
  37. + bool stop;
  38. + std::deque<fp_t> job_queue;
  39. + std::vector<std::thread> threads;
  40. + std::mutex q_lock;
  41. + std::condition_variable cond;
  42. +
  43. + void add_worker() {
  44. + std::thread t([this]() {
  45. + while(1) {
  46. + std::function<void()> task;
  47. + {
  48. + std::unique_lock<std::mutex> lock(q_lock);
  49. + cond.wait(lock, [this]{ return this->stop || !this->job_queue.empty(); });
  50. +
  51. + if(this->stop && this->job_queue.empty()) {
  52. + return;
  53. + }
  54. +
  55. + task = std::move(job_queue.front());
  56. + job_queue.pop_front();
  57. + }
  58. + task();
  59. + }
  60. + });
  61. + threads.push_back(std::move(t));
  62. + }
  63. +
  64. + public:
  65. + TWorkQueue(size_t thd_cnt = 1) : stop(false) {
  66. + for ( unsigned i = 0; i < thd_cnt; ++i )
  67. + add_worker();
  68. + }
  69. + ~TWorkQueue() {
  70. + {
  71. + std::unique_lock<std::mutex> lock(q_lock);
  72. + stop = true;
  73. + }
  74. + cond.notify_all();
  75. + for (auto &t : threads)
  76. + t.join();
  77. + }
  78. +
  79. + template<class F, class... Args>
  80. + auto add_task(F&& f, Args&&... args)
  81. + -> std::future<typename std::result_of<F(Args...)>::type> {
  82. + using return_type = typename std::result_of<F(Args...)>::type;
  83. +
  84. + auto task = std::make_shared< std::packaged_task<return_type()> >(
  85. + std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  86. + );
  87. +
  88. + std::future<return_type> ret = task->get_future();
  89. + {
  90. + std::unique_lock<std::mutex> lock(q_lock);
  91. +
  92. + if(stop) {
  93. + throw std::runtime_error("add_task on stopped WorkQueue");
  94. + }
  95. +
  96. + job_queue.push_back([task](){ (*task)(); });
  97. + }
  98. +
  99. + cond.notify_one();
  100. +
  101. + return ret;
  102. + }
  103. +
  104. +
  105. +
  106. +};
  107. +} // namespace dslab
  108. +
  109. +#endif // WORK_QUEUE_HPP_
  110. diff --git a/src/core/HDCSCore.cpp b/src/core/HDCSCore.cpp
  111. index 34bd3a5..d2fa40d 100644
  112. --- a/src/core/HDCSCore.cpp
  113. +++ b/src/core/HDCSCore.cpp
  114. @@ -23,6 +23,7 @@ HDCSCore::HDCSCore() {
  115.  
  116. int hdcs_thread_max = stoi(config->configValues["cacheservice_threads_num"]);
  117. hdcs_op_threads = new ThreadPool( hdcs_thread_max );
  118. + tq = new TWorkQueue(hdcs_thread_max);
  119. uint64_t total_size = stoull(config->configValues["total_size"]);
  120. uint64_t block_size = stoull(config->configValues["cache_min_alloc_size"]);
  121. uint64_t cache_size = stoull(config->configValues["cache_total_size"]);
  122. @@ -44,6 +45,7 @@ HDCSCore::HDCSCore() {
  123. HDCSCore::~HDCSCore() {
  124. go = false;
  125. delete hdcs_op_threads;
  126. + delete tq;
  127. delete policy;
  128. delete block_guard;
  129. //delete main_thread;
  130. @@ -96,7 +98,8 @@ void HDCSCore::map_block(BlockRequest &&block_request) {
  131. }
  132. block->block_mutex.unlock();
  133. if (do_process) {
  134. - hdcs_op_threads->schedule(std::bind(&BlockOp::send, block_ops_head));
  135. + tq->add_task(std::bind(&BlockOp::send, block_ops_head));
  136. + //hdcs_op_threads->schedule(std::bind(&BlockOp::send, block_ops_head));
  137. //block_ops_head->send();
  138. }
  139. }
  140. diff --git a/src/core/HDCSCore.h b/src/core/HDCSCore.h
  141. index 8d45396..6a7c58c 100644
  142. --- a/src/core/HDCSCore.h
  143. +++ b/src/core/HDCSCore.h
  144. @@ -3,6 +3,7 @@
  145. #define HDCS_CORE_H
  146.  
  147. #include "common/WorkQueue.h"
  148. +#include "common/tq.h"
  149. #include "common/Request.h"
  150. #include "common/Config.h"
  151. #include "common/ThreadPool.h"
  152. @@ -23,6 +24,7 @@ namespace core {
  153. void aio_write (const char* data, uint64_t offset, uint64_t length, void* c);
  154. private:
  155. ThreadPool *hdcs_op_threads;
  156. + TWorkQueue* tq;
  157. ThreadPool *main_thread;
  158. bool go;
  159. Config *config;
  160. --
  161. 1.9.1
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement