Python реалізація парадигми event-driven з допомогою сопрограмм

Стаття про те, як за допомогоюрозширених генераторів Python зробити власну реалізацію сопрограмм, що перемикаються по отриманню подій. Простота коду отриманого модуля вас приємно здивує і прояснить нові та мало використовуються можливості мови, які можна отримати, використовуючи такі генератори. Стаття допоможе розібратися з тим, як це влаштовано в серйозних реалізаціях: asyncio, tornado, etc.

Теоретичні моменти і disclaimer

Поняття сопрограмма має дуже широке тлумачення, тому слід визначитися, якими характеристиками вони будуть мати в нашій реалізації:
  • Виконуються спільно в одному потоці;
  • Виконання може перериватися для очікування певної події;
  • Виконання може поновитися після отримання очікуваного події;
  • Може повернути результат по завершенню.
Як наслідок отримуємо: подієво-орієнтоване програмування без функцій зворотного виклику і кооперативну багатозадачність. Ефект від використання такої парадигми програмування буде істотним тільки для завдань, які реагують на нерівномірно надходять події. В першу чергу це завдання обробки I/O: мережеві сервери, інтерфейси, тощо Інший можливий варіант застосування — це завдання розрахунку стану персонажів в ігровому світі. Але категорично не підійде для завдань, які виробляють довгі розрахунки.
Слід чітко розуміти, що поки виконувана сопрограмма не перервалася на очікування події, всі інші знаходяться в стані зупинки, навіть якщо очікуваний ними подія вже відбулася.

Основа всього

В Python хорошою основою для всього цього є генератори, якщо їх правильно приготувати в прямому і переносному сенсі. Точніше розширені генератори, API яких остаточно сформувався у версії Python 3.3. У попередніх версіях не було реалізовано повернення значення (результату) по завершенню роботи генератора і не було зручного механізму виклику одного генератора з іншого. Тим не менш, реалізації сопрограмм були і раніше, але з-за обмежень звичайних генераторів вони були не так «красиві» як те, що вийде у нас. Нижче розглянуті можливості додаткових генераторів, які нам знадобляться.
Передача повідомлень в сопрограмму у нас буде побудована на можливості задати генератору його стан. Скопіюйте код нижче у вікно запущеного інтерпретатора Python версія 3.3 і вище.
def gen_factory():
state = None
while True:
print("state:", state)
state = yield state

gen = gen_factory()

Генератор створений, його треба запустити.
>>> next(gen)
state: None

Отримано початковий стан. Змінимо стан:
>>> gen.send("OK")
state: OK
'OK'

Бачимо що стан змінився і повернуто в результаті. Наступні виклики next будуть повертати його.

Навіщо нам все це?

Уявіть завдання: передавати привіт Петрову раз на дві секунди, Іванову раз на три секунди, а всьому світу раз в п'ять секунд. У вигляді Python коду можна представити так:
def hello(name, timeout):
while True:
sleep(timeout)
print("Привіт, {}!".format(name))

hello("Петров", 2.0)
hello("Іванов", 3.0)
hello("Мир", 5.0)

Виглядає добре, але привіти буде отримувати тільки Петров. Однак! Невелика модифікація не впливає на зрозумілість коду, а навіть навпаки — уточнююча нашу думку, і це вже може заробити як годиться.
@coroutine
def hello(name, timeout):
while True:
yield from sleep(timeout)
print("Привіт, {}!".format(name))

hello("Петров", 2.0)
hello("Іванов", 3.0)
hello("Мир", 5.0)
run()

Код вийшов у стилі pythonic way — наочно ілюструє завдання, лінійний без калбэков, без зайвих наворотів з об'єктами, будь-які коментарі у ньому зайві. Залишилося лише реалізувати декоратор coroutine, свою версію функції sleep і функцію run. В реалізації, звичайно, без наворотів не обійдеться. Але це теж pythonic way, ховати за фасадом бібліотечних модулів всю магію.

найцікавіше

Назвемо модуль з реалізацією невигадливо — concurrency, зі змістом і відображає той факт, що це, фактично, буде реалізація кооперативної багатозадачності. Зрозуміло, що декоратор повинен буде зробити зі звичайної функції генератор і запустити його (зробити перший виклик next). Конструкція мови yield from прокидає виклик наступного генератор. Тобто функція sleep повинна створити генератор, в якому можна сховати всю магію. В генератор, її викликав, повернеться тільки код отриманого події. Тут повертається результат не обробляється, код тут може отримати по суті лише один результат, що означає що тайм-аут минув. Очікування ж вводу-виводу може повертати різні види подій, наприклад (читання/запис/тайм аут). Більш того, генератори породжувані функціями типу sleep можуть повернути за yield from будь-який тип даних і відповідно їх функціонал може бути не обмежений очікуванням подій. Функція run запустить диспетчер подій, його завдання — отримати подія ззовні та/або згенерувати всередині, визначити його одержувача і власне відправити.
Почнемо з декоратора:
class coroutine(object):
"""Робить з функції сопрограмму на базі розширеного генератора."""
_current = None

def __init__(self, callable):
self._callable = callable

def __call__(self, *args, **kwargs):
corogen = self._callable(*args, **kwargs)
cls = self.__class__
if cls._current is None:
try:
cls._current = corogen
next(corogen)
finally:
cls._current = None
return corogen

Він виконаний у вигляді класу, типовий прийом, як і обіцяв, він створює і запускає генератор. Конструкція з _current додана для того, щоб уникнути запуску генератора, якщо декорована функція, що його створює викликається всередині тіла іншого генератора. У цьому випадку перший дзвінок буде і так зроблено. Так само це допоможе розібратися, в якій генератор має бути передано подія, щоб воно потрапило по ланцюжку в генератор, створений функцією sleep.
def sleep(timeout):
"""Зупиняє виконання до отримання події "таймаут закінчився"."""
corogen = coroutine._current
dispatcher.setup_timeout(corogen, timeout)
revent = yield
return revent

Тут бачимо виклик dispatcher.setup_sleep це повідомляє диспетчеру подій, що генератор такий-то чекає подія «тайм-аут» по закінченню заданого параметра timeout кількості секунд.
from collections import deque
from time import time, sleep as sys_sleep


class Dispatcher(object):
"""Об'єкт реалізує диспечер подій."""
def __init__(self):
self._pending = deque()
self._deadline = time() + 3600.0

def setup_timeout(self, corogen, timeout):
deadline = time() + timeout
self._deadline = min([self._deadline, deadline])
self._pending.append([corogen, deadline])
self._pending = deque(sorted(self._pending, key=lambda a: a[1]))

def run(self):
"""Запускає цикл обробки подій."""
while len(self._pending) > 0:
timeout = self._deadline - time()
self._deadline = time() + 3600.0
if timeout > 0:
sys_sleep(timeout)
while len(self._pending) > 0:
if self._pending[0][1] <= time():
corogen, _ = self._pending.popleft()
try:
coroutine._current = corogen
corogen.send("timeout")
except StopIteration:
pass
finally:
coroutine._current = None
else:
break

dispatcher = Dispatcher()
run = lambda: dispatcher.run()

У коді диспетчера подій теж немає нічого незвичайного. Куди передавати події визначається за допомогою змінної класу coroutine._current. При завантаженні модуля створюється екземпляр класу, робочої реалізації це звичайно ж повинен бути сінглетон. Клас collections.deque задіяний замість списку, так як швидше і корисний своїм методом popleft. Ну ось власне і все, і немає якоїсь особливої магії. Вся вона на повірку захована ще глибше, у реалізації розширених генераторів Python. Їх залишається тільки правильно приготувати.

Файл: concurrency.py
# concurrency.py
from collections import deque
from time import time, sleep as sys_sleep


class coroutine(object):
"""Робить з функції сопрограмму на базі розширеного генератора."""
_current = None

def __init__(self, callable):
self._callable = callable

def __call__(self, *args, **kwargs):
corogen = self._callable(*args, **kwargs)
cls = self.__class__
if cls._current is None:
try:
cls._current = corogen
next(corogen)
finally:
cls._current = None
return corogen


def sleep(timeout):
"""Зупиняє виконання до отримання події "таймаут закінчився"."""
corogen = coroutine._current
dispatcher.setup_timeout(corogen, timeout)
revent = yield
return revent


class Dispatcher(object):
"""Об'єкт реалізує диспечер подій."""
def __init__(self):
self._pending = deque()
self._deadline = time() + 3600.0

def setup_timeout(self, corogen, timeout):
deadline = time() + timeout
self._deadline = min([self._deadline, deadline])
self._pending.append([corogen, deadline])
self._pending = deque(sorted(self._pending, key=lambda a: a[1]))

def run(self):
"""Запускає цикл обробки подій."""
while len(self._pending) > 0:
timeout = self._deadline - time()
self._deadline = time() + 3600.0
if timeout > 0:
sys_sleep(timeout)
while len(self._pending) > 0:
if self._pending[0][1] <= time():
corogen, _ = self._pending.popleft()
try:
coroutine._current = corogen
corogen.send("timeout")
except StopIteration:
pass
finally:
coroutine._current = None
else:
break

dispatcher = Dispatcher()
run = lambda: dispatcher.run()


Файл: sample.py
# sample.py
from concurency import coroutine, sleep, run

@coroutine
def hello(name, timeout):
while True:
yield from sleep(timeout)
print("Привіт, {}!".format(name))

hello("Петров", 2.0)
hello("Іванов", 3.0)
hello("Мир", 5.0)
run()



Outro

Якщо тема цікава, можна продовжити в бік реалізації очікування подій вводу/виводу з асинхронним TCP Echo сервером в якості прикладу. З реальним диспетчером подій, реалізованим у вигляді динамічної бібліотеки, написаної на іншому, більш швидкому, ніж мовою Python.

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.