Guest User

ProducerConsumer

a guest
Sep 12th, 2013
682
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #include <stdio.h>  /* printf, NULL */
  2. #include <windows.h>    /* Win API */
  3. #include <queue>    /* std::queue */
  4. #include <stdlib.h>     /* srand, rand */
  5. #include <time.h>       /* time */
  6.  
  7. /* Slightly modified Request class from the task to track request number */
  8. class Request
  9. {
  10. static int idToGiveNext;
  11. public:
  12.     int id;
  13.     Request():
  14.         id(idToGiveNext++)
  15.         {/*empty body*/};
  16. };
  17.  
  18. int Request::idToGiveNext = 0;
  19.  
  20.  
  21. /* Base class to protect some object with WinAPI CRITICAL_SECTION */
  22. class InterthreadObject
  23. {
  24. protected:
  25.     CRITICAL_SECTION mCriticalSection;
  26. public:
  27.     InterthreadObject()
  28.     {
  29.         InitializeCriticalSection(&mCriticalSection);
  30.     }
  31.  
  32. virtual ~InterthreadObject()
  33.     {
  34.         DeleteCriticalSection(&mCriticalSection);
  35.     }
  36. };
  37.  
  38.  
  39. /* Just a boolean to be checked protected by CRITICAL_SECTION */
  40. class StopperCondition: public InterthreadObject
  41. {
  42. private:
  43.     bool mShouldStop;
  44. public:
  45.     StopperCondition():
  46.         mShouldStop(false)
  47.         {/*empty*/};
  48.    
  49.     void set()
  50.     {
  51.         EnterCriticalSection(&mCriticalSection);
  52.         {
  53.             mShouldStop = true;
  54.         }
  55.         LeaveCriticalSection(&mCriticalSection);
  56.     }
  57.  
  58.     bool check()
  59.     {
  60.         bool val;
  61.         EnterCriticalSection(&mCriticalSection);
  62.         {
  63.             val = mShouldStop;
  64.         }
  65.         LeaveCriticalSection(&mCriticalSection);
  66.         return val;
  67.     }
  68. };
  69.  
  70. /* A Stopper class from the task, just a wrapper over a previous class.
  71.     I did so to implement GetRequest and ProcessRequest exactly in the way that specification required*/
  72. class Stopper
  73. {
  74. public:
  75.     StopperCondition* const mC;
  76.     Stopper(StopperCondition* c):
  77.         mC(c)
  78.         {/*empty*/}
  79.     Stopper(const Stopper &objToCopy):
  80.         mC(objToCopy.mC)
  81.         {/*empty*/}
  82.     void set()
  83.     {
  84.         mC->set();
  85.     }
  86.     bool check()
  87.     {
  88.         return mC->check();
  89.     }
  90. };
  91.  
  92. /* Function from the task specification*/
  93. Request* GetRequest(Stopper stopSignal) throw()
  94. {
  95.     const int MAX_WAIT_MS = 100;
  96.     int waitMs = rand() % MAX_WAIT_MS + 1;
  97.     Sleep(waitMs); // emulate long-time operations
  98.  
  99.     return stopSignal.check() ? NULL : (new Request());
  100. };
  101.  
  102.  
  103. /* Function from the task specification*/
  104. void ProcessRequest(Request* request, Stopper stopSignal) throw()
  105. {
  106.     if(stopSignal.check())
  107.         return;
  108.  
  109.     /* some processig there */
  110.     const int MAX_WAIT_MS = 1000;
  111.     int waitMs = rand() % MAX_WAIT_MS + 1;
  112.     Sleep(waitMs); // emulate long-time operations
  113. };
  114.  
  115.  
  116. /* std::queue to hold incoming requests protected by CRITICAL_SECTION*/
  117. class InterthreadRequestQueue : public InterthreadObject
  118. {
  119. private:
  120.     std::queue<Request*> mQueue;
  121. public:
  122.  
  123.     void push(Request* &val)
  124.     {
  125.         EnterCriticalSection(&mCriticalSection);
  126.         {
  127.             mQueue.push(val);
  128.         }
  129.         LeaveCriticalSection(&mCriticalSection);
  130.     }
  131.  
  132.     Request* pop()
  133.     {
  134.         Request* val = NULL;
  135.         EnterCriticalSection(&mCriticalSection);
  136.         {
  137.             if(!mQueue.empty())
  138.             {
  139.                 val = mQueue.front();
  140.                 mQueue.pop();
  141.             }
  142.         }
  143.         LeaveCriticalSection(&mCriticalSection);
  144.         return val;
  145.     }
  146. };
  147.  
  148.  
  149. /* All interprocess objects in one place. We can easily pass this object to CreateProcess(...) now. */
  150. class SharedObjects
  151. {
  152. private:
  153.     StopperCondition stopCondition;
  154. public:
  155.     InterthreadRequestQueue waitingRequests;
  156.     InterthreadRequestQueue readyRequests;
  157.     Stopper stopper;
  158.  
  159.     SharedObjects():
  160.         stopper(&stopCondition)
  161.         {/*empty*/};   
  162. };
  163.  
  164.  
  165. int threadId = 0;
  166. DWORD WINAPI consumerThread( LPVOID lpParam )
  167. {
  168.     int id = threadId++;
  169.     SharedObjects* s = (SharedObjects*)lpParam;
  170.     while(!s->stopper.check())
  171.     {
  172.         Request* r = s->waitingRequests.pop();
  173.         if(!r)
  174.             continue;
  175.    
  176.         printf("Request %d processed by thread %d\n", r->id, id);
  177.         ProcessRequest(r, s->stopper);
  178.         printf("Request %d is ready\n", r->id);
  179.         s->readyRequests.push(r);
  180.     }
  181.  
  182.     return 0;
  183. };
  184.  
  185. DWORD WINAPI producerThread( LPVOID lpParam )
  186. {
  187.     int id = threadId++;
  188.     SharedObjects* s = (SharedObjects*)lpParam;
  189.     while(!s->stopper.check())
  190.     {
  191.         Request* r = GetRequest(s->stopper);
  192.         if(!r)
  193.             continue;
  194.  
  195.         printf("Request %d is waiting\n", r->id);
  196.         s->waitingRequests.push(r);
  197.     }
  198.  
  199.     return 0;
  200. };
  201.  
  202. int main()
  203. {
  204.     const int CONSUMER_THREAD_COUNT = 2;
  205.     const int TOTAL_THREAD_COUNT = CONSUMER_THREAD_COUNT + 1;  //consumers + producer
  206.     const int SECONDS_TO_WORK = 30;
  207.  
  208.     srand(time(NULL));
  209.  
  210.     SharedObjects s;
  211.     HANDLE hProducerThread;
  212.     HANDLE hSomeConsumerThread;
  213.  
  214.     HANDLE* workerThreads;
  215.     workerThreads = new HANDLE[TOTAL_THREAD_COUNT];
  216.  
  217.     hProducerThread = CreateThread( NULL, 0, producerThread, &s, 0, NULL);
  218.     if (NULL == hProducerThread)
  219.     {
  220.         printf("CreateThread failed (%d)\n", GetLastError());
  221.         return 1;
  222.     }
  223.     workerThreads[0] = hProducerThread;
  224.  
  225.     for(int i = 1; i <= CONSUMER_THREAD_COUNT; i++)
  226.     {
  227.         hSomeConsumerThread = CreateThread( NULL, 0, consumerThread, &s, 0, NULL);
  228.         if (NULL == hProducerThread)
  229.         {
  230.             printf("CreateThread failed (%d)\n", GetLastError());
  231.             return 1;
  232.         }
  233.         workerThreads[i] = hSomeConsumerThread;
  234.     }
  235.  
  236.  
  237.     HANDLE hTimer = NULL;
  238.     LARGE_INTEGER liDueTime;
  239.  
  240.     liDueTime.QuadPart = -10000000LL * SECONDS_TO_WORK; // LL is in 100ns intervals, negative = relative to call moment
  241.  
  242.     // Create an unnamed waitable timer.
  243.     hTimer = CreateWaitableTimer(NULL, TRUE, NULL);
  244.     if (NULL == hTimer)
  245.     {
  246.         printf("CreateWaitableTimer failed (%d)\n", GetLastError());
  247.         return 1;
  248.     }
  249.  
  250.     printf("Waiting for %d seconds...\n", SECONDS_TO_WORK);
  251.  
  252.     if (!SetWaitableTimer(hTimer, &liDueTime, 0, NULL, NULL, 0))
  253.     {
  254.         printf("SetWaitableTimer failed (%d)\n", GetLastError());
  255.         return 2;
  256.     }
  257.  
  258.     if (WaitForSingleObject(hTimer, INFINITE) != WAIT_OBJECT_0)
  259.         printf("WaitForSingleObject failed (%d)\n", GetLastError());
  260.     else
  261.     {
  262.         printf("Timer was signaled.\n");
  263.         s.stopper.set();
  264.     }
  265.  
  266.     WaitForMultipleObjects( TOTAL_THREAD_COUNT,
  267.         workerThreads, TRUE, INFINITE);
  268.  
  269.     printf("Since All threads executed"
  270.            " lets close their handles \n");
  271.  
  272.     for(int i = 0; i < TOTAL_THREAD_COUNT; i++)
  273.     {
  274.         CloseHandle(workerThreads[i]);
  275.     }
  276.     delete[] workerThreads;
  277.  
  278.     Request* r;
  279.     while(r = s.readyRequests.pop(), r)
  280.     {
  281.         int id = r->id;
  282.         delete r;
  283.         printf("Request %d deleted from readyRequests queue\n", id);
  284.     }
  285.  
  286.     while(r = s.waitingRequests.pop(), r)
  287.     {
  288.         int id = r->id;
  289.         delete r;
  290.         printf("Request %d deleted from waitingRequests queue\n", id);
  291.     }
  292.  
  293.     printf("All done. Press enter to exit...\n");
  294.     getc(stdin);
  295.  
  296.     return 0;
  297. }
RAW Paste Data