Advertisement
dmkozyrev

task-concurrency-mult-optimize.cpp

Oct 11th, 2018
99
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 7.16 KB | None | 0 0
  1. /*
  2.     File: task-concurrency-mult-optimize.cpp
  3.    
  4.     g++ -pthread -march=native -O2 -std=c++14 -Wall -Wextra -Wshadow -Wconversion -fmax-errors=2 task-concurrency-mult-optimize.cpp -o task-concurrency-mult-optimize.o
  5. */
  6.  
  7. #pragma GCC target("sse2")
  8. #pragma GCC target("ssse3")
  9. #pragma GCC target("sse4.1")
  10. #pragma GCC target("avx")
  11.  
  12. #include <chrono>
  13. #include <thread>
  14. #include <mutex>
  15. #include <condition_variable>
  16. #include <queue>
  17. #include <functional>
  18. #include <random>
  19. #include <cassert>
  20. #include <iostream>
  21. #include <numeric>
  22. #include <immintrin.h>
  23.  
  24. double vecDotProd(const double* a, const double* b, const int N) {
  25.     double res = 0;
  26.     for (int i = 0; i + 4 <= N; i += 4) {
  27.         alignas(32) double temp[4];
  28.         auto pa = _mm256_set_pd(a[i], a[i+1], a[i+2], a[i+3]);
  29.         auto pb = _mm256_set_pd(b[i], b[i+1], b[i+2], b[i+3]);
  30.         _mm256_store_pd(temp, _mm256_mul_pd(pa, pb));
  31.         res += temp[0] + temp[1] + temp[2] + temp[3];
  32.     }
  33.     for (int i = N / 4 * 4; i < N; ++i) {
  34.         res += a[i] * b[i];
  35.     }
  36.     return res;
  37. }
  38.  
  39. void matMultVec(double *result, double *mat, unsigned rows, unsigned cols, double *vec) {
  40.      for (unsigned i = 0; i < rows; ++i)
  41.      {
  42.          result[i] += vecDotProd(mat+i*cols, vec, cols);
  43.      }
  44. }
  45.  
  46. class Timer {
  47.  
  48.     std::chrono::time_point<std::chrono::system_clock> time_point;
  49.    
  50.     size_t value;
  51.    
  52. public:
  53.    
  54.     void start() {
  55.         time_point = std::chrono::system_clock::now();
  56.     }
  57.  
  58.     void finish() {
  59.         auto curr = std::chrono::system_clock::now();    
  60.         auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(curr - time_point);
  61.         value = elapsed.count();
  62.     }
  63.    
  64.     size_t get() const {
  65.         return value;
  66.     }    
  67. };
  68.  
  69. class Producer {
  70.     const unsigned totalJobs;
  71.     unsigned jobsIssued;
  72.     unsigned jobsReady;
  73.  
  74.     const unsigned jobWidth;
  75.     const unsigned jobHeight;
  76.  
  77. public:
  78.     Producer(): totalJobs(100), jobsIssued(0), jobsReady(0), jobWidth(1000), jobHeight(2000) {}
  79.     unsigned getTotalJobs() { return totalJobs; }
  80.     std::function<double(void)> getJob()
  81.     {
  82.         assert(jobsIssued < totalJobs);
  83.         if (!jobsReady) {
  84.             std::chrono::milliseconds generationTime(100);
  85.             std::this_thread::sleep_for(generationTime);
  86.             unsigned newJobs = 2;
  87.             if (newJobs + jobsIssued > totalJobs)
  88.             newJobs = totalJobs - jobsIssued;
  89.             jobsReady += newJobs;
  90.             std::clog << "Added " << newJobs << " new jobs." << std::endl;
  91.         }
  92.         jobsReady--;
  93.         jobsIssued++;
  94.         unsigned jobInit = jobsIssued;
  95.         return [jobInit, this] () {
  96.             Timer timer;
  97.             size_t totalMultTime = 0, totalGenTime = 0;
  98.             double sum = 0;
  99.             for (unsigned iter = 0; iter != 10; iter++)
  100.             {
  101.                 // Prepare matrix and vector:
  102.                 timer.start();
  103.                 std::vector<double> matrix(jobWidth*jobHeight);
  104.                 for (unsigned i = 0; i != jobWidth*jobHeight; i++)
  105.                 {
  106.                     matrix[i] = (0.00001*((jobInit + 10)*(jobInit - iter)) + i*0.02 + 0.1*(jobInit%4))/100000;
  107.                 }
  108.                 std::vector<double> vec(jobWidth);
  109.                 for (unsigned i = 0; i != jobWidth; i++)
  110.                 {
  111.                     vec[i] = -(0.0001*((jobInit + 10) - iter) - i*0.01)/1000;
  112.                 }
  113.                 std::vector<double> result(jobHeight, 0);
  114.                 timer.finish();
  115.                 totalGenTime += timer.get();
  116.                 // Call multiplication:
  117.                 timer.start();
  118.                 matMultVec(&result[0], &matrix[0], jobHeight, jobWidth, &vec[0]);
  119.                 timer.finish();
  120.                 totalMultTime += timer.get();
  121.                 // Update answer:
  122.                 sum += std::accumulate(result.begin(), result.end(), 0)/1000.0;
  123.             }
  124.             std::clog << "Task " << jobInit << " produced " << sum;
  125.             std::clog << ", totalMultTime is " << totalMultTime << " ms";
  126.             std::clog << ", totalGenTime is " << totalGenTime << " ms" << std::endl;
  127.             return sum;
  128.         };
  129.     }
  130. };
  131.  
  132. class JobQueue {
  133.     std::mutex mtx;
  134.     std::queue<std::function<double(void)>> queue;
  135. public:
  136.     bool isEmpty() { return queue.empty(); }
  137.     void lock() { mtx.lock(); }
  138.     void unlock() { mtx.unlock(); }
  139.     void addJob(std::function<double(void)> func) { queue.push(func); }
  140.     std::function<double(void)> getJob() {
  141.         assert(!queue.empty());
  142.         auto ret = queue.front();
  143.         queue.pop();
  144.         return ret;
  145.     }
  146. };
  147.  
  148. class Status {
  149.     double result;
  150.     bool generated;
  151.     std::mutex mutex;
  152. public:
  153.     Status() : result(0.0), generated(false) { }
  154.     void lock() { mutex.lock(); }
  155.     void unlock() { mutex.unlock(); }
  156.     void markAsGenerated() { generated = true; }
  157.     bool isGenerated() const { return generated; }
  158.     void addToResult(double val) { result += val; }
  159.     double getResult() const { return result; }
  160. };
  161.  
  162. class Helper {
  163.     std::mutex mutex;
  164.     std::condition_variable condvar;
  165. public:
  166.     void waitForJob(JobQueue& queue, Status& status) {
  167.         std::unique_lock<std::mutex> ulock(mutex);
  168.         condvar.wait(ulock, [&](){return !queue.isEmpty() || status.isGenerated();});
  169.     }
  170.     void checkNewJob() {
  171.         std::unique_lock<std::mutex> ulock(mutex);
  172.         condvar.notify_one();
  173.     }
  174. };
  175.  
  176. void thread(Helper& helper, JobQueue& queue, Status& status) {
  177.     bool wait = !status.isGenerated();
  178.     while (true) {
  179.         if (wait) {
  180.             helper.waitForJob(queue, status);
  181.         }
  182.         status.lock();
  183.         bool generated = status.isGenerated();
  184.         if (generated) {
  185.             wait = false;
  186.         }
  187.         status.unlock();
  188.         queue.lock();
  189.         auto job = queue.getJob();
  190.         queue.unlock();
  191.         double res = job();
  192.         status.lock();
  193.         status.addToResult(res);
  194.         status.unlock();
  195.         queue.lock();
  196.         bool finished = generated && queue.isEmpty();
  197.         queue.unlock();
  198.         if (finished) {
  199.             return;
  200.         }
  201.     }
  202. }
  203.  
  204. int main()
  205. {
  206.     Timer timer;
  207.     timer.start();
  208.    
  209.     JobQueue queue;
  210.     Status status;
  211.     Helper helper1, helper2;
  212.     std::thread thread1(thread, std::ref(helper1), std::ref(queue), std::ref(status));
  213.     std::thread thread2(thread, std::ref(helper2), std::ref(queue), std::ref(status));
  214.    
  215.     Producer jobSource;
  216.     unsigned jobsLeft = jobSource.getTotalJobs();
  217.     while (jobsLeft--) {
  218.         auto job = jobSource.getJob();
  219.         queue.lock();
  220.         queue.addJob(job);
  221.         queue.unlock();
  222.         helper1.checkNewJob();
  223.         helper2.checkNewJob();
  224.     }
  225.    
  226.     status.lock();
  227.     status.markAsGenerated();
  228.     helper1.checkNewJob();
  229.     helper2.checkNewJob();
  230.     status.unlock();
  231.    
  232.     Helper helper3;
  233.     thread(helper3, queue, status);
  234.     thread1.join();
  235.     thread2.join();
  236.    
  237.     timer.finish();
  238.    
  239.     std::clog << "Done. Result is " << status.getResult() << ", working time is " << timer.get() << " ms" << std::endl;
  240. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement