Advertisement
Guest User

asio UDP sample

a guest
Nov 15th, 2010
3,662
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 4.26 KB | None | 0 0
  1. #include <cstdlib>
  2. #include <iostream>
  3. #include <boost/bind.hpp>
  4. #include <boost/asio.hpp>
  5. #include <boost/thread.hpp>
  6. #include <boost/date_time.hpp>
  7. #include <boost/foreach.hpp>
  8.  
  9. using boost::asio::ip::udp;
  10.  
  11. class server
  12. {
  13. public:
  14.   server(boost::asio::io_service& io_service, short port)
  15.     : io_service_(io_service),
  16.       socket_(io_service, udp::endpoint(udp::v4(), port)),
  17.       messages_sent_(0u),
  18.       should_run_(true)
  19.   {
  20.     boost::thread   t(boost::bind(&server::thread_func, this));
  21.     t_.swap(t);
  22.  
  23.     socket_.async_receive_from(
  24.         boost::asio::buffer(data_, max_length), sender_endpoint_,
  25.         boost::bind(&server::handle_receive_from, this,
  26.           boost::asio::placeholders::error,
  27.           boost::asio::placeholders::bytes_transferred));
  28.   }
  29.  
  30.   ~server() {
  31.       should_run_ = false;
  32.       t_.join();
  33.   }
  34.  
  35.   void handle_receive_from(const boost::system::error_code& error,
  36.       size_t bytes_recvd)
  37.   {
  38.     if (error) {
  39.         std::cout << "receive error!" << std::endl;
  40.         return;
  41.     }
  42.  
  43.       std::string  message(data_, bytes_recvd);
  44.  
  45.       // start receiving again
  46.       socket_.async_receive_from(
  47.           boost::asio::buffer(data_, max_length), sender_endpoint_,
  48.           boost::bind(&server::handle_receive_from, this,
  49.             boost::asio::placeholders::error,
  50.             boost::asio::placeholders::bytes_transferred));
  51.  
  52.       // store the received message in a vector
  53.       boost::lock_guard<boost::recursive_mutex>    lock(mutex_);
  54.       messages_.push_back(message);
  55.       messages_received_++;
  56.   }
  57.  
  58.   void send(udp::endpoint other_end, const std::string &message) {
  59.       unsigned int sent = socket_.send_to(
  60.                         boost::asio::buffer(message.c_str(), message.size()),
  61.                         other_end);
  62.  
  63.       if (sent < message.size()) {
  64.           std::cout << "only " << sent << " bytes sent from " << message.size()
  65.                     << std::endl;
  66.       }
  67.  
  68.       ++messages_sent_;
  69.   }
  70.  
  71.   void thread_func() {
  72.       while (should_run_) {
  73.           boost::this_thread::yield();
  74.           boost::this_thread::sleep(boost::posix_time::milliseconds(100u));
  75.  
  76.           // work on the messages received
  77.           boost::lock_guard<boost::recursive_mutex>    lock(mutex_);
  78.           BOOST_FOREACH(const std::string & str, messages_) {
  79.             boost::this_thread::sleep(boost::posix_time::milliseconds(1u));
  80.           }
  81.           messages_.clear();
  82.       }
  83.   }
  84.  
  85.  
  86.   std::vector<std::string>  messages_;
  87.   unsigned int messages_sent_;
  88.   unsigned int messages_received_;
  89.  
  90. private:
  91.   boost::asio::io_service& io_service_;
  92.   udp::socket socket_;
  93.   udp::endpoint sender_endpoint_;
  94.   enum { max_length = 1024 };
  95.   char data_[max_length];
  96.   boost::recursive_mutex  mutex_;
  97.   bool should_run_;
  98.   boost::thread t_;
  99. };
  100.  
  101. static boost::asio::io_service io_service;
  102. static bool                    should_run;
  103.  
  104. void service_thread(void) {
  105.     while (should_run) {
  106.         io_service.run();
  107.  
  108.         io_service.reset();
  109.     }
  110. }
  111.  
  112.  
  113. int main(int argc, char* argv[])
  114. {
  115.   try
  116.   {
  117.     server a(io_service, 4567);
  118.     server b(io_service, 4568);
  119.  
  120.     should_run = true;
  121.  
  122.     boost::thread   t(service_thread);
  123.  
  124.     // generate an endpoint for 'a'
  125.     udp::resolver           resolver(io_service);
  126.     udp::resolver::query    query(udp::v4(), "localhost", "4567");
  127.     udp::endpoint           endp_a = *resolver.resolve(query);
  128.  
  129.     for (unsigned int i = 0; i < 20000; ++i) {
  130.         std::stringstream   sstr;
  131.         for (unsigned int j = 0; j < 1000; ++j) {
  132.             sstr << i;
  133.         }
  134.         b.send(endp_a, sstr.str());
  135.  
  136.         if (i % 50 == 0) {
  137.             //boost::this_thread::yield();
  138.             //boost::this_thread::sleep(boost::posix_time::milliseconds(1));
  139.         }
  140.     }
  141.  
  142.     boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
  143.  
  144.     std::cout << "b sent " << b.messages_sent_ << " messages"
  145.               << std::endl;
  146.     std::cout << "a received " << a.messages_received_ << " messages"
  147.               << std::endl;
  148.  
  149.     // stop the listening thread
  150.     should_run = false;
  151.     io_service.stop();
  152.     t.join();
  153.   }
  154.   catch (std::exception& e)
  155.   {
  156.     std::cerr << "Exception: " << e.what() << "\n";
  157.   }
  158.  
  159.   return 0;
  160. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement