Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <thread>
- #include <mutex>
- #include <iostream>
- #include <chrono>
- #include <utility>
- #include <vector>
- #include <queue>
- #include <memory>
- #include <random>
- #include <cassert>
- using namespace std;
- using namespace chrono;
- using time_value = chrono::microseconds;
- template<typename T>
- class ThreadSaveQueue {
- public:
- T front() {
- lock_guard<mutex> l(_m);
- return _data.front();
- }
- void push(T value) {
- lock_guard<mutex> l(_m);
- _data.push(move(value));
- }
- void pop() {
- lock_guard<mutex> l(_m);
- _data.pop();
- }
- bool empty() {
- lock_guard<mutex> l(_m);
- return _data.empty();
- }
- private:
- queue<T> _data;
- mutex _m;
- };
- template<typename T>
- class Task {
- public:
- Task(T _value, int _id, bool _last = false) : value(std::move(_value)), id(_id), last(_last) {}
- T &get() {
- return value;
- }
- bool isLast() {
- return last;
- }
- int getId() {
- return id;
- }
- private:
- T value;
- int id;
- bool last;
- };
- template<typename T>
- using TaskQueue = ThreadSaveQueue<shared_ptr<Task<T>>>;
- template<typename T>
- class Generator {
- public:
- Generator() = default;
- virtual shared_ptr<Task<T>> generate() = 0;
- virtual bool empty() = 0;
- template<typename Container>
- void run(shared_ptr<Container> toInsert) {
- while (!empty())
- toInsert->push(generate());
- }
- };
- class StringGenerator : public Generator<string> {
- public:
- StringGenerator() : _current("a"), _last('a'), _count(4), id(-1) {}
- explicit StringGenerator(size_t count) :
- _current("a"), _last('a'), _count(count) {}
- shared_ptr<Task<string>> generate() override {
- if (_count) {
- if (_last <= 'z')
- _current.back() = _last++;
- else {
- _last = 'a';
- _current += _last++;
- }
- _count--;
- id++;
- }
- return make_shared<Task<string>>(_current, id, !_count);
- }
- bool empty() override {
- return !((bool) _count);
- }
- private:
- string _current;
- char _last;
- size_t _count;
- int id;
- };
- class RanStrGenerator : public Generator<string> {
- public:
- RanStrGenerator() : _count(4), _size(5), _id(0) {}
- RanStrGenerator(size_t count, size_t size) : _count(count), _size(size), _id(0) {}
- shared_ptr<Task<string>> generate() override {
- assert(!empty());
- _id++;
- return make_shared<Task<string>>(_randomString(), _id, _id == _count - 1);
- }
- bool empty() override {
- return _id >= _count;
- }
- private:
- string _randomString() {
- static string letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
- string res;
- res.reserve(_size);
- for (size_t i = 0; i < _size; ++i) {
- res.push_back(letters[random() % letters.size()]);
- }
- return res;
- }
- private:
- size_t _count;
- size_t _size;
- size_t _id;
- };
- template<typename T>
- class Handler {
- public:
- explicit Handler(string name = "Base handler") : _name(std::move(name)) {}
- virtual void handle(shared_ptr<Task<T>> task) = 0;
- virtual string name() {
- return _name;
- }
- protected:
- string _name;
- };
- class StringHasher : public Handler<string> {
- public:
- explicit StringHasher(uint8_t hash = 11) : Handler("String hasher " + to_string(hash)), _hash(hash) {}
- void handle(shared_ptr<Task<string>> task) override {
- for (char &ch : task->get())
- ch ^= _hash;
- }
- private:
- uint8_t _hash;
- };
- template<typename T>
- class Line {
- public:
- explicit Line(shared_ptr<Handler<T>> handler) : _handler(std::move(handler)), _stop(false) {}
- void setInQueue(shared_ptr<ThreadSaveQueue<shared_ptr<Task<T>>>> q) {
- _toHandle = q;
- }
- void setOutQueue(shared_ptr<ThreadSaveQueue<shared_ptr<Task<T>>>> q) {
- _nextQueue = q;
- }
- shared_ptr<ThreadSaveQueue<shared_ptr<Task<T>>>> inQueue() {
- return _toHandle;
- }
- void push(shared_ptr<Task<T>> value) {
- _toHandle->push(value);
- }
- void run(time_point<high_resolution_clock> startTime) {
- idleTime = 0;
- while (!_stop) {
- idleStart = high_resolution_clock::now();
- while (_toHandle->empty()) {
- this_thread::sleep_for(10ns);
- }
- idleTime += duration_cast<time_value>(high_resolution_clock::now() - idleStart).count();
- while (!_toHandle->empty()) {
- shared_ptr<Task<T>> value = _toHandle->front();
- _toHandle->pop();
- _inTimes.push_back(duration_cast<time_value>(high_resolution_clock::now() - startTime).count());
- _handler->handle(value);
- if (value->isLast())
- _stop = true;
- _outTimes.push_back(duration_cast<time_value>(high_resolution_clock::now() - startTime).count());
- _nextQueue->push(value);
- }
- }
- }
- const vector<size_t> &getInTimes() {
- return _inTimes;
- }
- const vector<size_t> &getOutTimes() {
- return _outTimes;
- }
- size_t getIdleTime() {
- return idleTime;
- }
- protected:
- shared_ptr<Handler<T>> _handler;
- shared_ptr<ThreadSaveQueue<shared_ptr<Task<T>>>> _toHandle;
- shared_ptr<ThreadSaveQueue<shared_ptr<Task<T>>>> _nextQueue;
- vector<size_t> _inTimes;
- vector<size_t> _outTimes;
- bool _stop;
- size_t idleTime;
- time_point<high_resolution_clock> idleStart;
- };
- template<typename T>
- class Conveyor {
- public:
- Conveyor(shared_ptr<Generator<T>> generator, vector<shared_ptr<Line<T>>> conveyor)
- : _generator(std::move(generator)), _conveyor(std::move(conveyor)) {
- _conveyor[0]->setInQueue(make_shared<ThreadSaveQueue<shared_ptr<Task<T>>>>());
- for (size_t i = 1; i < _conveyor.size(); ++i) {
- _conveyor[i]->setInQueue(make_shared<ThreadSaveQueue<shared_ptr<Task<T>>>>());
- _conveyor[i - 1]->setOutQueue(_conveyor[i]->inQueue());
- }
- _handled = make_shared<ThreadSaveQueue<shared_ptr<Task<T>>>>();
- _conveyor.back()->setOutQueue(_handled);
- }
- void run() {
- vector<thread> threads;
- // threads.emplace_back(_threadGenerator, _generator, _conveyor.front()->inQueue());
- auto c = high_resolution_clock::now();
- for (const shared_ptr<Line<T>> &line: _conveyor) {
- threads.emplace_back(_threadLine, line, c);
- }
- _generator->run(_conveyor.front()->inQueue());
- for (thread &t : threads)
- t.join();
- _conveyorRunTime = duration_cast<time_value>(high_resolution_clock::now() - c).count();
- }
- void printStats() {
- cout << "Conveyor run time: " << _conveyorRunTime << "ms" << endl;
- string sep = "\t\t";
- for (size_t i = 0; i < _conveyor[0]->getInTimes().size(); ++i) {
- cout << i << sep;
- for (size_t j = 0; j < _conveyor.size(); ++j) {
- cout << _conveyor[j]->getInTimes()[i] << "-" << _conveyor[j]->getOutTimes()[i] << sep;
- }
- cout << endl;
- }
- }
- private:
- static void _threadLine(shared_ptr<Line<T>> line, time_point<high_resolution_clock> startTime) {
- line->run(startTime);
- }
- static void _threadGenerator(shared_ptr<Generator<T>> generator, shared_ptr<TaskQueue<T>> toInsert) {
- generator->run(toInsert);
- }
- private:
- shared_ptr<Generator<T>> _generator;
- vector<shared_ptr<Line<T>>> _conveyor;
- shared_ptr<TaskQueue<T>> _handled;
- size_t _conveyorRunTime;
- };
- void runConveyor() {
- vector<shared_ptr<Line<string>>> lines;
- lines.push_back(make_shared<Line<string>>(make_shared<StringHasher>(StringHasher(0))));
- lines.push_back(make_shared<Line<string>>(make_shared<StringHasher>(StringHasher(1))));
- lines.push_back(make_shared<Line<string>>(make_shared<StringHasher>(StringHasher(2))));
- Conveyor<string> con(make_shared<RanStrGenerator>(10, 5000), lines);
- con.run();
- con.printStats();
- }
- int main() {
- runConveyor();
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement