Advertisement
Guest User

Untitled

a guest
Jul 27th, 2017
60
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 7.84 KB | None | 0 0
  1. ////////////////////////////////////////////
  2. // Author: Reggie Meisler
  3. // Brief: Simple task-based threading system.
  4. //        Highly efficient, lockless and
  5. //        easy to use.
  6. //
  7. //        Inherit from ITask to create a basic
  8. //        parallel operation, or Future<...>
  9. //        in order to store data on completion.
  10. ////////////////////////////////////////////
  11. // Example use:
  12. //
  13. // // This template parameter represents the type
  14. // // of data that your tasks will return!
  15. // TaskList<int> tasks(&scheduler);
  16. //
  17. // // Push some tasks onto the list
  18. // // Each task is scheduled the moment it is added
  19. // // There are no gaurentees on order of completion
  20. // tasks.Add(new PrintTask("I "));
  21. // tasks.Add(new PrintTask(" love"));
  22. // tasks.Add(new PrintTask(" pie!"));
  23. //
  24. // // Can either Join or signal on completion
  25. // tasks.OnComplete += TaskList<int>::CompletionDelegate(CombineResults);
  26. //
  27. // tasks.Join();
  28. //
  29. ////////////////////////////////////////////
  30.  
  31. #pragma once
  32.  
  33. #include "Event.hpp"
  34.  
  35. #define WIN32_LEAN_AND_MEAN
  36. #include <windows.h>
  37.  
  38. #include <process.h>
  39.  
  40. namespace MF
  41. {
  42.  
  43. // Basic task structure you may inherit from
  44. class ITask
  45. {
  46.   public:
  47.    
  48.     virtual ~ITask() {}
  49.  
  50.     virtual void Execute() = 0;
  51.     virtual bool IsFuture() const { return false; }
  52.  
  53.     // This should only be called by Scheduler
  54.     void Complete()
  55.     {
  56.       InterlockedDecrement(mSiblingCount);
  57.      
  58.       // Note: the final sibling is the completion job
  59.       if( *mSiblingCount == 1 )
  60.       {
  61.         SetEvent(mOSCompletionEvent);
  62.       }
  63.     }
  64.    
  65.   private:
  66.    
  67.     template <typename T>
  68.     friend class TaskList;
  69.    
  70.     volatile long* mSiblingCount;
  71.     HANDLE mOSCompletionEvent;
  72.    
  73. };
  74.  
  75.  
  76. class Scheduler
  77. {
  78.   private:
  79.    
  80.     // We're going to avoid using the intrusive slist here in favor of a simple,
  81.     // lockless slist, crafted especially for the tasklist (Similar to Win32 SLIST)
  82.     struct AsyncNode
  83.     {
  84.       ITask* task;
  85.       AsyncNode* next;
  86.     };
  87.    
  88.     AsyncNode* mProcessList;
  89.    
  90.     HANDLE mTasksReady;
  91.     HANDLE* mThreadHnd;
  92.     unsigned char mThreadNum;
  93.  
  94.     long mLastTaskCount;
  95.    
  96.     bool mRunning;
  97.    
  98.     // Time to wait for threads to exit
  99.     static const long WAIT_TIME_BEFORE_EXIT = 1000;
  100.    
  101.   public:
  102.    
  103.     Scheduler()
  104.       : mProcessList(NULL),
  105.         mRunning(true)
  106.     {
  107.       SYSTEM_INFO sysinfo;
  108.       GetSystemInfo( &sysinfo );
  109.      
  110.       // Number of threads to spawn based on number of available processors!
  111.       // - Note: Includes main thread, we will spawn a secondary thread that has the same affinity
  112.       mThreadNum = sysinfo.dwNumberOfProcessors;
  113.       mThreadHnd = new HANDLE[mThreadNum];
  114.      
  115.       // Create semaphore
  116.       mTasksReady = CreateSemaphore(0, 0, mThreadNum, 0);
  117.       mLastTaskCount = 0;
  118.      
  119.       // Spawn threads with unique affinity (For each core)
  120.       for( unsigned i = 0; i < mThreadNum; ++i )
  121.       {
  122.         mThreadHnd[i] = (HANDLE)_beginthread(ThreadFnc, 0, this);
  123.        
  124.         SetThreadAffinityMask(mThreadHnd[i], (DWORD_PTR)1 << i);
  125.       }
  126.     }
  127.    
  128.     ~Scheduler()
  129.     {
  130.       // Set exiting flag
  131.       mRunning = false;
  132.      
  133.       // Wake up thread
  134.       long lastCount = 0;
  135.       ReleaseSemaphore(mTasksReady, 1, &lastCount);
  136.      
  137.       // Close handles
  138.       for( unsigned i = 0; i < mThreadNum; ++i )
  139.       {
  140.         WaitForSingleObject(mThreadHnd[i], WAIT_TIME_BEFORE_EXIT);
  141.         CloseHandle(mThreadHnd[i]);
  142.       }
  143.      
  144.       delete [] mThreadHnd;
  145.      
  146.       CloseHandle(mTasksReady);
  147.     }
  148.    
  149.   private:
  150.    
  151.     template <typename T>
  152.     friend class TaskList;
  153.    
  154.     void Process(ITask* task)
  155.     {
  156.       // Push task onto list
  157.       AsyncNode* newNode = new AsyncNode();
  158.       newNode->task = task;
  159.      
  160.       // Link up with front node and swap head pointer with new node
  161.       newNode->next = mProcessList;
  162.      
  163.       InterlockedExchangePointer(&mProcessList, newNode);
  164.      
  165.       // Wake up a thread
  166.       ReleaseSemaphore(mTasksReady, 1, &mLastTaskCount);
  167.     }
  168.    
  169.     AsyncNode* Pop()
  170.     {
  171.       // Unlink front node
  172.       AsyncNode* top = mProcessList;
  173.       AsyncNode* next = top->next;
  174.      
  175.       InterlockedExchangePointer(&mProcessList, next);
  176.      
  177.       return top;
  178.     }
  179.    
  180.     friend void ThreadFnc(void* data)
  181.     {
  182.       Scheduler* scheduler = (Scheduler*)data;
  183.      
  184.       while( scheduler->mRunning )
  185.       {
  186.         // Sleep until we have tasks ready
  187.         WaitForSingleObject(scheduler->mTasksReady, INFINITE);
  188.        
  189.         // Grab next task from slist
  190.         AsyncNode* node = scheduler->Pop();
  191.        
  192.         // Do it!
  193.         node->task->Execute();
  194.         node->task->Complete();
  195.        
  196.         // Simple check for if this task is a future
  197.         // (Don't cleanup memory here in this case)
  198.         if( !node->task->IsFuture() )
  199.         {
  200.           delete node->task;
  201.           delete node;
  202.         }
  203.       }
  204.     }
  205.    
  206. };
  207.  
  208.  
  209. // Inherit from this class instead of ITask if you want to contain resulting data
  210. template <typename ResultType>
  211. class Future : public ITask
  212. {
  213.   public:
  214.    
  215.     virtual bool IsFuture() const { return true; }
  216.  
  217.     ResultType Result;
  218.     SLink< Future<ResultType> > link;
  219.    
  220. };
  221.  
  222.  
  223. // Specialize void to have no result data
  224. template <>
  225. class Future<void> : public ITask
  226. {
  227.   public:
  228.    
  229.     SLink< Future<void> > link;
  230.    
  231. };
  232.  
  233.  
  234. // Basic task used by the TaskList that is fired upon the completion of all tasks
  235. template <typename ResultType>
  236. class CompletionTask : public ITask
  237. {
  238.   public:
  239.    
  240.     typedef IntrusiveSList<Future<ResultType>, &Future<ResultType>::link> FutureList;
  241.    
  242.     CompletionTask(FutureList& futureList, HANDLE osEvent, Event<void (FutureList&)>& event)
  243.       : mFutureList(futureList),
  244.         mOSEvent(osEvent),
  245.         mEvent(event)
  246.     {
  247.      
  248.     }
  249.    
  250.     virtual void Execute()
  251.     {
  252.       // Wait for event to fire
  253.       WaitForSingleObject(mOSEvent, INFINITE);
  254.      
  255.       // Signal event
  256.       mEvent(mFutureList);
  257.      
  258.       // Cleanup memory
  259.       Future<ResultType>* deleteThis = NULL;
  260.      
  261.       for( Future<ResultType>* itr = mFutureList.GetFirst();
  262.            itr != mFutureList.GetLast(); )
  263.       {
  264.         deleteThis = itr;
  265.         mFutureList.Iterate(itr);
  266.         delete deleteThis;
  267.       }
  268.     }
  269.    
  270.   private:
  271.    
  272.     FutureList& mFutureList;
  273.     HANDLE mOSEvent;
  274.     Event<void (FutureList&)>& mEvent;
  275.    
  276. };
  277.  
  278.  
  279. // Simple list of tasks, manages processing with the Scheduler
  280. template <typename ResultType = void>
  281. class TaskList
  282. {
  283.   public:
  284.    
  285.     typedef IntrusiveSList<Future<ResultType>, &Future<ResultType>::link> FutureList;
  286.     typedef Delegate<void (FutureList&)> CompletionDelegate;
  287.    
  288.     Event<void (FutureList&)> OnComplete;
  289.    
  290.   private:
  291.    
  292.     FutureList mFutureList;
  293.     Scheduler* mScheduler;
  294.     HANDLE mEvent;
  295.     volatile long mCount;
  296.    
  297.   public:
  298.    
  299.     TaskList(Scheduler* scheduler)
  300.       : mScheduler(scheduler),
  301.         mEvent(CreateEvent(0,0,0,0)),
  302.         mCount(0)
  303.     {
  304.       // Throw task on another thread which waits for
  305.       // osCompletion event to fire, and invokes OnComplete event
  306.       Add(new CompletionTask<ResultType>(mFutureList, mEvent, OnComplete));
  307.     }
  308.    
  309.     ~TaskList()
  310.     {
  311.       CloseHandle(mEvent);
  312.     }
  313.    
  314.     void Add(Future<ResultType>* task)
  315.     {
  316.       mFutureList.PushFront(task);
  317.      
  318.       Add((ITask*)task);
  319.     }
  320.    
  321.     void Add(ITask* task)
  322.     {
  323.       InterlockedIncrement(&mCount);
  324.      
  325.       task->mSiblingCount = &mCount;
  326.       task->mOSCompletionEvent = mEvent;
  327.      
  328.       mScheduler->Process(task);
  329.     }
  330.    
  331.     void Join()
  332.     {
  333.       // Wait for event to fire
  334.       WaitForSingleObject(mEvent, INFINITE);
  335.     }
  336.    
  337. };
  338.  
  339. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement