Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ////////////////////////////////////////////
- // Author: Reggie Meisler
- // Brief: Simple task-based threading system.
- // Highly efficient, lockless and
- // easy to use.
- //
- // Inherit from ITask to create a basic
- // parallel operation, or Future<...>
- // in order to store data on completion.
- ////////////////////////////////////////////
- // Example use:
- //
- // // This template parameter represents the type
- // // of data that your tasks will return!
- // TaskList<int> tasks(&scheduler);
- //
- // // Push some tasks onto the list
- // // Each task is scheduled the moment it is added
- // // There are no gaurentees on order of completion
- // tasks.Add(new PrintTask("I "));
- // tasks.Add(new PrintTask(" love"));
- // tasks.Add(new PrintTask(" pie!"));
- //
- // // Can either Join or signal on completion
- // tasks.OnComplete += TaskList<int>::CompletionDelegate(CombineResults);
- //
- // tasks.Join();
- //
- ////////////////////////////////////////////
- #pragma once
- #include "Event.hpp"
- #define WIN32_LEAN_AND_MEAN
- #include <windows.h>
- #include <process.h>
- namespace MF
- {
- // Basic task structure you may inherit from
- class ITask
- {
- public:
- virtual ~ITask() {}
- virtual void Execute() = 0;
- virtual bool IsFuture() const { return false; }
- // This should only be called by Scheduler
- void Complete()
- {
- InterlockedDecrement(mSiblingCount);
- // Note: the final sibling is the completion job
- if( *mSiblingCount == 1 )
- {
- SetEvent(mOSCompletionEvent);
- }
- }
- private:
- template <typename T>
- friend class TaskList;
- volatile long* mSiblingCount;
- HANDLE mOSCompletionEvent;
- };
- class Scheduler
- {
- private:
- // We're going to avoid using the intrusive slist here in favor of a simple,
- // lockless slist, crafted especially for the tasklist (Similar to Win32 SLIST)
- struct AsyncNode
- {
- ITask* task;
- AsyncNode* next;
- };
- AsyncNode* mProcessList;
- HANDLE mTasksReady;
- HANDLE* mThreadHnd;
- unsigned char mThreadNum;
- long mLastTaskCount;
- bool mRunning;
- // Time to wait for threads to exit
- static const long WAIT_TIME_BEFORE_EXIT = 1000;
- public:
- Scheduler()
- : mProcessList(NULL),
- mRunning(true)
- {
- SYSTEM_INFO sysinfo;
- GetSystemInfo( &sysinfo );
- // Number of threads to spawn based on number of available processors!
- // - Note: Includes main thread, we will spawn a secondary thread that has the same affinity
- mThreadNum = sysinfo.dwNumberOfProcessors;
- mThreadHnd = new HANDLE[mThreadNum];
- // Create semaphore
- mTasksReady = CreateSemaphore(0, 0, mThreadNum, 0);
- mLastTaskCount = 0;
- // Spawn threads with unique affinity (For each core)
- for( unsigned i = 0; i < mThreadNum; ++i )
- {
- mThreadHnd[i] = (HANDLE)_beginthread(ThreadFnc, 0, this);
- SetThreadAffinityMask(mThreadHnd[i], (DWORD_PTR)1 << i);
- }
- }
- ~Scheduler()
- {
- // Set exiting flag
- mRunning = false;
- // Wake up thread
- long lastCount = 0;
- ReleaseSemaphore(mTasksReady, 1, &lastCount);
- // Close handles
- for( unsigned i = 0; i < mThreadNum; ++i )
- {
- WaitForSingleObject(mThreadHnd[i], WAIT_TIME_BEFORE_EXIT);
- CloseHandle(mThreadHnd[i]);
- }
- delete [] mThreadHnd;
- CloseHandle(mTasksReady);
- }
- private:
- template <typename T>
- friend class TaskList;
- void Process(ITask* task)
- {
- // Push task onto list
- AsyncNode* newNode = new AsyncNode();
- newNode->task = task;
- // Link up with front node and swap head pointer with new node
- newNode->next = mProcessList;
- InterlockedExchangePointer(&mProcessList, newNode);
- // Wake up a thread
- ReleaseSemaphore(mTasksReady, 1, &mLastTaskCount);
- }
- AsyncNode* Pop()
- {
- // Unlink front node
- AsyncNode* top = mProcessList;
- AsyncNode* next = top->next;
- InterlockedExchangePointer(&mProcessList, next);
- return top;
- }
- friend void ThreadFnc(void* data)
- {
- Scheduler* scheduler = (Scheduler*)data;
- while( scheduler->mRunning )
- {
- // Sleep until we have tasks ready
- WaitForSingleObject(scheduler->mTasksReady, INFINITE);
- // Grab next task from slist
- AsyncNode* node = scheduler->Pop();
- // Do it!
- node->task->Execute();
- node->task->Complete();
- // Simple check for if this task is a future
- // (Don't cleanup memory here in this case)
- if( !node->task->IsFuture() )
- {
- delete node->task;
- delete node;
- }
- }
- }
- };
- // Inherit from this class instead of ITask if you want to contain resulting data
- template <typename ResultType>
- class Future : public ITask
- {
- public:
- virtual bool IsFuture() const { return true; }
- ResultType Result;
- SLink< Future<ResultType> > link;
- };
- // Specialize void to have no result data
- template <>
- class Future<void> : public ITask
- {
- public:
- SLink< Future<void> > link;
- };
- // Basic task used by the TaskList that is fired upon the completion of all tasks
- template <typename ResultType>
- class CompletionTask : public ITask
- {
- public:
- typedef IntrusiveSList<Future<ResultType>, &Future<ResultType>::link> FutureList;
- CompletionTask(FutureList& futureList, HANDLE osEvent, Event<void (FutureList&)>& event)
- : mFutureList(futureList),
- mOSEvent(osEvent),
- mEvent(event)
- {
- }
- virtual void Execute()
- {
- // Wait for event to fire
- WaitForSingleObject(mOSEvent, INFINITE);
- // Signal event
- mEvent(mFutureList);
- // Cleanup memory
- Future<ResultType>* deleteThis = NULL;
- for( Future<ResultType>* itr = mFutureList.GetFirst();
- itr != mFutureList.GetLast(); )
- {
- deleteThis = itr;
- mFutureList.Iterate(itr);
- delete deleteThis;
- }
- }
- private:
- FutureList& mFutureList;
- HANDLE mOSEvent;
- Event<void (FutureList&)>& mEvent;
- };
- // Simple list of tasks, manages processing with the Scheduler
- template <typename ResultType = void>
- class TaskList
- {
- public:
- typedef IntrusiveSList<Future<ResultType>, &Future<ResultType>::link> FutureList;
- typedef Delegate<void (FutureList&)> CompletionDelegate;
- Event<void (FutureList&)> OnComplete;
- private:
- FutureList mFutureList;
- Scheduler* mScheduler;
- HANDLE mEvent;
- volatile long mCount;
- public:
- TaskList(Scheduler* scheduler)
- : mScheduler(scheduler),
- mEvent(CreateEvent(0,0,0,0)),
- mCount(0)
- {
- // Throw task on another thread which waits for
- // osCompletion event to fire, and invokes OnComplete event
- Add(new CompletionTask<ResultType>(mFutureList, mEvent, OnComplete));
- }
- ~TaskList()
- {
- CloseHandle(mEvent);
- }
- void Add(Future<ResultType>* task)
- {
- mFutureList.PushFront(task);
- Add((ITask*)task);
- }
- void Add(ITask* task)
- {
- InterlockedIncrement(&mCount);
- task->mSiblingCount = &mCount;
- task->mOSCompletionEvent = mEvent;
- mScheduler->Process(task);
- }
- void Join()
- {
- // Wait for event to fire
- WaitForSingleObject(mEvent, INFINITE);
- }
- };
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement