Advertisement
dmkozyrev

task-concurrency-mult-gen-optimize.cpp

Oct 13th, 2018
216
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 10.24 KB | None | 0 0
  1. /*
  2.     File: task-concurrency-mult-gen-optimize.cpp
  3.    
  4.     g++ -pthread -march=native -O2 -std=c++14 -Wall -Wextra -Wshadow -Wconversion -fmax-errors=2 task-concurrency-mult-gen-optimize.cpp -o task-concurrency-mult-gen-optimize.o
  5. */
  6.  
  7. #pragma GCC target("sse2")
  8. #pragma GCC target("ssse3")
  9. #pragma GCC target("sse4.1")
  10. #pragma GCC target("avx")
  11.  
  12. #include <chrono>
  13. #include <thread>
  14. #include <mutex>
  15. #include <condition_variable>
  16. #include <queue>
  17. #include <functional>
  18. #include <random>
  19. #include <cassert>
  20. #include <iostream>
  21. #include <numeric>
  22. #include <immintrin.h>
  23.  
  24. std::mutex logMutex;
  25.  
  26. double vecDotProd(const double* a, const double* b, const int N) {
  27.     double res = 0;
  28.     for (int i = 0; i + 4 <= N; i += 4) {
  29.         alignas(32) double temp[4];
  30.         auto pa = _mm256_set_pd(a[i], a[i+1], a[i+2], a[i+3]);
  31.         auto pb = _mm256_set_pd(b[i], b[i+1], b[i+2], b[i+3]);
  32.         _mm256_store_pd(temp, _mm256_mul_pd(pa, pb));
  33.         res += temp[0] + temp[1] + temp[2] + temp[3];
  34.     }
  35.     for (int i = N / 4 * 4; i < N; ++i) {
  36.         res += a[i] * b[i];
  37.     }
  38.     return res;
  39. }
  40.  
  41. void matMultVec(double *result, const double *mat, const unsigned rows, const unsigned cols, const double *vec) {
  42.      for (unsigned i = 0; i < rows; ++i)
  43.      {
  44.          result[i] += vecDotProd(mat+i*cols, vec, cols);
  45.      }
  46. }
  47.  
  48. class Timer {
  49.     std::chrono::steady_clock::time_point timePoint;
  50.     size_t value;
  51. public:
  52.     void start() { timePoint = std::chrono::steady_clock::now(); }
  53.     void finish() {
  54.         auto curr = std::chrono::steady_clock::now();    
  55.         auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(curr - timePoint);
  56.         value = elapsed.count();
  57.     }
  58.     size_t get() const { return value; }
  59. };
  60.  
  61. class Producer {
  62.     const unsigned totalJobs;
  63.     unsigned jobsIssued;
  64.     unsigned jobsReady;
  65.  
  66.     const unsigned jobWidth;
  67.     const unsigned jobHeight;
  68.  
  69. public:
  70.     Producer(): totalJobs(100), jobsIssued(0), jobsReady(0), jobWidth(1000), jobHeight(2000) {}
  71.     unsigned getTotalJobs() { return totalJobs; }
  72.     std::function<double(void)> getJob()
  73.     {
  74.         assert(jobsIssued < totalJobs);
  75.         if (!jobsReady) {
  76.             std::chrono::milliseconds generationTime(100);
  77.             std::this_thread::sleep_for(generationTime);
  78.             unsigned newJobs = 2;
  79.             if (newJobs + jobsIssued > totalJobs)
  80.             newJobs = totalJobs - jobsIssued;
  81.             jobsReady += newJobs;
  82.             logMutex.lock();
  83.             std::clog << "Added " << newJobs << " new jobs." << std::endl;
  84.             logMutex.unlock();
  85.         }
  86.         jobsReady--;
  87.         jobsIssued++;
  88.         unsigned jobInit = jobsIssued;
  89.         return [jobInit, this] () {
  90.             Timer timer;
  91.             size_t totalMultTime = 0, totalGenTime = 0;
  92.             double sum = 0;
  93.             std::vector<double> matrix(jobWidth*jobHeight), vec(jobWidth), result(jobHeight, 0);
  94.             for (unsigned iter = 0; iter != 10; iter++)
  95.             {
  96.                 // Prepare matrix and vector:
  97.                 timer.start();
  98.                 {
  99.                     const auto Y = (0.00001*((jobInit + 10)*(jobInit - iter))+0.1*(jobInit%4))/100000;
  100.                     for (unsigned i = 0; i != jobWidth*jobHeight; i++)
  101.                     {
  102.                         // matrix[i] = (0.00001*((jobInit + 10)*(jobInit - iter)) + i*0.02 + 0.1*(jobInit%4))/100000;
  103.                         matrix[i] = Y + i*(0.02/100000);
  104.                     }
  105.                 }
  106.                 for (unsigned i = 0; i != jobWidth; i++)
  107.                 {
  108.                     vec[i] = -(0.0001*((jobInit + 10) - iter) - i*0.01)/1000;
  109.                 }
  110.                 std::fill(result.begin(), result.end(), 0);
  111.                 timer.finish();
  112.                 totalGenTime += timer.get();
  113.                 // Call multiplication:
  114.                 timer.start();
  115.                 matMultVec(&result[0], &matrix[0], jobHeight, jobWidth, &vec[0]);
  116.                 timer.finish();
  117.                 totalMultTime += timer.get();
  118.                 // Update answer:
  119.                 sum += std::accumulate(result.begin(), result.end(), 0)/1000.0;
  120.             }
  121.             logMutex.lock();
  122.             std::clog << "Task " << jobInit << " produced " << sum
  123.                 << ", totalMultTime is " << totalMultTime << " ms"
  124.                 << ", totalGenTime is " << totalGenTime << " ms" << std::endl;
  125.             logMutex.unlock();
  126.             return sum;
  127.         };
  128.     }
  129. };
  130.  
  131. // lockable adapter class for external locking
  132. template <typename Lockable>
  133. class basic_lockable_adapter {
  134. public:
  135.     typedef Lockable mutex_type;
  136. private:
  137.     mutable mutex_type lockable_;
  138. protected:
  139.     mutex_type& lockable() const { return lockable_; }
  140. public:
  141.     basic_lockable_adapter(const basic_lockable_adapter& other) = delete;
  142.     basic_lockable_adapter& operator=(const basic_lockable_adapter& other) = delete;
  143.  
  144.     basic_lockable_adapter() {}
  145.     void     lock() { lockable().lock(); }
  146.     void   unlock() { lockable().unlock(); }
  147.     bool try_lock() { return lockable().try_lock(); }    
  148. };
  149.  
  150. // JobQueue class, which contains pull of jobs and jobsGenerated state
  151. // we need to know that all jobs has been generated for no wait no more
  152. // class allows only external locking
  153. class JobQueue : public basic_lockable_adapter<std::mutex> {
  154.     std::queue<std::function<double(void)>> queue;
  155.     bool jobsGenerated;
  156.    
  157. public:
  158.     JobQueue() : jobsGenerated(false) { }
  159.    
  160.     bool isEmpty(std::unique_lock<JobQueue>& guard) const {
  161.         if (this != guard.mutex()) { throw "JobQueue Locking Error: Wrong Object Locked"; }
  162.         return queue.empty();
  163.     }
  164.    
  165.     bool isJobsGenerated(std::unique_lock<JobQueue>& guard) const {
  166.         if (this != guard.mutex()) { throw "JobQueue Locking Error: Wrong Object Locked"; }
  167.         return jobsGenerated;
  168.     }
  169.    
  170.     void markAsJobsGenerated(std::unique_lock<JobQueue>& guard) {
  171.         if (this != guard.mutex()) { throw "JobQueue Locking Error: Wrong Object Locked"; }
  172.         jobsGenerated = true;
  173.     }
  174.    
  175.     void addJob(std::function<double(void)> func, std::unique_lock<JobQueue>& guard) {
  176.         if (this != guard.mutex()) { throw "JobQueue Locking Error: Wrong Object Locked"; }
  177.         queue.push(func);
  178.     }
  179.    
  180.     std::function<double(void)> getJob(std::unique_lock<JobQueue>& guard) {
  181.         if (this != guard.mutex()) { throw "JobQueue Locking Error: Wrong Object Locked"; }
  182.         if (queue.empty()) { throw "JobQueue is empty"; }
  183.         auto ret = queue.front();
  184.         queue.pop();
  185.         return ret;
  186.     }
  187. };
  188.  
  189. // Result class, contains temporary answer for already completed jobs
  190. // allows only external locking
  191. // we separate the queue and the result, because when we need to update the result, we can know nothing about the queue and vice versa
  192. class Result : public basic_lockable_adapter<std::mutex> {
  193.     double value;
  194.  
  195. public:
  196.     Result() : value(0.0) { }
  197.    
  198.     void increase(double extra, std::unique_lock<Result>& guard) {
  199.         if (this != guard.mutex()) { throw "Result Locking Error: Wrong Object Locked"; }
  200.         value += extra;
  201.     }
  202.    
  203.     double get(std::unique_lock<Result>& guard) const {
  204.         if (this != guard.mutex()) { throw "Result Locking Error: Wrong Object Locked"; }
  205.         return value;
  206.     }
  207. };
  208.  
  209. class Helper {
  210.     std::condition_variable_any condvar;
  211. public:
  212.     void waitForJob(JobQueue& queue) {
  213.         std::unique_lock<JobQueue> ulock(queue);
  214.         condvar.wait(ulock, [&](){return !queue.isEmpty(ulock) || queue.isJobsGenerated(ulock);});
  215.     }
  216.     void checkNewJob() { condvar.notify_one(); }
  217. };
  218.  
  219. void thread(Helper& helper, JobQueue& queue, Result& result) {
  220.     bool wait = true;
  221.     {
  222.         std::unique_lock<JobQueue> ulock(queue);
  223.         wait = !queue.isJobsGenerated(ulock);
  224.     }
  225.     while (true) {
  226.         // check on the need to wait for jobs:
  227.         if (wait) {
  228.             helper.waitForJob(queue);
  229.         }
  230.         // getting new job:
  231.         std::function<double(void)> job;
  232.         {
  233.             std::unique_lock<JobQueue> ulock(queue);
  234.             wait = !queue.isJobsGenerated(ulock);
  235.             job = queue.isEmpty(ulock) ? [](){return 0.0;} : queue.getJob(ulock);
  236.         }
  237.         // complete the job and increase answer if needed
  238.         auto res = job();
  239.         if (res != 0.0) {
  240.             std::unique_lock<Result> ulock(result);
  241.             result.increase(res, ulock);
  242.         }
  243.        
  244.         {   // check on cycle ending condition
  245.             std::unique_lock<JobQueue> ulock(queue);
  246.             if (!wait && queue.isEmpty(ulock)) {
  247.                 return;
  248.             }
  249.         }
  250.     }
  251. }
  252.  
  253. int main()
  254. {
  255.     // Run timer:
  256.     Timer timer;
  257.     timer.start();
  258.     // Init queue for jobs and result variable:
  259.     JobQueue queue;
  260.     Result result;
  261.     // Create 2 helpers and run 2 threads:
  262.     Helper helper1, helper2;
  263.     std::thread thread1(thread, std::ref(helper1), std::ref(queue), std::ref(result));
  264.     std::thread thread2(thread, std::ref(helper2), std::ref(queue), std::ref(result));
  265.    
  266.     // Init producer and run cycle for jobs creating / executing
  267.     Producer jobSource;
  268.     unsigned jobsLeft = jobSource.getTotalJobs();
  269.     while (jobsLeft--) {
  270.         auto job = jobSource.getJob();
  271.         {
  272.             std::unique_lock<JobQueue> ulock(queue);
  273.             queue.addJob(job, ulock);
  274.         }
  275.         helper1.checkNewJob();
  276.         helper2.checkNewJob();
  277.     }
  278.     // Mark that all jobs has been generated and notify helpers so that they do not wait for anything else:
  279.     {
  280.         std::unique_lock<JobQueue> ulock(queue);
  281.         queue.markAsJobsGenerated(ulock);
  282.     }
  283.     helper1.checkNewJob();
  284.     helper2.checkNewJob();
  285.    
  286.     // Start helping with the jobs too:
  287.     Helper helper3;
  288.     thread(helper3, queue, result);
  289.     // Wait for helpers:
  290.     thread1.join();
  291.     thread2.join();
  292.     // Stop timer and get the result:
  293.     timer.finish();
  294.     double res = 0.0;
  295.     {
  296.         std::unique_lock<Result> ulock(result);
  297.         res = result.get(ulock);
  298.     }
  299.     logMutex.lock();
  300.     std::clog << "Done. Result is " << res << ", execution time is " << timer.get() << " ms" << std::endl;
  301.     logMutex.unlock();
  302. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement