Advertisement
dmkozyrev

task-concurrency-mult-optimize.cpp

Oct 13th, 2018
114
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 10.34 KB | None | 0 0
  1. /*
  2.     File: task-concurrency-mult-gen-optimize.cpp
  3.    
  4.     g++ -pthread -march=native -O2 -std=c++14 -Wall -Wextra -Wshadow -Wconversion -fmax-errors=2 task-concurrency-mult-gen-optimize.cpp -o task-concurrency-mult-gen-optimize.o
  5. */
  6.  
  7. #pragma GCC target("sse2")
  8. #pragma GCC target("ssse3")
  9. #pragma GCC target("sse4.1")
  10. #pragma GCC target("avx")
  11.  
  12. #include <chrono>
  13. #include <thread>
  14. #include <mutex>
  15. #include <condition_variable>
  16. #include <queue>
  17. #include <functional>
  18. #include <random>
  19. #include <cassert>
  20. #include <iostream>
  21. #include <numeric>
  22. #include <immintrin.h>
  23.  
  24. std::mutex logMutex;
  25.  
  26. double vecDotProd(const double* a, const double* b, const int N) {
  27.     double res = 0;
  28.     for (int i = 0; i + 4 <= N; i += 4) {
  29.         alignas(32) double temp[4];
  30.         auto pa = _mm256_set_pd(a[i], a[i+1], a[i+2], a[i+3]);
  31.         auto pb = _mm256_set_pd(b[i], b[i+1], b[i+2], b[i+3]);
  32.         _mm256_store_pd(temp, _mm256_mul_pd(pa, pb));
  33.         res += temp[0] + temp[1] + temp[2] + temp[3];
  34.     }
  35.     for (int i = N / 4 * 4; i < N; ++i) {
  36.         res += a[i] * b[i];
  37.     }
  38.     return res;
  39. }
  40.  
  41. void matMultVec(double *result, const double *mat, const unsigned rows, const unsigned cols, const double *vec) {
  42.      for (unsigned i = 0; i < rows; ++i)
  43.      {
  44.          result[i] += vecDotProd(mat+i*cols, vec, cols);
  45.      }
  46. }
  47.  
  48. class Timer {
  49.     std::chrono::time_point<std::chrono::steady_clock> timePoint;
  50.     size_t value;
  51. public:
  52.     void start() { timePoint = std::chrono::steady_clock::now(); }
  53.     void finish() {
  54.         auto curr = std::chrono::steady_clock::now();    
  55.         auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(curr - timePoint);
  56.         value = elapsed.count();
  57.     }
  58.     size_t get() const { return value; }
  59. };
  60.  
  61. class Producer {
  62.     const unsigned totalJobs;
  63.     unsigned jobsIssued;
  64.     unsigned jobsReady;
  65.  
  66.     const unsigned jobWidth;
  67.     const unsigned jobHeight;
  68.  
  69. public:
  70.     Producer(): totalJobs(100), jobsIssued(0), jobsReady(0), jobWidth(1000), jobHeight(2000) {}
  71.     unsigned getTotalJobs() { return totalJobs; }
  72.     std::function<double(void)> getJob()
  73.     {
  74.         assert(jobsIssued < totalJobs);
  75.         if (!jobsReady) {
  76.             std::chrono::milliseconds generationTime(100);
  77.             std::this_thread::sleep_for(generationTime);
  78.             unsigned newJobs = 2;
  79.             if (newJobs + jobsIssued > totalJobs)
  80.             newJobs = totalJobs - jobsIssued;
  81.             jobsReady += newJobs;
  82.             logMutex.lock();
  83.             std::clog << "Added " << newJobs << " new jobs." << std::endl;
  84.             logMutex.unlock();
  85.         }
  86.         jobsReady--;
  87.         jobsIssued++;
  88.         unsigned jobInit = jobsIssued;
  89.         return [jobInit, this] () {
  90.             Timer timer;
  91.             size_t totalMultTime = 0, totalGenTime = 0;
  92.             double sum = 0;
  93.             std::vector<double> matrix(jobWidth*jobHeight), vec(jobWidth), result(jobHeight, 0);
  94.             for (unsigned iter = 0; iter != 10; iter++)
  95.             {
  96.                 // Prepare matrix and vector:
  97.                 timer.start();
  98.                 {
  99.                     const auto Y = (0.00001*((jobInit + 10)*(jobInit - iter))+0.1*(jobInit%4))/100000;
  100.                     for (unsigned i = 0; i != jobWidth*jobHeight; i++)
  101.                     {
  102.                         // matrix[i] = (0.00001*((jobInit + 10)*(jobInit - iter)) + i*0.02 + 0.1*(jobInit%4))/100000;
  103.                         matrix[i] = Y + i*(0.02/100000);
  104.                     }
  105.                 }
  106.                 for (unsigned i = 0; i != jobWidth; i++)
  107.                 {
  108.                     vec[i] = -(0.0001*((jobInit + 10) - iter) - i*0.01)/1000;
  109.                 }
  110.                 std::fill(result.begin(), result.end(), 0);
  111.                 timer.finish();
  112.                 totalGenTime += timer.get();
  113.                 // Call multiplication:
  114.                 timer.start();
  115.                 matMultVec(&result[0], &matrix[0], jobHeight, jobWidth, &vec[0]);
  116.                 timer.finish();
  117.                 totalMultTime += timer.get();
  118.                 // Update answer:
  119.                 sum += std::accumulate(result.begin(), result.end(), 0)/1000.0;
  120.             }
  121.             logMutex.lock();
  122.             std::clog << "Task " << jobInit << " produced " << sum
  123.                 << ", totalMultTime is " << totalMultTime << " ms"
  124.                 << ", totalGenTime is " << totalGenTime << " ms" << std::endl;
  125.             logMutex.unlock();
  126.             return sum;
  127.         };
  128.     }
  129. };
  130.  
  131. // lockable adapter class for external locking
  132. template <typename Lockable>
  133. class basic_lockable_adapter {
  134. public:
  135.     typedef Lockable mutex_type;
  136. protected:
  137.     mutable mutex_type lockable_;
  138.     mutex_type& lockable() const { return lockable_; }
  139. public:
  140.     basic_lockable_adapter(const basic_lockable_adapter& other) = delete;
  141.     basic_lockable_adapter& operator=(const basic_lockable_adapter& other) = delete;
  142.  
  143.     basic_lockable_adapter() {}
  144.     void     lock() { lockable().lock(); }
  145.     void   unlock() { lockable().unlock(); }
  146.     bool try_lock() { return lockable().try_lock(); }
  147.    
  148. };
  149.  
  150. // JobQueue class, which contains pull of jobs and jobsGenerated state
  151. // we need to know that all jobs has been generated for no wait no more
  152. // class allows only external locking
  153. class JobQueue : public basic_lockable_adapter<std::mutex> {
  154.     std::queue<std::function<double(void)>> queue;
  155.     bool jobsGenerated;
  156.    
  157.     // prevent blocking errors:
  158.     void checkOnLockedState(std::unique_lock<JobQueue>& guard) const {
  159.         if (guard.mutex() != this) {
  160.             throw "JobQueue locking Error: Wrong Object Locked";
  161.         }
  162.     }
  163.  
  164. public:
  165.     JobQueue() : jobsGenerated(false) { }
  166.    
  167.     bool isEmpty(std::unique_lock<JobQueue>& guard) const {
  168.         checkOnLockedState(guard);
  169.         return queue.empty();
  170.     }
  171.    
  172.     bool isJobsGenerated(std::unique_lock<JobQueue>& guard) const {
  173.         checkOnLockedState(guard);
  174.         return jobsGenerated;
  175.     }
  176.    
  177.     void markAsJobsGenerated(std::unique_lock<JobQueue>& guard) {
  178.         checkOnLockedState(guard);
  179.         jobsGenerated = true;
  180.     }
  181.    
  182.     void addJob(std::function<double(void)> func, std::unique_lock<JobQueue>& guard) {
  183.         checkOnLockedState(guard);
  184.         queue.push(func);
  185.     }
  186.    
  187.     std::function<double(void)> getJob(std::unique_lock<JobQueue>& guard) {
  188.         checkOnLockedState(guard);
  189.         if (queue.empty()) {
  190.             throw "JobQueue is empty";
  191.         }
  192.         auto ret = queue.front();
  193.         queue.pop();
  194.         return ret;
  195.     }
  196. };
  197.  
  198. // Result class, contains temporary answer for already completed jobs
  199. // allows only external locking
  200. // we separate the queue and the result, because when we need to update the result, we can know nothing about the queue and vice versa
  201. class Result : public basic_lockable_adapter<std::mutex> {
  202.     double value;
  203.    
  204.     // prevent blocking errors:
  205.     void checkOnLockedState(std::unique_lock<Result>& guard) const {
  206.         if (guard.mutex() != this) {
  207.             throw "Result locking Error: Wrong Object Locked";
  208.         }
  209.     }
  210.    
  211. public:
  212.     Result() : value(0.0) { }
  213.    
  214.     void increase(double extra, std::unique_lock<Result>& guard) {
  215.         checkOnLockedState(guard);
  216.         value += extra;
  217.     }
  218.    
  219.     double get(std::unique_lock<Result>& guard) const {
  220.         checkOnLockedState(guard);
  221.         return value;
  222.     }
  223. };
  224.  
  225. class Helper {
  226.     std::condition_variable_any condvar;
  227. public:
  228.     void waitForJob(JobQueue& queue) {
  229.         std::unique_lock<JobQueue> ulock(queue);
  230.         condvar.wait(ulock, [&](){return !queue.isEmpty(ulock) || queue.isJobsGenerated(ulock);});
  231.     }
  232.     void checkNewJob() { condvar.notify_one(); }
  233. };
  234.  
  235. void thread(Helper& helper, JobQueue& queue, Result& result) {
  236.     bool wait = true;
  237.     {
  238.         std::unique_lock<JobQueue> ulock(queue);
  239.         wait = !queue.isJobsGenerated(ulock);
  240.     }
  241.     while (true) {
  242.         // check on the need to wait for jobs:
  243.         if (wait) {
  244.             helper.waitForJob(queue);
  245.         }
  246.         // getting new job:
  247.         std::function<double(void)> job;
  248.         {
  249.             std::unique_lock<JobQueue> ulock(queue);
  250.             wait = !queue.isJobsGenerated(ulock);
  251.             job = queue.isEmpty(ulock) ? [](){return 0.0;} : queue.getJob(ulock);
  252.         }
  253.         // complete the job and increase answer if needed
  254.         auto res = job();
  255.         if (res != 0.0) {
  256.             std::unique_lock<Result> ulock(result);
  257.             result.increase(res, ulock);
  258.         }
  259.        
  260.         {   // check on cycle ending condition
  261.             std::unique_lock<JobQueue> ulock(queue);
  262.             if (!wait && queue.isEmpty(ulock)) {
  263.                 return;
  264.             }
  265.         }
  266.     }
  267. }
  268.  
  269. int main()
  270. {
  271.     // Run timer:
  272.     Timer timer;
  273.     timer.start();
  274.     // Init queue for jobs and result variable:
  275.     JobQueue queue;
  276.     Result result;
  277.     // Create 2 helpers and run 2 threads:
  278.     Helper helper1, helper2;
  279.     std::thread thread1(thread, std::ref(helper1), std::ref(queue), std::ref(result));
  280.     std::thread thread2(thread, std::ref(helper2), std::ref(queue), std::ref(result));
  281.    
  282.     // Init producer and run cycle for jobs creating / executing
  283.     Producer jobSource;
  284.     unsigned jobsLeft = jobSource.getTotalJobs();
  285.     while (jobsLeft--) {
  286.         auto job = jobSource.getJob();
  287.         {
  288.             std::unique_lock<JobQueue> ulock(queue);
  289.             queue.addJob(job, ulock);
  290.         }
  291.         helper1.checkNewJob();
  292.         helper2.checkNewJob();
  293.     }
  294.     // Mark that all jobs has been generated and notify helpers so that they do not wait for anything else:
  295.     {
  296.         std::unique_lock<JobQueue> ulock(queue);
  297.         queue.markAsJobsGenerated(ulock);
  298.     }
  299.     helper1.checkNewJob();
  300.     helper2.checkNewJob();
  301.    
  302.     // Start helping with the jobs too:
  303.     Helper helper3;
  304.     thread(helper3, queue, result);
  305.     // Wait for helpers:
  306.     thread1.join();
  307.     thread2.join();
  308.     // Srop timer and ger the result:
  309.     timer.finish();
  310.     double res = 0.0;
  311.     {
  312.         std::unique_lock<Result> ulock(result);
  313.         res = result.get(ulock);
  314.     }
  315.     logMutex.lock();
  316.     std::clog << "Done. Result is " << res << ", execution time is " << timer.get() << " ms" << std::endl;
  317.     logMutex.unlock();
  318. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement