Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- * File: main.cpp
- * Author: david
- *
- * Created on November 28, 2014, 4:16 PM
- */
- #include <cstdlib>
- #include <iostream>
- #include <future>
- #include <list>
- #include "pstreams/pstream.h"
- #include "thread_pool.hpp"
- /*
- *
- */
- int main(int argc, char** argv) {
- // Variables
- std::string debug = argv[1];
- std::string newFile = argv[2];
- std::string completePacket;
- std::cout << "Filename: " << newFile << std::endl;
- std::string input_line;
- int totalPackets;
- thread_pool tp( 8 );
- std::list<std::future<int>> fut;
- // External command (tshark)
- redi::ipstream inFile("/home/monitor/scripts/exec_tshark.sh " + newFile);
- /// Start reading from tshark output
- while (getline(inFile, input_line)) {
- input_line += "\n";
- if (input_line == "\n") {
- totalPackets++;
- completePacket = "";
- } else {
- completePacket += input_line;
- }
- };
- std::cout << std::endl << "Total Packets Processed: " << totalPackets << std::endl;
- inFile.clear();
- inFile.close();
- /*
- int file_removal_result = remove( newFile.c_str() );
- if ( file_removal_result != 0) {
- cerr << "Error deleting file: " << file_removal_result << endl;
- } else {
- cerr << "File successfully deleted" << endl;
- }
- */
- std::cout << "End @ " << std::endl << "==================================================" << std::endl;
- return 0;
- }
- /**
- thread_pool.hpp
- **/
- #include <atomic>
- #include <deque>
- #include <functional>
- #include <future>
- #include <list>
- class thread_pool{
- public:
- thread_pool(unsigned int);
- ~thread_pool();
- template<typename Ret, typename... Args>
- std::future<Ret> async(std::function<Ret(Args...)> f, Args... args){
- typedef std::function<Ret(Args...)> F;
- std::atomic<bool> *ready = new std::atomic<bool>(false);
- std::promise<Ret> *p = new std::promise<Ret>;
- auto task_wrapper = [p, ready](F&& f, Args&&... args){
- p->set_value(f(args...));
- ready->store(true);
- };
- auto ret_wrapper = [p, ready]() -> Ret{
- while(!ready->load())
- std::this_thread::yield();
- auto temp = p->get_future().get();
- // Clean up resources
- delete p;
- delete ready;
- return temp;
- };
- task_mutex.lock();
- tasks.emplace_back(
- std::async(
- std::launch::deferred,
- task_wrapper,
- std::move(f),
- std::move(args...)
- )
- );
- task_mutex.unlock();
- return std::async(std::launch::deferred,
- ret_wrapper);
- }
- template<typename Ret>
- std::future<Ret> async(std::function<Ret()> f){
- typedef std::function<Ret()> F;
- std::atomic<bool> *ready = new std::atomic<bool>(false);
- std::promise<Ret> *p = new std::promise<Ret>;
- auto task_wrapper = [p, ready](F&& f){
- p->set_value(f());
- ready->store(true);
- };
- auto ret_wrapper = [p, ready]() -> Ret{
- while(!ready->load())
- std::this_thread::yield();
- auto temp = p->get_future().get();
- // Clean up resources
- delete p;
- delete ready;
- return temp;
- };
- task_mutex.lock();
- tasks.emplace_back(std::async(std::launch::deferred,
- task_wrapper, std::move(f)));
- task_mutex.unlock();
- return std::async(std::launch::deferred,
- ret_wrapper);
- }
- template<typename... Args>
- std::future<void> async(std::function<void(Args...)> f, Args... args){
- typedef std::function<void(Args...)> F;
- std::atomic<bool> *ready = new std::atomic<bool>(false);
- std::promise<void> *p = new std::promise<void>;
- auto task_wrapper = [p, ready](F&& f, Args&&... args){
- f(args...);
- p->set_value();
- ready->store(true);
- };
- auto ret_wrapper = [p, ready](){
- while(!ready->load())
- std::this_thread::yield();
- p->get_future().get();
- // Clean up resources
- delete p;
- delete ready;
- return;
- };
- task_mutex.lock();
- tasks.emplace_back(std::async(std::launch::deferred,
- task_wrapper, std::move(f), std::move(args...)));
- task_mutex.unlock();
- return std::async(std::launch::deferred,
- ret_wrapper);
- }
- std::future<void> async(std::function<void()> f){
- typedef std::function<void()> F;
- std::atomic<bool> *ready = new std::atomic<bool>(false);
- std::promise<void> *p = new std::promise<void>;
- auto task_wrapper = [p, ready](F&& f){
- f();
- p->set_value();
- ready->store(true);
- };
- auto ret_wrapper = [p, ready](){
- while(!ready->load())
- std::this_thread::yield();
- p->get_future().get();
- // Clean up resources
- delete p;
- delete ready;
- return;
- };
- task_mutex.lock();
- tasks.emplace_back(std::async(std::launch::deferred,
- task_wrapper, std::move(f)));
- task_mutex.unlock();
- return std::async(std::launch::deferred,
- ret_wrapper);
- }
- protected:
- void thread_func();
- void init_threads();
- private:
- bool join = false;
- unsigned int num_threads;
- std::mutex task_mutex;
- std::deque<std::future<void>> tasks;
- std::list<std::thread> threads;
- };
- //**** compiling with:
- cc main.cpp -o signallingTracerV2 -std=c++11 -lpthread -lssl -lcrypto -lmongoclient -lboost_thread -lboost_filesystem -lboost_program_options -lboost_system -lboost_regex -pthread
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement