Як просто написати розподілена веб-сервіс на Python + AMQP

Привіт, Хабр. Я вже досить давно пишу на Python. Нещодавно довелося розбиратися з RabbitMQ. Мені сподобалося. Тому що він без всяких проблем (зрозуміло, що з деякими тонкощами) збирається в кластер. Тут я подумав: а непогано б його використовувати в якості черги повідомлень у шматочку API проекту, над яким я працюю. Сам API написаний на tornado, основна думка була у виключенні блокуючого коду з API. Всі синхронні операції виконувалися в пулі тредів.

Перше, що я вирішив це зробити окремий процес(и) «worker», який би брав на себе всю синхронну роботу. Задумав, щоб «worker» був максимально простий, і робив завдання з черги одну за одною. Скажімо, вибрав з бази що-небудь, відповів, що взяв на себе таку задачу і так далі. Самих «worker»ов можна запустити багато і тоді AMQP виступає вже в ролі якоїсь подібності IPC.

Через деякий час з цього виріс модуль, який бере на себе всю рутину пов'язану з AMQP та передачею повідомлень туди і назад, а також стискає їх gzipом, якщо даних занадто багато. Так народився crew. Власне, використовуючи його, ми з вами напишемо простий API, який буде складатися з сервера на tornado і простих і нехитрих «worker» процесів. Забігаючи вперед скажу, що весь код доступний на github, а те, про що я буду розповідати далі, зібрано в папці example.

Підготовка

Отже, давайте розберемося по порядку. Перше, що нам потрібно буде зробити — це встановити RabbitMQ. Як це робити я описувати не буду. Скажу лише те, що на тій-же убунте він ставиться і працює з коробки. У мене на маці єдине, що довелося зробити, це поставити LaunchRocket, який зібрав всі сервіси, що були встановлені через homebrew і вивів в GUI:

LaunchRocket

Далі створимо наш проект virtualenv і встановимо сам модуль через pip:

mkdir-p api
cd api
virtualenv env
source env/bin/activate
pip install crew tornado


В залежності модуля навмисно не вказано tornado, так як на хості з workerом його може і не бути. А на веб-частини зазвичай створюють requirements.txt, де вказані всі інші залежності.

Код я буду писати частинами, щоб не порушувати порядок оповідання. Те, що у нас вийде в підсумку, можна подивитися тут.

Пишемо код

Сам tornado сервер складається з двох частин. У першій частині ми визначаємо обробники запитів handlers, а в другій запускається event-loop. Давайте напишемо сервер і створимо наш перший метод api.

Файл master.py:
# encoding: utf-8

import tornado.ioloop
import tornado.gen
import tornado.web
import tornado.options


class MainHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
# Викликаємо завдання test c пріоритетом 100
resp = yield self.application.crew.call('test', priority=100)
self.write("{0}: {1}".format(type(resp).__name__, str(resp)))


application = tornado.web.Application(
[
('/', MainHandler),
],
autoreload=True,
debug=True,
)


if __name__ == "__main__":
tornado.options.parse_command_line()
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()


Завдяки coroutine в торнадо, код виглядає просто. Можна написати теж саме без coroutine.

Файл master.py:
class MainHandler(tornado.web.RequestHandler):
def get(self):
# Викликаємо завдання test c пріоритетом 100
self.application.crew.call('test', priority=100, callback=self._on_response)

def _on_response(resp, headers):
self.write("{0}: {1}".format(type(resp).__name__, str(resp)))


Наш сервер готовий. Але якщо ми його запустимо, і сходимо на /, то не дочекаємося відповіді, його нікому обробляти.

Тепер напишемо простий worker:

Файл worker.py:
# encoding: utf-8

from crew.worker import run, context, Task

@Task('test')
def long_task(req):
context.settings.counter += 1
return 'Wake up Neo.\n'

run(
counter=0, # This is a part of this context worker
)


Отже, як видно в коді, є проста функція, загорнута декоратором Task(«test»), де test — це унікальний ідентифікатор завдання. У вашому кіоску не може бути двох завдань з однаковими ідентифікаторами. Звичайно, правильно було б назвати завдання «crew.example.test» (так зазвичай і називаю в продакшн середовищі), але для нашого прикладу достатньо просто «test».

Відразу кидається в очі context.settings.counter. Це певний контекст, який ініціалізується в worker процесі при виклику функції run. Також в контексті вже є context.headers — це заголовки відповіді для відділення метаданих від відповіді. У прикладі з callback-функцією саме цей словник передається в _on_response.

Заголовки скидаються після кожної відповіді, а ось context.settings — ні. Я використовую context.settings для передачі у функції worker(и) з'єднання з базою даних і взагалі будь-якого іншого об'єкта.

Також worker обробляє ключі запуску, їх не багато:

$ python worker.py --help
Usage: worker.py [options]

Options:
-h, --help show this help message and exit
-v, --verbose make lots of noise
--logging=LOGGING Logging level
-H HOST, --host=HOST RabbitMQ host
-P PORT, --port=PORT port RabbitMQ


URL підключення до бази та інші змінні можна брати з змінний оточення. Тому worker в параметрах чекає тільки як йому з'єднатися c AMQP (хост і порт) і рівень логування.

Отже, запускаємо все і перевіряємо:

$ python master.py & python worker.py


image

Працює, але що сталося за ширмою?

При запуску tornado-сервера tornado підключився до RabbitMQ, створив Exchange DLX і почав слухати чергу DLX. Це Dead-Letter-Exchange — спеціальна чергу, в яку потрапляють завдання, які не взяв жоден worker за певний timeout. Також він створив чергу з унікальним ідентифікатором, куди будуть надходити відповіді від workerов.

Після запуску worker створив по черзі на кожну обгорнуту декоратором Task чергу і підписався на них. При вступі завдання воркер main-loop створює один потік, контролюючи в головному потоці час виконання завдання і виконує обгорнуту функцію. Після повернення з оберненої функції серіалізует його і ставить в чергу відповідей сервера.

Після надходження запиту tornado-сервер ставить завдання у відповідну чергу, вказуючи при цьому ідентифікатор своєю унікальною черги, в яку має надійти відповідь. Якщо жоден воркер не взяв завдання, тоді RabbitMQ перенаправляє завдання exchange DLX і tornado-сервер отримує повідомлення про те, що минув таймаут перебування черги, генеруючи виняток.

Зависла завдання

Щоб продемонструвати, як працює механізм завершення завдань, які зависли в процесі виконання, напишемо ще один веб-метод і завдання worker.

В файл master.py додамо:

class FastHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
try:
resp = yield self.application.crew.call(
'dead', persistent=False, priority=255, expiration=3,
)
self.write("{0}: {1}".format(type(resp).__name__, str(resp)))
except TimeoutError:
self.write('Timeout')
except ExpirationError:
self.write('All workers are gone')


І додамо його до списку хендлеров:

application = tornado.web.Application(
[
(r"/", MainHandler),
(r"/stat", StatHandler),
],
autoreload=True,
debug=True,
)


А в worker.py:
@Task('dead')
def infinite_loop_task(req):
while True:
pass


Як видно з наведеного вище прикладу, завдання піде в нескінченний цикл. Однак, якщо завдання не виконається за 3 секунди (враховуючи час отримання з черги), main-loop у воркере пошле потоку виняток SystemExit. І так, вам доведеться обробити його.

Контекст

Як вже згадувалося вище, контекст — це такий спеціальний об'єкт, який імпортується і має кілька вбудованих змінних.

Давайте зробимо просту статистику по відповідям нашого worker.

В файл master.py додамо наступний handler:

class StatHandler(tornado.web.RequestHandler):

@tornado.gen.coroutine
def get(self):
resp = yield self.application.crew.call('stat', persistent=False, priority=0)
self.write("{0}: {1}".format(type(resp).__name__, str(resp)))


Також зареєструємо у списку обробників запитів:

application = tornado.web.Application(
[
(r"/", MainHandler),
(r"/fast", FastHandler),
(r"/stat", StatHandler),
],
autoreload=True,
debug=True,
)


Цей handler не дуже відрізняється від попередніх, просто повертає значення, яке йому передав worker.

Тепер сама задача.

В файл worker.py додамо:

@Task('stat')
def get_counter(req):
context.settings.counter += 1
return 'I\'m worker "%s". And I serve %s tasks' % (context.settings.uuid, context.settings.counter)


Функція повертає рядок з інформацією про кількість завдань, оброблених workerом.

PubSub і Long polling

Тепер реалізуємо кілька обробників. Один при запиті буде просто висіти і чекати, а другий буде приймати POST дані. Після передачі останніх перший буде їх віддавати.

master.py:

class LongPoolingHandler(tornado.web.RequestHandler):
LISTENERS = []

@tornado.web.asynchronous
def get(self):
self.LISTENERS.append(self.response)

def response(self, data):
self.finish(str(data))

@classmethod
def responder(cls, data):
for in cb cls.LISTENERS:
cb(data)

cls.LISTENERS = []

class PublishHandler(tornado.web.RequestHandler):

@tornado.gen.coroutine
def post(self, *args, **kwargs):
resp = yield self.application.crew.call('publish', self.request.body)
self.finish(str(resp))

...

application = tornado.web.Application(
[
(r"/", MainHandler),
(r"/stat", StatHandler),
(r"/fast", FastHandler),
(r'/subscribe', LongPoolingHandler),
(r'/publish', PublishHandler),
],
autoreload=True,
debug=True,
)

application.crew = Client()
application.crew.subscribe('test', LongPoolingHandler.responder)

if __name__ == "__main__":
application.crew.connect()
tornado.options.parse_command_line()
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()


Напишемо завдання publish.

worker.py:

@Task('publish')
def publish(req):
context.pubsub.publish('test', req)


Якщо ж вам не потрібно передавати управління в worker, можна просто публікувати прямо з tornado-сервера

class PublishHandler2(tornado.web.RequestHandler):

def post(self, *args, **kwargs):
self.application.crew.publish('test', self.request.body)


Паралельне виконання завдань

Часто буває ситуація, коли ми можемо виконати кілька завдань паралельно. У crew є для цього невеликий синтаксичний цукор:

class Multitaskhandler(tornado.web.RequestHandler):

@tornado.gen.coroutine
def get(self, *args, **kwargs):
with self.application.crew.parallel() as mc:
# mc - multiple calls
mc.call('test')
mc.call('stat')
test_result, stat_result = yield mc.result()
self.set_header('Content-Type', 'text/plain')
self.write("Test result: {0}\nStat result: {1}".format(test_result, stat_result))


У цьому разі завдання будуть поставлені дві задачі паралельно і вихід із with буде зроблений по закінченні останньої.

Але потрібно бути обережним, так як якась завдання може викликати виключення. Воно буде прирівняне безпосередньо змінної. Таким чином, вам потрібно перевірити, чи не є test_result і stat_result екземплярами класу Exception.

Плани на майбутнє

Коли eigrad запропонував написати прошарок, якій можна запустити будь wsgi додаток з допомогою crew, мені ця ідея зразу сподобалася. Тільки уявіть, запити не хлинуть на ваше wsgi додаток, а будуть рівномірно надходити через чергу на wsgi-worker.

Я ніколи не писав wsgi сервер і навіть не знаю, з чого почати. Але ви можете мені допомогти, pull-requestы я приймаю.

Також думаю дописати client для ще одного популярного асинхронного фреймворку, для twisted. Але поки розбираюся з ним, та й вільного часу не вистачає.

Подяки

Спасибі розробникам RabbitMQ і AMQP. Чудові ідеї.

Також спасибі вам, читачі. Сподіваюся, що ви не даремно витратили час.

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.