SObjectizer: від простого до складного. Частина III

У черговій статті про SObjectizer продовжимо стежити за еволюцією простого спочатку агента, який все більше і більше ускладнюється по мірі свого розвитку. Розглянемо, як бути з відкладеними повідомленнями, в яких ми більше не зацікавлені. І скористаємося певною функціональністю ієрархічних кінцевих автоматів.

У попередній статті ми зупинилися на тому, що у нас з'явився агент email_analyzer, який можна вважати більш-менш надійно вирішальним своє завдання. Проте, він сам, послідовно, виконує три стадії перевірки email-а: спершу перевіряє заголовки, потім вміст, потім аттачи.
Швидше за все, кожна з цих операцій не буде виключно CPU-bound. Набагато ймовірніше, що вичленувавши якісь значення перевіряється фрагмента (наприклад, заголовків листа), потрібно зробити кудись запит для перевірки допустимості цього значення. Наприклад, запит до БД щоб перевірити, чи немає імені хоста-відправника в чорному списку. Поки буде виконуватися даний запит можна було б виконати ще якусь операцію, наприклад, розібрати зміст тексту листа на окремі ключові фрази, щоб їх можна було перевірити по якомусь словнику спам-маркерів. Або перевірити, чи є в аттачах архіви, та ініціювати їх перевірку антивірусом. Загалом, має сенс розпаралелити операції аналізу email-а.
Давайте спробуємо задіяти окремих агентів на кожну операцію. Тобто можна написати агентів виду:
class email_headers_checker : public agent_t {
public :
struct result { check_status status_ }; /* Повідомлення з результатом */
email_headers_checker( context_t ctx, ... /* Якісь параметри */ ) {...}
virtual void so_evt_start() override {
... /* Иницирование операцій по перевірці заголовків */
}
... /* Якісь деталі реалізації */
};
class email_body_checker : public agent_t {...};
class email_attachment_checker : public agent_t {...};

Кожен такий агент буде виконувати специфічні для своєї операції дії, а потім пошле результат email_analyzer у вигляді повідомлення. Нашому email_analyzer потрібно створити екземпляри цих агентів у себе і дочекатися від них повідомлень з результатами аналізу:
void on_load_succeed( const load_email_succeed & msg ) {
try {
auto parsed_data = parse_email( msg.content_ );
introduce_child_coop( *this,
// Агенти-checker-и будуть працювати на своєму власному
// thread-pool-диспетчера, який був створений заздалегідь
// під спеціальним ім'ям.
disp::thread_pool::create_disp_binder(
"checkers", disp::thread_pool::bind_params_t{} ),
[&]( coop_t & coop ) {
coop.make_agent< email_headers_checker >(
so_direct_mbox(), parsed_data->headers() );
coop.make_agent< email_body_checker >(
so_direct_mbox(), parsed_data->body() );
coop.make_agent< email_attach_checker >(
so_direct_mbox(), parsed_data->attachments() );
} );
}
catch( const exception & ) {...}
}

Тих, хто уважно читав попередні статті, фраза «дочекатися від них повідомлень» мала б насторожити. Чекати без обмеження часу не є добре, це прямий шлях отримати дарма що бовтається в системі і нічого не робить агента. Тому при очікуванні відповідей від checker-ів нам має сенс вчинити так само, як і при очікуванні результату IO-операції: надіслати самим собі якийсь відкладений сигнал, отримавши який ми зрозуміємо, що далі чекати безглуздо. Тобто нам треба було б написати щось на кшталт:
// Спроба представити агента email_analyzer з двома відкладеними сигналами.
class email_analyzer : public agent_t {
// Цей сигнал потрібно для того, щоб відстежувати відсутність
// відповіді від IO-агента протягом розумного часу.
struct io_agent_response_timeout : public signal_t {};
// Цей сигнал потрібно для того, щоб відстежувати відсутність
// результатів перевірки окремих частин email-а.
struct checkers_responses_timeout : public signal_t {};
...
virtual void so_evt_start() override {
... /* Відсилання запиту IO-агенту */
// І відразу ж починаємо відлік тайм-ауту для відповіді від IO-агента.
send_delayed< io_agent_response_timeout >( *this, 1500ms );
}
...
void on_load_succeed( const load_succeed & msg ) {
... /* Створення кооперацій з агентами checker-ами */
// Відразу ж починаємо відлік тайм-ауту для відповідей від агентів-checker-ів.
send_delayed< checkers_responses_timeout >( *this, 750ms );
}
...
void on_checkers_responses_timeout() {
... /* Відсилання негативної відповіді. */
}
};

Однак, пішовши по цьому шляху ми наступимо на граблі: очікуючи відповіді від checker-ів ми запросто можемо отримати відкладений сигнал io_agent_response_timeout. Адже його ніхто не відміняв. І коли це сигнал прийде, ми згенеруємо негативну відповідь з-за нібито наявного тайм-ауту вводу-виводу, якого-то і немає. Давайте спробуємо обійти ці граблі.
Часто розробники, не звиклі до асинхронного обміну повідомленнями, намагаються скасувати відкладений сигнал. Це можна зробити, якщо зберегти ідентифікатор таймера при зверненні до send_periodic:
// Спроба представити агент email_analyzer з відміною відкладеного
// сигналу io_agent_response_timeout.
class email_analyzer : public agent_t {
struct io_agent_response_timeout : public signal_t {};
...
virtual void so_evt_start() override {
... /* Відсилання запиту IO-агенту */
// Для того, щоб отримати ідентифікатор таймера використовуємо
// send_periodic замість send_delayed, але параметр period
// виставляємо в 0, що робить відсилається сигнал відкладеним,
// але не періодичним.
io_response_timer_ = send_periodic< io_agent_response_timeout >(
*this, 1500ms, 0ms );
}
...
void on_load_succeed( const load_succeed & msg ) {
// Скасовуємо відкладений сигнал.
io_response_timer_.reset();
... /* Створення кооперацій з агентами checker-ами */
// Відразу ж починаємо відлік тайм-ауту для відповідей від агентів-checker-ів.
send_delayed< checkers_responses_timeout >( *this, 750ms );
}
...
// Ідентифікатор таймера для відкладеного сигналу про тайм-аут для IO-операції.
timer_id_t io_response_timer_;
};

На жаль, цей простий спосіб не завжди працює. Проблема в тому, що відкладений сигнал може бути надісланий агенту email_analyzer буквально за мить до того, як агент email_analyzer виконає скидання таймера для цього відкладеного сигналу. Тут вже нічого не поробиш – чудеса багатопоточності, вони такі.
Агент email_analyzer може зайти в on_load_succeed на контексті своєї робочої нитки, може навіть встигнути увійти на виклик reset() для таймера… Але тут його нитка витіснять, управління отримає нитка таймера SObjectizer-а, на якій відбудеться відсилання відкладеного сигналу. Після чого управління знову отримає робоча нитка агента email_analyzer() і метод reset() для таймера зробить скасування вже відісланого сигналу. Однак, сигнал вже знаходиться в черзі повідомлень агента, звідки його вже ніхто не викине – раз вже повідомлення потрапило в чергу до агента, то вилучити його звідти не можна.
найгірше в цій ситуації те, що подібна помилка буде виникати епізодично. З-за чого зрозуміти, що саме відбувається і в чому саме помилка, буде складно. Так що потрібно пам'ятати, що скасування відкладеного повідомлення – це зовсім не гарантія того, що воно не буде відіслано.
Отже, якщо просто скасовувати відкладене повідомлення неправильно, то що ж робити?
Наприклад, можна використовувати стану агента. Коли email_analyzer чекає відповіді від IO-агента, він знаходиться в одному стані. Коли відповідь від IO-агента приходить, агент email_analyzer переходить в інший стан, в якому він буде чекати відповіді від checker-ів. Т. к. у другому стані email_analyzer на сигнал io_agent_response_timeout не підписаний, то цей сигнал буде просто проігноровано.
З введенням станів агент email_analyzer ми могли б отримати щось на кшталт:
// Спроба представити агент email_analyzer з використанням
// кількох станів.
class email_analyzer : public agent_t {
struct io_agent_response_timeout : public signal_t {};
struct checkers_responses_timeout : public signal_t {};

// Стан, в якому агент буде чекати результату IO-операції.
state_t st_wait_io{ this };
// Стан, в якому агент буде чекати відповіді від checker-ів.
state_t st_wait_checkers{ this };
...
virtual void so_define_agent() override {
// Підписуємо агента на різні події в різних станах.
// Для того, щоб це було наочно, використовуємо другу спосіб
// підписки агентів – через методи класу state_t.
st_wait_io
.event( &email_analyzer::on_load_succeed )
.event( &email_analyzer::on_load_failed )
.event< io_agent_response_timeout >( &email_analyzer::on_io_timeout );
st_wait_checkers
.event( &email_analyzer::on_header_check_result )
.event( &email_analyzer::on_body_check_result )
.event( &email_analyzer::on_attach_check_result )
.event< checkers_responses_timeout >( &email_analyzer::on_checkers_timeout );
}
...
};

Проте, в SObjectizer можна зробити ще простіше: можна призначити часовий ліміт на перебування агента в конкретному стані. Коли цей ліміт закінчиться, агент буде примусово переведено в інший стан. Тобто ми можемо написати щось на кшталт:
// Спроба представити агента email_analyzer з використанням обмеження часу
// на перебування агента в конкретному стані.
class email_analyzer : public agent_t {
state_t st_wait_io{ this };
state_t st_io_timeout{ this };

state_t st_wait_checkers{ this };
state_t st_checkers_timeout{ this };
...
virtual void so_define_agent() override {
st_wait_io
.event( &email_analyzer::on_load_succeed )
.event( &email_analyzer::on_load_failed )
// Обмежуємо час очікування.
.time_limit( 1500ms, st_io_timeout );
st_wait_checkers
.event( &email_analyzer::on_header_check_result )
.event( &email_analyzer::on_body_check_result )
.event( &email_analyzer::on_attach_check_result )
.time_limit( 750ms, st_checkers_timeout );
}
};

Але просто обмежити час перебування у певному стані недостатньо. Потрібно ще зробити якісь дії, коли цей час мине. Як це зробити?
Використовувати таку річ, як обробник входу в стан. Коли агент входить в конкретний стан, SObjectizer викликає функцію-обробник входу в цей стан, якщо користувач таку функцію призначив. Це означає, що на вхід в st_io_timeout ми можемо повісити обробник, який відсилає check_result з негативним результатом і завершує роботу агента:
st_io_timeout.on_enter( [this]{
send< check_result >( reply_to_, email_file_, check_status::check_failure );
so_deregister_agent_coop_normally();
} );

Точно такий же обробник ми повісимо і на вхід в st_checkers_timeout. А оскільки дії всередині цих обробників будуть однаковими, то ми можемо винести їх в окремий метод агента email_analyzer і вказати цей метод в якості обробника входу і для стану st_io_timeout, і для стану st_checkers_timeout:
class email_analyzer : public agent_t {
state_t st_wait_io{ this };
state_t st_io_timeout{ this };

state_t st_wait_checkers{ this };
state_t st_checkers_timeout{ this };
...
virtual void so_define_agent() override {
...
st_io_timeout
.on_enter( &email_analyzer::on_enter_timeout_state );
...
st_checkers_timeout
.on_enter( &email_analyzer::on_enter_timeout_state );
};
...
void on_enter_timeout_state() {
send< check_result >( reply_to_, email_file_, check_status::check_failure );
so_deregister_agent_coop_normally();
}
};

Але і це ще не все. Раз вже ми торкнулися теми станів агентів і їх можливостей, то можна розвинути її далі і провести рефакторинг коду email_analyzer.
Неважко помітити, що в коді дуже часто дублюється парочка дій: відсилання повідомлення check_result і дерегистрация кооперації агента. Таке дублювання не є добре, слід від нього позбутися.
По суті, робота агента email_analyzer зводиться до того, щоб у підсумку агент виявився в одному з двох станів: або все завершилося нормально і слід надіслати позитивний результат, після чого завершити свою роботу, або ж все завершилося помилкою, потрібно відіслати негативний результат і, знову таки, завершити роботу агента. Так давайте це і виразимо прямо в коді за допомогою двох станів агента: st_success і st_failure.
// Спроба представити агента email_analyzer зі спеціальними фінальними
// станами st_success і st_failure.
class email_analyzer : public agent_t {
state_t st_wait_io{ this };
state_t st_wait_checkers{ this };

state_t st_failure{ this };
state_t st_success{ this };
...
virtual void so_define_agent() override {
st_wait_io
.event( &email_analyzer::on_load_succeed )
.event( &email_analyzer::on_load_failed )
// Обмежуємо час очікування.
.time_limit( 1500ms, st_failure );
st_wait_checkers
.event( &email_analyzer::on_header_check_result )
.event( &email_analyzer::on_body_check_result )
.event( &email_analyzer::on_attach_check_result )
.time_limit( 750ms, st_failure );
st_failure
.on_enter( [this]{
send< check_result >( reply_to_, email_file_, status_ );
so_deregister_agent_coop_normally();
} );
st_success
.on_enter( [this]{
send< check_result >( reply_to_, email_file_, check_status::safe );
so_deregister_agent_coop_normally();
} );
};
...
// Новий атрибут потрібен для збереження актуального негативного результату.
check_status status_{ check_status::check_failure };
};

Це дозволить нам у коді агента просто змінювати стан для завершення роботи агента тим або іншим чином:
void on_load_failed( const load_email_failed & ) {
st_failure.activate();
}

void on_checker_result( check_status status ) {
// На першому ж невдалому результаті перериваємо свою роботу.
if( check_status::safe != status ) {
status_ = status;
st_failure.activate();
}
else {
++checks_passed_;
if( 3 == checks_passed_ )
// Всі результати отримані. Можна завершувати перевірку з
// позитивним результатом.
st_success.activate();
}
}

Але можна піти ще далі. Для станів st_failure і st_success є одна спільна дія, яку потрібно виконати при вході в будь-який з цих станів – звернення до so_deregister_agent_coop_normally(). І це не випадково, адже обидва ці стани відповідають за завершення роботи агента. А раз так, то ми можемо скористатися вкладеними станами. Тобто ми введемо стан st_finishing, для якого st_failure і st_success будуть подсостояниями. При вході в st_finishing буде викликатися so_deregister_agent_coop_normally(). А при вході в st_failure і st_success – буде тільки надіслано відповідне повідомлення.
Т. к. стану st_failure і st_success вкладені в st_finishing, то при вході в будь-який з них спочатку буде викликатися обробник входу в st_finishing, а вже потім – обробник входу в st_failure або st_success. Вийде, що ми при вході в st_finishing ми дерегистрируем агента, а слідом, при вході в st_failure або st_success, надсилаємо повідомлення check_result.
Якщо хтось із читачів відчуває себе не комфортно при згадці вкладених станів, обробників входу до стану, обмежень на час перебування в стані, то має сенс ознайомитися з однією з основоположних статей на тему ієрархічних кінцевих автоматів: David Harel, Statecharts: A visual formalism for complex systems. Science of Computer Programming. Стану агентів в SObjectizer реалізують неабияку частину описаних там можливостей.
У результаті всіх цих перетворень агент email_analyzer отримає такий вигляд.
// Сьома версія агента email_analyzer, з розпаралелюванням роботи по перевірці
// вмісту email-а і використанням вкладених станів.

class email_analyzer : public agent_t {
state_t st_wait_io{ this };
state_t st_wait_checkers{ this };

state_t st_finishing{ this };
state_t st_failure{ initial_substate_of{ st_finishing } };
state_t st_success{ substate_of{ st_finishing } };

public :
email_analyzer( context_t ctx,
string email_file,
mbox_t reply_to )
: agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to))
{}

virtual void so_define_agent() override {
st_wait_io
.event( &email_analyzer::on_load_succeed )
.event( &email_analyzer::on_load_failed )
// Призначаємо тайм-аут для очікування відповіді.
.time_limit( 1500ms, st_failure );

st_wait_checkers
.event( [this]( const email_headers_checker::result & msg ) {
on_checker_result( msg.status_ );
} )
.event( [this]( const email_body_checker::result & msg ) {
on_checker_result( msg.status_ );
} )
.event( [this]( const email_attach_checker::result & msg ) {
on_checker_result( msg.status_ );
} )
// Ще один тайм-аут для відповідей.
.time_limit( 750ms, st_failure );

// Для станів, які відповідають за завершення роботи,
// потрібно визначити тільки обробники входу.
st_finishing.on_enter( [this]{ so_deregister_agent_coop_normally(); } );
st_failure.on_enter( [this]{
send< check_result >( reply_to_, email_file_, status_ );
} );
st_success.on_enter( [this]{
send< check_result >( reply_to_, email_file_, check_status::safe );
} );
}

virtual void so_evt_start() override {
// Починаємо працювати в стан за замовчуванням, тому
// потрібно примусово перейти в потрібний стан.
st_wait_io.activate();

// При старті відразу ж відправляємо запит IO-агенту для завантаження
// вмісту email файлу.
send< load_email_request >(
so_environment().create_mbox( "io_agent" ),
email_file_,
so_direct_mbox() );
}

private :
const string email_file_;
const mbox_t reply_to_;

// Зберігаємо останній негативний результат для того, щоб надіслати
// його при вході в стан st_failure.
check_status status_{ check_status::check_failure };

int checks_passed_{};

void on_load_succeed( const load_email_succeed & msg ) {
// Змінюємо стан т. к. переходимо до наступної операції.
st_wait_checkers.activate();

try {
auto parsed_data = parse_email( msg.content_ );
introduce_child_coop( *this,
// Агенти-checker-и будуть працювати на своєму власному
// thread-pool-диспетчера, який був створений заздалегідь
// під спеціальним ім'ям.
disp::thread_pool::create_disp_binder(
"checkers", disp::thread_pool::bind_params_t{} ),
[&]( coop_t & coop ) {
coop.make_agent< email_headers_checker >(
so_direct_mbox(), parsed_data->headers() );
coop.make_agent< email_body_checker >(
so_direct_mbox(), parsed_data->body() );
coop.make_agent< email_attach_checker >(
so_direct_mbox(), parsed_data->attachments() );
} );
}
catch( const exception & ) {
st_failure.activate();
}
}

void on_load_failed( const load_email_failed & ) {
st_failure.activate();
}

void on_checker_result( check_status status ) {
// На першому ж невдалому результаті перериваємо свою роботу.
if( check_status::safe != status ) {
status_ = status;
st_failure.activate();
}
else {
++checks_passed_;
if( 3 == checks_passed_ )
// Всі результати отримані. Можна завершувати перевірку з
// позитивним результатом.
st_success.activate();
}
}
};

Ну а тепер має сенс подивитися на код отриманого агента email_analyzer і задати собі просте, але важливе питання: а воно того варте?
Очевидно, що з відповіддю на це питання все не так однозначно. Але поговорити про це ми спробуємо вже в наступній статті. В якій торкнемося теми уроків, які ми отримали після більш ніж десяти років використання SObjectizer у розробці програмних систем.
Вихідні коди до показаним у статті прикладів можна знайти на у цьому репозиторії.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.