Advertisement
Guest User

Untitled

a guest
Jul 27th, 2017
55
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 8.44 KB | None | 0 0
  1. ////////////////////////////////////////////
  2. // Author: Reggie Meisler
  3. // Brief: Simple task-based threading system.
  4. //        Highly efficient, lockless and
  5. //        easy to use.
  6. //
  7. //        Inherit from ITask to create a basic
  8. //        parallel operation, or Future<...>
  9. //        in order to store data on completion.
  10. ////////////////////////////////////////////
  11. // Example use:
  12. //
  13. // // This template parameter represents the type
  14. // // of data that your tasks will return!
  15. // TaskList<int> tasks(&scheduler);
  16. //
  17. // // Push some tasks onto the list
  18. // // Each task is scheduled the moment it is added
  19. // // There are no gaurentees on order of completion
  20. // tasks.Add(new PrintTask("I"));
  21. // tasks.Add(new PrintTask(" love"));
  22. // tasks.Add(new PrintTask(" pie!"));
  23. //
  24. // // Can either Join or signal on completion
  25. // tasks.OnComplete += TaskList<int>::CompletionDelegate(CombineResults);
  26. //
  27. // tasks.Join();
  28. //
  29. ////////////////////////////////////////////
  30.  
  31. #pragma once
  32.  
  33. #include "Event.hpp"
  34.  
  35. #define WIN32_LEAN_AND_MEAN
  36. #include <windows.h>
  37.  
  38. #include <process.h>
  39. #include <typeinfo>
  40.  
  41. namespace MF
  42. {
  43.  
  44. // Basic task structure you may inherit from
  45. class ITask
  46. {
  47.   public:
  48.    
  49.     virtual ~ITask() {}
  50.  
  51.     virtual void Execute() = 0;
  52.     virtual bool IsFuture() const { return false; }
  53.  
  54.     // This should only be called by Scheduler
  55.     void Complete()
  56.     {
  57.       InterlockedDecrement(mSiblingCount);
  58.      
  59.       // Note: the final sibling is the completion job
  60.       if( *mSiblingCount == 1 )
  61.       {
  62.         SetEvent(mOSCompletionEvent);
  63.       }
  64.     }
  65.    
  66.   private:
  67.    
  68.     template <typename T>
  69.     friend class TaskList;
  70.    
  71.     volatile long* mSiblingCount;
  72.     HANDLE mOSCompletionEvent;
  73.    
  74. };
  75.  
  76.  
  77. class Scheduler
  78. {
  79.   private:
  80.    
  81.     // We're going to avoid using the intrusive slist here in favor of a simple,
  82.     // lockless slist, crafted especially for the tasklist (Similar to Win32 SLIST)
  83.     struct AsyncNode
  84.     {
  85.       ITask* task;
  86.       AsyncNode* next;
  87.     };
  88.    
  89.     volatile AsyncNode* mProcessList;
  90.    
  91.     HANDLE mTasksReady;
  92.     HANDLE* mThreadHnd;
  93.     unsigned char mThreadNum;
  94.  
  95.     volatile long mLastTaskCount;
  96.    
  97.     bool mRunning;
  98.    
  99.     // Time to wait for threads to exit
  100.     static const long WAIT_TIME_BEFORE_EXIT = 3000;
  101.    
  102.   public:
  103.    
  104.     Scheduler()
  105.       : mProcessList(NULL),
  106.         mRunning(true)
  107.     {
  108.       SYSTEM_INFO sysinfo;
  109.       GetSystemInfo( &sysinfo );
  110.      
  111.       // Number of threads to spawn based on number of available processors!
  112.       // - Note: Includes main thread, we will spawn a secondary thread that has the same affinity
  113.       mThreadNum = sysinfo.dwNumberOfProcessors;
  114.       mThreadHnd = new HANDLE[mThreadNum];
  115.      
  116.       // Create semaphore
  117.       // - Note: Make sure we have plenty of counts for the semaphore (0xFFFF),
  118.       //   otherwise tasks will just not get processed!
  119.       mTasksReady = CreateSemaphore(0, 0, 0xFFFF, 0);
  120.       mLastTaskCount = 0;
  121.      
  122.       // Spawn threads with unique affinity (For each core)
  123.       for( unsigned i = 0; i < mThreadNum; ++i )
  124.       {
  125.         mThreadHnd[i] = (HANDLE)_beginthread(ThreadFnc, 0, this);
  126.        
  127.         SetThreadAffinityMask(mThreadHnd[i], (DWORD_PTR)1 << i);
  128.       }
  129.     }
  130.    
  131.     ~Scheduler()
  132.     {
  133.       // Set exiting flag
  134.       mRunning = false;
  135.      
  136.       // Let thread handles get cleaned up by OS after program closes
  137.       delete [] mThreadHnd;
  138.      
  139.       CloseHandle(mTasksReady);
  140.     }
  141.    
  142.   private:
  143.    
  144.     template <typename T>
  145.     friend class TaskList;
  146.    
  147.     void Process(ITask* task)
  148.     {
  149.       // Push task onto list
  150.       AsyncNode* newNode = new AsyncNode();
  151.       newNode->task = task;
  152.      
  153.       // Link up with front node and swap head pointer with new node
  154.       volatile AsyncNode* originalHead = NULL;
  155.      
  156.       // Cannot fail a push! Keep trying till this works!
  157.       do
  158.       {
  159.         originalHead = mProcessList;
  160.         newNode->next = (AsyncNode*)originalHead;
  161.       }
  162.       while( InterlockedCompareExchangePointer((volatile PVOID*)&mProcessList, newNode, (PVOID)originalHead) != originalHead );
  163.      
  164.       // Wake up a thread
  165.       ReleaseSemaphore(mTasksReady, 1, (LPLONG)&mLastTaskCount);
  166.     }
  167.    
  168.     AsyncNode* Pop()
  169.     {
  170.       // Unlink front node
  171.       volatile AsyncNode* top = mProcessList;
  172.       AsyncNode* next = top->next;
  173.      
  174.       do
  175.       {
  176.         top = mProcessList;
  177.         next = top->next;
  178.       }
  179.       while( InterlockedCompareExchangePointer((volatile PVOID*)&mProcessList, next, (PVOID)top) != top );
  180.      
  181.       return (AsyncNode*)top;
  182.     }
  183.    
  184.     friend void ThreadFnc(void* data)
  185.     {
  186.       Scheduler* scheduler = (Scheduler*)data;
  187.      
  188.       while( scheduler->mRunning )
  189.       {
  190.         // Sleep until we have tasks ready
  191.         WaitForSingleObject(scheduler->mTasksReady, INFINITE);
  192.        
  193.         // Grab next task from slist
  194.         AsyncNode* node = scheduler->Pop();
  195.        
  196.         // Do it!
  197.         node->task->Execute();
  198.         node->task->Complete();
  199.        
  200.         // Simple check for if this task is a future
  201.         // (Don't cleanup memory here in this case)
  202.         if( !node->task->IsFuture() )
  203.         {
  204.           delete node->task;
  205.           delete node;
  206.         }
  207.       }
  208.     }
  209.    
  210. };
  211.  
  212.  
  213. // Inherit from this class instead of ITask if you want to contain resulting data
  214. template <typename ResultType>
  215. class Future : public ITask
  216. {
  217.   public:
  218.    
  219.     virtual bool IsFuture() const { return true; }
  220.  
  221.     ResultType Result;
  222.     SLink< Future<ResultType> > link;
  223.    
  224. };
  225.  
  226.  
  227. // Specialize void to have no result data
  228. // (This should honestly never be inherited from, we just needs a special case for
  229. //  template resolution on TaskList<void>)
  230. template <>
  231. class Future<void> : public ITask
  232. {
  233.   public:
  234.    
  235.     SLink< Future<void> > link;
  236.    
  237. };
  238.  
  239.  
  240. // Basic task used by the TaskList that oversees the completion of all tasks, and invokes the OnComplete event
  241. template <typename ResultType>
  242. class CompletionTask : public ITask
  243. {
  244.   public:
  245.    
  246.     CompletionTask(TaskList<ResultType>* taskList)
  247.       : mTaskList(taskList)
  248.     {
  249.      
  250.     }
  251.    
  252.     virtual void Execute()
  253.     {
  254.       // Wait for event to fire
  255.       WaitForSingleObject(mTaskList->mEvent, INFINITE);
  256.      
  257.       // Signal event
  258.       mTaskList->OnComplete(mTaskList->mFutureList);
  259.  
  260.       // Cleanup memory
  261.       Future<ResultType>* deleteThis = NULL;
  262.      
  263.       for( Future<ResultType>* itr = mTaskList->mFutureList.GetFirst();
  264.            itr != mTaskList->mFutureList.GetLast(); )
  265.       {
  266.         deleteThis = itr;
  267.         mTaskList->mFutureList.Iterate(itr);
  268.         delete deleteThis;
  269.       }
  270.      
  271.       // Reset event
  272.       ResetEvent(mTaskList->mEvent);
  273.      
  274.       // Reduce count to 0
  275.       InterlockedDecrement(&mTaskList->mCount);
  276.      
  277.       // Add another Completion task
  278.       mTaskList->Add(new CompletionTask<ResultType>(mTaskList));
  279.     }
  280.    
  281.   private:
  282.    
  283.     TaskList<ResultType>* mTaskList;
  284.    
  285. };
  286.  
  287.  
  288. // Simple list of tasks, manages processing with the Scheduler
  289. template <typename ResultType = void>
  290. class TaskList
  291. {
  292.   public:
  293.    
  294.     typedef IntrusiveSList<Future<ResultType>, &Future<ResultType>::link> FutureList;
  295.     typedef Delegate<void (FutureList&)> CompletionDelegate;
  296.    
  297.     Event<void (FutureList&)> OnComplete;
  298.    
  299.   private:
  300.    
  301.     friend class CompletionTask<ResultType>;
  302.    
  303.     FutureList mFutureList;
  304.     Scheduler* mScheduler;
  305.     HANDLE mEvent;
  306.     volatile long mCount;
  307.    
  308.   public:
  309.    
  310.     TaskList(Scheduler* scheduler)
  311.       : mScheduler(scheduler),
  312.         mEvent(CreateEvent(0,1,0,0)),
  313.         mCount(0)
  314.     {
  315.       // Throw task on another thread which waits for
  316.       // osCompletion event to fire, and invokes OnComplete event
  317.       Add(new CompletionTask<ResultType>(this));
  318.     }
  319.    
  320.     ~TaskList()
  321.     {
  322.       CloseHandle(mEvent);
  323.     }
  324.    
  325.     void Add(Future<ResultType>* task)
  326.     {
  327.       mFutureList.PushFront(task);
  328.      
  329.       Add((ITask*)task);
  330.     }
  331.    
  332.     void Add(ITask* task)
  333.     {
  334.       InterlockedIncrement(&mCount);
  335.      
  336.       task->mSiblingCount = &mCount;
  337.       task->mOSCompletionEvent = mEvent;
  338.      
  339.       mScheduler->Process(task);
  340.     }
  341.    
  342.     void Join()
  343.     {
  344.       // Wait for event to fire
  345.       WaitForSingleObject(mEvent, INFINITE);
  346.     }
  347.    
  348. };
  349.  
  350. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement