Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // Псевдокод event-loop-а, в котором принимаются новые подключения,
- // читаются данные, отслеживаются моменты отключения клиентов.
- void event_loop( handle_t fd, ... )
- {
- // Окружение, в рамках которого работают агенты.
- so_5::rt::environment_t & env = ...;
- // Для осуществления связи между каналами и агентами.
- std::unordered_map< handle_t, so_5::rt::mbox_ref_t > channels;
- if( <новое подключение> )
- {
- // Создаем для него агента-обработчика.
- std::unique_ptr< a_parser_t > parser{ new a_parser_t( ... ) };
- // Нам нужно запомнить mbox этого агента, чтобы
- // отсылать ему данные для разбора.
- auto mbox = parser->so_direct_mbox();
- // Агент должен быть зарегистрирован.
- env->register_agent_as_coop(
- // У кооперации должно быть уникальное имя.
- // Формируем его каким-то образом.
- create_new_coop_name(...),
- // За судьбу агента отвечает SObjectizer.
- std::move( parser ) );
- // Сохраняем связь между каналами и агентами.
- channels.emplace( fd, mbox );
- }
- else if( <готовность к приему данных> )
- {
- // Буфер для чтения.
- std::unique_ptr< msg_in_data > data{ new msg_in_data( ... ) };
- // Читаем данные + какой-то контроль ошибок I/O.
- read_data( fd, msg_in_data->m_buffer );
- if( <чтение прошло успешно> )
- // Отсылаем прочитанные данные агенту для обработки.
- channels[ fd ]->deliver_message( std::move( data ) );
- else
- {
- // Считаем, что канал закрыт.
- // Агент должен завершить свою работу.
- channels[ fd ]->deliver_signal< msg_channel_closed >();
- // Про канал нужно забыть.
- channels.erase( fd );
- }
- }
- }
- // Cообщения, необходимые для агента-парсера.
- // Сообщение об очередной порции данных.
- struct msg_in_data : public so_5::rt::message_t
- {
- std::vector< std::uint8_t > m_data;
- ... // Какой-то конструктор(ы)
- };
- // Сигнал о том, что канал закрылся удаленной строной.
- struct msg_channel_closed : public so_5::rt::signal_t {};
- //
- // Сам агент-парсер.
- //
- class a_parser_t : public so_5::rt::agent_t
- {
- public :
- // Конструктор.
- a_parser_t(
- // Это обязательный параметр. Без него никак.
- // Агент должен быть связан с тем окружением,
- // в котором он будет работать.
- so_5::rt::environment_t & env,
- // Возможно, еще какой-то набор параметров.
- ... )
- : so_5::rt::agent_t( env )
- ...
- {}
- // Подготовка агента к работе внутри SObjectizer.
- // В данном случае только подписка на сообщения,
- // которые приходят на собственный почтовый ящик агента.
- virtual void so_define_agent() override
- {
- so_subscribe( so_direct_mbox() ).event( &a_parser_t::evt_in_data );
- so_subscribe( so_direct_mbox() )
- .event( so_5::signal< msg_channel_closed >,
- &a_parser_t::evt_channel_closed );
- ... // Возможно, подписка еще на какие-то сообщения.
- }
- // Реакция на поступление входящих данных.
- void evt_in_data( const msg_in_data & evt )
- {
- // Попытка разбора.
- ...
- if( <получены данные для zip-ования> )
- // Отсылаем их следующему агенту.
- m_zipper->deliver_message( new msg_data_to_zip( ... ) );
- }
- // Реакция на закрытие канала.
- void evt_channel_closed()
- {
- // Какие-то действия (логирование, отчистка ресурсов...).
- ...
- // Мы больше не нужны, завершаем свою работу.
- so_deregister_agent_coop_normally();
- }
- private :
- ... // Какие-то атрибуты, необходимые для разбора входящих данных.
- // Почтовый ящик агента, который занимается zip-ованием.
- const so_5::rt::mbox_ref_t m_zipper;
- };
- // Сообщение о необходимости зазиповать данные.
- struct msg_data_to_zip : public so_5::rt::message_t
- {
- ... // Какие-то потроха: сами данные, информация о том,
- // куда их отсылать дальше и т.д.
- // + Конструктор(ы).
- };
- //
- // Агент для зипования данных.
- //
- class a_zipper_t : public so_5::rt::agent_t
- {
- public :
- // Опять же нужен конструктор.
- a_zipper_t( so_5::rt::environment_t & env,
- // + какие-то параметры для агента.
- ... )
- : so_5::rt::agent_t( env )
- ...
- {}
- // Настройка агента для работы внутри SObjectizer.
- virtual void so_define_agent() override
- {
- // Подписываемся на сообщение для собственного почтового ящика.
- // При этом считаем, что zip-ование -- это stateless процедура,
- // поэтому указываем, что обработчик thread-safe и он может
- // задействоваться для параллельной обработки событий.
- so_subscribe( so_direct_mbox() )
- .event( &a_zipper_t::evt_zip_data, so_5::thread_safe );
- ...
- }
- // Выполнение зипования данных и их последующая отсылка куда-то.
- void evt_zip_data( const msg_data_to_zip & evt )
- {
- ...
- }
- };
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement