Advertisement
Guest User

Untitled

a guest
Mar 20th, 2014
116
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.47 KB | None | 0 0
  1. #include <iostream>
  2. #include <fstream>
  3. #include <queue>
  4. #include <functional>
  5. #include <thread>
  6. #include <mutex>
  7. #include <memory>
  8. #include <deque>
  9. #include <stack>
  10. #include <list>
  11. #include <forward_list>
  12. #include <boost/lockfree/spsc_queue.hpp>
  13.  
  14.  
  15.  
  16. std::mutex mm;
  17.  
  18. using std::cout ; using std::cin; using std::endl;
  19. using std::string;
  20.  
  21.  
  22. template <typename T> using lockfreequeue = boost::lockfree::spsc_queue<T,boost::lockfree::capacity<100000>>;
  23.  
  24.  
  25. template <typename T> class Processable;
  26.  
  27.  
  28.  
  29. template <typename T> class Processable{
  30.  
  31.  
  32. protected:
  33.  
  34.  
  35.  
  36. public:
  37. std::thread processLoop;
  38. void myWrapper(Processable<T>*);
  39. virtual void processLoopFunction() = 0 ;
  40. Processable( ):processLoop(&Processable<T>::myWrapper,this,this){};
  41. ~Processable( ){};
  42.  
  43. };
  44.  
  45.  
  46. template <typename T> class Source ;
  47.  
  48. template <typename T> class Sink : public virtual Processable<T>{
  49.  
  50.  
  51. // virtual void processChunk(void) = 0;
  52. // virtual void isChunkReady(void) = 0;
  53. protected:
  54.  
  55. public:
  56. Source<T>* mySource;
  57. Sink():mySource(nullptr){};
  58.  
  59.  
  60. };
  61.  
  62.  
  63.  
  64. template <typename T> class Source : public virtual Processable<T>{
  65.  
  66. protected:
  67.  
  68.  
  69. public:
  70. lockfreequeue<T> myOutputDataBuffer;
  71.  
  72.  
  73. void setSink(Sink<T>*);
  74. virtual void processLoopFunction() = 0 ;
  75. };
  76.  
  77. template <typename T> class Pipe : public virtual Source<T>, public virtual Sink<T>{
  78.  
  79. public:
  80. virtual void processLoopFunction() = 0 ;
  81.  
  82. };
  83.  
  84. template <typename T> class PipeDouble: public Pipe<T>{
  85.  
  86. public:
  87. virtual void processLoopFunction() ;
  88.  
  89. };
  90.  
  91.  
  92.  
  93.  
  94.  
  95. template <typename T> class OutputSink : public Sink<T>{
  96. public:
  97.  
  98. virtual void processLoopFunction() ;
  99.  
  100. };
  101.  
  102.  
  103. template <typename T> void Source<T>::setSink(Sink<T>* pp){
  104.  
  105. pp->mySource = this ;
  106. };
  107.  
  108.  
  109. template <typename T> void Processable<T>::myWrapper(Processable<T>* pp){
  110. pp->processLoopFunction();
  111. };
  112.  
  113.  
  114.  
  115. template <typename T> class FileSource : Source<T>{
  116. std::ifstream myInputFile;
  117. public:
  118. FileSource(string filename):myInputFile(filename){};
  119. virtual void processLoopFunction() ;
  120. };
  121.  
  122. template <typename T> class NumberSource : public Source<T>{
  123.  
  124. public:
  125. NumberSource(){};
  126. virtual void processLoopFunction() ;
  127.  
  128.  
  129.  
  130. };
  131.  
  132.  
  133.  
  134. template <typename T> void OutputSink<T>::processLoopFunction(){
  135.  
  136.  
  137. T tmp;
  138. while(true){
  139. if(this->mySource!=nullptr){
  140. this->mySource->myOutputDataBuffer.pop(tmp);
  141. cout << tmp << endl;
  142. }
  143. }
  144.  
  145.  
  146.  
  147. }
  148.  
  149.  
  150. template <typename T> void FileSource<T>::processLoopFunction(){
  151.  
  152.  
  153.  
  154. while(true){
  155.  
  156.  
  157. }
  158.  
  159.  
  160.  
  161. }
  162.  
  163.  
  164.  
  165.  
  166. template <typename T> void NumberSource<T>::processLoopFunction(){
  167.  
  168. T ll = 0 ;
  169.  
  170. while(true){
  171. this->myOutputDataBuffer.push(ll);
  172. ll++;
  173. ll++;
  174.  
  175. }
  176. }
  177.  
  178.  
  179. template <typename T> void PipeDouble<T>::processLoopFunction(){
  180. T tmp;
  181.  
  182. while(true){
  183. if(this->mySource!=nullptr){
  184. this->mySource->myOutputDataBuffer.pop(tmp);
  185. tmp = tmp + tmp;
  186. this->myOutputDataBuffer.push(tmp);
  187. }
  188. }
  189.  
  190. }
  191.  
  192.  
  193.  
  194. template <typename T> class SyncDeque{
  195.  
  196. std::deque<T> myDeque;
  197. std::mutex front_mutex;
  198. std::mutex back_mutex;
  199.  
  200.  
  201.  
  202. public:
  203. SyncDeque(){};
  204. ~SyncDeque(){};
  205. T front();
  206. void pop_front();
  207. void push_back(T);
  208. bool empty();
  209. int size();
  210.  
  211.  
  212. };
  213.  
  214. template <typename T> T SyncDeque<T>::front(){
  215.  
  216. std::unique_lock<std::mutex> ul_front(front_mutex);
  217.  
  218.  
  219.  
  220.  
  221.  
  222. return myDeque.front();
  223.  
  224.  
  225.  
  226.  
  227.  
  228.  
  229. }
  230.  
  231. template <typename T> void SyncDeque<T>::push_back(T tt ){
  232.  
  233.  
  234. std::unique_lock<std::mutex> ul_front(front_mutex);
  235.  
  236.  
  237.  
  238. myDeque.push_back(tt);
  239.  
  240.  
  241.  
  242.  
  243.  
  244. }
  245. template <typename T> void SyncDeque<T>::pop_front(){
  246.  
  247. std::unique_lock<std::mutex> ul_front(front_mutex);
  248.  
  249.  
  250.  
  251.  
  252. myDeque.pop_front();
  253.  
  254.  
  255.  
  256.  
  257. }
  258.  
  259. template <typename T> bool SyncDeque<T>::empty(){
  260.  
  261.  
  262. std::unique_lock<std::mutex> ul_front(front_mutex);
  263.  
  264.  
  265. return myDeque.empty();
  266. }
  267.  
  268.  
  269.  
  270. template <typename T> int SyncDeque<T>::size(){
  271.  
  272. std::unique_lock<std::mutex> ul_front(front_mutex);
  273.  
  274. return myDeque.size();
  275.  
  276. }
  277.  
  278.  
  279. void demoFunction1( lockfreequeue<int> &tmpSyncDeque ){
  280.  
  281. int kk =0 ;
  282. while(true){
  283. tmpSyncDeque.push(kk);
  284. kk++;
  285. }
  286. }
  287.  
  288.  
  289. void demoFunction2( lockfreequeue<int> &tmpSyncDeque ){
  290.  
  291. int value;
  292. while(true){
  293. if(!tmpSyncDeque.empty()){
  294. tmpSyncDeque.pop(value) ;
  295. cout << value << endl;
  296. }
  297. }
  298. }
  299.  
  300.  
  301. void print( SyncDeque<int> &tmpSyncDeque ){
  302. std::lock_guard<std::mutex> guard(mm);
  303. // for(auto ii = tmpSyncDeque.begin() ; ii!=tmpSyncDeque.end() ;ii++ ){
  304.  
  305. // cout << *ii ;
  306.  
  307. // }
  308. cout << endl;
  309.  
  310. }
  311.  
  312.  
  313.  
  314.  
  315.  
  316.  
  317. int main(int argc, char** argv){
  318.  
  319. // std::vector<int> myInputDataBufer;
  320. // myInputDataBufer.push_back();
  321. // SyncDeque<int> myDeck;
  322. // lockfreequeue<int> myspcp;
  323.  
  324.  
  325. NumberSource<long> localSource;
  326. PipeDouble<long> localPipe;
  327. localSource.setSink(&localPipe);
  328. OutputSink<long> localSink;
  329.  
  330. localPipe.setSink(&localSink);
  331.  
  332. // localSource.setSink(&localSink);
  333. localSource.processLoop.detach();
  334. localSink.processLoop.detach();
  335. localPipe.processLoop.detach();
  336.  
  337. // std::thread t1(demoFunction1,std::ref(myspcp));
  338. // std::thread t2(demoFunction2,std::ref(myspcp));
  339. // t1.detach();
  340. // t2.detach();
  341. std::this_thread::sleep_for(std::chrono::seconds(10000));
  342. return 0 ;
  343.  
  344. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement