Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- File: task-concurrency-mult-gen-optimize.cpp
- g++ -pthread -march=native -O2 -std=c++14 -Wall -Wextra -Wshadow -Wconversion -fmax-errors=2 task-concurrency-mult-gen-optimize.cpp -o task-concurrency-mult-gen-optimize.o
- */
- #pragma GCC target("sse2")
- #pragma GCC target("ssse3")
- #pragma GCC target("sse4.1")
- #pragma GCC target("avx")
- #include <chrono>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <queue>
- #include <functional>
- #include <random>
- #include <cassert>
- #include <iostream>
- #include <numeric>
- #include <immintrin.h>
- std::mutex logMutex;
- double vecDotProd(const double* a, const double* b, const int N) {
- double res = 0;
- for (int i = 0; i + 4 <= N; i += 4) {
- alignas(32) double temp[4];
- auto pa = _mm256_set_pd(a[i], a[i+1], a[i+2], a[i+3]);
- auto pb = _mm256_set_pd(b[i], b[i+1], b[i+2], b[i+3]);
- _mm256_store_pd(temp, _mm256_mul_pd(pa, pb));
- res += temp[0] + temp[1] + temp[2] + temp[3];
- }
- for (int i = N / 4 * 4; i < N; ++i) {
- res += a[i] * b[i];
- }
- return res;
- }
- void matMultVec(double *result, double *mat, unsigned rows, unsigned cols, double *vec) {
- for (unsigned i = 0; i < rows; ++i)
- {
- result[i] += vecDotProd(mat+i*cols, vec, cols);
- }
- }
- class Timer {
- std::chrono::time_point<std::chrono::steady_clock> timePoint;
- size_t value;
- public:
- void start() { timePoint = std::chrono::steady_clock::now(); }
- void finish() {
- auto curr = std::chrono::steady_clock::now();
- auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(curr - timePoint);
- value = elapsed.count();
- }
- size_t get() const { return value; }
- };
- class Producer {
- const unsigned totalJobs;
- unsigned jobsIssued;
- unsigned jobsReady;
- const unsigned jobWidth;
- const unsigned jobHeight;
- public:
- Producer(): totalJobs(100), jobsIssued(0), jobsReady(0), jobWidth(1000), jobHeight(2000) {}
- unsigned getTotalJobs() { return totalJobs; }
- std::function<double(void)> getJob()
- {
- assert(jobsIssued < totalJobs);
- if (!jobsReady) {
- std::chrono::milliseconds generationTime(100);
- std::this_thread::sleep_for(generationTime);
- unsigned newJobs = 2;
- if (newJobs + jobsIssued > totalJobs)
- newJobs = totalJobs - jobsIssued;
- jobsReady += newJobs;
- logMutex.lock();
- std::clog << "Added " << newJobs << " new jobs." << std::endl;
- logMutex.unlock();
- }
- jobsReady--;
- jobsIssued++;
- unsigned jobInit = jobsIssued;
- return [jobInit, this] () {
- Timer timer;
- size_t totalMultTime = 0, totalGenTime = 0;
- double sum = 0;
- std::vector<double> matrix(jobWidth*jobHeight), vec(jobWidth), result(jobHeight, 0);
- for (unsigned iter = 0; iter != 10; iter++)
- {
- // Prepare matrix and vector:
- timer.start();
- {
- const auto Y = (0.00001*((jobInit + 10)*(jobInit - iter))+0.1*(jobInit%4))/100000;
- for (unsigned i = 0; i != jobWidth*jobHeight; i++)
- {
- // matrix[i] = (0.00001*((jobInit + 10)*(jobInit - iter)) + i*0.02 + 0.1*(jobInit%4))/100000;
- matrix[i] = Y + i*(0.02/100000);
- }
- }
- for (unsigned i = 0; i != jobWidth; i++)
- {
- vec[i] = -(0.0001*((jobInit + 10) - iter) - i*0.01)/1000;
- }
- std::fill(result.begin(), result.end(), 0);
- timer.finish();
- totalGenTime += timer.get();
- // Call multiplication:
- timer.start();
- matMultVec(&result[0], &matrix[0], jobHeight, jobWidth, &vec[0]);
- timer.finish();
- totalMultTime += timer.get();
- // Update answer:
- sum += std::accumulate(result.begin(), result.end(), 0)/1000.0;
- }
- logMutex.lock();
- std::clog << "Task " << jobInit << " produced " << sum
- << ", totalMultTime is " << totalMultTime << " ms"
- << ", totalGenTime is " << totalGenTime << " ms" << std::endl;
- logMutex.unlock();
- return sum;
- };
- }
- };
- class JobQueue {
- std::mutex mutex;
- std::queue<std::function<double(void)>> queue;
- public:
- bool isEmpty() { return queue.empty(); }
- void lock() { mutex.lock(); }
- void unlock() { mutex.unlock(); }
- void addJob(std::function<double(void)> func) { queue.push(func); }
- std::function<double(void)> getJob() {
- assert(!queue.empty());
- auto ret = queue.front();
- queue.pop();
- return ret;
- }
- };
- class Status {
- double result;
- bool generated;
- std::mutex mutex;
- public:
- Status() : result(0.0), generated(false) { }
- void lock() { mutex.lock(); }
- void unlock() { mutex.unlock(); }
- void markAsGenerated() { generated = true; }
- bool isGenerated() const { return generated; }
- void addToResult(double val) { result += val; }
- double getResult() const { return result; }
- };
- class Helper {
- std::mutex mutex;
- std::condition_variable condvar;
- public:
- void waitForJob(JobQueue& queue, Status& status) {
- std::unique_lock<std::mutex> ulock(mutex);
- condvar.wait(ulock, [&](){return !queue.isEmpty() || status.isGenerated();});
- }
- void checkNewJob() {
- std::unique_lock<std::mutex> ulock(mutex);
- condvar.notify_one();
- }
- };
- void thread(Helper& helper, JobQueue& queue, Status& status) {
- bool wait = !status.isGenerated();
- while (true) {
- if (wait) {
- helper.waitForJob(queue, status);
- }
- status.lock();
- bool generated = status.isGenerated();
- if (generated) {
- wait = false;
- }
- status.unlock();
- while (true) {
- queue.lock();
- if (queue.isEmpty()) {
- queue.unlock();
- break;
- }
- auto job = queue.getJob();
- queue.unlock();
- double res = job();
- status.lock();
- status.addToResult(res);
- status.unlock();
- }
- queue.lock();
- bool finished = generated && queue.isEmpty();
- queue.unlock();
- if (finished) {
- return;
- }
- }
- }
- int main()
- {
- Timer timer;
- timer.start();
- JobQueue queue;
- Status status;
- Helper helper1, helper2;
- std::thread thread1(thread, std::ref(helper1), std::ref(queue), std::ref(status));
- std::thread thread2(thread, std::ref(helper2), std::ref(queue), std::ref(status));
- Producer jobSource;
- unsigned jobsLeft = jobSource.getTotalJobs();
- while (jobsLeft--) {
- auto job = jobSource.getJob();
- queue.lock();
- queue.addJob(job);
- queue.unlock();
- helper1.checkNewJob();
- helper2.checkNewJob();
- }
- status.lock();
- status.markAsGenerated();
- helper1.checkNewJob();
- helper2.checkNewJob();
- status.unlock();
- Helper helper3;
- thread(helper3, queue, status);
- thread1.join();
- thread2.join();
- timer.finish();
- std::clog << "Done. Result is " << status.getResult() << ", execution time is " << timer.get() << " ms" << std::endl;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement