Подієва модель на основі async і await

У далекому 2012, коли ціна на нафту була тризначного, а трава зеленіша, майкрософтом був випущений .NET 4.5, а з ним і конструкція async/await. Про неї написано вже досить багато статей (Async в C#), а більшість розробників C# добре її вивчили. Але всі варіанти використання були розглянуті, можна вичавити з await трохи більше?

Самим очевидним варіантом використанням цієї конструкції є очікування завершення якоїсь асинхронної операції. Перше, що приходить на розум — це очікування вводу-виводу. Наприклад, ми послали запити і чекаємо відповіді, тоді використовуючи await ми зможемо продовжити виконання коду після отримання відповіді, а сам код при цьому буде виглядати синхронним. Але що якщо під час очікування виникне необхідність перервати виконання цієї операції? Тоді нам доведеться використовувати CancellationToken, причому якщо таких кілька операцій, то токени необхідно буде линковать або використовувати один загальний токен. При цьому причина скасування буде прихована від коду, що використовує цей CancellationToken. Крім скасування, код повинен підтримувати обробку втрати з'єднання, таймауту, що повертаються помилок і т. д.

У класичному варіанті це виллється у використання CancellationToken для обробки скасування, try catch для обробки розриву з'єднання та аналізу код повернутих даних, для оцінки результату запиту. Але чи можна вмістити все це в єдиній парадигмі? В цей статті я пропоную розглянути альтернативний підхід, заснований на подієвої моделі з використанням синтаксичного цукру async/await.

Бібліотека Eventing.
Все необхідне для подієвої моделі на async/await було оформлено у вигляді бібліотеки Eventing і викладено на GitHub під ліцензією MIT.

Бібліотека протестована і успішно використовується на бойовій системі більше двох років.

Використання
Описаний на початку приклад з використанням Eventing буде виглядати так:

var @event = await this.EventManager.WaitFor<MessageReceived, CancelRequested>(TimeSpan.FromSeconds(50));

if (@event == null)
Log.Info("timeout");
else if (@event is CancelRequested)
Log.Info("Cancelled, reason: {0}", ((CancelRequested) @event).Reason);
else
Log.Info("Message received");

Тут ми використовуємо EventManager — менеджер подій реалізує інтерфейс IEventManager, для очікування подій MessageReceived і CancelRequested з таймаутом в 50 секунд. З допомогою виклику WaitFor ми формуємо підписку на зазначені події, а виклик await блокує подальше виконання коду(але не потоку). Воно залишиться заблокованим до тих пір, поки не станеться одне з зазначених подій або закінчиться час таймауту, після чого виконання продовжиться в поточному контексті синхронізації. Але що якщо зв'язок з клієнтом буде втрачена під час формування підписки? У цьому випадку код зависне на 50 секунд, так як подія відключення клієнта буде втрачено. Виправимо це:

// Створення підписки
var eventAwait = this.EventManager.WaitFor<MessageReceived, ClientDisconnected, CancelRequested>(TimeSpan.FromSeconds(50), 
e => !(e is ClientDisconnected) || ((ClientDisconnected)e).id == client.Id); // Фільтр події

if (!client.Connected || cancelRequested) {
// Випадок відключення клієнта або запиту на скасування під час створення підписки
Log.Info("Client disconnected or cancel requested");
return;
}

// Переривання коду до настання події
var @event = await eventAwait;
...

Тут ми додали подія ClientDisconnected і розділили створення awaitable змінної eventAwait і безпосередньо очікування події. Якщо б ми не розділили їх, то клієнт міг би відключитися після перевірки client.Connected і очікуванням події, що призвело б до втрати події. Також був доданий фільтр подій, який виключає події ClientDisconnected не відносяться до поточного клієнту.

Як створити подію?
Для цього треба створити клас, имплементирующий IEvent:

class CancelRequested : IEvent {
public string Reason { get; set; }
}

А потім викликати IEventManager.RaiseEvent, наприклад:

this.EventManager.RaiseEvent(new CancelRequested()). 


Спадкування від IEvent відокремлює події від інших класів і запобігає використання невідповідних примірників у методі RaiseEvent. Також підтримується спадкування:

class UserCancelRequested : CancelRequested {
}

class SystemCancelRequested : CancelRequested {
}

var @event = await this.EventManager.WaitFor<CancelRequested>();
if (@event is UserCancelRequested)
...

Якщо у вас складна система в якій безліч одночасно очікуваних подій, використання події CancelRequested замість токенів скасування, дозволить уникнути прокидывания і линкование глобального і локального CancellationToken. Це важливо, так як складне линкование підвищує ймовірність пропустити витік пам'яті з-за утримання токенів.

Як підписатися на подію?
Деякі події носять періодичний характер, такі події можна отримувати методом IEventManager.StartReceiving:

void StartReceiving<T>(Action<T> handler, object listener, Func<T, bool> filter = null, SynchronizationContext context = null) 
where T : IEvent;

Обробник handler буде викликаний в контексті синхронізації context при кожному подію T, що задовольняє фільтру filter, якщо він заданий. Якщо контекст синхронізації не задано, буде використано SynchronizationContext.Current.

Як це працює?
Використовується все той-же механізм тасков, на якому заснований async/await. При виклику WaitFor менеджер подій створює таск використовуючи TaskCompletionSource і формує підписку за обраними типами подій в шині повідомлень.

// EventManager.cs, створення підписки
var taskCompletionSource = new TaskCompletionSource<IEvent>();

var subscription = new MessageSubscription(
subscriptionId,
message => {
var @event = message as IEvent;
if (filter != null && !filter(@event))
return;

// Встановлюємо результат виконання завдання
if (taskCompletionSource.TrySetResult(@event))
this.trace.TraceEvent(TraceEventType.Information, 0, "Wait ended: '{0}' - '{1}'",
subscriptionId, message.GetType());
},
this, UnsubscribePolicy.Auto, this.defaultContext, eventTypes);

this.messageBus.Subscribe(subscription);
...
return taskCompletionSource.Task;

При генерації події викликається метод RaiseEvent, який передає подія в шину, а вона у відповідності з типом події вибирає підписки, в яких eventTypes включає в себе цей тип. Далі викликається обробник підписки і якщо він задовольняє фільтру, то встановлюється результат виконання завдання і розблокує виклик await.

// EventManager.cs, генерація події
public void RaiseEvent(IEvent @event) {
this.trace.TraceEvent(TraceEventType.Information, 0, "Event: {0}", @event);

this.messageBus.Send(@event);
}

// MessageBus.cs, відправлення повідомлення
public void Send(object message) {
var messageType = message.GetType();
IMessageSubscription[] subscriptionsForMessage;

lock (this.subscriptions) {
subscriptionsForMessage = this.subscriptions
.Where(s => s.MessagesTypes.Any(type => messageType == type || type.IsAssignableFrom(messageType)))
.ToArray();
}

...

foreach (var subscription in subscriptionsForMessage)
subscription.ProccessMessage(message);

this.UnsubscribeAutoSubscriptions(subscriptionsForMessage);
...

// MessageSubscription.cs
public void ProccessMessage(object message) {
var messageHandler = this.handler;
this.SynchronizationContext.Post(o => messageHandler(message), null);
}

У MessageSubscription.ProccessMessage повідомлення передається в заданий користувачем контекст синхронізації, що дозволяє уникнути задержок при надсиланні повідомлення.

Визволи мій клас від багатопоточності!
Кожен, хто працював з async/await знає, що після завершення await код продовжує своє виконання у поточному потоці, а в поточному контексті синхронізації. Це може бути проблемою, якщо ви підпишетеся на подію з допомогою StartReceiving, а потім викличте WaitFor, що призведе до того, що код класу буде виконуватися одночасно в різних потоках(обробник подій з StartReceiving та код після await // як страшно жити!). Це легко виправити однопоточним контектстом синхронізації, що входять в бібліотеку:

this.serverSynchronizationContext = new SingleThreadSynchronizationContext("Server thread");
this.clientSynchronizationContext = new SingleThreadSynchronizationContext("Client thread");

this.serverSynchronizationContext.Post(async o => await this.RunServer(), null);
this.clientSynchronizationContext.Post(async o => await this.RunClient(), null);

Таким чином у нас клієнт завжди буде виконуватися в потоці «Client thread», а сервер «Server thread». Ви зможете писати багатопотоковий код не замислюючись про race condition. В якості бонусу це дозволить максимально утилізувати окремо взятий потік.

У чому перевага?
Головною перевагою є простота та тестування коду. Якщо щодо першого можна сперечатися, простоту кожен розуміє по своєму, то з другим пунктом все очевидно. Багатопотокове додаток можна протестувати в одному потоці, емулюючи будь-яку послідовність подій, причому для цього не потрібно створювати mock об'єкти, будь взаємодію можна звести до подій, а їх перевірку до викликом RaiseEvent. Приклад NUnit:

/// < summary>
/// This test demonstrates how to test application that uses Eventing
/// All code executes sequently in one thread
/ / / < /summary>
[TestFixture]
public class TestCase : TestBase {
[Test]
public async Task ClientDoesWork() {
var client = new Client(this.EventManager);
var doWorkAwaitable = client.DoWork();

this.EventManager.RaiseEvent(new Connected());

// We can and debug step into 
this.React();

await doWorkAwaitable;

Assert.AreEqual(true, client.Connected);
}
}

Як це можна використовувати?
Щоб не переповнювати статтю листингами, наведу лише короткий текстовий опис однієї з системи, де використовується Eventing. Це горизонтально масштабована розподілена система, що складається з чотирьох типів вузлів, один з яких є майстром. Майстер безперервно спілкується з усіма вузлами і керує виконанням різних операцій. Кожну операцію можна представити у вигляді кінцевого автомата, де це перехід настання події(в тому числі таймаут або скасування). Хоча для кожної операції та можна було автомат реалізувати в його класичному вигляді(що ми і зробили), набагато простіше виявилося представити його використовуючи Eventing, де поточний стан визначалося точкою виконання коду, а не окремої змінної. При це на кожному кроці були явно перераховані всі очікувані події, що спрощувало тестування білого ящика.

Висновок
У статті розглянуто ключові можливості і варіанти використання бібліотеки Eventing. Бібліотека не претендує на універсальність і підтримку високонавантажених систем, але закликає трохи по іншому поглянути на звичні речі, дозволяє писати безпечний і легко протестований з точки зору багатопоточності код.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.