Advertisement
Guest User

Untitled

a guest
Nov 28th, 2014
150
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 6.41 KB | None | 0 0
  1. /*
  2.  * File:   main.cpp
  3.  * Author: david
  4.  *
  5.  * Created on November 28, 2014, 4:16 PM
  6.  */
  7.  
  8. #include <cstdlib>
  9. #include <iostream>
  10. #include <future>
  11. #include <list>
  12. #include "pstreams/pstream.h"
  13. #include "thread_pool.hpp"
  14.  
  15. /*
  16.  *
  17.  */
  18. int main(int argc, char** argv) {
  19.  
  20.     // Variables
  21.     std::string debug = argv[1];
  22.     std::string newFile = argv[2];
  23.     std::string completePacket;
  24.     std::cout << "Filename: " << newFile << std::endl;
  25.     std::string input_line;
  26.     int totalPackets;
  27.     thread_pool tp( 8 );
  28.  
  29.    
  30.     std::list<std::future<int>> fut;
  31.    
  32.     // External command (tshark)
  33.     redi::ipstream inFile("/home/monitor/scripts/exec_tshark.sh " + newFile);
  34.  
  35.     /// Start reading from tshark output
  36.     while (getline(inFile, input_line)) {
  37.  
  38.         input_line += "\n";
  39.  
  40.         if (input_line == "\n") {
  41.  
  42.             totalPackets++;
  43.             completePacket = "";
  44.  
  45.         } else {
  46.  
  47.             completePacket += input_line;
  48.  
  49.         }
  50.  
  51.     };
  52.  
  53.     std::cout << std::endl << "Total Packets Processed: " << totalPackets << std::endl;
  54.  
  55.     inFile.clear();
  56.     inFile.close();
  57.  
  58.     /*
  59.     int file_removal_result = remove( newFile.c_str() );
  60.     if ( file_removal_result != 0) {
  61.         cerr << "Error deleting file: " << file_removal_result << endl;
  62.     } else {
  63.         cerr << "File successfully deleted" << endl;
  64.     }
  65.     */
  66.  
  67.     std::cout << "End @ " << std::endl << "==================================================" << std::endl;
  68.  
  69.     return 0;
  70.    
  71.    
  72. }
  73.  
  74.  
  75. /**
  76.  
  77. thread_pool.hpp
  78.  
  79. **/
  80.  
  81. #include <atomic>
  82. #include <deque>
  83. #include <functional>
  84. #include <future>
  85. #include <list>
  86.  
  87. class thread_pool{
  88. public:
  89.     thread_pool(unsigned int);
  90.     ~thread_pool();
  91.    
  92.     template<typename Ret, typename... Args>
  93.     std::future<Ret> async(std::function<Ret(Args...)> f, Args... args){
  94.         typedef std::function<Ret(Args...)> F;
  95.        
  96.         std::atomic<bool> *ready = new std::atomic<bool>(false);
  97.         std::promise<Ret> *p = new std::promise<Ret>;
  98.        
  99.         auto task_wrapper = [p, ready](F&& f, Args&&... args){
  100.             p->set_value(f(args...));
  101.             ready->store(true);
  102.         };
  103.        
  104.         auto ret_wrapper = [p, ready]() -> Ret{
  105.             while(!ready->load())
  106.                 std::this_thread::yield();
  107.             auto temp = p->get_future().get();
  108.            
  109.             // Clean up resources
  110.             delete p;
  111.             delete ready;
  112.             return temp;
  113.         };
  114.        
  115.         task_mutex.lock();
  116.         tasks.emplace_back(
  117.             std::async(
  118.                 std::launch::deferred,
  119.                     task_wrapper,
  120.                     std::move(f),
  121.                     std::move(args...)
  122.             )
  123.         );
  124.         task_mutex.unlock();
  125.        
  126.         return std::async(std::launch::deferred,
  127.                           ret_wrapper);
  128.     }
  129.    
  130.     template<typename Ret>
  131.     std::future<Ret> async(std::function<Ret()> f){
  132.         typedef std::function<Ret()> F;
  133.        
  134.         std::atomic<bool> *ready = new std::atomic<bool>(false);
  135.         std::promise<Ret> *p = new std::promise<Ret>;
  136.        
  137.         auto task_wrapper = [p, ready](F&& f){
  138.             p->set_value(f());
  139.             ready->store(true);
  140.         };
  141.        
  142.         auto ret_wrapper = [p, ready]() -> Ret{
  143.             while(!ready->load())
  144.                 std::this_thread::yield();
  145.             auto temp = p->get_future().get();
  146.            
  147.             // Clean up resources
  148.             delete p;
  149.             delete ready;
  150.             return temp;
  151.         };
  152.        
  153.         task_mutex.lock();
  154.         tasks.emplace_back(std::async(std::launch::deferred,
  155.                                       task_wrapper, std::move(f)));
  156.         task_mutex.unlock();
  157.        
  158.         return std::async(std::launch::deferred,
  159.                           ret_wrapper);
  160.     }
  161.    
  162.     template<typename... Args>
  163.     std::future<void> async(std::function<void(Args...)> f, Args... args){
  164.         typedef std::function<void(Args...)> F;
  165.        
  166.         std::atomic<bool> *ready = new std::atomic<bool>(false);
  167.         std::promise<void> *p = new std::promise<void>;
  168.        
  169.         auto task_wrapper = [p, ready](F&& f, Args&&... args){
  170.             f(args...);
  171.             p->set_value();
  172.             ready->store(true);
  173.         };
  174.        
  175.         auto ret_wrapper = [p, ready](){
  176.             while(!ready->load())
  177.                 std::this_thread::yield();
  178.             p->get_future().get();
  179.            
  180.             // Clean up resources
  181.             delete p;
  182.             delete ready;
  183.             return;
  184.         };
  185.        
  186.         task_mutex.lock();
  187.         tasks.emplace_back(std::async(std::launch::deferred,
  188.                                       task_wrapper, std::move(f), std::move(args...)));
  189.         task_mutex.unlock();
  190.        
  191.         return std::async(std::launch::deferred,
  192.                           ret_wrapper);
  193.     }
  194.    
  195.     std::future<void> async(std::function<void()> f){
  196.         typedef std::function<void()> F;
  197.        
  198.         std::atomic<bool> *ready = new std::atomic<bool>(false);
  199.         std::promise<void> *p = new std::promise<void>;
  200.        
  201.         auto task_wrapper = [p, ready](F&& f){
  202.             f();
  203.             p->set_value();
  204.             ready->store(true);
  205.         };
  206.        
  207.         auto ret_wrapper = [p, ready](){
  208.             while(!ready->load())
  209.                 std::this_thread::yield();
  210.             p->get_future().get();
  211.            
  212.             // Clean up resources
  213.             delete p;
  214.             delete ready;
  215.             return;
  216.         };
  217.        
  218.         task_mutex.lock();
  219.         tasks.emplace_back(std::async(std::launch::deferred,
  220.                                       task_wrapper, std::move(f)));
  221.         task_mutex.unlock();
  222.        
  223.         return std::async(std::launch::deferred,
  224.                           ret_wrapper);
  225.     }
  226.    
  227. protected:
  228.     void thread_func();
  229.    
  230.     void init_threads();
  231.    
  232. private:
  233.     bool join = false;
  234.     unsigned int num_threads;
  235.    
  236.     std::mutex task_mutex;
  237.     std::deque<std::future<void>> tasks;
  238.    
  239.     std::list<std::thread> threads;
  240. };
  241.  
  242.  
  243. //**** compiling with:
  244.  
  245. cc main.cpp -o signallingTracerV2  -std=c++11 -lpthread -lssl -lcrypto -lmongoclient -lboost_thread -lboost_filesystem -lboost_program_options -lboost_system  -lboost_regex -pthread
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement