Advertisement
eao197

Решение задачи про асинхронность с Хабра

Oct 17th, 2014
375
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 13.57 KB | None | 0 0
  1. #include <iostream>
  2. #include <unordered_map>
  3. #include <string>
  4.  
  5. #include <so_5/all.hpp>
  6.  
  7. // A helpers for trace messages.
  8. class locker_t
  9. {
  10. public :
  11.     locker_t() : m_lock( m_mutex ) {}
  12.     locker_t( locker_t && o ) : m_lock( std::move( o.m_lock ) ) {}
  13.     operator bool() const { return true; }
  14. private :
  15.     std::unique_lock< std::mutex > m_lock;
  16.     static std::mutex m_mutex;
  17. };
  18.  
  19. std::mutex locker_t::m_mutex;
  20.  
  21. #define TRACE() \
  22. if( auto l = locker_t{} ) std::cout  
  23.  
  24. struct msg_value_request : public so_5::rt::message_t
  25. {
  26.     long long m_id;
  27.     const std::string m_key;
  28.     const so_5::rt::mbox_t m_reply_to;
  29.  
  30.     msg_value_request(
  31.         long long id,
  32.         std::string key,
  33.         so_5::rt::mbox_t reply_to )
  34.         :   m_id( id )
  35.         ,   m_key( std::move( key ) )
  36.         ,   m_reply_to( std::move( reply_to ) )
  37.     {}
  38. };
  39.  
  40. struct msg_value_response : public so_5::rt::message_t
  41. {
  42.     long long m_id;
  43.     const std::string m_key;
  44.     const std::string m_value;
  45.  
  46.     msg_value_response(
  47.         long long id,
  48.         std::string key,
  49.         std::string value )
  50.         :   m_id( id )
  51.         ,   m_key( std::move( key ) )
  52.         ,   m_value( std::move( value ) )
  53.     {}
  54. };
  55.  
  56. struct msg_cache_check_result : public so_5::rt::message_t
  57. {
  58.     long long m_id;
  59.     bool m_found = false;
  60.     const std::string m_value;
  61.  
  62.     msg_cache_check_result( long long id )
  63.         :   m_id( id )
  64.     {}
  65.  
  66.     msg_cache_check_result( long long id, std::string value )
  67.         :   m_id( id ), m_found( true ), m_value( std::move( value ) )
  68.     {}
  69. };
  70.  
  71. struct msg_update_cache : public so_5::rt::message_t
  72. {
  73.     long long m_id;
  74.     const std::string m_key;
  75.     const std::string m_value;
  76.     const so_5::rt::mbox_t m_reply_to;
  77.  
  78.     msg_update_cache(
  79.         long long id,
  80.         std::string key,
  81.         std::string value,
  82.         so_5::rt::mbox_t reply_to )
  83.         :   m_id( id )
  84.         ,   m_key( std::move( key ) )
  85.         ,   m_value( std::move( value ) )
  86.         ,   m_reply_to( std::move( reply_to ) )
  87.     {}
  88. };
  89.  
  90. struct msg_cache_updated : public so_5::rt::message_t
  91. {
  92.     long long m_id;
  93.  
  94.     msg_cache_updated( long long id ) : m_id( id ) {}
  95. };
  96.  
  97. struct msg_network_result : public so_5::rt::message_t
  98. {
  99.     long long m_id;
  100.     std::string m_value;
  101.  
  102.     msg_network_result( long long id, std::string value )
  103.         :   m_id( id ), m_value( std::move( value ) )
  104.     {}
  105. };
  106.  
  107. struct performers_info_t
  108. {
  109.     const so_5::rt::mbox_t m_mem_cache;
  110.     const so_5::rt::mbox_t m_disk_cache;
  111.     const so_5::rt::mbox_t m_network;
  112.  
  113.     performers_info_t(
  114.         so_5::rt::mbox_t mem_cache,
  115.         so_5::rt::mbox_t disk_cache,
  116.         so_5::rt::mbox_t network )
  117.         :   m_mem_cache( std::move( mem_cache ) )
  118.         ,   m_disk_cache( std::move( disk_cache ) )
  119.         ,   m_network( std::move( network ) )
  120.     {}
  121. };
  122.  
  123. using performers_info_ptr_t = std::shared_ptr< performers_info_t >;
  124.  
  125. class a_request_performer_t : public so_5::rt::agent_t
  126. {
  127. public :
  128.     a_request_performer_t(
  129.         so_5::rt::environment_t & env,
  130.         const performers_info_ptr_t & performers,
  131.         long long id,
  132.         std::string key,
  133.         so_5::rt::mbox_t reply_to )
  134.         :   so_5::rt::agent_t( env )
  135.         ,   m_performers( performers )
  136.         ,   m_id( id )
  137.         ,   m_key( std::move( key ) )
  138.         ,   m_reply_to( std::move( reply_to ) )
  139.     {}
  140.  
  141.     virtual void
  142.     so_define_agent() override
  143.     {
  144.         this >>= st_wait_cache_resp;
  145.  
  146.         st_wait_cache_resp
  147.             .event( &a_request_performer_t::evt_cache_check_result )
  148.             .event( &a_request_performer_t::evt_timeout );
  149.  
  150.         st_wait_network_resp
  151.             .event( &a_request_performer_t::evt_network_result )
  152.             .event( &a_request_performer_t::evt_timeout );
  153.  
  154.         st_wait_cache_updates
  155.             .event( &a_request_performer_t::evt_cache_updated );
  156.     }
  157.  
  158.     virtual void
  159.     so_evt_start() override
  160.     {
  161.         TRACE() << "(" << m_id << ":" << m_key << ") processing started"
  162.                 << std::endl;
  163.  
  164.         so_5::send_delayed_to_agent< msg_timeout >( *this,
  165.                 std::chrono::seconds( 1 ),
  166.                 "total operation timeout" );
  167.  
  168.         so_5::send< msg_value_request >( m_performers->m_mem_cache,
  169.                 m_id, m_key, so_direct_mbox() );
  170.         so_5::send< msg_value_request >( m_performers->m_disk_cache,
  171.                 m_id, m_key, so_direct_mbox() );
  172.     }
  173.  
  174. private :
  175.     struct msg_timeout : public so_5::rt::message_t
  176.     {
  177.         const std::string m_what;
  178.  
  179.         msg_timeout( std::string what ) : m_what( std::move( what ) ) {}
  180.     };
  181.  
  182.     const so_5::rt::state_t st_wait_cache_resp = so_make_state();
  183.     const so_5::rt::state_t st_wait_network_resp = so_make_state();
  184.     const so_5::rt::state_t st_wait_cache_updates = so_make_state();
  185.  
  186.     const performers_info_ptr_t m_performers;
  187.  
  188.     const long long m_id;
  189.     const std::string m_key;
  190.     std::string m_value;
  191.  
  192.     const so_5::rt::mbox_t m_reply_to;
  193.  
  194.     unsigned int m_cache_checks_results = 0;
  195.     unsigned int m_cache_update_results = 0;
  196.  
  197.     static const unsigned int m_cache_count = 2;
  198.  
  199.     void
  200.     evt_cache_check_result( const msg_cache_check_result & evt )
  201.     {
  202.         ++m_cache_checks_results;
  203.  
  204.         TRACE() << "(" << m_id << ":" << m_key << ") cache response ("
  205.                 << m_cache_checks_results << "/" << m_cache_count << "), found="
  206.                 << ( evt.m_found ? "Y" : "N" ) << std::endl;
  207.  
  208.         if( evt.m_found )
  209.         {
  210.             TRACE() << "(" << m_id << ":" << m_key << ") found in cache"
  211.                     << std::endl;
  212.             m_value = evt.m_value;
  213.  
  214.             return_result_and_finish_work();
  215.         }
  216.         else
  217.         {
  218.             if( m_cache_checks_results == m_cache_count )
  219.                 initiate_network_request();
  220.         }
  221.     }
  222.  
  223.     void
  224.     evt_network_result( const msg_network_result & evt )
  225.     {
  226.         TRACE() << "(" << m_id << ":" << m_key << ") value from network: "
  227.                 << evt.m_value << std::endl;
  228.  
  229.         m_value = evt.m_value;
  230.  
  231.         this >>= st_wait_cache_updates;
  232.  
  233.         so_5::send< msg_update_cache >( m_performers->m_mem_cache,
  234.                 m_id, m_key, m_value, so_direct_mbox() );
  235.         so_5::send< msg_update_cache >( m_performers->m_disk_cache,
  236.                 m_id, m_key, m_value, so_direct_mbox() );
  237.     }
  238.  
  239.     void
  240.     evt_cache_updated( const msg_cache_updated & )
  241.     {
  242.         ++m_cache_update_results;
  243.  
  244.         TRACE() << "(" << m_id << ":" << m_key << ") cache updated ("
  245.                 << m_cache_update_results << "/" << m_cache_count << ")"
  246.                 << std::endl;
  247.  
  248.         if( m_cache_update_results == m_cache_count )
  249.             return_result_and_finish_work();
  250.     }
  251.  
  252.     void
  253.     evt_timeout( const msg_timeout & evt )
  254.     {
  255.         TRACE() << "(" << m_id << ":" << m_key << ") timedout: "
  256.                 << evt.m_what << std::endl;
  257.  
  258.         so_deregister_agent_coop_normally();
  259.     }
  260.  
  261.     void
  262.     initiate_network_request()
  263.     {
  264.         this >>= st_wait_network_resp;
  265.  
  266.         so_5::send_delayed_to_agent< msg_timeout >( *this,
  267.                 std::chrono::milliseconds( 500 ),
  268.                 "network operations timeout" );
  269.  
  270.         so_5::send< msg_value_request >( m_performers->m_network,
  271.                 m_id, m_key, so_direct_mbox() );
  272.     }
  273.  
  274.     void
  275.     return_result_and_finish_work()
  276.     {
  277.         so_5::send< msg_value_response >( m_reply_to, m_id, m_key, m_value );
  278.  
  279.         so_deregister_agent_coop_normally();
  280.     }
  281. };
  282.  
  283. class a_cache_t : public so_5::rt::agent_t
  284. {
  285. public :
  286.     a_cache_t(
  287.         so_5::rt::environment_t & env,
  288.         unsigned int min_pause,
  289.         unsigned int max_pause )
  290.         :   so_5::rt::agent_t( env )
  291.         ,   m_min_pause( min_pause )
  292.         ,   m_pause_delta( max_pause - min_pause )
  293.     {}
  294.  
  295.     virtual void
  296.     so_define_agent() override
  297.     {
  298.         so_default_state().event( [=]( const msg_value_request & evt ) {
  299.                 auto it = m_values.find( evt.m_key );
  300.                 auto delay = random_delay();
  301.  
  302.                 if( it == m_values.end() )
  303.                     so_5::send_delayed< msg_cache_check_result >(
  304.                             *this, evt.m_reply_to, delay, evt.m_id );
  305.                 else
  306.                     so_5::send_delayed< msg_cache_check_result >(
  307.                             *this, evt.m_reply_to, delay, evt.m_id, it->second );
  308.             } );
  309.  
  310.         so_default_state().event( [=]( const msg_update_cache & evt ) {
  311.                 m_values[ evt.m_key ] = evt.m_value;
  312.  
  313.                 so_5::send_delayed< msg_cache_updated >(
  314.                         *this, evt.m_reply_to, random_delay(), evt.m_id );
  315.             } );
  316.     }
  317.  
  318. private :
  319.     const unsigned int m_min_pause;
  320.     const unsigned int m_pause_delta;
  321.  
  322.     std::unordered_map< std::string, std::string > m_values;
  323.  
  324.     std::chrono::milliseconds
  325.     random_delay() const
  326.     {
  327.         return std::chrono::milliseconds( m_min_pause +
  328.                 static_cast< unsigned int >( std::rand() ) % m_pause_delta );
  329.     }
  330. };
  331.  
  332. class a_network_t : public so_5::rt::agent_t
  333. {
  334. public :
  335.     a_network_t(
  336.         so_5::rt::environment_t & env,
  337.         unsigned int min_pause,
  338.         unsigned int max_pause )
  339.         :   so_5::rt::agent_t( env )
  340.         ,   m_min_pause( min_pause )
  341.         ,   m_pause_delta( max_pause - min_pause )
  342.     {}
  343.  
  344.     virtual void
  345.     so_define_agent() override
  346.     {
  347.         so_default_state().event( [=]( const msg_value_request & evt ) {
  348.                 TRACE() << "(" << evt.m_id << ":" << evt.m_key << ") will be "
  349.                         "requested via network" << std::endl;
  350.  
  351.                 so_5::send_delayed< msg_network_result >(
  352.                         *this, evt.m_reply_to, random_delay(),
  353.                         evt.m_id, "<" + evt.m_key + ">" );
  354.             } );
  355.     }
  356.  
  357. private :
  358.     const unsigned int m_min_pause;
  359.     const unsigned int m_pause_delta;
  360.  
  361.     std::chrono::milliseconds
  362.     random_delay() const
  363.     {
  364.         return std::chrono::milliseconds( m_min_pause +
  365.                 static_cast< unsigned int >( std::rand() ) % m_pause_delta );
  366.     }
  367. };
  368.  
  369. class request_conductor_t
  370. {
  371. public :
  372.     request_conductor_t(
  373.         so_5::rt::environment_t & env,
  374.         const performers_info_ptr_t & performers )
  375.         :   m_env( env )
  376.         ,   m_performers( performers )
  377.     {}
  378.  
  379.     void
  380.     initiate_request(
  381.         long long id,
  382.         const std::string & key,
  383.         const so_5::rt::mbox_t & reply_to )
  384.     {
  385.         auto request_performer = std::unique_ptr< so_5::rt::agent_t >(
  386.                 new a_request_performer_t(
  387.                         m_env, m_performers, id, key, reply_to ) );
  388.  
  389.         m_env.register_agent_as_coop(
  390.                 make_coop_name( id ),
  391.                 std::move( request_performer ),
  392.                 so_5::disp::thread_pool::create_disp_binder(
  393.                         "cpu",
  394.                         so_5::disp::thread_pool::params_t() ) );
  395.     }
  396.  
  397.     void
  398.     cancel_request(
  399.         long long id )
  400.     {
  401.         m_env.deregister_coop( make_coop_name( id ),
  402.                 so_5::rt::dereg_reason::normal );
  403.     }
  404.  
  405. private :
  406.     so_5::rt::environment_t & m_env;
  407.     const performers_info_ptr_t m_performers;
  408.  
  409.     std::string
  410.     make_coop_name( long long id )
  411.     {
  412.         return "request_" + std::to_string( id );
  413.     }
  414. };
  415.  
  416. using request_conductor_ptr_t = std::shared_ptr< request_conductor_t >;
  417.  
  418. class a_requests_producer_t : public so_5::rt::agent_t
  419. {
  420. public :
  421.     a_requests_producer_t(
  422.         so_5::rt::environment_t & env,
  423.         const request_conductor_ptr_t & request_conductor )
  424.         :   so_5::rt::agent_t( env )
  425.         ,   m_request_conductor( request_conductor )
  426.     {}
  427.  
  428.     virtual void
  429.     so_define_agent() override
  430.     {
  431.         so_default_state()
  432.             .event( [=]( const msg_initiate_request & evt ) {
  433.                     TRACE() << "initiate request for: (" << evt.m_id << ":"
  434.                             << evt.m_key << ")" << std::endl;
  435.  
  436.                     m_request_conductor->initiate_request(
  437.                             evt.m_id, evt.m_key, so_direct_mbox() );
  438.                 } )
  439.             .event( [=]( const msg_cancel_request & evt ) {
  440.                     TRACE() << "cancel request for id: " << evt.m_id << std::endl;
  441.  
  442.                     m_request_conductor->cancel_request( evt.m_id );
  443.                 } )
  444.             .event( [=]( const msg_value_response & evt ) {
  445.                     TRACE() << "response for: (" << evt.m_id << ":"
  446.                             << evt.m_key << ") = " << evt.m_value << std::endl;
  447.                 } )
  448.             .event< msg_finish >( [=] { so_environment().stop(); } );
  449.     }
  450.  
  451.     virtual void
  452.     so_evt_start() override
  453.     {
  454.         using millisec = std::chrono::milliseconds;
  455.  
  456.         so_5::send_delayed_to_agent< msg_initiate_request >(
  457.                 *this, millisec( 100 ), 1, "Hello" );
  458.         so_5::send_delayed_to_agent< msg_initiate_request >(
  459.                 *this, millisec( 200 ), 2, "Bye" );
  460.         so_5::send_delayed_to_agent< msg_initiate_request >(
  461.                 *this, millisec( 300 ), 3, "Hello" );
  462.         so_5::send_delayed_to_agent< msg_cancel_request >(
  463.                 *this, millisec( 305 ), 3 );
  464.         so_5::send_delayed_to_agent< msg_initiate_request >(
  465.                 *this, millisec( 400 ), 4, "Hello" );
  466.  
  467.         so_5::send_delayed_to_agent< msg_finish >(
  468.                 *this, std::chrono::seconds( 3 ) );
  469.     }
  470.  
  471. private :
  472.     struct msg_initiate_request : public so_5::rt::message_t
  473.     {
  474.         long long m_id;
  475.         const std::string m_key;
  476.  
  477.         msg_initiate_request( long long id, std::string key )
  478.             :   m_id( id ), m_key( std::move( key ) )
  479.         {}
  480.     };
  481.  
  482.     struct msg_cancel_request : public so_5::rt::message_t
  483.     {
  484.         long long m_id;
  485.  
  486.         msg_cancel_request( long long id )
  487.             :   m_id( id )
  488.         {}
  489.     };
  490.  
  491.     struct msg_finish : public so_5::rt::signal_t {};
  492.  
  493.     const request_conductor_ptr_t m_request_conductor;
  494. };
  495.  
  496. void
  497. init( so_5::rt::environment_t & env )
  498. {
  499.     so_5::disp::thread_pool::params_t pool_params;
  500.     pool_params.fifo( so_5::disp::thread_pool::fifo_t::individual );
  501.  
  502.     auto performers_coop = env.create_coop( "performers" );
  503.  
  504.     auto mem_cache = performers_coop->add_agent(
  505.             new a_cache_t( env, 5, 150 ),
  506.             so_5::disp::thread_pool::create_disp_binder( "cpu", pool_params ) );
  507.     auto disk_cache = performers_coop->add_agent(
  508.             new a_cache_t( env, 50, 550 ),
  509.             so_5::disp::thread_pool::create_disp_binder( "cpu", pool_params ) );
  510.     auto network = performers_coop->add_agent(
  511.             new a_network_t( env, 150, 750 ),
  512.             so_5::disp::active_obj::create_disp_binder( "net" ) );
  513.  
  514.     env.register_coop( std::move( performers_coop ) );
  515.  
  516.     auto performers_info = std::make_shared< performers_info_t >(
  517.             mem_cache->so_direct_mbox(),
  518.             disk_cache->so_direct_mbox(),
  519.             network->so_direct_mbox() );
  520.  
  521.     auto conductor = std::make_shared< request_conductor_t >(
  522.             env, performers_info );
  523.  
  524.     env.register_agent_as_coop( "initiator",
  525.             new a_requests_producer_t( env, conductor ),
  526.             so_5::disp::one_thread::create_disp_binder( "ui" ) );
  527. }
  528.  
  529. void
  530. init_env_params( so_5::rt::environment_params_t & params )
  531. {
  532.     params.add_named_dispatcher( "ui",
  533.             so_5::disp::one_thread::create_disp() );
  534.  
  535.     params.add_named_dispatcher( "cpu",
  536.             so_5::disp::thread_pool::create_disp( 3 ) );
  537.  
  538.     params.add_named_dispatcher( "net",
  539.             so_5::disp::active_obj::create_disp() );
  540. }
  541.  
  542. int
  543. main()
  544. {
  545.     try
  546.     {
  547.         so_5::launch( init, init_env_params );
  548.  
  549.         return 0;
  550.     }
  551.     catch( const std::exception & x )
  552.     {
  553.         std::cerr << "Exception: " << x.what() << std::endl;
  554.     }
  555.  
  556.     return 2;
  557. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement