Advertisement
froleyks

main_concurrent_queue.cpp

Jan 7th, 2019
192
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 2.75 KB | None | 0 0
  1. #include "../implementation/concurrent_queue.hpp"
  2. #include "parameter_processor.hpp"
  3. #include <functional>
  4. #include <iostream>
  5. #include <thread>
  6.  
  7. #define CHECK
  8.  
  9. #ifdef CHECK
  10. #include <mutex>
  11. std::mutex mtx;
  12. std::vector<bool> check;
  13. #endif // CHECK
  14. // std::atomic<size_t> num_inserted = 0;
  15.  
  16. size_t Q_SIZE;
  17. size_t NUM_PRODUCERS;
  18. size_t NUM_CONSUMERS;
  19. size_t NUM_ELEMENTS_FOR_EACH;
  20.  
  21. void produce(uint id, ConcurrentQueue<uint> &q) {
  22.   const size_t begin = id * NUM_ELEMENTS_FOR_EACH;
  23.   const size_t end   = begin + NUM_ELEMENTS_FOR_EACH;
  24.   for (size_t i = begin; i < end; ++i) {
  25.     // num_inserted++;
  26. #ifdef CHECK
  27.     mtx.lock();
  28.     check[i] = true;
  29.     mtx.unlock();
  30. #endif // CHECK
  31.  
  32.     q.push_back(id, i);
  33.   }
  34.   // std::cout << "done pushing: " << id << std::endl;
  35. }
  36.  
  37. void consume(uint id, ConcurrentQueue<uint> &q) {
  38.   for (size_t i = 0; i < NUM_ELEMENTS_FOR_EACH; ++i) {
  39.     uint r = q.pop_front(id);
  40. #ifdef CHECK
  41.     mtx.lock();
  42.     if (check[r]) {
  43.       check[r] = false;
  44.     } else {
  45.       std::cerr << "Error: Element" << r
  46.                 << " was extracted without being pushed" << std::endl;
  47.     }
  48.     mtx.unlock();
  49. #endif // CHECK
  50.   }
  51.   // std::cout << "done consuming: " << id << std::endl;
  52. }
  53.  
  54. int main(int argc, char *argv[]) {
  55.   ParameterProcessor param(argc, argv);
  56.   Q_SIZE                = param.getInt("s", 25000000);
  57.   NUM_PRODUCERS         = param.getInt("p", 8);
  58.   NUM_CONSUMERS         = param.getInt("c", 8);
  59.   NUM_ELEMENTS_FOR_EACH = param.getInt("e", 10000000);
  60.  
  61. #ifdef CHECK
  62.   check.resize(NUM_PRODUCERS * NUM_ELEMENTS_FOR_EACH, false);
  63. #endif // CHECK
  64.  
  65.   ConcurrentQueue<uint> q(Q_SIZE, NUM_PRODUCERS + NUM_CONSUMERS);
  66.  
  67.   std::vector<std::thread> producers;
  68.   producers.reserve(NUM_PRODUCERS);
  69.  
  70.   std::vector<std::thread> consumers;
  71.   consumers.reserve(NUM_CONSUMERS);
  72.  
  73.   for (size_t i = 0; i < NUM_PRODUCERS; ++i) {
  74.     producers.push_back(std::thread(produce, i, std::ref(q)));
  75.   }
  76.  
  77.   // for (size_t i = 0; i < NUM_PRODUCERS; ++i) {
  78.   //   producers[i].join();
  79.   // }
  80.   // for (auto e : check) {
  81.   //   if (!e) {
  82.   //     std::cout << "Element was not inserted" << std::endl;
  83.   //   }
  84.   // }
  85.  
  86.   for (size_t i = 0; i < NUM_CONSUMERS; ++i) {
  87.     consumers.push_back(std::thread(consume, NUM_PRODUCERS + i, std::ref(q)));
  88.   }
  89.  
  90.   for (size_t i = 0; i < NUM_PRODUCERS; ++i) {
  91.     producers[i].join();
  92.   }
  93.  
  94.   for (size_t i = 0; i < NUM_CONSUMERS; ++i) {
  95.     consumers[i].join();
  96.   }
  97.  
  98. #ifdef CHECK
  99.   for (size_t i = 0; i < check.size(); ++i) {
  100.     if (check[i]) {
  101.       std::cerr << "Error: Element: " << i << " was not extracted" << std::endl;
  102.     }
  103.   }
  104. #endif // CHECK
  105.  
  106.   // if (num_inserted != NUM_PRODUCERS * NUM_ELEMENTS_FOR_EACH) {
  107.   //   std::cerr << "Error: " << num_inserted << " inserted not "
  108.   //             << NUM_PRODUCERS * NUM_ELEMENTS_FOR_EACH << std::endl;
  109.   // }
  110.  
  111.   return 0;
  112. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement