Advertisement
Guest User

Untitled

a guest
Jul 27th, 2017
59
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 8.00 KB | None | 0 0
  1. ////////////////////////////////////////////
  2. // Author: Reggie Meisler
  3. // Brief: Simple task-based threading system.
  4. //        Highly efficient, lockless and
  5. //        easy to use.
  6. //
  7. //        Inherit from ITask to create a basic
  8. //        parallel operation, or Future<...>
  9. //        in order to store data on completion.
  10. ////////////////////////////////////////////
  11. // Example use:
  12. //
  13. // // This template parameter represents the type
  14. // // of data that your tasks will return!
  15. // TaskList<int> tasks(&scheduler);
  16. //
  17. // // Push some tasks onto the list
  18. // // Each task is scheduled the moment it is added
  19. // // There are no gaurentees on order of completion
  20. // tasks.Add(new PrintTask("I "));
  21. // tasks.Add(new PrintTask(" love"));
  22. // tasks.Add(new PrintTask(" pie!"));
  23. //
  24. // // Can either Join or signal on completion
  25. // tasks.OnComplete += TaskList<int>::CompletionDelegate(CombineResults);
  26. //
  27. // tasks.Join();
  28. //
  29. ////////////////////////////////////////////
  30.  
  31. #pragma once
  32.  
  33. #include "Event.hpp"
  34.  
  35. #define WIN32_LEAN_AND_MEAN
  36. #include <windows.h>
  37.  
  38. #include <process.h>
  39.  
  40. namespace MF
  41. {
  42.  
  43. // Basic task structure you may inherit from
  44. class ITask
  45. {
  46.   public:
  47.    
  48.     virtual ~ITask() {}
  49.  
  50.     virtual void Execute() = 0;
  51.     virtual bool IsFuture() const { return false; }
  52.  
  53.     // This should only be called by Scheduler
  54.     void Complete()
  55.     {
  56.       InterlockedDecrement(mSiblingCount);
  57.      
  58.       // Note: the final sibling is the completion job
  59.       if( *mSiblingCount == 1 )
  60.       {
  61.         SetEvent(mOSCompletionEvent);
  62.       }
  63.     }
  64.    
  65.   private:
  66.    
  67.     template <typename T>
  68.     friend class TaskList;
  69.    
  70.     volatile long* mSiblingCount;
  71.     HANDLE mOSCompletionEvent;
  72.    
  73. };
  74.  
  75.  
  76. class Scheduler
  77. {
  78.   private:
  79.    
  80.     // We're going to avoid using the intrusive slist here in favor of a simple,
  81.     // lockless slist, crafted especially for the tasklist (Similar to Win32 SLIST)
  82.     struct AsyncNode
  83.     {
  84.       ITask* task;
  85.       AsyncNode* next;
  86.     };
  87.    
  88.     AsyncNode* mProcessList;
  89.    
  90.     HANDLE mTasksReady;
  91.     HANDLE* mThreadHnd;
  92.     unsigned char mThreadNum;
  93.  
  94.     long mLastTaskCount;
  95.    
  96.     bool mRunning;
  97.    
  98.     // Time to wait for threads to exit
  99.     static const long WAIT_TIME_BEFORE_EXIT = 1000;
  100.    
  101.   public:
  102.    
  103.     Scheduler()
  104.       : mProcessList(NULL),
  105.         mRunning(true)
  106.     {
  107.       SYSTEM_INFO sysinfo;
  108.       GetSystemInfo( &sysinfo );
  109.      
  110.       // Number of threads to spawn based on number of available processors!
  111.       // - Note: Includes main thread, we will spawn a secondary thread that has the same affinity
  112.       mThreadNum = sysinfo.dwNumberOfProcessors;
  113.       mThreadHnd = new HANDLE[mThreadNum];
  114.      
  115.       // Create semaphore
  116.       mTasksReady = CreateSemaphore(0, 0, mThreadNum, 0);
  117.       mLastTaskCount = 0;
  118.      
  119.       // Spawn threads with unique affinity (For each core)
  120.       for( unsigned i = 0; i < mThreadNum; ++i )
  121.       {
  122.         mThreadHnd[i] = (HANDLE)_beginthread(ThreadFnc, 0, this);
  123.        
  124.         SetThreadAffinityMask(mThreadHnd[i], (DWORD_PTR)1 << i);
  125.       }
  126.     }
  127.    
  128.     ~Scheduler()
  129.     {
  130.       // Set exiting flag
  131.       mRunning = false;
  132.      
  133.       // Wake up thread
  134.       long lastCount = 0;
  135.       ReleaseSemaphore(mTasksReady, 1, &lastCount);
  136.      
  137.       // Close handles
  138.       for( unsigned i = 0; i < mThreadNum; ++i )
  139.       {
  140.         WaitForSingleObject(mThreadHnd[i], WAIT_TIME_BEFORE_EXIT);
  141.         CloseHandle(mThreadHnd[i]);
  142.       }
  143.      
  144.       delete [] mThreadHnd;
  145.      
  146.       CloseHandle(mTasksReady);
  147.     }
  148.    
  149.   private:
  150.    
  151.     template <typename T>
  152.     friend class TaskList;
  153.    
  154.     void Process(ITask* task)
  155.     {
  156.       // Push task onto list
  157.       AsyncNode* newNode = new AsyncNode();
  158.       newNode->task = task;
  159.      
  160.       // Link up with front node and swap head pointer with new node
  161.       newNode->next = mProcessList;
  162.      
  163.       InterlockedExchangePointer(&mProcessList, newNode);
  164.      
  165.       // Wake up a thread
  166.       ReleaseSemaphore(mTasksReady, 1, &mLastTaskCount);
  167.     }
  168.    
  169.     AsyncNode* Pop()
  170.     {
  171.       // Unlink front node
  172.       AsyncNode* top = mProcessList;
  173.       AsyncNode* next = top->next;
  174.      
  175.       InterlockedExchangePointer(&mProcessList, next);
  176.      
  177.       return top;
  178.     }
  179.    
  180.     friend void ThreadFnc(void* data)
  181.     {
  182.       Scheduler* scheduler = (Scheduler*)data;
  183.      
  184.       while( scheduler->mRunning )
  185.       {
  186.         // Sleep until we have tasks ready
  187.         WaitForSingleObject(scheduler->mTasksReady, INFINITE);
  188.        
  189.         // Grab next task from slist
  190.         AsyncNode* node = scheduler->Pop();
  191.        
  192.         // Do it!
  193.         node->task->Execute();
  194.         node->task->Complete();
  195.        
  196.         // Simple check for if this task is a future
  197.         // (Don't cleanup memory here in this case)
  198.         if( !node->task->IsFuture() )
  199.         {
  200.           delete node->task;
  201.           delete node;
  202.         }
  203.       }
  204.     }
  205.    
  206. };
  207.  
  208.  
  209. // Inherit from this class instead of ITask if you want to contain resulting data
  210. template <typename ResultType>
  211. class Future : public ITask
  212. {
  213.   public:
  214.    
  215.     virtual bool IsFuture() const { return true; }
  216.  
  217.     ResultType Result;
  218.     SLink< Future<ResultType> > link;
  219.    
  220. };
  221.  
  222.  
  223. // Specialize void to have no result data
  224. // (This should honestly never be inherited from, we just needs a special case for
  225. //  template resolution on TaskList<void>)
  226. template <>
  227. class Future<void> : public ITask
  228. {
  229.   public:
  230.    
  231.     SLink< Future<void> > link;
  232.    
  233. };
  234.  
  235.  
  236. // Basic task used by the TaskList that oversees the completion of all tasks, and invokes the OnComplete event
  237. template <typename ResultType>
  238. class CompletionTask : public ITask
  239. {
  240.   public:
  241.    
  242.     typedef IntrusiveSList<Future<ResultType>, &Future<ResultType>::link> FutureList;
  243.    
  244.     CompletionTask(FutureList& futureList, HANDLE osEvent, Event<void (FutureList&)>& event)
  245.       : mFutureList(futureList),
  246.         mOSEvent(osEvent),
  247.         mEvent(event)
  248.     {
  249.      
  250.     }
  251.    
  252.     virtual void Execute()
  253.     {
  254.       // Wait for event to fire
  255.       WaitForSingleObject(mOSEvent, INFINITE);
  256.      
  257.       // Signal event
  258.       mEvent(mFutureList);
  259.      
  260.       // Cleanup memory
  261.       Future<ResultType>* deleteThis = NULL;
  262.      
  263.       for( Future<ResultType>* itr = mFutureList.GetFirst();
  264.            itr != mFutureList.GetLast(); )
  265.       {
  266.         deleteThis = itr;
  267.         mFutureList.Iterate(itr);
  268.         delete deleteThis;
  269.       }
  270.     }
  271.    
  272.   private:
  273.    
  274.     FutureList& mFutureList;
  275.     HANDLE mOSEvent;
  276.     Event<void (FutureList&)>& mEvent;
  277.    
  278. };
  279.  
  280.  
  281. // Simple list of tasks, manages processing with the Scheduler
  282. template <typename ResultType = void>
  283. class TaskList
  284. {
  285.   public:
  286.    
  287.     typedef IntrusiveSList<Future<ResultType>, &Future<ResultType>::link> FutureList;
  288.     typedef Delegate<void (FutureList&)> CompletionDelegate;
  289.    
  290.     Event<void (FutureList&)> OnComplete;
  291.    
  292.   private:
  293.    
  294.     FutureList mFutureList;
  295.     Scheduler* mScheduler;
  296.     HANDLE mEvent;
  297.     volatile long mCount;
  298.    
  299.   public:
  300.    
  301.     TaskList(Scheduler* scheduler)
  302.       : mScheduler(scheduler),
  303.         mEvent(CreateEvent(0,0,0,0)),
  304.         mCount(0)
  305.     {
  306.       // Throw task on another thread which waits for
  307.       // osCompletion event to fire, and invokes OnComplete event
  308.       Add(new CompletionTask<ResultType>(mFutureList, mEvent, OnComplete));
  309.     }
  310.    
  311.     ~TaskList()
  312.     {
  313.       CloseHandle(mEvent);
  314.     }
  315.    
  316.     void Add(Future<ResultType>* task)
  317.     {
  318.       mFutureList.PushFront(task);
  319.      
  320.       Add((ITask*)task);
  321.     }
  322.    
  323.     void Add(ITask* task)
  324.     {
  325.       InterlockedIncrement(&mCount);
  326.      
  327.       task->mSiblingCount = &mCount;
  328.       task->mOSCompletionEvent = mEvent;
  329.      
  330.       mScheduler->Process(task);
  331.     }
  332.    
  333.     void Join()
  334.     {
  335.       // Wait for event to fire
  336.       WaitForSingleObject(mEvent, INFINITE);
  337.     }
  338.    
  339. };
  340.  
  341. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement