Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <algorithm>
- #include <iostream>
- #include <chrono>
- #include <iterator>
- #include <list>
- #include <map>
- #include <regex>
- #include <cctype>
- #include <so_5/all.hpp>
- #include <md5_cpp11/all.hpp>
- #include <various_helpers_1/ensure.hpp>
- using namespace std;
- using namespace std::chrono;
- using namespace so_5;
- using namespace so_5::rt;
- using namespace so_5::disp;
- // Just an alias for distinguish between ordinary strings and passwords.
- using password_t = string;
- // Description of one task.
- struct task_t
- {
- string m_md5; // MD5 value to find.
- password_t m_left; // The left (inclusive) border of range.
- password_t m_right; // The right (inclusive) border of range.
- };
- //
- // Messages
- //
- // Process next task.
- using msg_process_line = tuple_as_message_t< mtag<0>, string >;
- // Successful result of task processing (worker,password).
- using msg_password_found = tuple_as_message_t< mtag<1>, string, string >;
- // Unsuccessful result of task processing (worker).
- using msg_password_not_found = tuple_as_message_t< mtag<2>, string >;
- // Request for new task from stream.
- struct msg_load_next : public so_5::rt::signal_t {};
- // Signal about end-of-stream (no more task will be received).
- struct msg_no_more_lines : public so_5::rt::signal_t {};
- //
- // Worker agent.
- //
- class worker_t : public agent_t
- {
- public :
- worker_t( context_t ctx, string self_id, mbox_t manager )
- : agent_t( ctx )
- , m_self_id( self_id )
- , m_self_mbox( so_environment().create_local_mbox( self_id ) )
- , m_manager( manager )
- {}
- virtual void so_define_agent() override
- {
- so_subscribe( m_self_mbox ).event( &worker_t::evt_process_line );
- }
- private :
- static const regex m_line_format;
- const string m_self_id;
- const mbox_t m_self_mbox;
- const mbox_t m_manager;
- void evt_process_line( const msg_process_line & evt )
- {
- const auto task = parse_task( get<0>( evt ) );
- const auto etalon = md5_cpp11::from_hex_string( task.m_md5 );
- auto pw = task.m_left;
- auto stop_point = [&] { auto r = task.m_right; next_pass(r); return r; }();
- // This form of loop is important because there could be ranges
- // like [0000,zzzz] for which stop_point will be 0000.
- // Because of that we must do at least one iteration.
- do
- {
- if( md5_cpp11::make_digest( pw ) != etalon )
- next_pass( pw );
- else
- {
- so_5::send< msg_password_found >( m_manager, m_self_id, pw );
- return;
- }
- } while( pw != stop_point );
- so_5::send< msg_password_not_found >( m_manager, m_self_id );
- }
- static task_t parse_task( const string & line )
- {
- smatch parts;
- const auto match_result = regex_match( line, parts, m_line_format );
- ensure( match_result, "task line has wrong format, line: '" + line + "'" );
- const task_t task{ parts.str(1), parts.str(2), parts.str(3) };
- ensure_valid_password_range( task.m_left, task.m_right );
- return task;
- }
- static void ensure_valid_password_range(
- const password_t & left, const password_t & right )
- {
- ensure( left.size() == right.size(),
- "range borders must be same size [" + left + ", " + right + "]" );
- auto valid_alphabet = []( const password_t & p ) {
- return end(p) == find_if_not( begin(p), end(p),
- []( password_t::value_type ch ) {
- return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z');
- } );
- };
- ensure( valid_alphabet( left ), "illegal char in left border: " + left );
- ensure( valid_alphabet( right ), "illegal char in right border: " + right );
- ensure( !(right < left),
- "invalid range (left must not be "
- "greater than right): [" + left + ", " + right + "]" );
- }
- static password_t::value_type next_byte( password_t::value_type b )
- {
- switch( b )
- {
- case 'z' : return '0';
- case '9' : return 'a';
- default : return b + 1;
- }
- }
- static void next_pass( password_t & p )
- {
- for( auto e = p.rbegin(); e != p.rend(); ++e )
- {
- *e = next_byte( *e );
- if( '0' != *e )
- break;
- }
- }
- };
- const regex worker_t::m_line_format( "^\\s*(\\S+)\\s+(\\S+)\\s+(\\S+)\\s*$" );
- //
- // Task loader agent.
- //
- class input_loader_t : public agent_t
- {
- public :
- input_loader_t( context_t ctx )
- : agent_t( ctx )
- {}
- void set_manager( const mbox_t & manager )
- {
- m_manager = manager;
- }
- virtual void so_define_agent() override
- {
- // In normal state will try to load next line.
- so_default_state().event< msg_load_next >( &input_loader_t::evt_load_next );
- // Sends NO_MORE_TASK signal immediatelly if EOF reached.
- st_eof.event< msg_load_next >( [this] { send_eof_signal(); } );
- }
- private :
- const state_t st_eof = so_make_state( "EOF" );
- mbox_t m_manager;
- void evt_load_next()
- {
- bool read_more;
- do {
- read_more = false;
- string line;
- if( getline( cin, line ) )
- {
- strip_spaces( line );
- if( line.empty() )
- read_more = true;
- else
- send< msg_process_line >( m_manager, line );
- }
- else
- {
- this >>= st_eof;
- send_eof_signal();
- }
- } while( read_more );
- }
- void send_eof_signal()
- {
- send< msg_no_more_lines >( m_manager );
- }
- static void strip_spaces( string & line )
- {
- auto space = []( string::value_type c ) { return isspace( c ); };
- auto s = find_if_not( line.begin(), line.end(), space );
- auto e = find_if_not( line.rbegin(), string::reverse_iterator(s), space ).base();
- line = string( s, e );
- }
- };
- //
- // manager_t
- //
- class manager_t : public agent_t
- {
- public :
- manager_t( context_t ctx, mbox_t input_loader, size_t worker_count )
- : agent_t( ctx )
- , m_input_loader( input_loader )
- , m_worker_count( worker_count )
- , m_worker_disp(
- thread_pool::create_private_disp(
- so_environment(),
- worker_count ) )
- {}
- virtual void so_define_agent() override
- {
- so_subscribe_self()
- .event( &manager_t::evt_next_line_received )
- .event< msg_no_more_lines >( &manager_t::evt_no_more_lines )
- .event( &manager_t::evt_password_found )
- .event( &manager_t::evt_password_not_found )
- .event( &manager_t::evt_worker_coop_started )
- .event( &manager_t::evt_worker_coop_destroyed );
- }
- virtual void so_evt_start() override
- {
- for( size_t i = 0; i != m_worker_count; ++i )
- create_new_worker();
- }
- private :
- const mbox_t m_input_loader;
- const size_t m_worker_count;
- size_t m_worker_ordinal = { 0 };
- // Dedicated thread pool for workers.
- thread_pool::private_dispatcher_handle_t m_worker_disp;
- // List of free workers.
- // Items from this list will be extracted when next_task_received
- // arrived and placed back after getting task processing result.
- list< string > m_free_workers;
- // Map of scheduled tasks (worker_id -> line).
- map< string, string > m_scheduled_lines;
- void evt_next_line_received( const msg_process_line & evt )
- {
- auto worker_id = m_free_workers.front();
- m_free_workers.pop_front();
- // We must know what task is scheduled to the worker.
- const auto & line = get<0>( evt );
- m_scheduled_lines[ worker_id ] = line;
- cout << "*** " << line << " scheduled to " << worker_id << endl;
- send< msg_process_line >(
- so_environment().create_local_mbox( worker_id ),
- line );
- }
- void evt_no_more_lines()
- {
- if( m_scheduled_lines.empty() )
- // No more tasks to wait. Work can be finished.
- so_deregister_agent_coop_normally();
- }
- void evt_password_found( const msg_password_found & evt )
- {
- handle_worker_result( get<0>(evt), get<1>(evt) );
- }
- void evt_password_not_found( const msg_password_not_found & evt )
- {
- handle_worker_result( get<0>(evt), "NOT FOUND" );
- }
- void evt_worker_coop_started( const msg_coop_registered & evt )
- {
- // New worker available.
- m_free_workers.push_back( evt.m_coop_name );
- // We need new task for it.
- request_next_task();
- }
- void evt_worker_coop_destroyed( const msg_coop_deregistered & evt )
- {
- auto it = m_scheduled_lines.find( evt.m_coop_name );
- if( it != m_scheduled_lines.end() )
- {
- // Cooperation destroyed during task processing!
- cerr << "TASK(" << it->second << ") STOPPED! "
- << ", deregistration reason: " << evt.m_reason.reason()
- << endl;
- m_scheduled_lines.erase( it );
- create_new_worker();
- }
- }
- void create_new_worker()
- {
- const auto worker_id = "worker-" + to_string( m_worker_ordinal++ );
- introduce_child_coop( *this,
- // Worker id will be name of the worker cooperation.
- worker_id,
- // Worker will work on thread pool.
- m_worker_disp->binder( thread_pool::params_t{} ),
- [&]( agent_coop_t & coop ) {
- // We must receive notifications about start and deregistration
- // of this cooperation.
- coop.add_reg_notificator(
- make_coop_reg_notificator( so_direct_mbox() ) );
- coop.add_dereg_notificator(
- make_coop_dereg_notificator( so_direct_mbox() ) );
- // Cooperation with agent must be deregistered in the case of
- // exception.
- coop.set_exception_reaction(
- exception_reaction_t::deregister_coop_on_exception );
- // The single agent in the cooperation.
- coop.make_agent< worker_t >( worker_id, so_direct_mbox() );
- } );
- }
- void handle_worker_result(
- const string & worker_id, const string & result )
- {
- auto it = m_scheduled_lines.find( worker_id );
- if( it != m_scheduled_lines.end() )
- {
- cout << "result(" << it->second << "): " << result << endl;
- m_scheduled_lines.erase( it );
- }
- else
- throw std::runtime_error(
- "UNEXPECTED RESPONSE! worker_id: " + worker_id +
- ", result: " );
- m_free_workers.push_back( worker_id );
- request_next_task();
- }
- void request_next_task()
- {
- send< msg_load_next >( m_input_loader );
- }
- };
- void run( environment_t & env, size_t desired_workers )
- {
- const auto actual_workers = desired_workers ? desired_workers :
- thread::hardware_concurrency();
- env.introduce_coop( [&]( agent_coop_t & coop ) {
- // Task loader must work on its own thread.
- auto input_loader = coop.make_agent_with_binder< input_loader_t >(
- one_thread::create_private_disp( env )->binder() );
- // Manager will work on the default dispatcher.
- auto manager = coop.make_agent< manager_t >(
- // Manager must be bound to input_loader.
- input_loader->so_direct_mbox(), actual_workers );
- // And input_loader must be bound to manager.
- input_loader->set_manager( manager->so_direct_mbox() );
- } );
- }
- size_t detect_workers_count( int argc, char ** argv )
- {
- if( 2 == argc )
- return stoul( argv[ 1 ] );
- else
- return 0;
- }
- double time_spent( const steady_clock::time_point s )
- {
- const auto e = steady_clock::now();
- return duration_cast< milliseconds >( e - s ).count() / 1000.0;
- }
- int main( int argc, char ** argv )
- {
- try
- {
- auto workers = detect_workers_count( argc, argv );
- const auto started_at = steady_clock::now();
- so_5::launch( [workers]( environment_t & env ) {
- run( env, workers );
- } );
- cout << "time: " << time_spent( started_at ) << "s" << endl;
- return 0;
- }
- catch( const exception & x )
- {
- cerr << "Exception: " << x.what() << endl;
- }
- return 2;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement