Управління діями процесів. Не перевищення ліміту RPS (QPS) API

image
Структурно-функціональна схема модуля
Хочу розповісти про розроблений і використовується в продакшне модулі Publisher Pulsar (github), який дозволяє синхронізувати дії процесів.

Наприклад, є безліч (десятки або сотні) процесів, незалежно один від одного звертаються до API Google Analytics з одного IP.

При цьому, GA встановлений ліміт 10 queries per second з одного IP.

Якщо не регулювати звернення до API, то постійно буде повертатися помилка перевищення ліміту деякими процесами, і або вони не зможуть виконати своє завдання без даних API, або будуть знову і знову намагатися отримати дані у циклі, створюючи проблему для інших процесів, збільшуючи кількість помилок. Тобто буде хаос і відсутність прогнозованості в частині відсотка коректно одержуваних даних і відсотка помилок при зверненнях до API.



І остільки виникає завдання уникнути перевищення ліміту RPS (QPS), щоб всі процеси могли коректно отримувати дані.

Даний модуль як раз справляється з цим (можливо існують інші реалізації подібної логіки для PHP, але вони не були знайдені).

Система покликана функціонувати подібно пульсару — здійснювати регулярні («пульсуючі») розсилки.

Загальну структуру дій з використання модуля можна описати так:

1. Вказати параметри і запустити Пульсар як демон.

2. Налаштувати код процесу (Сервісу), що звертається до API (прим. — виконує будь-яку дію, яке необхідно синхронізувати), для конекту до Пульсару, щоб перш виконання дії (наприклад, здійснення запиту до API) процес звертався б до Пульсару і чекав дозволу на виконання дії. І тільки після отримання дозволу виконував би його.

В результаті Пульсар згідно налаштувань дозволяє одночасно бути передплатниками тільки [наприклад] 10 процесів (які вийшли з FIFO стека; тобто 10-ти дозволили стати передплатниками, а решта N знаходяться в ZMQ черги).

І після того, як необхідна кількість процесів стало передплатниками, їм надсилається дозвіл, після якого вони можуть зробити свою дію (наприклад, звернутися до API).

Таким чином, ліміт буде дотримуватися незалежно від кількості паралельно працюючих процесів (в межах можливостей стека ZMQ).

3. Після цього передплатник (виконавець) повинен послати в Пульсар повідомлення про виконане дії — присутні помилки чи все в порядку.

Оскільки якщо при виконанні дії присутні помилки, пов'язані з кількістю одночасно виконаних дій, то Пульсар може скоригувати свою поведінку — тимчасово, до нормалізації ситуації (зникнення помилок) зменшити число передплатників, збільшити інтервал між публікаціями (дозволами дій), або навіть на якийсь час припинити роботу (у разі помилки, що потребує перерви в діях; наприклад, перевищення добового ліміту 403 DailyLimitExceeded).

1) Налагодження та запуск Пульсара:

$pulsar = new \React\PublisherPulsar\Pulsar();

$publisherPulsarDto = new \React\PublisherPulsar\Inventory\PublisherPulsarDto();
$publisherPulsarDto->setPulsationIterationPeriod(1); // кількість секунд між публікаціями (в результаті розмір буде не меншим, ніж зазначений у цьому параметрі; і може бути більшим при певних умовах)
$publisherPulsarDto->setSubscribersPerIteration(10); // кількість передплатників, яким надсилається дозвіл на дію (у т. ч. одночасне; і це одночасність чи не одночасність залежить вже від коду процесу-виконавця/передплатника)
$publisherPulsarDto->setModuleName('react:pulsar-ga'); //довільне ім'я
$publisherPulsarDto->setReplyStackCommandName('php artisan react:pulsar-reply-stack'); // Виклик субсидіарного скрипта, що виконує роль стека для виконавців. Код цього скрипта не вимагає установки, він приведений трохи нижче. В даному випадку вказаний шлях виклику консольної команди Laravel
$publisherPulsarDto->setPerformerContainerActionMaxExecutiontime(7); // кількість секунд очікування результуючих повідомлень від виконавців для можливої корекції поведінки
$publisherPulsarDto->setLogger(\Log::getMonolog()); // щоб використовувати наявні StreamHandlers. Якщо не зробити set, то створить новий Logger з виводом інформації в STDOUT
$publisherPulsarDto->setMaxWaitReplyStackResult(7); // кількість секунд очікування підключення потрібної кількості передплатників, зазначеного у властивості subscribersPerIteration вище. Якщо за цей час потрібну кількість не підключиться до Стеку, то Пульсар запустить процес імітації підключення виконавців, щоб дібрати потрібну кількість у вигляді "фантомів" і продовжити роботу
$pulsarSocketsParams = new \React\PublisherPulsar\Inventory\PulsarSocketsParamsDto();

//можуть бути будь-які вільні порти
$pulsarSocketsParams->setReplyToReplyStackSocketAddress('tcp://127.0.0.1:6261');
$pulsarSocketsParams->setPushToReplyStackSocketAddress('tcp://127.0.0.1:6262');
$pulsarSocketsParams->setPublishSocketAddress('tcp://127.0.0.1:6263');
$pulsarSocketsParams->setPullSocketAddress('tcp://127.0.0.1:6264');
$pulsarSocketsParams->setReplyStackSocketAddress('tcp://127.0.0.1:6265');

$publisherPulsarDto->setPulsarSocketsParams($pulsarSocketsParams);
$pulsar->setPublisherPulsarDto($publisherPulsarDto);

$pulsar->manage();

Код скрипта ReplyStack:

$replyStack = new \React\PublisherPulsar\ReplyStack();
$replyStack->startCommunication();


Note: важливо, щоб Пульсар був запущений раніше процесів, що підключаються до нього, інакше процеси будуть стукати в порожнечу за адресами, які ще не пов'язані з Пульсаром, і просто зависнуть в очікуванні відповіді, який ніколи не настане.

2) Налаштування коду виконавця (підписувача):

Включаємо об'єкт Performer пакету модуля код (у вигляді властивості, якщо код процесу на ООП) процесу:

$performerDto = new \React\PublisherPulsar\Inventory\PerformerDto();
$performerDto->setModuleName("PerformerCommand"); // для розуміння в логах який тип виконавців виконує дію 

$performer = new \React\PublisherPulsar\Performer($performerDto);

$performerSocketParams = new \React\PublisherPulsar\Inventory\PerformerSocketsParamsDto();

//ці адреси повинні відповідати адресами Пульсара в рамках ZMQ-парності (Publish/Subscribe, Push/Pull, Request/Reply)
$performerSocketParams->setPublisherPulsarSocketAddress('tcp://127.0.0.1:6273');
$performerSocketParams->setPushPulsarSocketAddress('tcp://127.0.0.1:6274');
$performerSocketParams->setRequestPulsarRsSocketAddress('tcp://127.0.0.1:6275');

$performer->setSocketsParams($performerSocketParams);

$this->zmqPerformer = $performer; 

І далі в необхідному місці, перед викликом цільового дії, що вимагає синхронізації/координації, викликаємо метод, відповідальний за отримання дозволу від Пульсара:

$this->zmqPerformer->connectToPulsarAndWaitPermissiontoact();


3) Після виконання цільового дії необхідно відправити результуюче повідомлення про те, виникли помилки. Наприклад, у такому вигляді:


//має місце помилка перевищення 10 QPS
if (strpos($e->getMessage(), GaErrorResponsesConstants::USER_RATE_LIMIT_EXCEEDED) !== false) {

$actionResultWithError = new ActionResultingPushDto();

$actionResultWithError->setActionCompleteCorrectly(false); 
$actionResultWithError->setSlowDown(true);

$actionResultWithError->setErrorMessage($e->getMessage());
$actionResultWithError->setErrorReason(GaErrorResponsesConstants::USER_RATE_LIMIT_EXCEEDED);

$this->zmqPerformer->pushActionResultInfo($actionResultWithError);

// денний ліміт перевищений і необхідно на час заснути
} elseif (strpos($e->getMessage(), GaErrorResponsesConstants::DAILY_LIMIT_EXCEEDED) !== false) {

$actionResultWithError = new ActionResultingPushDto();

$actionResultWithError->setActionCompleteCorrectly(false);

$sleepForPeriod = new ErrorSleepForPeriod();
$sleepForPeriod->setSleepPeriod((60 * 60 * 1000000)); //на годину, в мікросекундах
$actionResultWithError->setSleepForPeriod($sleepForPeriod);

$actionResultWithError->setErrorMessage($e->getMessage());
$actionResultWithError->setErrorReason(GaErrorResponsesConstants::DAILY_LIMIT_EXCEEDED);

$this->zmqPerformer->pushActionResultInfo($actionResultWithError);

//все коректно
} else {

$this->zmqPerformer->pushActionResultInfoWithoutPulsarcorrectionbehavior();

}

Це актуально також для випадку, якщо до Пульсару підключена тільки частину процесів, а інша працює в довільному хаотичному вигляді. І така механіка дозволить знижувати кількість помилок, створюваних спільної активністю упорядкованих і неупорядкованих процесів.

***

При цьому, як вже було сказано, модуль можна використовувати для будь-якої періодичної передачі інформації процесам. Для цього достатньо при ініціалізації засетить свій клас, отнаследованный від PublisherToSubscribersDto, що містить логіку управління процесами, які його отримають.

Тобто при ініціалізації демона в пункті 1) додати:

$publisherToSubscribersDto = new YourNameExtendedByPublisherToSubscribersdto(); 
$publisherToSubscribersDto->setYourProperty();

$publisherPulsarDto->setPublisherToSubscribersDto($publisherToSubscribersDto);

І цей об'єкт буде передаватися процесів.

***

Основна частина коду була написана на минулій роботі в компанії Adventum в рамках вирішення комерційних завдань і публікується з її дозволу.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.