Трохи фактів про python asyncio

Всім привіт! Хотілося б поділитися досвідом використання python asyncio. За півтора року використання в продакшені накопичився певний досвід, загальні прийоми, що полегшують життя. Природно, були і граблі, про які також варто згадати, бо це допоможе заощадити купу часу тим, хто тільки починає використовувати в своїх додатках asyncio. Кому цікаво — прошу під кат.

Трохи історії
Asyncio з'явився в Pyhton версії 3.4, 3.5 був доданий більш приємний оку async/await синтаксис. Asyncio надає з коробки Event loop, Future, Task, Coroutine, I/O multiplexing, Synchronization primitives. Це, звичайно, не мало, але для повноцінної розробки недостатньо. Для цього є сторонні бібліотеки. Відмінна добірка є ось тут. У себе в компанії ми використовуємо asyncio разом з набором сторонніх бібліотек для написання микросервисов. За своєю природою наші сервіси більше орієнтовані на I/O ніж на CPU, так що для нас asyncio відмінно підходить.

Власне факти
Це не підручник по asyncio. Я не буду пояснювати, чому асинхронний ввід/вивід це добре, або чому б не використовувати потоки. Не буде розповідей про корутинах, генераторах, event loop'ах і т. д. Також тут не буде ніяких бенчмарків і порівнянь з іншими мовами. Поїхали!

Debug
По-перше, PYTHONASYNCIODEBUG. Це змінна оточення, яка включає дебаг режим. Наприклад, можна побачити повідомлення про те, що ви оголосили функцію як корутину, але викликаєте як звичайну функцію(актуально для python3.4). Також необхідно налаштувати asyncio logger на рівень дебаг і ще дозволити висновок ResourseWarning. Можна побачити багато цікавого: повідомлення про те, що ви забули закрити транспорт або сам event loop(читай — забули звільнити ресурси). Порівняйте запуск наступного коду з параметром інтерпретатора -Wdefault і змінної оточення PYTHONASYNCIODEBUG=1 і без них (тут і далі в прикладах коду я буду опускати деякі несуттєві частини такі як import або обробка виключень):

@asyncio.coroutine
def test():
pass

loop = asyncio.get_event_loop()
test()

Правильне завершення
До речі про звільнення ресурсів. Event loop треба вміти правильно зупинити, дочекавшись коректного заверешения всі тасок, закриття з'єднань і т. д. І якщо з використанням run_until_complete() особливих проблем немає, то з run_forever() все трохи складніше. Метод close() у event loop'а можна викликати, тільки якщо він вже зупинено — тобто після методу stop(). Краще всього це зробити за допомогою сигналів:

def handler(loop):
loop.remove_signal_handler(signal.SIGTERM)
loop.stop()

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, handler, loop)

try:
loop.run_forever()
finally:
loop.close()

Далі в прикладах коду я все ж буду концентруватися на самій суті, а не на правильному завершення програми.

Запуск блокуючого коду
Природно, не для всього є асинхронні бібліотеки. Деякий код так і залишається блокуючим, і його треба якось запускати, щоб він не блокував наш event loop. Для цього є хороший метод run_in_executor(), який запускає те, що ви йому передали в одному з потоків вбудованого пулу, не блокуючи основний потік з event loop'ом. Все б добре, але з цим є 2 проблеми. По-перше, розмір стандартного пулу всього 5. По-друге, в asyncio синхронний dns resolver, який запускається саме таким чином у вбудованому пулі. Отже, за пул всього в 5 потоків будуть конкурувати ваші синхронні операції, плюс всі кому треба зробити getaddrinfo(). Вихід — використовувати свій пул. Завжди:

def blocking_function():
time.sleep(42)

pool = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())
loop = asyncio.get_event_loop()
loop.run_in_executor(pool, blocking_function())
loop.close()

Підступні Future
У Future є одна дуже цікава особливість: якщо в ній відбудеться виняток — ви про це нічого не дізнаєтесь, якщо тільки явно не запитайте про це у самої future. В документації є хороший приклад на цю тему. Ви побачите, що було виключення, тільки коли gc буде видаляти об'єкт future. Звідси випливає просте правило — завжди перевіряєте результат вашої future. Навіть якщо з вашої задумом код всередині future повинен просто крутитися в нескінченному циклі, і, здавалося б, ніде перевіряти результат — все одно треба обробити винятки, наприклад так:

async def handle_exception():
try:
await bug()
except Exception:
print('TADA!')

async def bug():
raise Exception()

loop = asyncio.get_event_loop()
loop.create_task(handle_exception())
loop.run_forever()
loop.close()

await і __init__()
Неможливо. Магічний метод __init__() не може містити асинхронний код. Є два шляхи. Або зробити у класу ще один метод, наприклад, initialize(), який вже буде корутиной. Він буде містити весь асинхронний код для ініціалізації, і його треба буде викликати після створення об'єкта. Виглядає жахливо. Тому прийнято використовувати функції-фабрики. Поясню на прикладі:

class Foo:
def __init__(self, reader, writer, loop, *args, **kwargs):
self._reader = reader
self._writer = writer
self._loop = loop

async def create_foo(loop):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888, loop=loop)
return Foo(reader, writer, loop)

loop = asyncio.get_event_loop()
foo = loop.run_until_complete(create_foo(loop))
print(foo)
loop.close()

Wake up, Neo
Скажімо, у вас є таска, яка крутиться в event loop'е і періодично скидає якийсь буфер. Можна написати такий код:

async def flush_task():
while True:
# flushing...
await asyncio.sleep(FLUSH_TIMEOUT)

Зробити create_task() — і все начебто добре, крім одного: що робити, якщо по завершенні вам необхідно примусово скинути вміст буфера? Як змусити сум «проснусться»? Тут на допомогу приходять примітиви синхронізації:

class Foo:

def __init__(self, loop, *args, **kwargs):
self._loop = loop
self._waiter = asyncio.Event()
self._flush_future = self._loop.create_task(self.flush_task())

async def flush_task(self):
while True:
try:
await asyncio.wait_for(self._waiter.wait(), timeout=FLUSH_TIMEOUT, loop=self._loop)
except asyncio.TimeoutError:
pass
# flushing ...
self._waiter.clear()

def force_flush():
self._waiter.set()

loop = asyncio.get_event_loop()
foo = Foo(loop)
loop.run_forever()
loop.close()

Тестування
Тестувати асинхронний код можна і потрібно. І робити це так само просто, як і у випадку синхронного коду:

class TestCase(unittest.TestCase):

def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)

def tearDown(self):
self.loop.close()

def test_001(self):
async def func():
self.assertEqual(42, 42)
self.loop.run_until_complete(func())

Тести відмінно ізольовані, т. к. в кожному новому тесті використовується свій event loop. А можна піти далі і використовувати pytest, де є зручні декоратори.

Джерела натхнення
Насамперед — особистий досвід. Багато чого з перерахованого було усвідомлено в результаті «лову граблів», а потім вивчення документації та вихідних asyncio. Також чудовими прикладами послужили исходники популярних бібліотек, таких як aiohttp, aioredis, aiopg.

Спасибі всім, хто дочитав статтю до кінця. Удачи з asyncio!
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.