Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <utility>
- #include <tuple>
- #include <future>
- #include <mutex>
- #include <thread>
- #include <iostream>
- // This code is used to unpack tuple elements into function arguments.
- // apply(f, tuple<int,bool,char>) would be unpacked as f(int,bool,char)
- // This implementation has been specialized for tuples of futures.
- template<size_t N>
- struct Apply {
- template<typename F, typename T, typename... A>
- static inline auto apply(F & f, T & t, A &... a) {
- return Apply<N - 1>::apply(f, t, a...,
- ::std::get<std::tuple_size<std::decay_t<T>>::value - N >(t)
- );
- }
- };
- template<>
- struct Apply<0> {
- template<typename F, typename T, typename... A>
- static inline auto apply(F & f, T &, A &... a) {
- // This invokes the predicate with the result of the .get() method on each future in the tuple.
- return f((a.get())...);
- }
- };
- template<typename F, typename T>
- inline auto apply(F & f, T & t) {
- return Apply< ::std::tuple_size< ::std::decay_t<T>
- >::value>::apply(f, t);
- }
- // integer_sequence is used to generate a sequence of consecutive compile time integers
- // The integers are used for iterating over elements of a tuple
- template<int... Ints>
- struct integer_sequence{};
- template<int N, int... Ints>
- struct generate_integer_sequence : generate_integer_sequence<N-1, N-1, Ints...>{};
- template <int... Ints>
- struct generate_integer_sequence<0, Ints...> : integer_sequence<Ints...>{};
- template<typename T, typename F, int... Ints>
- void for_each_tuple_element_impl(T&& t, F f, integer_sequence<Ints...>){
- [](...){}((f(::std::get<Ints>(t)),0)...);
- }
- template<typename... Ts, typename F>
- void for_each_tuple_element(::std::tuple<Ts...> &t, F f){
- for_each_tuple_element_impl(t, f, generate_integer_sequence<sizeof...(Ts)>());
- }
- // this is used as a work around since task can't have 2 different parameter packs
- // the make_task function wraps the predicate in this type to package type information with it
- // the wrapped predicate is then passed to the task where it aids with type computations
- template<typename ReturnType, typename... Args>
- struct predicate_wrapper{
- using result_t = ReturnType;
- using predicate_t = ReturnType(*)(Args...);
- using packaged_task_t = std::packaged_task<result_t(Args...)>;
- predicate_wrapper(predicate_t pred) : predicate(pred) {}
- predicate_t predicate;
- };
- // This function is used to deduce the type parameters for the predicate_wrapper struct.
- template<typename ReturnType, typename... Args>
- auto make_predicate_wrapper(ReturnType(*pred)(Args...)){
- return predicate_wrapper<ReturnType, Args...>(pred);
- }
- class semaphore{
- public:
- semaphore(int count = 0)
- : m_thread_count(count)
- {}
- semaphore& operator=(const semaphore&) = delete;
- semaphore& operator=(semaphore&&) = delete;
- semaphore(const semaphore&) = delete;
- semaphore(semaphore&&) = delete;
- inline void wait() {
- std::unique_lock<decltype(m_mutex)> lock(m_mutex);
- m_cv.wait(lock, [this]{ return m_thread_count > 0; });
- --m_thread_count;
- }
- inline void notify() {
- std::unique_lock<decltype(m_mutex)> lock(m_mutex);
- m_thread_count++;
- m_cv.notify_one();
- }
- private:
- std::mutex m_mutex;
- std::condition_variable m_cv;
- int m_thread_count{0};
- };
- template<typename Predicate, typename... Dependencies>
- struct task{
- using predicate_t = typename Predicate::predicate_t;
- using result_t = typename Predicate::result_t;
- using task_t = typename Predicate::packaged_task_t;
- using dependencies_t = std::tuple<Dependencies&...>;
- using parameters_t = std::tuple<std::future<typename std::decay_t<Dependencies>::result_t>...>;
- task(Predicate Task, Dependencies&... dependencies)
- : m_params(std::move(dependencies.get_future())...)
- , m_dependencies(std::forward<Dependencies>(dependencies)...)
- , m_task(std::move(Task.predicate))
- {}
- void start(int threads = 0){
- if(threads){
- m_semaphore = new semaphore(threads);
- }
- // recursively start each sub-task on another thread.
- for_each_tuple_element(m_dependencies, [this](auto& f){
- std::async([this, &f]{ f.start_child(m_semaphore); });
- });
- // wait until all dependent tasks are done executing
- for_each_tuple_element(m_params, [](auto& f){
- f.wait();
- });
- // at this point all of the sub-tasks should've been completed
- // it should be safe to execute this task now,
- // but you need to make sure it's okay for another thread to execute it's work
- m_semaphore->wait();
- apply(m_task, m_params);
- m_semaphore->notify();
- if(threads){
- delete m_semaphore;
- }
- }
- std::future<result_t>&& get_future(){
- return std::move(m_task.get_future());
- }
- void start_child(semaphore* sem){
- m_semaphore = sem;
- start();
- }
- semaphore* m_semaphore;
- parameters_t m_params;
- dependencies_t m_dependencies;
- task_t m_task;
- };
- namespace detail {
- // This inner make_task is used so that the first call can wrap the predicate in a wrapper class.
- // The wrapper class exposes some information about the predicate so it is easier to work with.
- template<typename Predicate, typename... Dependencies>
- auto make_task_impl(Predicate pred, Dependencies&&... dependencies){
- return task<Predicate, Dependencies...>(pred, std::forward<Dependencies>(dependencies)...);
- }
- }
- template<typename Predicate, typename... Dependencies>
- auto make_task(Predicate pred, Dependencies&&... dependencies){
- return detail::make_task_impl(make_predicate_wrapper(pred), std::forward<Dependencies>(dependencies)...);
- }
- // Example usage is below
- struct int1{
- int value{1};
- };
- struct int2{
- int value{2};
- };
- struct int3{
- int value{3};
- };
- struct int4{
- int value{4};
- };
- int1 foo1() { std::cout << "Executing 1.\n"; return int1{};}
- int2 foo2() { std::cout << "Executing 2.\n"; return int2{};}
- int3 foo3() { std::cout << "Executing 3.\n"; return int3{};}
- int4 foo4() { std::cout << "Executing 4.\n"; return int4{};}
- int bar(int1 i1, int2 i2, int3 i3, int4 i4){ std::cout << "Executing bar.\n"; return i1.value + i2.value + i3.value + i4.value; }
- int main(){
- auto t1 = make_task(foo1);
- auto t2 = make_task(foo2);
- auto t3 = make_task(foo3);
- auto t4 = make_task(foo4);
- auto t5 = make_task(bar, t1, t2, t3, t4);
- auto f = t5.get_future();
- const int num_threads{ 2 };
- t5.start( num_threads );
- std::cout << f.get() << std::endl;
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement