Початкові граблі в роботі з Service Bus for Windows Server

Є у Майкрософта така не дуже відома річ, як Service Bus for Windows Server. І так сталося, що в декількох проектах поспіль довелося з нею попрацювати. У підсумку вийшло зібрати невеликий набір підводних каменів, які зустрічалися в проектах частіше інших. Чим і ділюся.

Короткий опис про те, що таке Service Bus for Windows Server взагаліЦе реалізація Service Bus від Microsoft, яка дуже близька до Windows Azure Service Bus on Windows, але не вимагає самого Azure. Тобто така собі досить зручна і сучасна шина. Вміє надавати як стандартні черги (queue), так і їх просунутий варіант топіки (topic), який вміє віддавати одне і те ж повідомлення на кілька різних своїх підписок (subscription). Так як стикатися на практиці мені вдалося тільки з топіки/підписками, то далі мова піде тільки про них.
image
Тобто споживачі публікують свої повідомлення топік. Топік передає їх у всі свої підписки. Підписки, в свою чергу, перевіряють повідомлення на предмет того, чи потрібні вони їм чи ні, зіставляючи їх зі своїм списком правил (фільтр). Всі придатні повідомлення далі передаються тим клієнтам, хто підписаний на ці самі підписки. Причому якщо на одну і ту ж підписку підписано кілька клієнтів, то повідомлення отримає лише один з них. Все досить стандартно.

Перші кроки і перші ж граблі
З чого починається використання цієї штуки? З спроби відправити і отримати повідомлення, звичайно.
Увага, тут і далі наведено не продакшн-код, прикладений код лише покликаний служити функціонуючої ілюстрацією тексту
var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString);
var publisher = messageFactory.CreateTopicClient(topicName);
var listener = messageFactory.CreateSubscriptionClient(topicName, subscriptionName);

listener.OnMessage(message => Console.WriteLine($"Message received: {message.GetBody<string>()}"));

var brokeredMessage = new BrokeredMessage("some test message");
publisher.Send(brokeredMessage);

Все просто, повідомлення в консолі з'являється. Спробуємо опублікувати багато повідомлень і дуже грубо оцінити, скільки займе часу відправлення та отримання:

var stopwatch = new Stopwatch();
int messagesToSent = 200;
int messagesProccessed = 0;

listener.OnMessage(message => 
{
Console.WriteLine($"Message received: {message.GetBody<string>()}");
messagesProccessed++;

if (messagesProccessed == messagesToSent)
{
stopwatch.Stop();
Console.WriteLine($"Time passed: {секундомір.Elapsed}");
}
});

stopwatch.Start();
for (var i = 0; i < messagesToSent; i ++)
{
var brokeredMessage = new BrokeredMessage($"Message №{i}");
publisher.Send(brokeredMessage);
}

Якщо виконати цей код, то виходить, що на моєму старенькому комп'ютері-ветерана процес йде близько шести секунд.

А ось наступний крок часто наводить на перші граблі. Справа в тому, що користувач може отримувати повідомлення в одному з двох режимів:

  • PeekLock — повідомлення виходить, але не видаляється з передплати, а лише отримує лок. Щоб воно віддалилось, клієнт повинен явно підтвердити його успішну обробку викликавши Commit(). Інакше ж по закінченню лока або за викликом Abandon() від самого клієнта буде зроблена спроба доставити це повідомлення знову.

  • ReceiveAndDelete — повідомлення виходить і відразу віддаляється з підписки. Якщо обробка пішла не так, повторно це повідомлення вже не отримати. Зате працює трохи швидше PeekLock, так як не вішає локі.
За замовчуванням messageFactory.CreateSubscriptionClient створює PeekLock варіант. Але зважаючи неочевидності я практично не бачив, щоб клієнт створювали без явної вказівки режиму роботи. І, якщо вірити документації, при зазначеному PeekLock треба викликати .Complete() для кожного повідомлення. Спробуємо це зробити:

listener.OnMessage(message =>
{
Console.WriteLine($"Message received: {message.GetBody<string>()}");
messagesProccessed++;

if (messagesProccessed == messagesToSent)
{
stopwatch.Stop();
Console.WriteLine($"Time passed: {секундомір.Elapsed}");
}

message.Complete(); // Це все, що змінилося від минулого прикладу з виміром
});

І тут трапляється несподіване. Незважаючи на те, що ніяких эксепшнов не кидається, рядки з «Message №X» тікають, все відбувається ДУЖЕ повільно. Ці 200 повідомлень замість шести секунд зажадали цілих чотири хвилини і дев'ять секунд! Це не виправдати старим залізом. А адже цю проблему я разок знайшов у коді живого проекту, просто за малим числом повідомлень осідання продуктивності не впадала в очі.

Чому так відбувається? Адже якщо щось було б не так, можна було б очікувати эксепшна? Насправді, эксепшн є. Просто з якоїсь не зовсім зрозумілої причини, Microsoft зробили вкрай неочевидний спосіб отримання інформації про цих самих винятки.

Метод підписки на повідомлення OnMessage приймає необов'язковий параметр OnMessageOptions, який дозволяє підписатися на подію ExceptionReceived. Тобто ті самі «приховані виключення».

var onMessageOptions = new OnMessageOptions();
onMessageOptions.ExceptionReceived += (sender, args) => 
Console.WriteLine($"Exception received: {args.Exception}");

listener.OnMessage(message => 
{
Console.WriteLine($"Message received: {message.GetBody<string>()}");
messagesProccessed++;

if (messagesProccessed == messagesToSent)
{
stopwatch.Stop();
Console.WriteLine($"Time passed: {секундомір.Elapsed}");
}

message.Complete();
}, onMessageOptions); // не забуваємо передавати onMessageOptions

Запустивши такий код, ми побачимо, що на кожне повідомлення кидається эксепшн Microsoft.ServiceBus.Messaging.MessageLockLostException:
Надана блокування недійсна. У неї закінчився термін дії, або повідомлення вже видалено з черги..TrackingId:54630ae4-6e4f-4979-8fc8-b66e5314079c_GAPC_BAPC,TimeStamp:24.08.2016 21:20:08
Чому так відбувається? Тому що у onMessageOptions є ще один параметр: AutoCommit. І він за замовчуванням виставляється в true. Таким чином, для коректної роботи у разі, якщо ви хочете самостійно керувати циклом життя повідомлення, це поле потрібно виставляти в false. Спробуємо зробити так:

var stopwatch = new Stopwatch();
int messagesToSent = 200;
int messagesProccessed = 0;

var onMessageOptions = new OnMessageOptions
{
AutoComplete = false // неочевидний параметр
};
onMessageOptions.ExceptionReceived += (sender, args) => 
Console.WriteLine($"Exception received: {args.Exception}");

listener.OnMessage(message => 
{
Console.WriteLine($"Message received: {message.GetBody<string>()}");
messagesProccessed++;

if (messagesProccessed == messagesToSent)
{
stopwatch.Stop();
Console.WriteLine($"Time passed: {секундомір.Elapsed}");
}

message.Complete();
}, onMessageOptions);

stopwatch.Start();
for (var i = 0; i < messagesToSent; i ++)
{
var brokeredMessage = new BrokeredMessage($"Message №{i}");
publisher.Send(brokeredMessage);
}

І вуаля: немає эксепшнов, обробка повідомлень займає всього дві з половиною секунди. Схоже на нормальний режим роботи.

Резюмуючи:

  • Практично завжди треба підписуватися на onMessageOptions.ExceptionReceived, інакше можна не помітити ряд проблем у роботі коду
  • Пам'ятати, що за замовчуванням сервисбас намагається коммитить повідомлення за вас. Найчастіше це поведінка варто відключати
Абстрации і граблі два
Другий, не менш часто зустрічається момент, теж помічений у продакшн коді, це неправильне створення обгортки для передплатника. Створення класу, який би приховував всередині себе роботу з сервисбасом, справа взагалі гарний. Але є нюанси. Ось ілюстрація до того, як не треба робити, але подібний код не раз був помічений в реальності.

Створюється ось такого виду клас:

class Listener : IListener
{
private readonly MessagingFactory _messageFactory;
private readonly SubscriptionClient _client;

public event Func<string, Task> OnReceivedAsync;

public Listener(string connectionString, string topicName, string subscriptionName, ReceiveMode receiveMode)
{
_messageFactory = MessagingFactory.CreateFromConnectionString(connectionString);
_client = _messageFactory.CreateSubscriptionClient(topicName, subscriptionName, receiveMode);

var onMessageOptions = new OnMessageOptions
{
AutoComplete = false
};

onMessageOptions.ExceptionReceived += (sender, args) =>
Console.WriteLine($"Exception received: {args.Exception}");

_client.OnMessageAsync(bm => OnReceivedAsync?.Invoke(bm.GetBody<string>()), onMessageOptions);
}
}

Який далі використовується приблизно так:

var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString);
var publisher = messageFactory.CreateTopicClient(topicName);

int messagesToSent = 20;
for (var i = 0; i < messagesToSent; i++)
{
var brokeredMessage = new BrokeredMessage($"Message №{i}");
publisher.Send(brokeredMessage);
}

var listener = new Listener(connectionString, topicName, subscriptionName, ReceiveMode.ReceiveAndDelete);
listener.OnReceivedAsync += x =>
{
Console.WriteLine($"Message received: {x}");
return Task.FromResult(true);
};

Якщо виконати цей код, то все начебто працює, але замість першого повідомлення буде помилка "NullReferenceException: Object reference not set to an instance of an object."

Причому помилка буде виловлена тільки в разі підписки на onMessageOptions.ExceptionReceived, якщо цього не зробити (а не роблять це чомусь дуже часто), то про наявність проблеми можна дізнатися тільки по непрямим і іноді дуже невловимим багам в поведінці коду.

Що тут не так? Ну, відповідь досить очевидна, і якщо б не зустрічався так часто, я би про нього, напевно, і не згадував. Коли в конструкторі абстракції Listener викликається _client.OnMessageAsync — передплатник вже починає приймати повідомлення. Тому ряд з них (в залежності від того, як далеко рознесені конструктор і підписка на listener.OnReceivedAsync буде пропущений і потрапить на порожній OnReceivedAsync?.Invoke, логічно повертаючи null. Звідси і NullReferenceException.

Що з цим робити? Найпростіше — рознести створення екземпляра і підписку, наприклад так:

class Listener : IListener
{
private readonly MessagingFactory _messageFactory;
private readonly SubscriptionClient _client;

public Listener(string connectionString, string topicName, string subscriptionName, ReceiveMode receiveMode)
{
_messageFactory = MessagingFactory.CreateFromConnectionString(connectionString);
_client = _messageFactory.CreateSubscriptionClient(topicName, subscriptionName, receiveMode);
}

public void Subscribe(Func<string, Task> handleMessage)
{
var onMessageOptions = new OnMessageOptions
{
AutoComplete = false
};

onMessageOptions.ExceptionReceived += (sender, args) =>
Console.WriteLine($"Exception received: {args.Exception}");

_client.OnMessageAsync(bm => handleMessage(bm.GetBody<string>()), onMessageOptions);
}
}

І підписуватися приблизно так:

var listener = new Listener(connectionString, topicName, subscriptionName, ReceiveMode.ReceiveAndDelete);
listener.Subscribe(x =>
{
Console.WriteLine($"Message received: {x}");
return Task.FromResult(true);
});


Тепер втрати повідомлень при створенні класу не відбувається.

Резюмуючи:

  • Підписка на onMessageOptions.ExceptionReceived актуальніша актуального
  • Пам'ятати, що отримання повідомлень починається відразу після виклику client.OnMessageAsync і враховувати це у проектуванні абстраций
Граблі номер три
Є у передплатника чудовий метод Close(). Але його поведінка не зовсім передбачувано умоглядно. Спробуємо виконати ось такий код, який після відправки першої половини повідомлень викликає цей самий Close() і отримує другу половину повідомлень вже через інший примірник передплатника.

var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString);
var publisher = messageFactory.CreateTopicClient(topicName);
var listener1 = messageFactory.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.ReceiveAndDelete);
var listener2 = messageFactory.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.ReceiveAndDelete);

int messagesToSent = 10;
int messagesProccessed = 0;

var onMessageOptions = new OnMessageOptions
{
AutoComplete = false
};
onMessageOptions.ExceptionReceived += (sender, args) =>
Console.WriteLine($"Exception received: {args.Exception}");

listener1.OnMessage(message =>
{
Console.WriteLine($"listener1: message received: {message.GetBody<string>()}, listener1 is closed: {listener1.IsClosed}");
messagesProccessed++;
}, onMessageOptions);

for (var i = 0; i < messagesToSent; i++)
{
var brokeredMessage = new BrokeredMessage($"Message №{i}");
publisher.Send(brokeredMessage);
Thread.Sleep(50);

if (i == 4)
{
Console.WriteLine("Closing listener1");
listener1.Close();
}
}

listener2.OnMessage(message =>
{
Console.WriteLine($"listener2: message received : {message.GetBody<string>()}, listener2 is closed: {listener2.IsClosed}");
messagesProccessed++;
}, onMessageOptions);

Але результат в консолі буде ось такий:
listener1: message received: Message №0, listener1 is closed: False
listener1: message received: Message №1, listener1 is closed: False
listener1: message received: Message №2, listener1 is closed: False
listener1: message received: Message №3, listener1 is closed: False
listener1: message received: Message №4, listener1 is closed: False
Closing listener1
listener1: message received: Message №5, listener1 is closed: True
listener2: message received: Message №6, listener2 is closed: False
listener2: message received: Message №7, listener2 is closed: False
listener2: message received: Message №8, listener2 is closed: False
listener2: message received: Message №9, listener2 is closed: False
Неочевидно, правда? Якщо зробити те ж саме, але для режиму роботи PeekLock замість ReceiveAndDelete, то результат буде схожий, хіба що .Complete() викине эксепшн System.OperationCanceledException: This messaging entity has already been closed, aborted, or disposed. Причому якщо ловити помилки в обробника повідомлень, щоб робити Abandon() руками, то і сам Abandon() викине помилку. Причому обидва ці эксепшна звичайні, не ховаються всередині OnMessageOptions.

А саме пропущене повідомлення, на відміну від ReceiveAndDelete, все таки буде опрацьовано пізніше, коли відбудеться повторна відправка.

Код з Complete і вивід на консоль
var messageFactory = MessagingFactory.CreateFromConnectionString(connectionString);
var messageFactory1 = MessagingFactory.CreateFromConnectionString(connectionString);
var messageFactory2 = MessagingFactory.CreateFromConnectionString(connectionString);
var publisher = messageFactory.CreateTopicClient(topicName);
var listener1 = messageFactory1.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.PeekLock);
var listener2 = messageFactory2.CreateSubscriptionClient(topicName, subscriptionName, ReceiveMode.PeekLock);

int messagesToSent = 10;
int messagesProccessed = 0;

var onMessageOptions = new OnMessageOptions
{
AutoComplete = false
};
onMessageOptions.ExceptionReceived += (sender, args) =>
Console.WriteLine($"Exception received: {args.Exception}");

listener1.OnMessage(message =>
{
try
{
Console.WriteLine($"listener1: message received: {message.GetBody<string>()}, listener1 is closed: {listener1.IsClosed}");
messagesProccessed++;
message.Complete();
}
catch (Exception ex1)
{
Console.WriteLine($"listener1 Complete() exception: {ex1.Message}");
try
{
message.Abandon();
}
catch (Exception ex2)
{
Console.WriteLine($"listener1 Abandon() exception: {ex2.Message}");
}
}
}, onMessageOptions);

for (var i = 0; i < messagesToSent; i++)
{
var brokeredMessage = new BrokeredMessage($"Message №{i}");
publisher.Send(brokeredMessage);
Thread.Sleep(50);

if (i == 4)
{
Console.WriteLine("Closing listener1");
listener1.Close();
}
}

listener2.OnMessage(message =>
{
Console.WriteLine($"listener2: message received : {message.GetBody<string>()}, listener2 is closed: {listener2.IsClosed}");
messagesProccessed++;
message.Complete();
}, onMessageOptions);


listener1: message received: Message №0, listener1 is closed: False
listener1: message received: Message №1, listener1 is closed: False
listener1: message received: Message №2, listener1 is closed: False
listener1: message received: Message №3, listener1 is closed: False
listener1: message received: Message №4, listener1 is closed: False
Closing listener1
listener1: message received: Message №5, listener1 is closed: True
listener1 Complete() exception: This messaging entity has already been closed, aborted, or disposed.
listener1 Abandon() exception: This messaging entity has already been closed, aborted, or disposed.

listener2: message received: Message №6, listener2 is closed: False
listener2: message received: Message №7, listener2 is closed: False
listener2: message received: Message №8, listener2 is closed: False
listener2: message received: Message №9, listener2 is closed: False
listener2: message received: Message №5, listener2 is closed: False

Що з цим робити і як з цим жити? Ну, про це треба просто пам'ятати і враховувати це в коді. Варіантів боротися з подібною поведінкою всезнаючий stackoverflow пропонує достатньо. Наприклад, там, де це доречно, можна викликати messageFactory.Close() спільно з закриттям передплатника. Або перевіряти в процесорі, не закритий зараз передплатник чимось типу if(listener.IsClosed) { /***/ }, і т. д.

Резюмуючи:

  • Не всі экспешны від сервисбаса прилітають тільки в onMessageOptions.ExceptionReceived
  • завершення роботи з передплатником треба враховувати особливості роботи методу Close()
Висновок
В цілому, Service Bus for Windows Server досить непогана штука і цілком справляється зі своїми завданнями, але деякі дрібниці на старті можуть попити крові. Сподіваюся, викладені у статті пункти виявляться кому-небудь корисними і вбережуть від набивання на них власних шишок.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.