Реалізація гарантованої асинхронної доставки повідомлень

Стаття за авторством Олександра Романова, розробника інтеграційних рішень.
У процесі інтеграції систем ми часто стикаємося з необхідністю гарантованої доставки повідомлень між системами. У таких випадках на допомогу нам приходять черги. Але не всі завдання так прості, як доставка повідомлень з системи А у систему Б. Бувають випадки, коли потрібно збагатити доставляються повідомлення даними з суміжних, які беруть участь в інтеграції систем. Які не завжди можуть інтегруватися через черги, а мають лише синхронні сервіси. І ось вже в нашій інтеграції виникають такі явища, як недоступність, відмови та інші «приємні» особливості застосування «синхронів». Можна було б перекласти обробку проміжних відмов на систему-джерело, але це некультурно, та й неможливо, якщо ми публікуємо події відразу для декількох систем (топік).



Зручним і працюючим рішенням проблеми, з нашої точки зору, є асинхронна покрокова обробка повідомлення через внутрішню чергу з викликом зовнішнього сервісу на кожному кроці. У разі відмови сервісу за помилку або через його тимчасової непрацездатності повідомлення потрапляє у внутрішню чергу відмов і відправляється повторно після розборів з виниклими в сервісі проблемами.



Дане рішення так само усуває проблему неможливості відкоту транзакції при роботі з зовнішніми сервісами. Ніякої виклик не пройде двічі — обробка починається саме з того кроку, на якому стався збій.

Все вищеописане дуже легко реалізується на інтеграційній шині, в якій асинхронне взаємодія між компонентами через внутрішні черзі йде «з коробки». Але надто високі ціни за «коробку» можуть сильно ускладнити використання інтеграційної шини. Ми наведемо приклад реалізації простого додатка на Spring Integration (далі SI) + Rabbit MQ. Обмовимося, що Rabbit MQ у себе в production ми не використовуємо з-за неможливості його роботи з XA.

Серцем всього додатка є spring-integration-context.xml. Там описана компонентна модель, ініціалізуються ресурсні бины і менеджер транзакцій для роботи з MQ. Опишемо його докладніше.

Підключаємо вбудований в SI драйвер і прописуємо ресурси:

<rabbit:queue name="si.test.queue.to"/>
<rabbit:queue name="si.test.queue.from"/>

Нам необхідний низькорівневий бін amqpTemplate, через який здійснюється взаємодія з ресурсами. Даний бін ми використовуємо безпосередньо в тестах, і він потрібен для компонент SI, які працюють з Rabbit MQ. ConnectionFactory, необхідний для підключення до ресурсів, конфигурит Spring Boot налаштуванням application.yml (org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration).

<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" mandatory="true"/>

Для транзакційний роботи з Rabbit MQ потрібно TransactionManager (потрібен для відкату повідомлення назад в чергу, якщо в процесі роботи відбудеться помилка). На жаль, Rabbit MQ не підтримує XA-транзакції, інакше менеджер транзакцій сконфигурил б Spring Boot. Конфигурим надається Spring-му вручну.

<bean id="rabbitTransactionManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<constructor-arg name="connectionFactory" ref="rabbitConnectionFactory"/>
</bean>

А тепер найприємніше. «Малюємо» flow! В лапках, бо пишемо у вигляді xml, що менш приємно.

Flow

Нам потрібні:

  • inbound-adapter для читання з вхідної черги;
  • асинхронні компоненти для дзвінків REST-сервісів;
  • outbound-channel-adapter для запису у вихідну чергу.
Для організації гарантованої доставки в цьому додатку ми використовуємо асинхронні виклики через внутрішню чергу. Так як компонент багато, потрібно кілька черг, що незручно ні з точки зору розробки, ні адміністратору. Постараємося обійтися однією.

Розглянемо сценарій взаємодії між двома компонентами. SomeComponentOne отримує повідомлення з каналу, викликає якийсь синхронний REST-сервіс (працює з БД, пише у файл тощо) і відправляє повідомлення на подальшу обробку, якою повинна займатися SomeComponentTwo. Якщо SomeComponentOne не змогла виконати доручену їй шматок роботи, вона повинна відкотити транзакцію і повернути отримане повідомлення туди, звідки вона його забрала. Якщо все добре, надіслати повідомлення у внутрішню чергу і завершити транзакцію. SomeComponentOne забирає повідомлення з внутрішньої черги і відправляє повідомлення в неї ж, при цьому не обов'язково в тому ж вигляді, в якому отримала. Повідомлення може бути збагачене або змінено, нам не важливо. Воно призначене для роботи компоненти SomeComponentTwo. Виникає проблема роутінга. Повідомлення потрапляє у внутрішню чергу і має забиратися звідти потрібної в даний момент часу компонентою. Іншими словами, необхідний роутинг.

В даному додатку продемонстрований роутинг, заснований на заголовках повідомлення. У повідомлення заповнюється кастомный заголовок PartnerComponent, що вказує на компоненту, яка повинна працювати з повідомленням.

Розпишемо технічні деталі представленого flow.

Адаптер для читання з вхідної черги. Отримує повідомлення і в транзакції кидає його відразу у внутрішню чергу.

<int-amqp:inbound-channel-adapter channel="innerChannel"
queue-names="si.test.queue.to"
connection-factory="rabbitConnectionFactory"
transaction-manager="rabbitTransactionManager"/>
<int-amqp:channel id="innerChannel" queue-name="si.test.queue.inner" connection-factory="rabbitConnectionFactory" transaction-manager="rabbitTransactionManager"/>

Ми використовували спеціалізований для роботи з чергами асинхронний канал, що надається Spring-му. Отримали інтерфейс SI-channel, а зберігання повідомлень безпосередньо в черзі, в нашому випадку у внутрішній mq-черзі програми. При отриманні повідомлення з даного каналу-черзі буде відкриватися транзакція, т. до. ми підключили наш менеджер транзакцій.

До даного каналу-черзі підключаємо SI-роутер, який працює на заголовках повідомлень.

<int:header value-router id="wireRouter" input-channel="innerChannel" header-name="PartnerComponent"
default-output-channel="component1Channel">
<int:mapping value="ComponentTwo" channel="component2Channel"/>
<int:mapping value="ComponentThree" channel="component3Channel"/>
<int:mapping value="OutboundComponent" channel="outboundRabbitChannel"/>
</int:header value-router>

Нове для flow повідомлення не має технічного заголовка PartnerComponent, тому за замовчуванням буде оброблятися компонентою someComponentOne, обов'язком якої є вказівка в заголовку повідомлення PartnerComponent наступній компоненти і відправлення повідомлення у внутрішню чергу. Роутер знову забирає повідомлення з внутрішньої черги і відправляє його на обробку в зазначену компоненту.

Опис компонент, які надсилаються повідомлення з роутера.

<int:channel id="component1Channel"/>
<int:service-activator input-channel="component1Channel"
ref="someComponentOne" method="process"/>
<int:channel id="component2Channel"/>
<int:service-activator input-channel="component2Channel"
ref="someComponentTwo" method="process"/>
<int:channel id="component3Channel"/>
<int:service-activator input-channel="component3Channel"
ref="someComponentThree" method="process"/>
<int:channel id="outboundRabbitChannel"/>
<int:service-activator input-channel="outboundRabbitChannel"
ref="outboundRabbitComponent" method="process"/

Адаптер для відправки на вихідну чергу.
<int:channel id="toRabbit"/>
<int-amqp:outbound-channel-adapter channel="toRabbit" amqp-template="amqpTemplate" routing-key="si.test.queue.from"/>

Складання (pom.xml)

Старий добрий Maven. Стандартна збірка від Spring Boot. Залежно від SI і AMQP надають всі необхідні бібліотеки. Також підключаємо spring-boot-starter-test для реалізації перевірочних кейсів на JUnit.

Робота SomeComponent*.java

Транзакційні бины, підключені як service-activator до flow SI. Виклик REST через RestTemplate і відправлення у внутрішню чергу через innerChannel. Достатньо, щоб продемонструвати роботу з сервісом і зручно за-mock-ить в тестах.

Тестуємо

У тесті testHappyPath ми перевірили працездатність flow, коли немає збоїв при виклику REST. Mock-отримаємо всі виклики REST-сервісів без збоїв, кидаємо повідомлення у вхідну чергу, чекаємо у вихідний, перевіряємо проходження всіх компонент по контенту тіла отриманого повідомлення.

У тесті testGuaranteedDelivery ми перевірили гарантовану доставку при збої в одному з REST. Емуляціях одноразовий помилка виклику сервісу з третьої компоненти, чекаємо доставку повідомлення до вихідної черги, перевіряємо тіло отриманого повідомлення.

Висновок

Ми зробили додаток з гарантованою доставкою. Залишився ряд дрібних невирішених питань: повторна відправка повідомлень відбувається нескінченно і некеровано, хто підтримує дане рішення — адміністратор або цю підтримку можна автоматизувати? Ми вирішуємо ці питання за допомогою самописних додатків і індивідуальних налаштувань для кожного типу повідомлення. Можливо, в майбутньому ми опишемо ці рішення.

Gриложение цілком на git-hub
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.