Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <iostream>
- #include <fstream>
- #include <queue>
- #include <functional>
- #include <thread>
- #include <mutex>
- #include <memory>
- #include <deque>
- #include <stack>
- #include <list>
- #include <forward_list>
- #include <boost/lockfree/spsc_queue.hpp>
- std::mutex mm;
- using std::cout ; using std::cin; using std::endl;
- using std::string;
- template <typename T> using lockfreequeue = boost::lockfree::spsc_queue<T,boost::lockfree::capacity<100000>>;
- template <typename T> class Processable;
- template <typename T> class Processable{
- protected:
- public:
- std::thread processLoop;
- void myWrapper(Processable<T>*);
- virtual void processLoopFunction() = 0 ;
- Processable( ):processLoop(&Processable<T>::myWrapper,this,this){};
- ~Processable( ){};
- };
- template <typename T> class Source ;
- template <typename T> class Sink : public virtual Processable<T>{
- // virtual void processChunk(void) = 0;
- // virtual void isChunkReady(void) = 0;
- protected:
- public:
- Source<T>* mySource;
- Sink():mySource(nullptr){};
- };
- template <typename T> class Source : public virtual Processable<T>{
- protected:
- public:
- lockfreequeue<T> myOutputDataBuffer;
- void setSink(Sink<T>*);
- virtual void processLoopFunction() = 0 ;
- };
- template <typename T> class Pipe : public virtual Source<T>, public virtual Sink<T>{
- public:
- virtual void processLoopFunction() = 0 ;
- };
- template <typename T> class PipeDouble: public Pipe<T>{
- public:
- virtual void processLoopFunction() ;
- };
- template <typename T> class OutputSink : public Sink<T>{
- public:
- virtual void processLoopFunction() ;
- };
- template <typename T> void Source<T>::setSink(Sink<T>* pp){
- pp->mySource = this ;
- };
- template <typename T> void Processable<T>::myWrapper(Processable<T>* pp){
- pp->processLoopFunction();
- };
- template <typename T> class FileSource : Source<T>{
- std::ifstream myInputFile;
- public:
- FileSource(string filename):myInputFile(filename){};
- virtual void processLoopFunction() ;
- };
- template <typename T> class NumberSource : public Source<T>{
- public:
- NumberSource(){};
- virtual void processLoopFunction() ;
- };
- template <typename T> void OutputSink<T>::processLoopFunction(){
- T tmp;
- while(true){
- if(this->mySource!=nullptr){
- this->mySource->myOutputDataBuffer.pop(tmp);
- cout << tmp << endl;
- }
- }
- }
- template <typename T> void FileSource<T>::processLoopFunction(){
- while(true){
- }
- }
- template <typename T> void NumberSource<T>::processLoopFunction(){
- T ll = 0 ;
- while(true){
- this->myOutputDataBuffer.push(ll);
- ll++;
- ll++;
- }
- }
- template <typename T> void PipeDouble<T>::processLoopFunction(){
- T tmp;
- while(true){
- if(this->mySource!=nullptr){
- this->mySource->myOutputDataBuffer.pop(tmp);
- tmp = tmp + tmp;
- this->myOutputDataBuffer.push(tmp);
- }
- }
- }
- template <typename T> class SyncDeque{
- std::deque<T> myDeque;
- std::mutex front_mutex;
- std::mutex back_mutex;
- public:
- SyncDeque(){};
- ~SyncDeque(){};
- T front();
- void pop_front();
- void push_back(T);
- bool empty();
- int size();
- };
- template <typename T> T SyncDeque<T>::front(){
- std::unique_lock<std::mutex> ul_front(front_mutex);
- return myDeque.front();
- }
- template <typename T> void SyncDeque<T>::push_back(T tt ){
- std::unique_lock<std::mutex> ul_front(front_mutex);
- myDeque.push_back(tt);
- }
- template <typename T> void SyncDeque<T>::pop_front(){
- std::unique_lock<std::mutex> ul_front(front_mutex);
- myDeque.pop_front();
- }
- template <typename T> bool SyncDeque<T>::empty(){
- std::unique_lock<std::mutex> ul_front(front_mutex);
- return myDeque.empty();
- }
- template <typename T> int SyncDeque<T>::size(){
- std::unique_lock<std::mutex> ul_front(front_mutex);
- return myDeque.size();
- }
- void demoFunction1( lockfreequeue<int> &tmpSyncDeque ){
- int kk =0 ;
- while(true){
- tmpSyncDeque.push(kk);
- kk++;
- }
- }
- void demoFunction2( lockfreequeue<int> &tmpSyncDeque ){
- int value;
- while(true){
- if(!tmpSyncDeque.empty()){
- tmpSyncDeque.pop(value) ;
- cout << value << endl;
- }
- }
- }
- void print( SyncDeque<int> &tmpSyncDeque ){
- std::lock_guard<std::mutex> guard(mm);
- // for(auto ii = tmpSyncDeque.begin() ; ii!=tmpSyncDeque.end() ;ii++ ){
- // cout << *ii ;
- // }
- cout << endl;
- }
- int main(int argc, char** argv){
- // std::vector<int> myInputDataBufer;
- // myInputDataBufer.push_back();
- // SyncDeque<int> myDeck;
- // lockfreequeue<int> myspcp;
- NumberSource<long> localSource;
- PipeDouble<long> localPipe;
- localSource.setSink(&localPipe);
- OutputSink<long> localSink;
- localPipe.setSink(&localSink);
- // localSource.setSink(&localSink);
- localSource.processLoop.detach();
- localSink.processLoop.detach();
- localPipe.processLoop.detach();
- // std::thread t1(demoFunction1,std::ref(myspcp));
- // std::thread t2(demoFunction2,std::ref(myspcp));
- // t1.detach();
- // t2.detach();
- std::this_thread::sleep_for(std::chrono::seconds(10000));
- return 0 ;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement