Досвід використання MassTransit 3.0

MassTransit це open source бібліотека, розроблена на мові C#.NET платформи, яка спрощує роботу з шиною даних, яка використовується при побудові розподілених додатків і реалізації SOA (service oriented architecture).

Як message broker можуть виступати RabbitMq, Azure Service Bus або In-Memory менеджер (у випадку з In-Memory область видимості обмежується процесом, в якому ініціалізованим першим примірник).

Зміст:

Контракти повідомлень
Роутинг

Консьюмеры (Consumer)
<a href="#di>Конфігурація контейнера DI
Спостерігачі (Observer)
Нове в MassTransit 3.0
Висновок
Опитування: А яку .NET бібліотеку використовуєте ви?

Команди та події
В бібліотеці закладено 2 основних типи повідомлень: команди і події.

Команди
Сигналізують про необхідність виконати якусь дію. Для найбільш змістовного найменування команди бажано використовувати структуру дієслово + іменник: EstimateConnection, SendSms, NotifyCustomerOrderProcessed.

Робота з командами здійснюється з допомогою методу Send (інтерфейсу ISendEndpoint) і вказівки одержувача endpoint (черги):

Відправка команди
private static async Task SendSmsCommand(IBus busControl)
{
var command = new SendSmsCommand
{
CommandId = Guid.NewGuid(),
PhoneNumber = "89031112233",
Message = "Thank you for your reply"
};

var endpoint = await busControl.GetSendEndpoint(AppSettings.CommandEndpoint);
await endpoint.Send(command);
}


Події
Сигналізують про подію, що може бути цікаво якомусь набору передплатників (патерн Observer), які реагують на ці події, наприклад: ConnectionEstimated, CallTerminated, SmsSent, CustomerNotified.

Робота з подіями здійснюється з допомогою методу Publish (інтерфейсу IPublishEndpoint).

У термінології закладено і основна відмінність цих типів повідомлень — команда доставляється єдиному виконавцю (щоб уникнути дублювання виконання):


Зображення із статті MassTransit Send vs. Publish

У той час як подія орієнтоване на сповіщення n-передплатників, кожен з яких реагує на те, що трапилося подію по-своєму.


Зображення із статті MassTransit Send vs. Publish

Іншими словами, при запущених n-консьюмеров (від англ. consumer — споживач, обробник), оброблювальних команду, після її публікації тільки один з них отримає повідомлення про неї, в той час як повідомлення про подію отримає кожен.

Контракти повідомлень
Згідно документації MassTransit, при оголошенні контрактів повідомлень рекомендується вдаватися до інтерфейсів:

Контракт: команда на відправку СМС повідомлення
public interface ISendSms {
Guid CommandId { get; }
string PhoneNumber { get; }
string Message { get; }
}


Як згадувалося раніше, відправка команд повинна здійснюватися виключно за допомогою методу Send (інтерфейсу IBus) і вказівки адресата (endpoint).

Контракт: подія про успішну відправку СМС повідомлення
public interface ISmsSent {
Guid EventId { get; }
DateTime SentAtUtc { get; } 
}


Події відправляються за допомогою методу Publish.

Роутинг
Як розподіл повідомлень за exchange, так і вибір консьюмеров (про них в цій статті буде розказано трохи пізніше) для обробки базуються на runtime типах цих повідомлень,- найменування використовуються namespace та ім'я типу, у випадку з generic ім'я батьківського типу і перелік аргументів.

Exchange
При конфігурації receive endpoint'a (підключення раніше зареєстрованих консьюмеров) у разі використання в якості каналу доставки повідомлень RabbitMq на підставі зазначених до обробки консьюмерами типів повідомлень формируются найменування необхідних exchange, які потім ці повідомлення і будуть поміщатися.

Аналогічні дії на етапі конфігурації send endpoint'a виконуються і для команд, для відправлення яких також потрібні власні exchange.

На зображенні можна побачити створені в рамках мого сценарію exchange:



У тому випадку, якщо конфігуруючи receive endpoint ми вказуємо найменування черзі:

cfg.ReceiveEndpoint(host, "play_with_masstransit_queue", e => e.LoadFrom(container));

то в прив'язці exchange повідомлень можна буде побачити наступну картину:



Підсумковий шлях повідомлення, тип якого імплементує ISmsEvent, буде мати наступний вигляд:



Якщо ж конфігурація здійснюється без вказівки черзі:

cfg.ReceiveEndpoint(host, e=> e.LoadFrom(container));

Імена для останнього exchange і черги формуються автоматично, а по завершенню роботи вони будуть видалені:



Формат повідомлення
Говорячи про формат повідомлення, хотілося б детальніше зупинитися на найменування (або messageType). За його формування (заголовків urn:message:) відповідальна функція GetUrnForType(Type type). Для прикладу я додав для команди ISendSms спадкування від ICommand та generic тип:

Контракт: команда на відправку СМС повідомлення ICommand<string>
public interface ICommand<T>
{
}

public interface ISendSms<T> : ICommand<T>
{
Guid CommandId { get; }
string PhoneNumber { get; }
string Message { get; }
}

class SendSmsCommand : ISendSms<string>
{
public Guid CommandId { get; set; }
public string PhoneNumber { get; set; }
public string Message { get; set; }
}


Сформоване повідомлення у такому випадку буде містити наступне значення у полі messageType (на підставі якого після отримання повідомлення потім обирається відповідальний за обробку консьюмер):

"messageType": [
"urn:message:PlayWithMassTransit30.Extension:SendSmsCommand",
"urn:message:PlayWithMassTransit30.Contract.Command:ISendSms[[System:String]]",
"urn:message:PlayWithMassTransit30.Contract.Command:ICommand[[System:String]]"
]

Крім messageType повідомлення містить інформацію про host, яким воно було відправлено:

"host": {
"machineName": "DESKTOP-SI9OHUR",
"processName": "PlayWithMassTransit30.vshost",
"processId": 1092,
"assembly": "PlayWithMassTransit30",
"assemblyVersion": "1.0.0.0",
"frameworkVersion": "4.0.30319.42000",
"massTransitVersion": "3.4.1.808",
"operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0"
}

Значну частину payload:

"message": {
"commandId": "7388f663-82dc-403a-8bf9-8952f2ff262e",
"phoneNumber": "89031112233",
"message": "Thank you for your reply"
}

та інші службові поля та заголовки.

Консьюмеры (Consumer)
Консьюмер — це клас, який обробляє один або кілька типів повідомлень, що вказуються при оголошенні в спадкуванні інтерфейсу IConsumer, де T це тип оброблюваного даними консьюмером повідомлення.

Приклад консьюмера, обробного команду ISendSms і публікує подія ISmsSent:

SendSmsConsumer: обробник команди на відправку повідомлення
public class SendSmsConsumer : IConsumer<ISendSms<string>>
{
public SendSmsConsumer(IBusControl busControl)
{
_busControl = busControl;
}

public async Task Consume(ConsumeContext<ISendSms<string>> context)
{
var message = context.Message;

Console.WriteLine($"[IConsumer<ISendSms>] Send sms command consumed");
Console.WriteLine($"[IConsumer<ISendSms>] CommandId: {message.CommandId}");
Console.WriteLine($"[IConsumer<ISendSms>] Phone number: {message.PhoneNumber}");
Console.WriteLine($"[IConsumer<ISendSms>] Message: {message.Message}");

Console.Write(Environment.NewLine);
Console.WriteLine("Публікація події: Смс повідомлення надіслано");
await _busControl.SmsSent(DateTime.UtcNow);
}

private readonly IBus _busControl;
}


Після того, як ми отримали команду на відправку смс повідомлення та виконали необхідні дії, ми формуємо і відправляємо подія про те, що смс доставлено.

Код відправки повідомлень я виніс в окремий Extension клас над IBusControl, там же знаходиться і імплементація самих повідомлень:

Методи розширення над IBus для інкапсуляції логіки міжсистемної взаємодії
public static class BusExtensions
{
/// <summary>
/// Відправлення смс повідомлення
/ / / < /summary>
/ / / < param name="bus"></param>
/ / / < param name="host"></param>
/ / / < param name="phoneNumber"></param>
/ / / < param name="message"></param>
/ / / < returns></returns>
public static async Task SendSms(
this IBus bus, Uri host, string phoneNumber, string message
)
{
var command = new SendSmsCommand
{
CommandId = Guid.NewGuid(),
PhoneNumber = phoneNumber,
Message = message
};

await bus.SendCommand(host, command);
}

/// <summary>
/// Публікація події про відправку смс повідомлення
/ / / < /summary>
/ / / < param name="bus"></param>
/ / / < param name="sentAtUtc"></param>
/ / / < returns></returns>
public static async Task SmsSent(
this IBus bus, DateTime sentAtUtc
)
{
var @event = new SmsSentEvent
{
EventId = Guid.NewGuid(),
SentAtUtc = sentAtUtc
};

await bus.PublishEvent(@event);
}

/// <summary>
/// Відправлення команди
/ / / < /summary>
/// <typeparam name="T"></typeparam>
/ / / < param name="bus"></param>
/ / / < param name="address"></param>
/ / / < param name="command"></param>
/ / / < returns></returns>
private static async Task SendCommand<T>(this IBus bus, Uri address, T command) where T : class
{
var endpoint = await bus.GetSendEndpoint(address);
await endpoint.Send(command);
}

/// <summary>
/// Публікація події
/ / / < /summary>
/// <typeparam name="T"></typeparam>
/ / / < param name="bus"></param>
/ / / < param name="message"></param>
/ / / < returns></returns>
private static async Task PublishEvent<T>(this IBus bus, T message) where T : class
{
await bus.Publish(message);
}
}

class SendSmsCommand : ISendSms<string>
{
public Guid CommandId { get; set; }
public string PhoneNumber { get; set; }
public string Message { get; set; }
}

class SmsSentEvent : ISmsSent
{
public Guid EventId { get; set; }
public DateTime SentAtUtc { get; set; }
}


На мій погляд, дане рішення цілком вдало дозволяє відокремити код бізнес-логіки від деталей реалізації міжсистемної (компонентного) взаємодії і инкапсулировать їх в одному місці.

Конфігурація контейнера DI
На даний момент MassTransit надає можливість використовувати наступні популярні контейнери:
  1. Autofac;
  2. Ninject;
  3. StructureMap;
  4. Unity;
  5. Windsor Castle.
У випадку з UnityContainer буде потрібно встановити nuget package MassTransit.Unity, після чого стане доступний метод розширення LoadFrom:

public static class UnityExtensions
{
public static void LoadFrom(this IReceiveEndpointConfigurator configurator, IUnityContainer container);
}

Приклад використання виглядає наступним чином:

Конфігурація IBusControl з допомогою UnityContainer
public static IBusControl GetConfiguredFactory(IUnityContainer container)
{
if (container == null)
{
throw new ArgumentNullException(nameof(container));
}

var control = Bus.Factory.CreateUsingRabbitMq(cfg => {
var host = cfg.Host(AppSettings.Host, h => { });


// cfg.ReceiveEndpoint(host, e => e.LoadFrom(container));
cfg.ReceiveEndpoint(host, "play_with_masstransit_queue", e => e.LoadFrom(container));
});

control.ConnectConsumeObserver(new ConsumeObserver());
control.ConnectReceiveObserver(new ReceiveObserver());
control.ConnectConsumeMessageObserver(new ConsumeObserverSendSmsCommand());
control.ConnectSendObserver(new SendObserver());
control.ConnectPublishObserver(new PublishObserver());

return control;
}


В якості терміну життя консьюмеров в контейнері документація пропонує використовувати ContainerControlledLifetimeManager().

Спостерігачі (Observer)
Для моніторингу процесу обробки повідомлень доступне підключення спостерігачів (Observer). Для цього MassTransit надає наступний набір інтерфейсів для обробників:
  1. IReceiveObserver — спрацьовує відразу ж після отримання повідомлення та створення RecieveContext;
  2. IConsumeObserver — спрацьовує після створення ConsumeContext;
  3. IConsumeMessageObserver<T> — для спостереження за повідомленнями типу T, методи якого буде доступно строго-типізоване вміст повідомлення;
  4. ISendObserver — для спостереження за відправляються командами;
  5. IPublishObserver — для спостереження за відправляються подіями.
Для кожного з них інтерфейс IBusControl надає власний метод підключення, виконання якого повинно бути здійснене безпосередньо перед IBusControl.Start().

В якості прикладу далі представлена реалізація ConsumeObserver:

Реалізація IConsumeObsever
public class ConsumeObserver :IConsumeObserver
{
public Task PreConsume<T>(ConsumeContext<T> context) where T : class
{
Console.WriteLine($"[ConsumeObserver] PreConsume {context.MessageId}");
return Task.CompletedTask;
}

public Task PostConsume<T>(ConsumeContext<T> context) where T : class
{
Console.WriteLine($"[ConsumeObserver] PostConsume {context.MessageId}");
return Task.CompletedTask;
}

public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
{
Console.WriteLine($"[ConsumeObserver] ConsumeFault {context.MessageId}");
return Task.CompletedTask;
}
}


Я не буду приводити код кожного з консьюмеров, т. к. за принципом роботи і структурі вони схожі. Імплементацію кожного з них можна побачити в документації або в исходниках на Github.

Підсумковий pipeline отримання команди на відправку смс повідомлення, її обробки і публікації про події її успішному виконанні виглядає наступним чином:



Нове в MassTransit 3.0
Із змінами, які торкнулися нової версії бібліотеки, ви можете ознайомитися в 2-х оглядових статтях автора бібліотеки Chris Patterson's а на сторінках його блогу: MassTransit 3 API Changes і MassTransit v3 Update.

Висновок
Тут повинно було бути порівняння найбільш популярних бібліотек для роботи з чергами повідомлень, однак, я вирішив залишити це для окремої статті.

Сподіваюся, мені вдалося провести для вас поверхневе знайомство з бібліотекою MassTransit, за межею якого ще залишаються такі цікаві речі, як транзакционность, персистування (інтеграція з NHibernate, MondoDb, EntityFramework), планувальник відправки повідомлень (інтеграція з Quartz), state machine (Automatonymous і Saga), логування (Log4Net, NLog), багатопоточність і багато іншого.

Вихідні коди прикладів доступні на Github.

Використовувані матеріали:
  1. Документація MassTransit.


Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.