Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include "../implementation/concurrent_queue.hpp"
- #include "parameter_processor.hpp"
- #include <functional>
- #include <iostream>
- #include <thread>
- #define CHECK
- #ifdef CHECK
- #include <mutex>
- std::mutex mtx;
- std::vector<bool> check;
- #endif // CHECK
- // std::atomic<size_t> num_inserted = 0;
- size_t Q_SIZE;
- size_t NUM_PRODUCERS;
- size_t NUM_CONSUMERS;
- size_t NUM_ELEMENTS_FOR_EACH;
- void produce(uint id, ConcurrentQueue<uint> &q) {
- const size_t begin = id * NUM_ELEMENTS_FOR_EACH;
- const size_t end = begin + NUM_ELEMENTS_FOR_EACH;
- for (size_t i = begin; i < end; ++i) {
- // num_inserted++;
- #ifdef CHECK
- mtx.lock();
- check[i] = true;
- mtx.unlock();
- #endif // CHECK
- q.push_back(id, i);
- }
- // std::cout << "done pushing: " << id << std::endl;
- }
- void consume(uint id, ConcurrentQueue<uint> &q) {
- for (size_t i = 0; i < NUM_ELEMENTS_FOR_EACH; ++i) {
- uint r = q.pop_front(id);
- #ifdef CHECK
- mtx.lock();
- if (check[r]) {
- check[r] = false;
- } else {
- std::cerr << "Error: Element" << r
- << " was extracted without being pushed" << std::endl;
- }
- mtx.unlock();
- #endif // CHECK
- }
- // std::cout << "done consuming: " << id << std::endl;
- }
- int main(int argc, char *argv[]) {
- ParameterProcessor param(argc, argv);
- Q_SIZE = param.getInt("s", 25000000);
- NUM_PRODUCERS = param.getInt("p", 8);
- NUM_CONSUMERS = param.getInt("c", 8);
- NUM_ELEMENTS_FOR_EACH = param.getInt("e", 10000000);
- #ifdef CHECK
- check.resize(NUM_PRODUCERS * NUM_ELEMENTS_FOR_EACH, false);
- #endif // CHECK
- ConcurrentQueue<uint> q(Q_SIZE, NUM_PRODUCERS + NUM_CONSUMERS);
- std::vector<std::thread> producers;
- producers.reserve(NUM_PRODUCERS);
- std::vector<std::thread> consumers;
- consumers.reserve(NUM_CONSUMERS);
- for (size_t i = 0; i < NUM_PRODUCERS; ++i) {
- producers.push_back(std::thread(produce, i, std::ref(q)));
- }
- // for (size_t i = 0; i < NUM_PRODUCERS; ++i) {
- // producers[i].join();
- // }
- // for (auto e : check) {
- // if (!e) {
- // std::cout << "Element was not inserted" << std::endl;
- // }
- // }
- for (size_t i = 0; i < NUM_CONSUMERS; ++i) {
- consumers.push_back(std::thread(consume, NUM_PRODUCERS + i, std::ref(q)));
- }
- for (size_t i = 0; i < NUM_PRODUCERS; ++i) {
- producers[i].join();
- }
- for (size_t i = 0; i < NUM_CONSUMERS; ++i) {
- consumers[i].join();
- }
- #ifdef CHECK
- for (size_t i = 0; i < check.size(); ++i) {
- if (check[i]) {
- std::cerr << "Error: Element: " << i << " was not extracted" << std::endl;
- }
- }
- #endif // CHECK
- // if (num_inserted != NUM_PRODUCERS * NUM_ELEMENTS_FOR_EACH) {
- // std::cerr << "Error: " << num_inserted << " inserted not "
- // << NUM_PRODUCERS * NUM_ELEMENTS_FOR_EACH << std::endl;
- // }
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement