Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <iostream>
- #include <unordered_map>
- #include <string>
- #include <so_5/all.hpp>
- // A helpers for trace messages.
- class locker_t
- {
- public :
- locker_t() : m_lock( m_mutex ) {}
- locker_t( locker_t && o ) : m_lock( std::move( o.m_lock ) ) {}
- operator bool() const { return true; }
- private :
- std::unique_lock< std::mutex > m_lock;
- static std::mutex m_mutex;
- };
- std::mutex locker_t::m_mutex;
- #define TRACE() \
- if( auto l = locker_t{} ) std::cout
- struct msg_value_request : public so_5::rt::message_t
- {
- long long m_id;
- const std::string m_key;
- const so_5::rt::mbox_t m_reply_to;
- msg_value_request(
- long long id,
- std::string key,
- so_5::rt::mbox_t reply_to )
- : m_id( id )
- , m_key( std::move( key ) )
- , m_reply_to( std::move( reply_to ) )
- {}
- };
- struct msg_value_response : public so_5::rt::message_t
- {
- long long m_id;
- const std::string m_key;
- const std::string m_value;
- msg_value_response(
- long long id,
- std::string key,
- std::string value )
- : m_id( id )
- , m_key( std::move( key ) )
- , m_value( std::move( value ) )
- {}
- };
- struct msg_cache_check_result : public so_5::rt::message_t
- {
- long long m_id;
- bool m_found = false;
- const std::string m_value;
- msg_cache_check_result( long long id )
- : m_id( id )
- {}
- msg_cache_check_result( long long id, std::string value )
- : m_id( id ), m_found( true ), m_value( std::move( value ) )
- {}
- };
- struct msg_update_cache : public so_5::rt::message_t
- {
- long long m_id;
- const std::string m_key;
- const std::string m_value;
- const so_5::rt::mbox_t m_reply_to;
- msg_update_cache(
- long long id,
- std::string key,
- std::string value,
- so_5::rt::mbox_t reply_to )
- : m_id( id )
- , m_key( std::move( key ) )
- , m_value( std::move( value ) )
- , m_reply_to( std::move( reply_to ) )
- {}
- };
- struct msg_cache_updated : public so_5::rt::message_t
- {
- long long m_id;
- msg_cache_updated( long long id ) : m_id( id ) {}
- };
- struct msg_network_result : public so_5::rt::message_t
- {
- long long m_id;
- std::string m_value;
- msg_network_result( long long id, std::string value )
- : m_id( id ), m_value( std::move( value ) )
- {}
- };
- struct performers_info_t
- {
- const so_5::rt::mbox_t m_mem_cache;
- const so_5::rt::mbox_t m_disk_cache;
- const so_5::rt::mbox_t m_network;
- performers_info_t(
- so_5::rt::mbox_t mem_cache,
- so_5::rt::mbox_t disk_cache,
- so_5::rt::mbox_t network )
- : m_mem_cache( std::move( mem_cache ) )
- , m_disk_cache( std::move( disk_cache ) )
- , m_network( std::move( network ) )
- {}
- };
- using performers_info_ptr_t = std::shared_ptr< performers_info_t >;
- class a_request_performer_t : public so_5::rt::agent_t
- {
- public :
- a_request_performer_t(
- so_5::rt::environment_t & env,
- const performers_info_ptr_t & performers,
- long long id,
- std::string key,
- so_5::rt::mbox_t reply_to )
- : so_5::rt::agent_t( env )
- , m_performers( performers )
- , m_id( id )
- , m_key( std::move( key ) )
- , m_reply_to( std::move( reply_to ) )
- {}
- virtual void
- so_define_agent() override
- {
- this >>= st_wait_cache_resp;
- st_wait_cache_resp
- .event( &a_request_performer_t::evt_cache_check_result )
- .event( &a_request_performer_t::evt_timeout );
- st_wait_network_resp
- .event( &a_request_performer_t::evt_network_result )
- .event( &a_request_performer_t::evt_timeout );
- st_wait_cache_updates
- .event( &a_request_performer_t::evt_cache_updated );
- }
- virtual void
- so_evt_start() override
- {
- TRACE() << "(" << m_id << ":" << m_key << ") processing started"
- << std::endl;
- so_5::send_delayed_to_agent< msg_timeout >( *this,
- std::chrono::seconds( 1 ),
- "total operation timeout" );
- so_5::send< msg_value_request >( m_performers->m_mem_cache,
- m_id, m_key, so_direct_mbox() );
- so_5::send< msg_value_request >( m_performers->m_disk_cache,
- m_id, m_key, so_direct_mbox() );
- }
- private :
- struct msg_timeout : public so_5::rt::message_t
- {
- const std::string m_what;
- msg_timeout( std::string what ) : m_what( std::move( what ) ) {}
- };
- const so_5::rt::state_t st_wait_cache_resp = so_make_state();
- const so_5::rt::state_t st_wait_network_resp = so_make_state();
- const so_5::rt::state_t st_wait_cache_updates = so_make_state();
- const performers_info_ptr_t m_performers;
- const long long m_id;
- const std::string m_key;
- std::string m_value;
- const so_5::rt::mbox_t m_reply_to;
- unsigned int m_cache_checks_results = 0;
- unsigned int m_cache_update_results = 0;
- static const unsigned int m_cache_count = 2;
- void
- evt_cache_check_result( const msg_cache_check_result & evt )
- {
- ++m_cache_checks_results;
- TRACE() << "(" << m_id << ":" << m_key << ") cache response ("
- << m_cache_checks_results << "/" << m_cache_count << "), found="
- << ( evt.m_found ? "Y" : "N" ) << std::endl;
- if( evt.m_found )
- {
- TRACE() << "(" << m_id << ":" << m_key << ") found in cache"
- << std::endl;
- m_value = evt.m_value;
- return_result_and_finish_work();
- }
- else
- {
- if( m_cache_checks_results == m_cache_count )
- initiate_network_request();
- }
- }
- void
- evt_network_result( const msg_network_result & evt )
- {
- TRACE() << "(" << m_id << ":" << m_key << ") value from network: "
- << evt.m_value << std::endl;
- m_value = evt.m_value;
- this >>= st_wait_cache_updates;
- so_5::send< msg_update_cache >( m_performers->m_mem_cache,
- m_id, m_key, m_value, so_direct_mbox() );
- so_5::send< msg_update_cache >( m_performers->m_disk_cache,
- m_id, m_key, m_value, so_direct_mbox() );
- }
- void
- evt_cache_updated( const msg_cache_updated & )
- {
- ++m_cache_update_results;
- TRACE() << "(" << m_id << ":" << m_key << ") cache updated ("
- << m_cache_update_results << "/" << m_cache_count << ")"
- << std::endl;
- if( m_cache_update_results == m_cache_count )
- return_result_and_finish_work();
- }
- void
- evt_timeout( const msg_timeout & evt )
- {
- TRACE() << "(" << m_id << ":" << m_key << ") timedout: "
- << evt.m_what << std::endl;
- so_deregister_agent_coop_normally();
- }
- void
- initiate_network_request()
- {
- this >>= st_wait_network_resp;
- so_5::send_delayed_to_agent< msg_timeout >( *this,
- std::chrono::milliseconds( 500 ),
- "network operations timeout" );
- so_5::send< msg_value_request >( m_performers->m_network,
- m_id, m_key, so_direct_mbox() );
- }
- void
- return_result_and_finish_work()
- {
- so_5::send< msg_value_response >( m_reply_to, m_id, m_key, m_value );
- so_deregister_agent_coop_normally();
- }
- };
- class a_cache_t : public so_5::rt::agent_t
- {
- public :
- a_cache_t(
- so_5::rt::environment_t & env,
- unsigned int min_pause,
- unsigned int max_pause )
- : so_5::rt::agent_t( env )
- , m_min_pause( min_pause )
- , m_pause_delta( max_pause - min_pause )
- {}
- virtual void
- so_define_agent() override
- {
- so_default_state().event( [=]( const msg_value_request & evt ) {
- auto it = m_values.find( evt.m_key );
- auto delay = random_delay();
- if( it == m_values.end() )
- so_5::send_delayed< msg_cache_check_result >(
- *this, evt.m_reply_to, delay, evt.m_id );
- else
- so_5::send_delayed< msg_cache_check_result >(
- *this, evt.m_reply_to, delay, evt.m_id, it->second );
- } );
- so_default_state().event( [=]( const msg_update_cache & evt ) {
- m_values[ evt.m_key ] = evt.m_value;
- so_5::send_delayed< msg_cache_updated >(
- *this, evt.m_reply_to, random_delay(), evt.m_id );
- } );
- }
- private :
- const unsigned int m_min_pause;
- const unsigned int m_pause_delta;
- std::unordered_map< std::string, std::string > m_values;
- std::chrono::milliseconds
- random_delay() const
- {
- return std::chrono::milliseconds( m_min_pause +
- static_cast< unsigned int >( std::rand() ) % m_pause_delta );
- }
- };
- class a_network_t : public so_5::rt::agent_t
- {
- public :
- a_network_t(
- so_5::rt::environment_t & env,
- unsigned int min_pause,
- unsigned int max_pause )
- : so_5::rt::agent_t( env )
- , m_min_pause( min_pause )
- , m_pause_delta( max_pause - min_pause )
- {}
- virtual void
- so_define_agent() override
- {
- so_default_state().event( [=]( const msg_value_request & evt ) {
- TRACE() << "(" << evt.m_id << ":" << evt.m_key << ") will be "
- "requested via network" << std::endl;
- so_5::send_delayed< msg_network_result >(
- *this, evt.m_reply_to, random_delay(),
- evt.m_id, "<" + evt.m_key + ">" );
- } );
- }
- private :
- const unsigned int m_min_pause;
- const unsigned int m_pause_delta;
- std::chrono::milliseconds
- random_delay() const
- {
- return std::chrono::milliseconds( m_min_pause +
- static_cast< unsigned int >( std::rand() ) % m_pause_delta );
- }
- };
- class request_conductor_t
- {
- public :
- request_conductor_t(
- so_5::rt::environment_t & env,
- const performers_info_ptr_t & performers )
- : m_env( env )
- , m_performers( performers )
- {}
- void
- initiate_request(
- long long id,
- const std::string & key,
- const so_5::rt::mbox_t & reply_to )
- {
- auto request_performer = std::unique_ptr< so_5::rt::agent_t >(
- new a_request_performer_t(
- m_env, m_performers, id, key, reply_to ) );
- m_env.register_agent_as_coop(
- make_coop_name( id ),
- std::move( request_performer ),
- so_5::disp::thread_pool::create_disp_binder(
- "cpu",
- so_5::disp::thread_pool::params_t() ) );
- }
- void
- cancel_request(
- long long id )
- {
- m_env.deregister_coop( make_coop_name( id ),
- so_5::rt::dereg_reason::normal );
- }
- private :
- so_5::rt::environment_t & m_env;
- const performers_info_ptr_t m_performers;
- std::string
- make_coop_name( long long id )
- {
- return "request_" + std::to_string( id );
- }
- };
- using request_conductor_ptr_t = std::shared_ptr< request_conductor_t >;
- class a_requests_producer_t : public so_5::rt::agent_t
- {
- public :
- a_requests_producer_t(
- so_5::rt::environment_t & env,
- const request_conductor_ptr_t & request_conductor )
- : so_5::rt::agent_t( env )
- , m_request_conductor( request_conductor )
- {}
- virtual void
- so_define_agent() override
- {
- so_default_state()
- .event( [=]( const msg_initiate_request & evt ) {
- TRACE() << "initiate request for: (" << evt.m_id << ":"
- << evt.m_key << ")" << std::endl;
- m_request_conductor->initiate_request(
- evt.m_id, evt.m_key, so_direct_mbox() );
- } )
- .event( [=]( const msg_cancel_request & evt ) {
- TRACE() << "cancel request for id: " << evt.m_id << std::endl;
- m_request_conductor->cancel_request( evt.m_id );
- } )
- .event( [=]( const msg_value_response & evt ) {
- TRACE() << "response for: (" << evt.m_id << ":"
- << evt.m_key << ") = " << evt.m_value << std::endl;
- } )
- .event< msg_finish >( [=] { so_environment().stop(); } );
- }
- virtual void
- so_evt_start() override
- {
- using millisec = std::chrono::milliseconds;
- so_5::send_delayed_to_agent< msg_initiate_request >(
- *this, millisec( 100 ), 1, "Hello" );
- so_5::send_delayed_to_agent< msg_initiate_request >(
- *this, millisec( 200 ), 2, "Bye" );
- so_5::send_delayed_to_agent< msg_initiate_request >(
- *this, millisec( 300 ), 3, "Hello" );
- so_5::send_delayed_to_agent< msg_cancel_request >(
- *this, millisec( 305 ), 3 );
- so_5::send_delayed_to_agent< msg_initiate_request >(
- *this, millisec( 400 ), 4, "Hello" );
- so_5::send_delayed_to_agent< msg_finish >(
- *this, std::chrono::seconds( 3 ) );
- }
- private :
- struct msg_initiate_request : public so_5::rt::message_t
- {
- long long m_id;
- const std::string m_key;
- msg_initiate_request( long long id, std::string key )
- : m_id( id ), m_key( std::move( key ) )
- {}
- };
- struct msg_cancel_request : public so_5::rt::message_t
- {
- long long m_id;
- msg_cancel_request( long long id )
- : m_id( id )
- {}
- };
- struct msg_finish : public so_5::rt::signal_t {};
- const request_conductor_ptr_t m_request_conductor;
- };
- void
- init( so_5::rt::environment_t & env )
- {
- so_5::disp::thread_pool::params_t pool_params;
- pool_params.fifo( so_5::disp::thread_pool::fifo_t::individual );
- auto performers_coop = env.create_coop( "performers" );
- auto mem_cache = performers_coop->add_agent(
- new a_cache_t( env, 5, 150 ),
- so_5::disp::thread_pool::create_disp_binder( "cpu", pool_params ) );
- auto disk_cache = performers_coop->add_agent(
- new a_cache_t( env, 50, 550 ),
- so_5::disp::thread_pool::create_disp_binder( "cpu", pool_params ) );
- auto network = performers_coop->add_agent(
- new a_network_t( env, 150, 750 ),
- so_5::disp::active_obj::create_disp_binder( "net" ) );
- env.register_coop( std::move( performers_coop ) );
- auto performers_info = std::make_shared< performers_info_t >(
- mem_cache->so_direct_mbox(),
- disk_cache->so_direct_mbox(),
- network->so_direct_mbox() );
- auto conductor = std::make_shared< request_conductor_t >(
- env, performers_info );
- env.register_agent_as_coop( "initiator",
- new a_requests_producer_t( env, conductor ),
- so_5::disp::one_thread::create_disp_binder( "ui" ) );
- }
- void
- init_env_params( so_5::rt::environment_params_t & params )
- {
- params.add_named_dispatcher( "ui",
- so_5::disp::one_thread::create_disp() );
- params.add_named_dispatcher( "cpu",
- so_5::disp::thread_pool::create_disp( 3 ) );
- params.add_named_dispatcher( "net",
- so_5::disp::active_obj::create_disp() );
- }
- int
- main()
- {
- try
- {
- so_5::launch( init, init_env_params );
- return 0;
- }
- catch( const std::exception & x )
- {
- std::cerr << "Exception: " << x.what() << std::endl;
- }
- return 2;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement