SHARE
TWEET

Решение задачи про асинхронность с Хабра

eao197 Oct 17th, 2014 (edited) 247 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #include <iostream>
  2. #include <unordered_map>
  3. #include <string>
  4.  
  5. #include <so_5/all.hpp>
  6.  
  7. // A helpers for trace messages.
  8. class locker_t
  9. {
  10. public :
  11.         locker_t() : m_lock( m_mutex ) {}
  12.         locker_t( locker_t && o ) : m_lock( std::move( o.m_lock ) ) {}
  13.         operator bool() const { return true; }
  14. private :
  15.         std::unique_lock< std::mutex > m_lock;
  16.         static std::mutex m_mutex;
  17. };
  18.  
  19. std::mutex locker_t::m_mutex;
  20.  
  21. #define TRACE() \
  22. if( auto l = locker_t{} ) std::cout  
  23.  
  24. struct msg_value_request : public so_5::rt::message_t
  25. {
  26.         long long m_id;
  27.         const std::string m_key;
  28.         const so_5::rt::mbox_t m_reply_to;
  29.  
  30.         msg_value_request(
  31.                 long long id,
  32.                 std::string key,
  33.                 so_5::rt::mbox_t reply_to )
  34.                 :       m_id( id )
  35.                 ,       m_key( std::move( key ) )
  36.                 ,       m_reply_to( std::move( reply_to ) )
  37.         {}
  38. };
  39.  
  40. struct msg_value_response : public so_5::rt::message_t
  41. {
  42.         long long m_id;
  43.         const std::string m_key;
  44.         const std::string m_value;
  45.  
  46.         msg_value_response(
  47.                 long long id,
  48.                 std::string key,
  49.                 std::string value )
  50.                 :       m_id( id )
  51.                 ,       m_key( std::move( key ) )
  52.                 ,       m_value( std::move( value ) )
  53.         {}
  54. };
  55.  
  56. struct msg_cache_check_result : public so_5::rt::message_t
  57. {
  58.         long long m_id;
  59.         bool m_found = false;
  60.         const std::string m_value;
  61.  
  62.         msg_cache_check_result( long long id )
  63.                 :       m_id( id )
  64.         {}
  65.  
  66.         msg_cache_check_result( long long id, std::string value )
  67.                 :       m_id( id ), m_found( true ), m_value( std::move( value ) )
  68.         {}
  69. };
  70.  
  71. struct msg_update_cache : public so_5::rt::message_t
  72. {
  73.         long long m_id;
  74.         const std::string m_key;
  75.         const std::string m_value;
  76.         const so_5::rt::mbox_t m_reply_to;
  77.  
  78.         msg_update_cache(
  79.                 long long id,
  80.                 std::string key,
  81.                 std::string value,
  82.                 so_5::rt::mbox_t reply_to )
  83.                 :       m_id( id )
  84.                 ,       m_key( std::move( key ) )
  85.                 ,       m_value( std::move( value ) )
  86.                 ,       m_reply_to( std::move( reply_to ) )
  87.         {}
  88. };
  89.  
  90. struct msg_cache_updated : public so_5::rt::message_t
  91. {
  92.         long long m_id;
  93.  
  94.         msg_cache_updated( long long id ) : m_id( id ) {}
  95. };
  96.  
  97. struct msg_network_result : public so_5::rt::message_t
  98. {
  99.         long long m_id;
  100.         std::string m_value;
  101.  
  102.         msg_network_result( long long id, std::string value )
  103.                 :       m_id( id ), m_value( std::move( value ) )
  104.         {}
  105. };
  106.  
  107. struct performers_info_t
  108. {
  109.         const so_5::rt::mbox_t m_mem_cache;
  110.         const so_5::rt::mbox_t m_disk_cache;
  111.         const so_5::rt::mbox_t m_network;
  112.  
  113.         performers_info_t(
  114.                 so_5::rt::mbox_t mem_cache,
  115.                 so_5::rt::mbox_t disk_cache,
  116.                 so_5::rt::mbox_t network )
  117.                 :       m_mem_cache( std::move( mem_cache ) )
  118.                 ,       m_disk_cache( std::move( disk_cache ) )
  119.                 ,       m_network( std::move( network ) )
  120.         {}
  121. };
  122.  
  123. using performers_info_ptr_t = std::shared_ptr< performers_info_t >;
  124.  
  125. class a_request_performer_t : public so_5::rt::agent_t
  126. {
  127. public :
  128.         a_request_performer_t(
  129.                 so_5::rt::environment_t & env,
  130.                 const performers_info_ptr_t & performers,
  131.                 long long id,
  132.                 std::string key,
  133.                 so_5::rt::mbox_t reply_to )
  134.                 :       so_5::rt::agent_t( env )
  135.                 ,       m_performers( performers )
  136.                 ,       m_id( id )
  137.                 ,       m_key( std::move( key ) )
  138.                 ,       m_reply_to( std::move( reply_to ) )
  139.         {}
  140.  
  141.         virtual void
  142.         so_define_agent() override
  143.         {
  144.                 this >>= st_wait_cache_resp;
  145.  
  146.                 st_wait_cache_resp
  147.                         .event( &a_request_performer_t::evt_cache_check_result )
  148.                         .event( &a_request_performer_t::evt_timeout );
  149.  
  150.                 st_wait_network_resp
  151.                         .event( &a_request_performer_t::evt_network_result )
  152.                         .event( &a_request_performer_t::evt_timeout );
  153.  
  154.                 st_wait_cache_updates
  155.                         .event( &a_request_performer_t::evt_cache_updated );
  156.         }
  157.  
  158.         virtual void
  159.         so_evt_start() override
  160.         {
  161.                 TRACE() << "(" << m_id << ":" << m_key << ") processing started"
  162.                                 << std::endl;
  163.  
  164.                 so_5::send_delayed_to_agent< msg_timeout >( *this,
  165.                                 std::chrono::seconds( 1 ),
  166.                                 "total operation timeout" );
  167.  
  168.                 so_5::send< msg_value_request >( m_performers->m_mem_cache,
  169.                                 m_id, m_key, so_direct_mbox() );
  170.                 so_5::send< msg_value_request >( m_performers->m_disk_cache,
  171.                                 m_id, m_key, so_direct_mbox() );
  172.         }
  173.  
  174. private :
  175.         struct msg_timeout : public so_5::rt::message_t
  176.         {
  177.                 const std::string m_what;
  178.  
  179.                 msg_timeout( std::string what ) : m_what( std::move( what ) ) {}
  180.         };
  181.  
  182.         const so_5::rt::state_t st_wait_cache_resp = so_make_state();
  183.         const so_5::rt::state_t st_wait_network_resp = so_make_state();
  184.         const so_5::rt::state_t st_wait_cache_updates = so_make_state();
  185.  
  186.         const performers_info_ptr_t m_performers;
  187.  
  188.         const long long m_id;
  189.         const std::string m_key;
  190.         std::string m_value;
  191.  
  192.         const so_5::rt::mbox_t m_reply_to;
  193.  
  194.         unsigned int m_cache_checks_results = 0;
  195.         unsigned int m_cache_update_results = 0;
  196.  
  197.         static const unsigned int m_cache_count = 2;
  198.  
  199.         void
  200.         evt_cache_check_result( const msg_cache_check_result & evt )
  201.         {
  202.                 ++m_cache_checks_results;
  203.  
  204.                 TRACE() << "(" << m_id << ":" << m_key << ") cache response ("
  205.                                 << m_cache_checks_results << "/" << m_cache_count << "), found="
  206.                                 << ( evt.m_found ? "Y" : "N" ) << std::endl;
  207.  
  208.                 if( evt.m_found )
  209.                 {
  210.                         TRACE() << "(" << m_id << ":" << m_key << ") found in cache"
  211.                                         << std::endl;
  212.                         m_value = evt.m_value;
  213.  
  214.                         return_result_and_finish_work();
  215.                 }
  216.                 else
  217.                 {
  218.                         if( m_cache_checks_results == m_cache_count )
  219.                                 initiate_network_request();
  220.                 }
  221.         }
  222.  
  223.         void
  224.         evt_network_result( const msg_network_result & evt )
  225.         {
  226.                 TRACE() << "(" << m_id << ":" << m_key << ") value from network: "
  227.                                 << evt.m_value << std::endl;
  228.  
  229.                 m_value = evt.m_value;
  230.  
  231.                 this >>= st_wait_cache_updates;
  232.  
  233.                 so_5::send< msg_update_cache >( m_performers->m_mem_cache,
  234.                                 m_id, m_key, m_value, so_direct_mbox() );
  235.                 so_5::send< msg_update_cache >( m_performers->m_disk_cache,
  236.                                 m_id, m_key, m_value, so_direct_mbox() );
  237.         }
  238.  
  239.         void
  240.         evt_cache_updated( const msg_cache_updated & )
  241.         {
  242.                 ++m_cache_update_results;
  243.  
  244.                 TRACE() << "(" << m_id << ":" << m_key << ") cache updated ("
  245.                                 << m_cache_update_results << "/" << m_cache_count << ")"
  246.                                 << std::endl;
  247.  
  248.                 if( m_cache_update_results == m_cache_count )
  249.                         return_result_and_finish_work();
  250.         }
  251.  
  252.         void
  253.         evt_timeout( const msg_timeout & evt )
  254.         {
  255.                 TRACE() << "(" << m_id << ":" << m_key << ") timedout: "
  256.                                 << evt.m_what << std::endl;
  257.  
  258.                 so_deregister_agent_coop_normally();
  259.         }
  260.  
  261.         void
  262.         initiate_network_request()
  263.         {
  264.                 this >>= st_wait_network_resp;
  265.  
  266.                 so_5::send_delayed_to_agent< msg_timeout >( *this,
  267.                                 std::chrono::milliseconds( 500 ),
  268.                                 "network operations timeout" );
  269.  
  270.                 so_5::send< msg_value_request >( m_performers->m_network,
  271.                                 m_id, m_key, so_direct_mbox() );
  272.         }
  273.  
  274.         void
  275.         return_result_and_finish_work()
  276.         {
  277.                 so_5::send< msg_value_response >( m_reply_to, m_id, m_key, m_value );
  278.  
  279.                 so_deregister_agent_coop_normally();
  280.         }
  281. };
  282.  
  283. class a_cache_t : public so_5::rt::agent_t
  284. {
  285. public :
  286.         a_cache_t(
  287.                 so_5::rt::environment_t & env,
  288.                 unsigned int min_pause,
  289.                 unsigned int max_pause )
  290.                 :       so_5::rt::agent_t( env )
  291.                 ,       m_min_pause( min_pause )
  292.                 ,       m_pause_delta( max_pause - min_pause )
  293.         {}
  294.  
  295.         virtual void
  296.         so_define_agent() override
  297.         {
  298.                 so_default_state().event( [=]( const msg_value_request & evt ) {
  299.                                 auto it = m_values.find( evt.m_key );
  300.                                 auto delay = random_delay();
  301.  
  302.                                 if( it == m_values.end() )
  303.                                         so_5::send_delayed< msg_cache_check_result >(
  304.                                                         *this, evt.m_reply_to, delay, evt.m_id );
  305.                                 else
  306.                                         so_5::send_delayed< msg_cache_check_result >(
  307.                                                         *this, evt.m_reply_to, delay, evt.m_id, it->second );
  308.                         } );
  309.  
  310.                 so_default_state().event( [=]( const msg_update_cache & evt ) {
  311.                                 m_values[ evt.m_key ] = evt.m_value;
  312.  
  313.                                 so_5::send_delayed< msg_cache_updated >(
  314.                                                 *this, evt.m_reply_to, random_delay(), evt.m_id );
  315.                         } );
  316.         }
  317.  
  318. private :
  319.         const unsigned int m_min_pause;
  320.         const unsigned int m_pause_delta;
  321.  
  322.         std::unordered_map< std::string, std::string > m_values;
  323.  
  324.         std::chrono::milliseconds
  325.         random_delay() const
  326.         {
  327.                 return std::chrono::milliseconds( m_min_pause +
  328.                                 static_cast< unsigned int >( std::rand() ) % m_pause_delta );
  329.         }
  330. };
  331.  
  332. class a_network_t : public so_5::rt::agent_t
  333. {
  334. public :
  335.         a_network_t(
  336.                 so_5::rt::environment_t & env,
  337.                 unsigned int min_pause,
  338.                 unsigned int max_pause )
  339.                 :       so_5::rt::agent_t( env )
  340.                 ,       m_min_pause( min_pause )
  341.                 ,       m_pause_delta( max_pause - min_pause )
  342.         {}
  343.  
  344.         virtual void
  345.         so_define_agent() override
  346.         {
  347.                 so_default_state().event( [=]( const msg_value_request & evt ) {
  348.                                 TRACE() << "(" << evt.m_id << ":" << evt.m_key << ") will be "
  349.                                                 "requested via network" << std::endl;
  350.  
  351.                                 so_5::send_delayed< msg_network_result >(
  352.                                                 *this, evt.m_reply_to, random_delay(),
  353.                                                 evt.m_id, "<" + evt.m_key + ">" );
  354.                         } );
  355.         }
  356.  
  357. private :
  358.         const unsigned int m_min_pause;
  359.         const unsigned int m_pause_delta;
  360.  
  361.         std::chrono::milliseconds
  362.         random_delay() const
  363.         {
  364.                 return std::chrono::milliseconds( m_min_pause +
  365.                                 static_cast< unsigned int >( std::rand() ) % m_pause_delta );
  366.         }
  367. };
  368.  
  369. class request_conductor_t
  370. {
  371. public :
  372.         request_conductor_t(
  373.                 so_5::rt::environment_t & env,
  374.                 const performers_info_ptr_t & performers )
  375.                 :       m_env( env )
  376.                 ,       m_performers( performers )
  377.         {}
  378.  
  379.         void
  380.         initiate_request(
  381.                 long long id,
  382.                 const std::string & key,
  383.                 const so_5::rt::mbox_t & reply_to )
  384.         {
  385.                 auto request_performer = std::unique_ptr< so_5::rt::agent_t >(
  386.                                 new a_request_performer_t(
  387.                                                 m_env, m_performers, id, key, reply_to ) );
  388.  
  389.                 m_env.register_agent_as_coop(
  390.                                 make_coop_name( id ),
  391.                                 std::move( request_performer ),
  392.                                 so_5::disp::thread_pool::create_disp_binder(
  393.                                                 "cpu",
  394.                                                 so_5::disp::thread_pool::params_t() ) );
  395.         }
  396.  
  397.         void
  398.         cancel_request(
  399.                 long long id )
  400.         {
  401.                 m_env.deregister_coop( make_coop_name( id ),
  402.                                 so_5::rt::dereg_reason::normal );
  403.         }
  404.  
  405. private :
  406.         so_5::rt::environment_t & m_env;
  407.         const performers_info_ptr_t m_performers;
  408.  
  409.         std::string
  410.         make_coop_name( long long id )
  411.         {
  412.                 return "request_" + std::to_string( id );
  413.         }
  414. };
  415.  
  416. using request_conductor_ptr_t = std::shared_ptr< request_conductor_t >;
  417.  
  418. class a_requests_producer_t : public so_5::rt::agent_t
  419. {
  420. public :
  421.         a_requests_producer_t(
  422.                 so_5::rt::environment_t & env,
  423.                 const request_conductor_ptr_t & request_conductor )
  424.                 :       so_5::rt::agent_t( env )
  425.                 ,       m_request_conductor( request_conductor )
  426.         {}
  427.  
  428.         virtual void
  429.         so_define_agent() override
  430.         {
  431.                 so_default_state()
  432.                         .event( [=]( const msg_initiate_request & evt ) {
  433.                                         TRACE() << "initiate request for: (" << evt.m_id << ":"
  434.                                                         << evt.m_key << ")" << std::endl;
  435.  
  436.                                         m_request_conductor->initiate_request(
  437.                                                         evt.m_id, evt.m_key, so_direct_mbox() );
  438.                                 } )
  439.                         .event( [=]( const msg_cancel_request & evt ) {
  440.                                         TRACE() << "cancel request for id: " << evt.m_id << std::endl;
  441.  
  442.                                         m_request_conductor->cancel_request( evt.m_id );
  443.                                 } )
  444.                         .event( [=]( const msg_value_response & evt ) {
  445.                                         TRACE() << "response for: (" << evt.m_id << ":"
  446.                                                         << evt.m_key << ") = " << evt.m_value << std::endl;
  447.                                 } )
  448.                         .event< msg_finish >( [=] { so_environment().stop(); } );
  449.         }
  450.  
  451.         virtual void
  452.         so_evt_start() override
  453.         {
  454.                 using millisec = std::chrono::milliseconds;
  455.  
  456.                 so_5::send_delayed_to_agent< msg_initiate_request >(
  457.                                 *this, millisec( 100 ), 1, "Hello" );
  458.                 so_5::send_delayed_to_agent< msg_initiate_request >(
  459.                                 *this, millisec( 200 ), 2, "Bye" );
  460.                 so_5::send_delayed_to_agent< msg_initiate_request >(
  461.                                 *this, millisec( 300 ), 3, "Hello" );
  462.                 so_5::send_delayed_to_agent< msg_cancel_request >(
  463.                                 *this, millisec( 305 ), 3 );
  464.                 so_5::send_delayed_to_agent< msg_initiate_request >(
  465.                                 *this, millisec( 400 ), 4, "Hello" );
  466.  
  467.                 so_5::send_delayed_to_agent< msg_finish >(
  468.                                 *this, std::chrono::seconds( 3 ) );
  469.         }
  470.  
  471. private :
  472.         struct msg_initiate_request : public so_5::rt::message_t
  473.         {
  474.                 long long m_id;
  475.                 const std::string m_key;
  476.  
  477.                 msg_initiate_request( long long id, std::string key )
  478.                         :       m_id( id ), m_key( std::move( key ) )
  479.                 {}
  480.         };
  481.  
  482.         struct msg_cancel_request : public so_5::rt::message_t
  483.         {
  484.                 long long m_id;
  485.  
  486.                 msg_cancel_request( long long id )
  487.                         :       m_id( id )
  488.                 {}
  489.         };
  490.  
  491.         struct msg_finish : public so_5::rt::signal_t {};
  492.  
  493.         const request_conductor_ptr_t m_request_conductor;
  494. };
  495.  
  496. void
  497. init( so_5::rt::environment_t & env )
  498. {
  499.         so_5::disp::thread_pool::params_t pool_params;
  500.         pool_params.fifo( so_5::disp::thread_pool::fifo_t::individual );
  501.  
  502.         auto performers_coop = env.create_coop( "performers" );
  503.  
  504.         auto mem_cache = performers_coop->add_agent(
  505.                         new a_cache_t( env, 5, 150 ),
  506.                         so_5::disp::thread_pool::create_disp_binder( "cpu", pool_params ) );
  507.         auto disk_cache = performers_coop->add_agent(
  508.                         new a_cache_t( env, 50, 550 ),
  509.                         so_5::disp::thread_pool::create_disp_binder( "cpu", pool_params ) );
  510.         auto network = performers_coop->add_agent(
  511.                         new a_network_t( env, 150, 750 ),
  512.                         so_5::disp::active_obj::create_disp_binder( "net" ) );
  513.  
  514.         env.register_coop( std::move( performers_coop ) );
  515.  
  516.         auto performers_info = std::make_shared< performers_info_t >(
  517.                         mem_cache->so_direct_mbox(),
  518.                         disk_cache->so_direct_mbox(),
  519.                         network->so_direct_mbox() );
  520.  
  521.         auto conductor = std::make_shared< request_conductor_t >(
  522.                         env, performers_info );
  523.  
  524.         env.register_agent_as_coop( "initiator",
  525.                         new a_requests_producer_t( env, conductor ),
  526.                         so_5::disp::one_thread::create_disp_binder( "ui" ) );
  527. }
  528.  
  529. void
  530. init_env_params( so_5::rt::environment_params_t & params )
  531. {
  532.         params.add_named_dispatcher( "ui",
  533.                         so_5::disp::one_thread::create_disp() );
  534.  
  535.         params.add_named_dispatcher( "cpu",
  536.                         so_5::disp::thread_pool::create_disp( 3 ) );
  537.  
  538.         params.add_named_dispatcher( "net",
  539.                         so_5::disp::active_obj::create_disp() );
  540. }
  541.  
  542. int
  543. main()
  544. {
  545.         try
  546.         {
  547.                 so_5::launch( init, init_env_params );
  548.  
  549.                 return 0;
  550.         }
  551.         catch( const std::exception & x )
  552.         {
  553.                 std::cerr << "Exception: " << x.what() << std::endl;
  554.         }
  555.  
  556.         return 2;
  557. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top