Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- File: task-concurrency-mult-gen-optimize.cpp
- g++ -pthread -march=native -O2 -std=c++14 -Wall -Wextra -Wshadow -Wconversion -fmax-errors=2 task-concurrency-mult-gen-optimize.cpp -o task-concurrency-mult-gen-optimize.o
- */
- #pragma GCC target("sse2")
- #pragma GCC target("ssse3")
- #pragma GCC target("sse4.1")
- #pragma GCC target("avx")
- #include <chrono>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <queue>
- #include <functional>
- #include <random>
- #include <cassert>
- #include <iostream>
- #include <numeric>
- #include <immintrin.h>
- std::mutex logMutex;
- double vecDotProd(const double* a, const double* b, const int N) {
- double res = 0;
- for (int i = 0; i + 4 <= N; i += 4) {
- alignas(32) double temp[4];
- auto pa = _mm256_set_pd(a[i], a[i+1], a[i+2], a[i+3]);
- auto pb = _mm256_set_pd(b[i], b[i+1], b[i+2], b[i+3]);
- _mm256_store_pd(temp, _mm256_mul_pd(pa, pb));
- res += temp[0] + temp[1] + temp[2] + temp[3];
- }
- for (int i = N / 4 * 4; i < N; ++i) {
- res += a[i] * b[i];
- }
- return res;
- }
- void matMultVec(double *result, const double *mat, const unsigned rows, const unsigned cols, const double *vec) {
- for (unsigned i = 0; i < rows; ++i)
- {
- result[i] += vecDotProd(mat+i*cols, vec, cols);
- }
- }
- class Timer {
- std::chrono::time_point<std::chrono::steady_clock> timePoint;
- size_t value;
- public:
- void start() { timePoint = std::chrono::steady_clock::now(); }
- void finish() {
- auto curr = std::chrono::steady_clock::now();
- auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(curr - timePoint);
- value = elapsed.count();
- }
- size_t get() const { return value; }
- };
- class Producer {
- const unsigned totalJobs;
- unsigned jobsIssued;
- unsigned jobsReady;
- const unsigned jobWidth;
- const unsigned jobHeight;
- public:
- Producer(): totalJobs(100), jobsIssued(0), jobsReady(0), jobWidth(1000), jobHeight(2000) {}
- unsigned getTotalJobs() { return totalJobs; }
- std::function<double(void)> getJob()
- {
- assert(jobsIssued < totalJobs);
- if (!jobsReady) {
- std::chrono::milliseconds generationTime(100);
- std::this_thread::sleep_for(generationTime);
- unsigned newJobs = 2;
- if (newJobs + jobsIssued > totalJobs)
- newJobs = totalJobs - jobsIssued;
- jobsReady += newJobs;
- logMutex.lock();
- std::clog << "Added " << newJobs << " new jobs." << std::endl;
- logMutex.unlock();
- }
- jobsReady--;
- jobsIssued++;
- unsigned jobInit = jobsIssued;
- return [jobInit, this] () {
- Timer timer;
- size_t totalMultTime = 0, totalGenTime = 0;
- double sum = 0;
- std::vector<double> matrix(jobWidth*jobHeight), vec(jobWidth), result(jobHeight, 0);
- for (unsigned iter = 0; iter != 10; iter++)
- {
- // Prepare matrix and vector:
- timer.start();
- {
- const auto Y = (0.00001*((jobInit + 10)*(jobInit - iter))+0.1*(jobInit%4))/100000;
- for (unsigned i = 0; i != jobWidth*jobHeight; i++)
- {
- // matrix[i] = (0.00001*((jobInit + 10)*(jobInit - iter)) + i*0.02 + 0.1*(jobInit%4))/100000;
- matrix[i] = Y + i*(0.02/100000);
- }
- }
- for (unsigned i = 0; i != jobWidth; i++)
- {
- vec[i] = -(0.0001*((jobInit + 10) - iter) - i*0.01)/1000;
- }
- std::fill(result.begin(), result.end(), 0);
- timer.finish();
- totalGenTime += timer.get();
- // Call multiplication:
- timer.start();
- matMultVec(&result[0], &matrix[0], jobHeight, jobWidth, &vec[0]);
- timer.finish();
- totalMultTime += timer.get();
- // Update answer:
- sum += std::accumulate(result.begin(), result.end(), 0)/1000.0;
- }
- logMutex.lock();
- std::clog << "Task " << jobInit << " produced " << sum
- << ", totalMultTime is " << totalMultTime << " ms"
- << ", totalGenTime is " << totalGenTime << " ms" << std::endl;
- logMutex.unlock();
- return sum;
- };
- }
- };
- // lockable adapter class for external locking
- template <typename Lockable>
- class basic_lockable_adapter {
- public:
- typedef Lockable mutex_type;
- protected:
- mutable mutex_type lockable_;
- mutex_type& lockable() const { return lockable_; }
- public:
- basic_lockable_adapter(const basic_lockable_adapter& other) = delete;
- basic_lockable_adapter& operator=(const basic_lockable_adapter& other) = delete;
- basic_lockable_adapter() {}
- void lock() { lockable().lock(); }
- void unlock() { lockable().unlock(); }
- bool try_lock() { return lockable().try_lock(); }
- };
- // JobQueue class, which contains pull of jobs and jobsGenerated state
- // we need to know that all jobs has been generated for no wait no more
- // class allows only external locking
- class JobQueue : public basic_lockable_adapter<std::mutex> {
- std::queue<std::function<double(void)>> queue;
- bool jobsGenerated;
- // prevent blocking errors:
- void checkOnLockedState(std::unique_lock<JobQueue>& guard) const {
- if (guard.mutex() != this) {
- throw "JobQueue locking Error: Wrong Object Locked";
- }
- }
- public:
- JobQueue() : jobsGenerated(false) { }
- bool isEmpty(std::unique_lock<JobQueue>& guard) const {
- checkOnLockedState(guard);
- return queue.empty();
- }
- bool isJobsGenerated(std::unique_lock<JobQueue>& guard) const {
- checkOnLockedState(guard);
- return jobsGenerated;
- }
- void markAsJobsGenerated(std::unique_lock<JobQueue>& guard) {
- checkOnLockedState(guard);
- jobsGenerated = true;
- }
- void addJob(std::function<double(void)> func, std::unique_lock<JobQueue>& guard) {
- checkOnLockedState(guard);
- queue.push(func);
- }
- std::function<double(void)> getJob(std::unique_lock<JobQueue>& guard) {
- checkOnLockedState(guard);
- if (queue.empty()) {
- throw "JobQueue is empty";
- }
- auto ret = queue.front();
- queue.pop();
- return ret;
- }
- };
- // Result class, contains temporary answer for already completed jobs
- // allows only external locking
- // we separate the queue and the result, because when we need to update the result, we can know nothing about the queue and vice versa
- class Result : public basic_lockable_adapter<std::mutex> {
- double value;
- // prevent blocking errors:
- void checkOnLockedState(std::unique_lock<Result>& guard) const {
- if (guard.mutex() != this) {
- throw "Result locking Error: Wrong Object Locked";
- }
- }
- public:
- Result() : value(0.0) { }
- void increase(double extra, std::unique_lock<Result>& guard) {
- checkOnLockedState(guard);
- value += extra;
- }
- double get(std::unique_lock<Result>& guard) const {
- checkOnLockedState(guard);
- return value;
- }
- };
- class Helper {
- std::condition_variable condvar;
- public:
- void waitForJob(JobQueue& queue) {
- std::unique_lock<JobQueue> ulock(queue);
- condvar.wait(ulock, [&](){return !queue.isEmpty(ulock) || queue.isJobsGenerated(ulock);});
- }
- void checkNewJob() { condvar.notify_one(); }
- };
- void thread(Helper& helper, JobQueue& queue, Result& result) {
- bool wait = true;
- {
- std::unique_lock<Queue> ulock(queue);
- wait = !queue.isJobsGenerated(ulock);
- }
- while (true) {
- // check on the need to wait for jobs:
- if (wait) {
- helper.waitForJob(queue);
- }
- // getting new job:
- std::function<double(void)> job;
- {
- std::unique_lock<JobQueue> ulock(queue);
- wait = !queue.isJobsGenerated(ulock);
- job = queue.isEmpty(ulock) ? [](){return 0.0;} : queue.getJob(unlock);
- }
- // complete the job and increase answer if needed
- auto res = job();
- if (res != 0.0) {
- std::unique_lock<Result> ulock(result);
- result.increase(temp, ulock);
- }
- { // check on cycle ending condition
- std::unique_lock<JobQueue> ulock(queue);
- if (!wait && queue.isEmpty(ulock)) {
- return;
- }
- }
- }
- }
- int main()
- {
- // Run timer:
- Timer timer;
- timer.start();
- // Init queue for jobs and result variable:
- JobQueue queue;
- Result result;
- // Create 2 helpers and run 2 threads:
- Helper helper1, helper2;
- std::thread thread1(thread, std::ref(helper1), std::ref(queue), std::ref(result));
- std::thread thread2(thread, std::ref(helper2), std::ref(queue), std::ref(result));
- // Init producer and run cycle for jobs creating / executing
- Producer jobSource;
- unsigned jobsLeft = jobSource.getTotalJobs();
- while (jobsLeft--) {
- auto job = jobSource.getJob();
- {
- std::unique_lock<JobQueue> ulock(queue);
- queue.addJob(job, ulock);
- }
- helper1.checkNewJob();
- helper2.checkNewJob();
- }
- // Mark that all jobs has been generated and notify helpers so that they do not wait for anything else:
- {
- std::unique_lock<JobQueue> ulock(queue);
- queue.markAsGenerated(ulock);
- }
- helper1.checkNewJob();
- helper2.checkNewJob();
- // Start helping with the jobs too:
- Helper helper3;
- thread(helper3, queue, result);
- // Wait for helpers:
- thread1.join();
- thread2.join();
- // Srop timer and ger the result:
- timer.finish();
- double res = 0.0;
- {
- std::unique_lock<Result> ulock(result);
- res = result.get();
- }
- mtxLog.lock();
- std::clog << "Done. Result is " << res << ", execution time is " << timer.get() << " ms" << std::endl;
- mtxLog.unlock();
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement