Advertisement
Guest User

Untitled

a guest
Dec 20th, 2014
162
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.55 KB | None | 0 0
  1. #include <iostream>
  2. using std::cin;
  3. using std::cout;
  4. using std::cerr;
  5. using std::endl;
  6.  
  7.  
  8.  
  9. #include <cmath>
  10. #include <cstdlib>
  11. #include <thread>
  12. #include <mutex>
  13. #include <condition_variable>
  14. #include <queue>
  15. #include <atomic>
  16. #include <chrono>
  17.  
  18.  
  19. namespace realtime
  20. {
  21. // realtime::communicator is a single-typed data communication channel
  22. // from non-rt thread to rt thread
  23. // single reader, single writer
  24. // reader side is rt-safe
  25. // writer side is rt-UNSAFE
  26.  
  27. template <class T, int Capacity=32>
  28. struct ringbuffer
  29. {
  30. using value_type = T;
  31. static constexpr auto capacity() { return Capacity; }
  32.  
  33. using size_type = std::atomic<size_t>;
  34.  
  35. private:
  36. size_type used{0};
  37. value_type values[capacity()];
  38. int head{0};
  39. int tail{0};
  40.  
  41. public:
  42. auto size() const { return used.load(); }
  43. auto full() const { return (size() == capacity()); }
  44. auto empty() const { return !size(); }
  45. operator bool () const { return !full(); }
  46.  
  47. auto push(value_type x)
  48. {
  49. values[tail++] = std::move(x);
  50. if (tail == capacity()) tail = 0;
  51. ++used;
  52. }
  53.  
  54. auto pop()
  55. {
  56. value_type x = std::move(values[head++]);
  57. if (head == capacity()) head = 0;
  58. used--;
  59. return std::move(x);
  60. }
  61. };
  62.  
  63. template <class T, int Capacity=32>
  64. struct communicator
  65. {
  66. using value_type = T;
  67. static constexpr auto capacity() { return Capacity; }
  68.  
  69. using mutex = std::mutex;
  70. using ulock = std::unique_lock<mutex>;
  71. using condv = std::condition_variable;
  72. using thread = std::thread;
  73.  
  74. using tank = std::queue<value_type>;
  75. using buffer = ringbuffer<T, capacity()>;
  76.  
  77. private:
  78. tank tk;
  79. buffer buf;
  80. std::atomic<typename tank::size_type> tk_size;
  81.  
  82. thread scheduler;
  83. mutex m;
  84. condv cv;
  85. std::atomic_bool cont{true};
  86.  
  87. public:
  88. communicator()
  89. {
  90. scheduler = thread{[this] {
  91. while (true) {
  92. value_type x;
  93. {
  94. ulock lck(m);
  95. cv.wait(lck, [this]{ return (!cont || (tank_size() && buf)); });
  96. if (cont) cout << "wake up: " << tank_size() << " -> " << size() << endl;
  97. else cout << "wake up: exit" << endl;
  98. if (!cont) break;
  99. x = tk.front();
  100. tk.pop();
  101. tk_size = tk.size();
  102. }
  103. buf.push(std::move(x));
  104. }
  105. }};
  106. }
  107.  
  108. ~communicator()
  109. {
  110. cont = false;
  111. if (scheduler.joinable()) {
  112. {
  113. ulock _(m);
  114. cv.notify_one();
  115. }
  116. scheduler.join();
  117. }
  118. }
  119.  
  120. auto tank_size() const { return tk.size(); }
  121. auto tank_size__rt_safe() const { return tk_size.load(); }
  122. auto size() const { return buf.size(); }
  123. auto empty() const { return buf.empty(); }
  124. operator bool () const { return !empty(); }
  125.  
  126. // for non-rt thread
  127. void push(value_type x)
  128. {
  129. ulock _(m);
  130. tk.push(std::move(x));
  131. tk_size = tk.size();
  132. cv.notify_one();
  133. }
  134.  
  135. // for rt_thread
  136. auto pop()
  137. {
  138. auto x = buf.pop();
  139. cv.notify_one();
  140. return std::move(x);
  141. }
  142. };
  143. }
  144.  
  145. void sleep(int ms)
  146. {
  147. using namespace std::chrono;
  148. std::this_thread::sleep_for(milliseconds{ms});
  149. }
  150.  
  151. int main()
  152. {
  153. realtime::communicator<int, 8> comm;
  154. std::atomic_bool cont{true};
  155.  
  156. std::thread writer{[&cont, &comm] {
  157. int i = 0;
  158. while (cont) {
  159. comm.push(i++);
  160. sleep(std::rand() % 300);
  161. }
  162. }};
  163.  
  164. std::thread reader{[&cont, &comm] {
  165. while (cont) {
  166. int t = 100+1000 * std::pow(0.8, comm.size() + comm.tank_size__rt_safe());
  167. cout << "sleep for " << t << "ms" << endl;
  168. sleep(t);
  169. if (comm) {
  170. auto i = comm.pop();
  171. cout << "got " << i << endl;
  172. }
  173. }
  174. }};
  175.  
  176. cout << "started" << endl;
  177. std::string whatever;
  178. while (cin >> whatever) {}
  179. cout << "exiting..." << endl;
  180. cont = false;
  181. if (writer.joinable()) writer.join();
  182. if (reader.joinable()) reader.join();
  183. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement