RabbitMQ tutorial 6 - Віддалений виклик процедур

У продовження п'ятого уроку з вивчення азів RabbitMQ, публікую переклад шостого уроку з офіційного сайту . Всі приклади написані на python (використовується pika версії 0.9.8), але як і раніше їх можна реалізувати на більшості популярних ЯП .
 
Під другому уроці ми розглянули використання черг завдань для розподілу ресурсоємних завдань між декількома передплатниками.
Але що якщо ми захочемо запустити функцію на віддаленій машині і дочекатися результату? Ну, це зовсім інша історія. Цей шаблон широко відомий як Віддалений виклик процедур (Remote Procedure Call або RPC, далі в тексті RPC).
У цьому керівництві ми побудуємо, використовуючи RabbitMQ, RPC систему, яка включатиме клієнт і масштабований RPC сервер. Так як у нас немає реальної трудомісткою завдання вимагає розподілу, ми створимо простий RPC сервер, який повертає числа Фібоначчі.
 
 

Інтерфейс клієнта

Для ілюстрації використання RPC служби, створимо простий клієнтський клас. Цей клас буде містити метод call , який буде відправляти RPC запити і блокуватися до отримання відповіді:
 
 
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)

 
 
Зауваження про RPC
Незважаючи на те, що RPC досить поширений шаблон, його часто критикують. Проблеми звичайно виникають, коли розробник не знає точно, яку функцію він використовує: локальну або повільну, виконується за допомогою RPC. Плутанина, на зразок цієї, може вилитися в непередбачуваність поведінки системи, а також вносить зайву складність в процес налагодження. Таким чином, замість спрощення програмного забезпечення невірне використання RPC може привести до не обслуговується і не читаемому коду.
 
З урахуванням вищевикладеного можна дати наступні рекомендації:
 
     
Переконайтеся, що це очевидно, яка функція викликається в кожному конкретному випадку: локальна або дистанційна;
 Документуйте вашу систему. Робіть залежності між компонентами явними;
 Обробляйте помилки. Як має реагувати клієнт, якщо RPC сервер не відповідає протягом тривалого проміжку часу?
 Якщо сумніваєтеся — не використовуйте RPC. Якщо це можливо, використовуйте асинхронний конвеєр замість блокуючого RPC, коли результати асинхронно передаються на наступний рівень обробки.
 
 

Черга результатів

Взагалі, здійснювати RPC через RabbitMQ легко. Клієнт відправляє запит і сервер відповідає на запит. Щоб отримати відповідь, клієнт повинен передати чергу для розміщення результатів разом із запитом. Давайте подивимося як це виглядає в коді:
 
 
result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ...какой-то код для чтения ответного сообщения из callback_queue ...

 
 

Властивості повідомлень

В протоколі AMQP мається 14 зумовлених властивостей повідомлень. Більшість з них використовуються вкрай рідко, за винятком таких:
 
 
     
delivery_mode : відзначає повідомлення як «стійке» (зі значенням 2) або «тимчасове» (будь-яке інше значення). Ви повинні пам'ятати це властивість по другого уроку .
 content_type : використовується для опису формату представлення даних (mime). Приміром, для часто використовуваного JSON формату хорошим тоном вважається встановлювати це властивість в application / json.
 reply_to : зазвичай використовується для вказівки черги результатів
 correlation_id : властивість використовується для зіставлення RPC відповідей із запитами.
 
 
 

Correlation id

У методі, представленому вище, ми пропонували створювати чергу відповідей для кожного RPC запиту. Це кілька надлишково, але, на щастя, є спосіб краще — давайте створимо загальну чергу результатів для кожного клієнта.
 
Це піднімає нове питання, отримавши відповідь з цієї черги не зовсім ясно, якому запиту відповідає ця відповідь. І тут нам стане в нагоді властивість correlation_id . Ми будемо присвоювати цій властивості унікальне значення при кожному запиті. Пізніше, коли ми витягнемо отриману відповідь з черги відповідей, грунтуючись на значенні цієї властивості ми зможемо однозначно зіставити запит з відповіддю. Якщо зустрінемо невідоме значення у властивості correlation_id , ми можемо спокійно ігнорувати це повідомлення, оскільки воно не відповідає жодному з наших запитів.
 
Ви могли б поцікавитися, чому ми плануємо просто ігнорувати невідомі повідомлення з черги відповідей, замість того, щоб перервати виконання сценарію? Це пов'язано з ймовірністю виникнення race condition на стороні сервера. Хоча це і малоймовірно, але цілком можливий сценарій, при якому RPC сервер відправить нам відповідь, але не встигне відправити підтвердження обробки запиту. Якщо це відбудеться, перезапущена RPC сервер знову буде обробляти даний запит. Ось чому на клієнті ми повинні коректно обробляти повторні відповіді. Крім того, RPC, в ідеалі, має бути ідемпотентів.
 
 

Підсумки

 image
 
Наш RPC працюватиме таким чином:
 - Коли Клієнт стартує, він створює анонімну унікальну чергу результатів.
 - Для вчинення RPC запиту, Клієнт відправляє повідомлення з двома властивостями: reply_to , де в якості значення вказується чергу результатів і correlation_id , установлюваний в унікальне значення для кожного запиту.
 - Запит відправляється в чергу rpc_queue .
 - Сервер очікує запити з цієї черги. Коли запит отримано, Сервер виконує своє завдання і відправляє повідомлення з результатом назад Клієнту, використовуючи чергу з властивості reply_to .
 - Клієнт очікує результат з черги результатів. Коли повідомлення отримано, Клієнт перевіряє властивість correlation_id . Якщо воно відповідає значення із запиту, то результат відправляється з додатком.
 
 

Збираючи все разом

Код сервера rpc_server.py:
 
 
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)

    print " [.] fib(%s)"  % (n,)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print " [x] Awaiting RPC requests"
channel.start_consuming()

 
Серверний код досить простий:
 
 
     
(4) Як зазвичай, ми встановлюємо з'єднання і оголошуємо чергу.
 (11) Оголошуємо нашу функцію, яка повертає числа Фібоначчі, яка приймає як аргумент тільки цілі позитивні числа (ця функція навряд чи буде працювати з великими числами, найімовірніше це сама повільна з можливих реалізацій).
 (19) Ми оголошуємо функцію зворотного виклику on_request для basic_consume , яка і є ядром RPC сервера. Вона виповнюється коли запит отримано. Виконавши роботу, функція відправляє результат назад.
 (32) Ймовірно, ми захочемо коли-небудь запустити більше одного сервера. Для рівномірного розподілу навантаження між декількома серверами ми встановлюємо prefetch_count .
 
 
Код клієнта rpc_client.py:
 
 
#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)

 
Код Клієнта дещо складніше:
 
 
     
(7) Ми встановлюємо з'єднання, канал і оголошуємо унікальну чергу результатів для отриманих відповідей.
 (16) Ми підписуємося на чергу результатів для отримання відповідей від RPC.
 (18) Функція зворотного виклику 'on_response ', ісполнямая при отриманні кожної відповіді, виконує досить тривіальну задачу — для кожного, хто переходив відповіді вона перевіряє чи відповідає correlation_id того що ми очікуємо. Якщо це так, вона зберігає відповідь в self.response і перериває цикл.
 (23) Далі, ми визначаємо наш метод call , який, власне, і виконує RPC запит.
 (24) В цьому методі ми спочатку генеруємо унікальний correlation_id і зберігаємо його — функція зворотного виклику 'on_response ' буде використовувати це значення для відстеження потрібної відповіді
 (25) Далі ми поміщаємо запит з властивостями reply_to і correlation_id в чергу.
 (32) Далі починається процес очікування відповіді.
 (33) І, врешті, ми повертаємо результат назад користувачеві.
 
 
Наш RPC сервіс готовий. Ми можемо запустити сервер:
 
 
$ python rpc_server.py
 [x] Awaiting RPC requests

 
Для отримання чисел Фібоначчі запускаємо Клієнт:
 
 
$ python rpc_client.py
 [x] Requesting fib(30)

 
Представлений варіант реалізації RPC не є єдиним можливим, але він має такі переваги: ​​
 
 
     
Якщо RPC сервер занадто повільний, ви можете легко додати ще один. Спробуйте запустити другий rpc_server.py в новій консолі.
 На стороні Клієнта, RPC вимагає відправки та отримання тільки одного повідомлення. Не потрібно синхронний виклик queue_declare . Як результат, RPC клієнт обходиться одним циклом запит-відповідь для одного RPC запиту.
 
 
Наш код, тим не менш, є спрощеним і навіть не намагається вирішувати складніші (але, безумовно, важливі) проблеми на зразок таких:
 
 
     
Як має реагувати Клієнт, якщо сервер не запущений?
 Чи повинен Клієнт мати таймоут для RPC?
 Якщо Сервер в якийсь момент «зламається» і викине виняток, чи повинно воно передаватися Клієнту?
 Захист від неприпустимих вхідних повідомлень (наприклад, перевірка допустимих меж) перед обробкою.
 
 
 

Всі статті керівництва

 RabbitMQ tutorial 1 — Hello World (python)
 RabbitMQ tutorial 2 — Черга завдань (python)
 RabbitMQ tutorial 3 — Публікація / Підписка (php)
 RabbitMQ tutorial 4 — Роутінг (php)
 RabbitMQ tutorial 5 — Тематики (php)
RabbitMQ tutorial 6 — Віддалений виклик процедур (ця стаття, python)

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.