Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <iostream>
- using std::cin;
- using std::cout;
- using std::cerr;
- using std::endl;
- #include <cmath>
- #include <cstdlib>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <queue>
- #include <atomic>
- #include <chrono>
- namespace realtime
- {
- // realtime::communicator is a single-typed data communication channel
- // from non-rt thread to rt thread
- // single reader, single writer
- // reader side is rt-safe
- // writer side is rt-UNSAFE
- template <class T, int Capacity=32>
- struct ringbuffer
- {
- using value_type = T;
- static constexpr auto capacity() { return Capacity; }
- using size_type = std::atomic<size_t>;
- private:
- size_type used{0};
- value_type values[capacity()];
- int head{0};
- int tail{0};
- public:
- auto size() const { return used.load(); }
- auto full() const { return (size() == capacity()); }
- auto empty() const { return !size(); }
- operator bool () const { return !full(); }
- auto push(value_type x)
- {
- values[tail++] = std::move(x);
- if (tail == capacity()) tail = 0;
- ++used;
- }
- auto pop()
- {
- value_type x = std::move(values[head++]);
- if (head == capacity()) head = 0;
- used--;
- return std::move(x);
- }
- };
- template <class T, int Capacity=32>
- struct communicator
- {
- using value_type = T;
- static constexpr auto capacity() { return Capacity; }
- using mutex = std::mutex;
- using ulock = std::unique_lock<mutex>;
- using condv = std::condition_variable;
- using thread = std::thread;
- using tank = std::queue<value_type>;
- using buffer = ringbuffer<T, capacity()>;
- private:
- tank tk;
- buffer buf;
- std::atomic<typename tank::size_type> tk_size;
- thread scheduler;
- mutex m;
- condv cv;
- std::atomic_bool cont{true};
- public:
- communicator()
- {
- scheduler = thread{[this] {
- while (true) {
- value_type x;
- {
- ulock lck(m);
- cv.wait(lck, [this]{ return (!cont || (tank_size() && buf)); });
- if (cont) cout << "wake up: " << tank_size() << " -> " << size() << endl;
- else cout << "wake up: exit" << endl;
- if (!cont) break;
- x = tk.front();
- tk.pop();
- tk_size = tk.size();
- }
- buf.push(std::move(x));
- }
- }};
- }
- ~communicator()
- {
- cont = false;
- if (scheduler.joinable()) {
- {
- ulock _(m);
- cv.notify_one();
- }
- scheduler.join();
- }
- }
- auto tank_size() const { return tk.size(); }
- auto tank_size__rt_safe() const { return tk_size.load(); }
- auto size() const { return buf.size(); }
- auto empty() const { return buf.empty(); }
- operator bool () const { return !empty(); }
- // for non-rt thread
- void push(value_type x)
- {
- ulock _(m);
- tk.push(std::move(x));
- tk_size = tk.size();
- cv.notify_one();
- }
- // for rt_thread
- auto pop()
- {
- auto x = buf.pop();
- cv.notify_one();
- return std::move(x);
- }
- };
- }
- void sleep(int ms)
- {
- using namespace std::chrono;
- std::this_thread::sleep_for(milliseconds{ms});
- }
- int main()
- {
- realtime::communicator<int, 8> comm;
- std::atomic_bool cont{true};
- std::thread writer{[&cont, &comm] {
- int i = 0;
- while (cont) {
- comm.push(i++);
- sleep(std::rand() % 300);
- }
- }};
- std::thread reader{[&cont, &comm] {
- while (cont) {
- int t = 100+1000 * std::pow(0.8, comm.size() + comm.tank_size__rt_safe());
- cout << "sleep for " << t << "ms" << endl;
- sleep(t);
- if (comm) {
- auto i = comm.pop();
- cout << "got " << i << endl;
- }
- }
- }};
- cout << "started" << endl;
- std::string whatever;
- while (cin >> whatever) {}
- cout << "exiting..." << endl;
- cont = false;
- if (writer.joinable()) writer.join();
- if (reader.joinable()) reader.join();
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement