Advertisement
Guest User

Untitled

a guest
Feb 25th, 2017
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.26 KB | None | 0 0
  1. #include <utility>
  2. #include <tuple>
  3. #include <future>
  4. #include <mutex>
  5. #include <thread>
  6. #include <iostream>
  7.  
  8. // This code is used to unpack tuple elements into function arguments.
  9. // apply(f, tuple<int,bool,char>) would be unpacked as f(int,bool,char)
  10. // This implementation has been specialized for tuples of futures.
  11. template<size_t N>
  12. struct Apply {
  13. template<typename F, typename T, typename... A>
  14. static inline auto apply(F & f, T & t, A &... a) {
  15. return Apply<N - 1>::apply(f, t, a...,
  16. ::std::get<std::tuple_size<std::decay_t<T>>::value - N >(t)
  17. );
  18. }
  19. };
  20. template<>
  21. struct Apply<0> {
  22. template<typename F, typename T, typename... A>
  23. static inline auto apply(F & f, T &, A &... a) {
  24. // This invokes the predicate with the result of the .get() method on each future in the tuple.
  25. return f((a.get())...);
  26. }
  27. };
  28.  
  29. template<typename F, typename T>
  30. inline auto apply(F & f, T & t) {
  31. return Apply< ::std::tuple_size< ::std::decay_t<T>
  32. >::value>::apply(f, t);
  33. }
  34.  
  35. // integer_sequence is used to generate a sequence of consecutive compile time integers
  36. // The integers are used for iterating over elements of a tuple
  37. template<int... Ints>
  38. struct integer_sequence{};
  39.  
  40. template<int N, int... Ints>
  41. struct generate_integer_sequence : generate_integer_sequence<N-1, N-1, Ints...>{};
  42.  
  43. template <int... Ints>
  44. struct generate_integer_sequence<0, Ints...> : integer_sequence<Ints...>{};
  45.  
  46. template<typename T, typename F, int... Ints>
  47. void for_each_tuple_element_impl(T&& t, F f, integer_sequence<Ints...>){
  48. [](...){}((f(::std::get<Ints>(t)),0)...);
  49. }
  50.  
  51. template<typename... Ts, typename F>
  52. void for_each_tuple_element(::std::tuple<Ts...> &t, F f){
  53. for_each_tuple_element_impl(t, f, generate_integer_sequence<sizeof...(Ts)>());
  54. }
  55.  
  56. // this is used as a work around since task can't have 2 different parameter packs
  57. // the make_task function wraps the predicate in this type to package type information with it
  58. // the wrapped predicate is then passed to the task where it aids with type computations
  59. template<typename ReturnType, typename... Args>
  60. struct predicate_wrapper{
  61. using result_t = ReturnType;
  62. using predicate_t = ReturnType(*)(Args...);
  63. using packaged_task_t = std::packaged_task<result_t(Args...)>;
  64. predicate_wrapper(predicate_t pred) : predicate(pred) {}
  65. predicate_t predicate;
  66. };
  67.  
  68. // This function is used to deduce the type parameters for the predicate_wrapper struct.
  69. template<typename ReturnType, typename... Args>
  70. auto make_predicate_wrapper(ReturnType(*pred)(Args...)){
  71. return predicate_wrapper<ReturnType, Args...>(pred);
  72. }
  73.  
  74. class semaphore{
  75. public:
  76. semaphore(int count = 0)
  77. : m_thread_count(count)
  78. {}
  79.  
  80. semaphore& operator=(const semaphore&) = delete;
  81. semaphore& operator=(semaphore&&) = delete;
  82. semaphore(const semaphore&) = delete;
  83. semaphore(semaphore&&) = delete;
  84.  
  85. inline void wait() {
  86. std::unique_lock<decltype(m_mutex)> lock(m_mutex);
  87. m_cv.wait(lock, [this]{ return m_thread_count > 0; });
  88. --m_thread_count;
  89. }
  90.  
  91. inline void notify() {
  92. std::unique_lock<decltype(m_mutex)> lock(m_mutex);
  93. m_thread_count++;
  94. m_cv.notify_one();
  95. }
  96.  
  97. private:
  98. std::mutex m_mutex;
  99. std::condition_variable m_cv;
  100. int m_thread_count{0};
  101. };
  102.  
  103. template<typename Predicate, typename... Dependencies>
  104. struct task{
  105. using predicate_t = typename Predicate::predicate_t;
  106. using result_t = typename Predicate::result_t;
  107. using task_t = typename Predicate::packaged_task_t;
  108. using dependencies_t = std::tuple<Dependencies&...>;
  109. using parameters_t = std::tuple<std::future<typename std::decay_t<Dependencies>::result_t>...>;
  110.  
  111. task(Predicate Task, Dependencies&... dependencies)
  112. : m_params(std::move(dependencies.get_future())...)
  113. , m_dependencies(std::forward<Dependencies>(dependencies)...)
  114. , m_task(std::move(Task.predicate))
  115. {}
  116.  
  117. void start(int threads = 0){
  118. if(threads){
  119. m_semaphore = new semaphore(threads);
  120. }
  121.  
  122. // recursively start each sub-task on another thread.
  123. for_each_tuple_element(m_dependencies, [this](auto& f){
  124. std::async([this, &f]{ f.start_child(m_semaphore); });
  125. });
  126.  
  127. // wait until all dependent tasks are done executing
  128. for_each_tuple_element(m_params, [](auto& f){
  129. f.wait();
  130. });
  131.  
  132. // at this point all of the sub-tasks should've been completed
  133. // it should be safe to execute this task now,
  134. // but you need to make sure it's okay for another thread to execute it's work
  135. m_semaphore->wait();
  136. apply(m_task, m_params);
  137. m_semaphore->notify();
  138.  
  139. if(threads){
  140. delete m_semaphore;
  141. }
  142. }
  143.  
  144. std::future<result_t>&& get_future(){
  145. return std::move(m_task.get_future());
  146. }
  147.  
  148. void start_child(semaphore* sem){
  149. m_semaphore = sem;
  150. start();
  151. }
  152.  
  153. semaphore* m_semaphore;
  154. parameters_t m_params;
  155. dependencies_t m_dependencies;
  156. task_t m_task;
  157. };
  158.  
  159. namespace detail {
  160. // This inner make_task is used so that the first call can wrap the predicate in a wrapper class.
  161. // The wrapper class exposes some information about the predicate so it is easier to work with.
  162. template<typename Predicate, typename... Dependencies>
  163. auto make_task_impl(Predicate pred, Dependencies&&... dependencies){
  164. return task<Predicate, Dependencies...>(pred, std::forward<Dependencies>(dependencies)...);
  165. }
  166. }
  167. template<typename Predicate, typename... Dependencies>
  168. auto make_task(Predicate pred, Dependencies&&... dependencies){
  169. return detail::make_task_impl(make_predicate_wrapper(pred), std::forward<Dependencies>(dependencies)...);
  170. }
  171.  
  172.  
  173. // Example usage is below
  174. struct int1{
  175. int value{1};
  176. };
  177. struct int2{
  178. int value{2};
  179. };
  180. struct int3{
  181. int value{3};
  182. };
  183. struct int4{
  184. int value{4};
  185. };
  186. int1 foo1() { std::cout << "Executing 1.\n"; return int1{};}
  187. int2 foo2() { std::cout << "Executing 2.\n"; return int2{};}
  188. int3 foo3() { std::cout << "Executing 3.\n"; return int3{};}
  189. int4 foo4() { std::cout << "Executing 4.\n"; return int4{};}
  190. int bar(int1 i1, int2 i2, int3 i3, int4 i4){ std::cout << "Executing bar.\n"; return i1.value + i2.value + i3.value + i4.value; }
  191.  
  192. int main(){
  193. auto t1 = make_task(foo1);
  194. auto t2 = make_task(foo2);
  195. auto t3 = make_task(foo3);
  196. auto t4 = make_task(foo4);
  197. auto t5 = make_task(bar, t1, t2, t3, t4);
  198.  
  199. auto f = t5.get_future();
  200. const int num_threads{ 2 };
  201. t5.start( num_threads );
  202.  
  203. std::cout << f.get() << std::endl;
  204. return 0;
  205. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement