Advertisement
dmkozyrev

task-concurrency-mult-gen-optimize.cpp

Oct 11th, 2018
137
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 7.61 KB | None | 0 0
  1. /*
  2.     File: task-concurrency-mult-gen-optimize.cpp
  3.    
  4.     g++ -pthread -march=native -O2 -std=c++14 -Wall -Wextra -Wshadow -Wconversion -fmax-errors=2 task-concurrency-mult-gen-optimize.cpp -o task-concurrency-mult-gen-optimize.o
  5. */
  6.  
  7. #pragma GCC target("sse2")
  8. #pragma GCC target("ssse3")
  9. #pragma GCC target("sse4.1")
  10. #pragma GCC target("avx")
  11.  
  12. #include <chrono>
  13. #include <thread>
  14. #include <mutex>
  15. #include <condition_variable>
  16. #include <queue>
  17. #include <functional>
  18. #include <random>
  19. #include <cassert>
  20. #include <iostream>
  21. #include <numeric>
  22. #include <immintrin.h>
  23.  
  24. std::mutex logMutex;
  25.  
  26. double vecDotProd(const double* a, const double* b, const int N) {
  27.     double res = 0;
  28.     for (int i = 0; i + 4 <= N; i += 4) {
  29.         alignas(32) double temp[4];
  30.         auto pa = _mm256_set_pd(a[i], a[i+1], a[i+2], a[i+3]);
  31.         auto pb = _mm256_set_pd(b[i], b[i+1], b[i+2], b[i+3]);
  32.         _mm256_store_pd(temp, _mm256_mul_pd(pa, pb));
  33.         res += temp[0] + temp[1] + temp[2] + temp[3];
  34.     }
  35.     for (int i = N / 4 * 4; i < N; ++i) {
  36.         res += a[i] * b[i];
  37.     }
  38.     return res;
  39. }
  40.  
  41. void matMultVec(double *result, double *mat, unsigned rows, unsigned cols, double *vec) {
  42.      for (unsigned i = 0; i < rows; ++i)
  43.      {
  44.          result[i] += vecDotProd(mat+i*cols, vec, cols);
  45.      }
  46. }
  47.  
  48. class Timer {
  49.     std::chrono::time_point<std::chrono::steady_clock> timePoint;
  50.     size_t value;
  51. public:
  52.     void start() { timePoint = std::chrono::steady_clock::now(); }
  53.     void finish() {
  54.         auto curr = std::chrono::steady_clock::now();    
  55.         auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(curr - timePoint);
  56.         value = elapsed.count();
  57.     }
  58.     size_t get() const { return value; }
  59. };
  60.  
  61. class Producer {
  62.     const unsigned totalJobs;
  63.     unsigned jobsIssued;
  64.     unsigned jobsReady;
  65.  
  66.     const unsigned jobWidth;
  67.     const unsigned jobHeight;
  68.  
  69. public:
  70.     Producer(): totalJobs(100), jobsIssued(0), jobsReady(0), jobWidth(1000), jobHeight(2000) {}
  71.     unsigned getTotalJobs() { return totalJobs; }
  72.     std::function<double(void)> getJob()
  73.     {
  74.         assert(jobsIssued < totalJobs);
  75.         if (!jobsReady) {
  76.             std::chrono::milliseconds generationTime(100);
  77.             std::this_thread::sleep_for(generationTime);
  78.             unsigned newJobs = 2;
  79.             if (newJobs + jobsIssued > totalJobs)
  80.             newJobs = totalJobs - jobsIssued;
  81.             jobsReady += newJobs;
  82.             logMutex.lock();
  83.             std::clog << "Added " << newJobs << " new jobs." << std::endl;
  84.             logMutex.unlock();
  85.         }
  86.         jobsReady--;
  87.         jobsIssued++;
  88.         unsigned jobInit = jobsIssued;
  89.         return [jobInit, this] () {
  90.             Timer timer;
  91.             size_t totalMultTime = 0, totalGenTime = 0;
  92.             double sum = 0;
  93.             std::vector<double> matrix(jobWidth*jobHeight), vec(jobWidth), result(jobHeight, 0);
  94.             for (unsigned iter = 0; iter != 10; iter++)
  95.             {
  96.                 // Prepare matrix and vector:
  97.                 timer.start();
  98.                 {
  99.                     const auto Y = (0.00001*((jobInit + 10)*(jobInit - iter))+0.1*(jobInit%4))/100000;
  100.                     for (unsigned i = 0; i != jobWidth*jobHeight; i++)
  101.                     {
  102.                         // matrix[i] = (0.00001*((jobInit + 10)*(jobInit - iter)) + i*0.02 + 0.1*(jobInit%4))/100000;
  103.                         matrix[i] = Y + i*(0.02/100000);
  104.                     }
  105.                 }
  106.                 for (unsigned i = 0; i != jobWidth; i++)
  107.                 {
  108.                     vec[i] = -(0.0001*((jobInit + 10) - iter) - i*0.01)/1000;
  109.                 }
  110.                 std::fill(result.begin(), result.end(), 0);
  111.                 timer.finish();
  112.                 totalGenTime += timer.get();
  113.                 // Call multiplication:
  114.                 timer.start();
  115.                 matMultVec(&result[0], &matrix[0], jobHeight, jobWidth, &vec[0]);
  116.                 timer.finish();
  117.                 totalMultTime += timer.get();
  118.                 // Update answer:
  119.                 sum += std::accumulate(result.begin(), result.end(), 0)/1000.0;
  120.             }
  121.             logMutex.lock();
  122.             std::clog << "Task " << jobInit << " produced " << sum
  123.                 << ", totalMultTime is " << totalMultTime << " ms"
  124.                 << ", totalGenTime is " << totalGenTime << " ms" << std::endl;
  125.             logMutex.unlock();
  126.             return sum;
  127.         };
  128.     }
  129. };
  130.  
  131. class JobQueue {
  132.     std::mutex mutex;
  133.     std::queue<std::function<double(void)>> queue;
  134. public:
  135.     bool isEmpty() { return queue.empty(); }
  136.     void lock() { mutex.lock(); }
  137.     void unlock() { mutex.unlock(); }
  138.     void addJob(std::function<double(void)> func) { queue.push(func); }
  139.     std::function<double(void)> getJob() {
  140.         assert(!queue.empty());
  141.         auto ret = queue.front();
  142.         queue.pop();
  143.         return ret;
  144.     }
  145. };
  146.  
  147. class Status {
  148.     double result;
  149.     bool generated;
  150.     std::mutex mutex;
  151. public:
  152.     Status() : result(0.0), generated(false) { }
  153.     void lock() { mutex.lock(); }
  154.     void unlock() { mutex.unlock(); }
  155.     void markAsGenerated() { generated = true; }
  156.     bool isGenerated() const { return generated; }
  157.     void addToResult(double val) { result += val; }
  158.     double getResult() const { return result; }
  159. };
  160.  
  161. class Helper {
  162.     std::mutex mutex;
  163.     std::condition_variable condvar;
  164. public:
  165.     void waitForJob(JobQueue& queue, Status& status) {
  166.         std::unique_lock<std::mutex> ulock(mutex);
  167.         condvar.wait(ulock, [&](){return !queue.isEmpty() || status.isGenerated();});
  168.     }
  169.     void checkNewJob() {
  170.         std::unique_lock<std::mutex> ulock(mutex);
  171.         condvar.notify_one();
  172.     }
  173. };
  174.  
  175. void thread(Helper& helper, JobQueue& queue, Status& status) {
  176.     bool wait = !status.isGenerated();
  177.     while (true) {
  178.         if (wait) {
  179.             helper.waitForJob(queue, status);
  180.         }
  181.         status.lock();
  182.         bool generated = status.isGenerated();
  183.         if (generated) {
  184.             wait = false;
  185.         }
  186.         status.unlock();
  187.         while (true) {
  188.             queue.lock();
  189.             if (queue.isEmpty()) {
  190.                 queue.unlock();
  191.                 break;
  192.             }
  193.             auto job = queue.getJob();
  194.             queue.unlock();
  195.             double res = job();
  196.             status.lock();
  197.             status.addToResult(res);
  198.             status.unlock();
  199.         }
  200.         queue.lock();
  201.         bool finished = generated && queue.isEmpty();
  202.         queue.unlock();
  203.         if (finished) {
  204.             return;
  205.         }
  206.     }
  207. }
  208.  
  209. int main()
  210. {
  211.     Timer timer;
  212.     timer.start();
  213.    
  214.     JobQueue queue;
  215.     Status status;
  216.     Helper helper1, helper2;
  217.     std::thread thread1(thread, std::ref(helper1), std::ref(queue), std::ref(status));
  218.     std::thread thread2(thread, std::ref(helper2), std::ref(queue), std::ref(status));
  219.    
  220.     Producer jobSource;
  221.     unsigned jobsLeft = jobSource.getTotalJobs();
  222.     while (jobsLeft--) {
  223.         auto job = jobSource.getJob();
  224.         queue.lock();
  225.         queue.addJob(job);
  226.         queue.unlock();
  227.         helper1.checkNewJob();
  228.         helper2.checkNewJob();
  229.     }
  230.    
  231.     status.lock();
  232.     status.markAsGenerated();
  233.     helper1.checkNewJob();
  234.     helper2.checkNewJob();
  235.     status.unlock();
  236.    
  237.     Helper helper3;
  238.     thread(helper3, queue, status);
  239.     thread1.join();
  240.     thread2.join();
  241.    
  242.     timer.finish();
  243.    
  244.     std::clog << "Done. Result is " << status.getResult() << ", execution time is " << timer.get() << " ms" << std::endl;
  245. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement