Проста система демонів для Yii2

В даній статті спробую розкрити основні нюанси реалізації системи демонів для PHP і навчити консольні команди Yii2 демонизироваться.

Останні 3 роки я займаюся розробкою та розвитком досить великого корпоративного порталу для однієї групи компаній. Я, як і багато, зіткнувся з проблемою, коли рішення задачі, яку вимагає бізнес, не вкладається ні в які таймаут. Зробіть отчетик в excel на 300 тис. рядків, відправте розсилку на 1500 листів і так далі. Природно, такі завдання повинні вирішуватися фоновими завданнями, демонами і crontab-ами. У рамках статті я не буду приводити порівняння кронов і демонів, ми для вирішення подібних завдань вибрали демонів. При цьому важливою вимогою для нас стала можливість мати доступ до всього, що вже написано для бекенду, відповідно, демони мають бути продовженням фрейворка Yii2. З цієї ж причини нам не підійшли вже готові рішення типу phpDaemon.

Під катом готове рішення для реалізації демонів на Yii2, що у мене вийшло.

Тема демонів на PHP піднімається із завидною регулярністю (раз, два, три, а хлопці з badoo навіть перезапускают їх без втрати з'єднань). Можливо мій велосипед швидкий спосіб запустити демони на популярному фреймворку буде корисний.

Трохи основ

Для того, щоб процес став демоном, треба:
  1. Відв'язати скрипт від консолі і стандартних потоків введення-виводу;
  2. Загорнути виконання основного коду в нескінченний цикл;
  3. Реалізувати механізми контролю над процесом.
Отвязываемся від консолі
Для початку закриваємо стандартні потоки STDIN, STOUT, STDERR. Але PHP без них не може, тому перший відкритий потік він зробить стандартним, так що відкриємо їх в /dev/null.

if (is_resource(STDIN)) {
fclose(STDIN);
$stdIn = fopen('/dev/null', 'r');
}
if (is_resource(STDOUT)) {
fclose(STDOUT);
$stdOut = fopen('/dev/null', 'ab');
}
if (is_resource(STDERR)) {
fclose(STDERR);
$stdErr = fopen('/dev/null', 'ab');
}

Далі форкаем процес і робимо форк основним процесом. Процес донор — завершуємо.
$pid = pcntl_fork();
if ($pid == -1) {
$this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() rise error');
} elseif ($pid) {
$this->halt(self::EXIT_CODE_NORMAL);
} else {
posix_setsid();
}

Нескінченний цикл і контроль
Я думаю, з циклом все зрозуміло. А ось необхідні механізми контролю варто розглянути докладніше.

Фіксація вже запущених процесів
Тут все просто — після запуску демон кладе у файл зі своєю назвою свій PID, а при завершенні своєї роботи цей файл зносить.

Обробка POSIX сигналів
Демон повинен коректно обробляти сигнали від операційної системи, тобто при отриманні сигналу SIGTERM повинен плавно завершувати свою роботу. Досягається це кількома речами: перше, визначаємо функцію, яка буде обробляти отримані сигнали:

pcntl_signal(SIGTERM, ['MyClassName', 'mySignalHandlerFunction']);

Друге, у функцію обробки сигналів ставимо присвоєння деякого статичного властивості класу значення true.
static function signalHandler($signo, $pid = null, $status = null)
{
self::$stopFlag = true;
}

Ну і третє, наш нескінченний цикл тепер повинен бути не такий вже нескінченний:
while (!self::$stopFlag) {
pcntl_signal_dispatch();
}

Особливості обробки сигналів в різних версіях PHPPHP < 5.3.0 для розподілу сигналів використовувалася спеціальна директива declare(ticks = N). Де тік — це подія, яка трапляється кожні N низькорівневих операцій, виконаних парсером всередині блоку declare. Розподіл сигналів здійснювалося у відповідності з налаштуванням. Дуже маленьке значення призводило до провалу в продуктивності, а занадто велике — до несвоєчасної обробці сигналів.

В PHP >= 5.3.0 з'явилася функція pcntl_signal_dispatch(), яку можна викликати для ручного розподілу сигналів, що ми і робимо після кожної ітерації.
Ну і нарешті, в PHP 7.1 стане доступно асинхронне розподіл сигналів, що дозволить майже миттєво отримувати сигнали без оверхеда і ручного виклику функцій.

Тепер при отриманні команди операційної системи скрипт спокійно завершить поточну ітерацію і вийде з циклу.

Контроль за витоками пам'яті
На жаль, якщо демон довго працює без перезапуску — у нього починає текти пам'ять. Інтенсивність витоку залежить від того, які функції ви використовуєте. З нашої практики — найбільш сильно «текли» демони, які працюють з віддаленими SOAP-сервісами через стандартний клас SoapClient. Так що за цим потрібно стежити і періодично їх перезапускати. Доповнимо наш цикл умовою контролю за витоками:

while (!self::$stopFlag) {
if (memory_get_usage() > $this->memoryLimit) {
break;
}
pcntl_signal_dispatch();
}

Де ж код для Yii?

Вихідні матеріали викладені на Github — yii2-daemon, пакет також доступне для установки через composer.

Пакет складається всього з 2-х абстрактних класів — базовий клас DaemonController і клас WatcherDaemonController.

DaemonController
<?php

namespace vyants\daemon;

use yii\base\NotSupportedException;
use yii\console\Controller;
use yii\helpers\Console;

/**
* Class DaemonController
*
* @author Vladimir Yants <vladimir.yants@gmail.com>
*/
abstract class DaemonController extends Controller
{

const EVENT_BEFORE_JOB = "beforeJob";
const EVENT_AFTER_JOB = "afterJob";

const EVENT_BEFORE_ITERATION = "beforeIteration";
const EVENT_AFTER_ITERATION = "afterIteration";

/**
* @var $demonize boolean Run controller as Daemon
* @default false
*/
public $demonize = false;

/**
* @var $isMultiInstance boolean allow daemon create a few instances
* @see $maxChildProcesses
* @default false
*/
public $isMultiInstance = false;

/**
* @var $parentPID int main procces pid
*/
protected $parentPID;

/**
* @var $maxChildProcesses int max daemon instances
* @default 10
*/
public $maxChildProcesses = 10;

/**
* @var $currentJobs [] array of running instances
*/
protected static $currentJobs = [];

/**
* @var int Memory limit for daemon, must bee less than php memory_limit
* @default 32M
*/
protected $memoryLimit = 268435456;

/**
* @var boolean used for soft daemon stop, set 1 to stop
*/
private static $stopFlag = false;

/**
* @var int Delay between task list checking
* @default 5sec
*/
protected $sleep = 5;

protected $pidDir = "@runtime/daemons/pid";

protected $реєстрації = "@runtime/daemons/logs";

private $stdIn;
private $stdOut;
private $stdErr;

/**
* Function Init
*/
public function init()
{
parent::init();

//set PCNTL signal handlers
pcntl_signal(SIGTERM, ['vyants\daemon\DaemonController', 'signalHandler']);
pcntl_signal(SIGINT, ['vyants\daemon\DaemonController', 'signalHandler']);
pcntl_signal(SIGHUP, ['vyants\daemon\DaemonController', 'signalHandler']);
pcntl_signal(SIGUSR1, ['vyants\daemon\DaemonController', 'signalHandler']);
pcntl_signal(SIGCHLD, ['vyants\daemon\DaemonController', 'signalHandler']);
}

function __destruct()
{
$this->deletePid();
}

/**
* Adjusting logger. You can override it.
*/
protected function initLogger()
{
$targets = \Yii::$app->getLog()->targets;
foreach ($targets as $name => $target) {
$target->enabled = false;
}
$config = [
'levels' => ['error', 'warning', 'trace', 'info'],
'logFile' => \Yii::getAlias($this->реєстрації) . DIRECTORY_SEPARATOR . $this->getProcessName() . '.log',
'logVars' => [],
'except' => [
'yii\db\*', // don't include messages from db
],
];
$targets['daemon'] = new \yii\log\FileTarget($config);
\Yii::$app->getLog()->targets = $targets;
\Yii::$app->getLog()->init();
}

/**
* Daemon worker body
*
* @param $job
*
* @return boolean
*/
abstract protected function doJob($job);

/**
* Base action, you can\t override or create another actions
* @return bool
* @throws NotSupportedException
*/
public final function actionIndex()
{
if ($this->demonize) {
$pid = pcntl_fork();
if ($pid == -1) {
$this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() rise error');
} elseif ($pid) {
$this->cleanLog();
$this->halt(self::EXIT_CODE_NORMAL);
} else {
posix_setsid();
$this->closeStdStreams();
}
}
$this->changeProcessName();

//run loop
return $this->loop();
}

/**
* Set new process name
*/
protected function changeProcessName()
{
//rename process
if (version_compare(PHP_VERSION, '5.5.0') >= 0) {
cli_set_process_title($this->getProcessName());
} else {
if (function_exists('setproctitle')) {
setproctitle($this->getProcessName());
} else {
\Yii::error('Can\'t find cli_set_process_title or setproctitle function');
}
}
}

/**
* Close std streams and to open /dev/null
* need some class properties
*/
protected function closeStdStreams()
{
if (is_resource(STDIN)) {
fclose(STDIN);
$this->stdIn = fopen('/dev/null', 'r');
}
if (is_resource(STDOUT)) {
fclose(STDOUT);
$this->stdOut = fopen('/dev/null', 'ab');
}
if (is_resource(STDERR)) {
fclose(STDERR);
$this->stdErr = fopen('/dev/null', 'ab');
}
}

/**
* Prevent non index action running
*
* @param \yii\base\Action $action
*
* @return bool
* @throws NotSupportedException
*/
public function beforeAction($action)
{
if (parent::beforeAction($action)) {
$this->initLogger();
if ($action->id != "index") {
throw new NotSupportedException(
"Only index action allowed in daemons. So, don't create and call another"
);
}

return true;
} else {
return false;
}
}

/**
* Повертає доступні опції
*
* @param string $actionID
*
* @return array
*/
public function options($actionID)
{
return [
'demonize',
'taskLimit',
'isMultiInstance',
'maxChildProcesses',
];
}

/**
* Extract current unprocessed jobs
* You can extract jobs from DB (DataProvider will be great), queue managers (ZMQ, RabbiMQ etc), redis and so on
*
* @return array jobs with
*/
abstract protected function defineJobs();

/**
* Fetch one task from array of tasks
*
* @param Array
*
* @return mixed one task
*/
protected function defineJobExtractor(&$jobs)
{
return array_shift($jobs);
}

/**
* Main Loop
*
* * @return boolean 0/1
*/
final private function loop()
{
if (file_put_contents($this->getPidPath(), getmypid())) {
$this->parentPID = getmypid();
\Yii::trace('Daemon' . $this->getProcessName() . ' pid ' . getmypid() . ' started.');
while (!self::$stopFlag) {
if (memory_get_usage() > $this->memoryLimit) {
\Yii::trace('Daemon' . $this->getProcessName() . ' pid ' .
getmypid() . 'used' . memory_get_usage() . 'bytes on' . $this->memoryLimit .
'bytes allowed by memory limit');
break;
}
$this->trigger(self::EVENT_BEFORE_ITERATION);
$this->renewConnections();
$jobs = $this->defineJobs();
if ($jobs && !empty($jobs)) {
while (($job = $this->defineJobExtractor($jobs)) !== null) {
//if no free workers, wait
if ($this->isMultiInstance && (count(static::$currentJobs) >= $this->maxChildProcesses)) {
\Yii::trace('Reached maximum number of child processes. Waiting...');
while (count(static::$currentJobs) >= $this->maxChildProcesses) {
sleep(1);
pcntl_signal_dispatch();
}
\Yii::trace(
'Free workers found:' .
($this->maxChildProcesses - count(static::$currentJobs)) .
'worker(s). Представник tasks.'
);
}
pcntl_signal_dispatch();
$this->runDaemon($job);
}
} else {
sleep($this->sleep);
}
pcntl_signal_dispatch();
$this->trigger(self::EVENT_AFTER_ITERATION);
}

\Yii::info('Daemon' . $this->getProcessName() . ' pid ' . getmypid() . ' is stopped.');

return self::EXIT_CODE_NORMAL;
}
$this->halt(self::EXIT_CODE_ERROR, 'Can\'t create pid file' . $this->getPidPath());
}

/**
* Delete pid file
*/
protected function deletePid()
{
$pid = $this->getPidPath();
if (file_exists($pid)) {
if (file_get_contents($pid == getmypid()) {
unlink($this->getPidPath());
}
} else {
\Yii::error('Can\'t unlink pid file' . $this->getPidPath());
}
}

/**
* PCNTL signals handler
*
* @param $signo
* @param null $pid
* @param null $status
*/
static final function signalHandler($signo, $pid = null, $status = null)
{
switch ($signo) {
case SIGINT:
case SIGTERM:
//shutdown
self::$stopFlag = true;
break;
case SIGHUP:
//restart, not implemented
break;
case SIGUSR1:
//user signal, not implemented
break;
case SIGCHLD:
if (!$pid) {
$pid = pcntl_waitpid(-1, $status, WNOHANG);
}
while ($pid > 0) {
if ($pid && isset(static::$currentJobs[$pid])) {
unset(static::$currentJobs[$pid]);
}
$pid = pcntl_waitpid(-1, $status, WNOHANG);
}
break;
}
}

/**
* Tasks runner
*
* @param string $job
*
* @return boolean
*/
public final function runDaemon($job)
{
if ($this->isMultiInstance) {
$this->flushLog();
$pid = pcntl_fork();
if ($pid == -1) {
return false;
} elseif ($pid !== 0) {
static::$currentJobs[$pid] = true;

return true;
} else {
$this->cleanLog();
$this->renewConnections();
//child process must die
$this->trigger(self::EVENT_BEFORE_JOB);
$status = $this->doJob($job);
$this->trigger(self::EVENT_AFTER_JOB);
if ($status) {
$this->halt(self::EXIT_CODE_NORMAL);
} else {
$this->halt(self::EXIT_CODE_ERROR, 'Child process #' . $pid ' return error.');
}
}
} else {
$this->trigger(self::EVENT_BEFORE_JOB);
$status = $this->doJob($job);
$this->trigger(self::EVENT_AFTER_JOB);

return $status;
}
}

/**
* Stop process and show or write message
*
* @param $code int -1|0|1
* @param $string message
*/
protected function halt($code, $message = null)
{
if ($message !== null) {
if ($code == self::EXIT_CODE_ERROR) {
\Yii::error($message);
if (!$this->demonize){
$message = Console::ansiFormat($message, [Console::FG_RED]);
}
} else {
\Yii::trace($message);
}
if (!$this->demonize) {
$this->writeConsole($message);
}
}
if ($code !== -1) {
\Yii::$app->end($code);
}
}

/**
* Renew connections
* @throws \yii\base\InvalidConfigException
* @throws \yii\db\Exception
*/
protected function renewConnections()
{
if (isset(\Yii::$app->db)) {
\Yii::$app->db->close();
\Yii::$app->db- > open();
}
}

/**
* Show message in console
*
* @param $message
*/
private function writeConsole($message)
{
$out = Console::ansiFormat('[' . date('d.m.Y H:i:s') . '] ', [Console::BOLD]);
$this->stdout($out . $message . "\n");
}

/**
* @param string $daemon
*
* @return string
*/
public function getPidPath($daemon = null)
{
$dir = \Yii::getAlias($this->pidDir);
if (!file_exists($dir)) {
mkdir($dir, 0744, true);
}
$daemon = $this->getProcessName($daemon);

return $dir . DIRECTORY_SEPARATOR . $daemon;
}

/**
* @return string
*/
public function getProcessName($route = null)
{
if (is_null($route)) {
$route = \Yii::$app->requestedRoute;
}

return str_replace(['/index', '/'], [", '.'], $route);
}

/**
* If in daemon mode - no write to console
*
* @param string $string
*
* @return bool|int
*/
public function stdout($string)
{
if (!$this->demonize && is_resource(STDOUT)) {
return parent::stdout($string);
} else {
return false;
}
}

/**
* If in daemon mode - no write to console
*
* @param string $string
*
* @return int
*/
public function stderr($string)
{
if (!$this->demonize && is_resource(\STDERR)) {
return parent::stderr($string);
} else {
return false;
}
}

/**
* Empty log queue
*/
protected function cleanLog()
{
\Yii::$app->log->logger->messages = [];
}

/**
* Empty log queue
*/
protected function flushLog($final = false)
{
\Yii::$app->log->logger->flush($final);
}
}


WatcherDaemonController
<?php

namespace vyants\daemon\controllers;

use vyants\daemon\DaemonController;

/**
* watcher-daemon - check another daemons and run it if need
*
* @author Vladimir Yants <vladimir.yants@gmail.com>
*/
abstract class WatcherDaemonController extends DaemonController
{
/**
* @var string subfolder in console/controllers
*/
public $daemonFolder = 'daemons';

/**
* @var boolean flag for first iteration
*/
protected $firstIteration = true;

/**
* Prevent double start
*/
public function init()
{
$pid_file = $this->getPidPath();
if (file_exists($pid_file) && ($pid = file_get_contents($pid_file)) && file_exists("/proc/$pid")) {
$this->halt(self::EXIT_CODE_ERROR, 'Another Watcher is already running.');
}
parent::init();
}

/**
* Job processing body
*
* @param $job array
*
* @return boolean
*/
protected function doJob($job)
{
$pid_file = $this->getPidPath($job['daemon']);

\Yii::trace('Check daemon' . $job['daemon']);
if (file_exists($pid_file)) {
$pid = file_get_contents($pid_file);
if ($this->isProcessRunning($pid)) {
if ($job['enabled']) {
\Yii::trace('Daemon' . $job['daemon'] . 'running and working fine');

return true;
} else {
\Yii::warning('Daemon' . $job['daemon'] . ' running, but disabled in config. Send SIGTERM signal.');
if (isset($job['hardKill']) && $job['hardKill']) {
posix_kill($pid, SIGKILL);
} else {
posix_kill($pid, SIGTERM);
}

return true;
}
}
}
\Yii::error('Daemon pid not found.');
if ($job['enabled']) {
\Yii::trace('Try to run daemon' . $job['daemon'] . '.');
$command_name = $job['daemon'] . DIRECTORY_SEPARATOR . 'index';
//flush log before fork
$this->flushLog(true);
//daemon run
$pid = pcntl_fork();
if ($pid === -1) {
$this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() returned error');
} elseif ($pid === 0) {
$this->cleanLog();
\Yii::$app->requestedRoute = $command_name;
\Yii::$app->runAction("$command_name", ['demonize' => 1]);
$this->halt(0);
} else {
$this->initLogger();
\Yii::trace('Daemon' . $job['daemon'] . 'is running with pid' . $pid);
}
}
\Yii::trace('Daemon' . $job['daemon'] . ' is checked.');

return true;
}

/**
* @return array
*/
protected function defineJobs()
{
if ($this->firstIteration) {
$this->firstIteration = false;
} else {
sleep($this->sleep);
}

return $this->getDaemonsList();
}

/**
* Daemons for check. Better way - get it from database
* [
* ['daemon' => 'one-daemon', 'enabled' => true]
* ...
* ['daemon' => 'another-daemon', 'enabled' => false]
* ]
* @return array
*/
abstract protected function getDaemonsList();

/**
* @param $pid
*
* @return bool
*/
public function isProcessRunning($pid)
{
return file_exists("/proc/$pid");
}
}


DaemonController
Це батьківський клас для всіх демонів. Ось мінімальний приклад демона:

<?php

namespace console\controllers\daemons;

use vyants\daemon\DaemonController;

class TestController extends DaemonController
{
/**
* @param $job
*
* @return boolean
*/
protected function doJob($job)
{
//do some job
return true;
}

/**
* @return array
*/
protected function defineJobs()
{
return [];
}
}

Функція defineJobs() повинна повертати набір завдань для виконання. За замовчуванням очікується, що вона буде повертати масив. Якщо ви хочете повертати, скажімо MongoCursor, потрібно ще перевизначити defineJobExtractor(). Функція doJob() повинна отримувати на вхід одну задачу для виконання, проводити з нею необхідні операції і позначати дану задачу в джерелі як відпрацьовану, щоб вона не впала вдруге.

Можливі параметри:
  • demonize — даний параметр визначає, чи буде скрипт демонизироваться або працювати як консольний додаток. Параметр доступний для завдання з консолі: --demonize=1
  • isMultiInstance і maxChildProcesses — визначає можна демону створювати свої власні копії і яке їх максимальна кількість може одночасно працювати. Дана функція дозволяє виконувати декілька завдань паралельно. doJob буде виконуватися в дочірніх процесах, а батьківський процес буде тільки делегувати завдання своїм нащадкам і стежити, щоб їх кількість не перевищувала допустимий максимум. Дуже корисно, якщо ресурсів сервера вистачає для того, щоб виконувати кілька досить тривалих за часом завдань. За замовчуванням така поведінка вимкнено. Параметри так само доступні з консолі: --isMultiInstance=1 --maxChildProcesses=2
  • memoryLimit — поріг споживання демоном пам'яті, якщо демон в режимі очікування перевищить цей поріг, то він благородно зробити сиппоку. Як вже було зазначено раніше, для зменшення розміру споживаної демонами пам'яті в результаті витоків.
  • sleep — час в секундах, на яке демон буде засипати між перевірками наявності завдань. Демон відправиться спати тільки якщо defineJob поверне empty і поки є завдання демон спати не буде. Тому defineJobs не повинна повертати статичний список завдань, інакше демон буде молотити їх без кінця і відпочинку.
  • pidDir і реєстрації — шляхи для зберігання логів і pid-ів, підтримують аліаси Yii. За замовчуванням "@runtime/daemons/pid" і "@runtime/daemons/logs"


Проблема втрати з'єднань
При здійсненні операції fork() встановлені в батьківському процесі з'єднання перестають працювати в дочірніх процесах. Для того, щоб уникнути цієї проблеми, після всіх форков проставлений виклик функції renewConnections(). За замовчуванням, дана функція переподключает тільки Yii::$app->db, але ви можете змінити її, і додати інші джерела, з'єднання з якими потрібно підтримувати в дочірніх процесах.

Логгирование
Демони перенастраивают стандартний логгер Yii під себе. Якщо вас не влаштовує поведінка за умовчанням- перевизначити функцію initLogger().

WatcherDaemonController
Це майже готовий демон-спостерігач. Завдання цього демона стежити за іншими демонами, запускати і зупиняти їх при необхідності. Він не може стартувати двічі, тому можна сміливо поставити його запуск в crontab. Для того, щоб почати його використовувати, потрібно в console/controllers створити папку daemons і покласти клас виду:

<?php

namespace console\controllers\daemons;

use vyants\daemon\controllers\WatcherDaemonController;

/**
* Class WatcherController
*/
class WatcherController extends WatcherDaemonController
{

protected $sleep = 10;

/**
* @return array
*/
protected function getDaemonsList()
{
return [
['daemon' => 'daemons/test', 'enabled' => true]
];
}
}

Потрібно визначити лише одну функцію — getDaemonsList(), яка повертає список демонів за якими потрібно стежити. У найпростішому вигляді — це зашитий в код масив, але в такому випадку ви не будете мати можливості змінювати список «на льоту». Покладіть список демонів в базу або окремий файлик і отримуйте його кожен раз звідти. В такому випадку, watcher зможе увімкнути або вимкнути демон без власного перезапуску.

Висновок

В даний момент у нас більше 50-ти демонів, які виконують різноманітні завдання, починаючи від надсилання поштових повідомлень і закінчуючи генерацією звітів та актуалізацією даних між різними системами.

Демони працюють з різними джерелами завдань — MySQL, RabbitMQ і навіть віддаленими веб-сервісами. Політ нормальний.
Безумовно, демони на php не зрівняються з тими ж демонами на Go. Але висока швидкість розробки, можливість повторного використання вже написаного коду і відсутність необхідності вчити команду іншої мови переважують мінуси.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.