Реактивні розширення



Це друга публікація за матеріалами нашій внутрішньої конференції Sync.NET. Перша публікація була присвячена багатопоточності .NET.

Реактивні розширення — звучить настільки круто, що напрошується зв'язок з реактивними літаками. Ніякого зв'язку, звичайно, немає, але це дійсно відмінний інструмент. Reactive походить від слова react (реагувати), мається на увазі, що система реагує на зміни стану. У процесі розвитку програмного забезпечення виникла потреба, щоб система вміла реагувати на безліч джерел даних, була стійка і щоб різні модулі не були тісно пов'язані.

Як правило, ми пишемо код, в якому є методи і функції, які ми викликаємо, отримуємо результат і його обробляємо. Rx в свою чергу дозволяє створювати події і обробники, які будуть реагувати на них. Таким чином, система буде складатися з послідовності подій, які будуть повідомляти про зміну стану і належним чином реагувати на них.

Rx складається з двох базових абстракцій в просторі імен System починаючи з .NET 4.0, а саме
System.IObserver
та
System.IObservable
. Як видно з назви, це реалізація патерну «спостерігач» (Observer). У даній реалізації
IObservable
виступає як
subject
, і очевидно, що
IObserver
це спостерігач, який може підписуватися на зміни. У платформі .NET вже є реалізація спостерігача у вигляді подій (Events). Як вже згадувалося, Rx дозволяє створювати послідовність подій, і само собою зрозуміло, що це можна зробити з допомогою івентів. Способи роботи з Rx і Events відрізняються, але про це трохи пізніше.

IObserver<in T>

Надає механізм отримання повідомлень. Інтерфейс оголошує три методу:

void OnNext(T value)
— надає наступний елемент послідовності.

void OnError(Exception ex)
— дозволяє передати Exception і адекватно його обробити. Мається на увазі, що після цього повідомлення послідовність закінчується і спостерігачам більше не потрібно стежити за змінами.

void OnCompleated()
— повідомляється, що послідовність закінчилася і більше не буде нових повідомлень, не треба їх чекати.

IObservable<out T>

Виробляє повідомлення і дозволяє підписуватися спостерігачам. Оголошує один метод:

IDisposable Subscribe(IObserver<T> observer)
— приймає спостерігача (IObserver) параметром і підписує його на повідомлення. Зверніть увагу, що метод повертає
IDisposable
, за допомогою чого можна потім викликати метод
Dispose
, тим самим відписавши і знищивши спостерігача.

Якщо ми захочемо реалізувати
IObservable
, то потрібно буде крім методу
Subscribe
також реалізувати логіку, яка може надсилати повідомлення помилки або повідомляти про закінчення послідовності. Виходить, що також потрібно буде реалізувати інтерфейс
IObservable
, для таких цілей можна використовувати тип
Subject
. Але щоб його використовувати, потрібно буде з
Nuget
встановити додаткову бібліотеку (
Install-Package Rx-Mail
), яка також надає додаткові розширення і можливість використовувати
LINQ
.

using System;
using System.Reactive.Subjects;
namespace Demo
{
class Program
{ 
static Subject<int> sub = new Subject<int>();//Declare
static void Main()
{
sub.Subscribe(Console.WriteLine); //Subscribe

sub.OnNext(234); //Publish
}
}
}


У цьому прикладі створюється нова послідовність, тобто
Subject<int>
(також можна назвати послідовність int'ів), потім на неї підписується спостерігач (в даному випадку просто виводиться на консоль кожне значення послідовності), та передається значення, яке виводиться на консоль за допомогою спостерігача. Кожен раз, коли підписується новий спостерігач, йому починають поставлятися елементи послідовності. Але є ще декілька реалізацій з іншим поведінкою:

ReplaySubject

using System;
using System.Reactive.Subjects;

namespace Demo
{
class Program
{ 
static ReplaySubject<int> sub = new ReplaySubject<int>();

static void Main()
{
sub.OnNext(222);

sub.Subscribe(Console.WriteLine);

sub.OnNext(354);
}
}
}


ReplaySubject
— постачає всі елементи послідовності незалежно від того, коли був підписаний спостерігач.

BehaviorSubject

using System;
using System.Reactive.Subjects;

namespace DemoData
{
class Program
{ 
static BehaviorSubject<int> sub = new BehaviorSubject<int>(666);

static void Main()
{
sub.OnNext(222);

sub.Subscribe(Console.WriteLine); // 222
}
}
}


BehaviorSubject
— не може бути порожнім, завжди містить у собі елемент, але тільки останній.

AsyncSubject

using System;
using System.Reactive.Subjects;

namespace DemoData
{
class Program
{ 
static AsyncSubject<int> sub = new AsyncSubject<int>();

static void Main(string[] args)
{
sub.OnNext(222);

sub.Subscribe(Console.WriteLine);

sub.OnCompleted(); // Publish 222
}
}
}


AsyncSubject
також повертає останнє значення, але, на відміну від інших реалізацій, дані будуть публікуватися при виклику
OnCompleated
.

Тепер порівняємо з Подієві ами, ось як виглядав би код:

using System;
namespace Demo
{
class Program
{ 
static event Action<int> Ev; //Declare

static void Main(string[] args)
{ 
Ev += Console.WriteLine; //Subscribe

Ev(234); //Publish
}
}
}


Все гранично просто, виконання буде проходити так само, але в Rx є ряд переваг перед івентами:

  • Реалізація
    IObservable
    — це класи, в яких можна робити все, що хочеш. Методи, які оголошує
    IObserver
    , дозволяють більш коректно керувати послідовністю.
  • Можна повідомити, що послідовність закінчилася і тим самим зробити останні потрібні дії і відписатися. Є можливість управляти помилками.
  • В івентах щоб відписатись, потрібно зберегти спостерігача в якій-то змінної і як ними управляти. В Rx метод
    Subscribe
    повертає
    IDisposable
    і йому можна просто викликати
    Dispose()
    , щоб відписатися.
var toDispose = sub.Subscribe(Console.WriteLine);
toDispose.Dispose();


  • Rx містить безліч корисні фіч, перевантажень методів і розширень
  • LINQ!
LINQ

Спочатку
LINQ
дозволяв робити запити до статичних джерел даних. Але так як кількість даних зростає, а підходи змінюються, то треба до цього пристосовуватися. Rx дозволяє виконувати запити до динамічних послідовності.

using System.Reactive.Linq; // дозволяє застосовувати LINQ

namespace Demo
{
class Program
{ 
static void Main()
{
var sequence = Вами.Range(1, 10, Scheduler.Default); // створюється послідовність від 1 до 10
var query = from s in sequence
where s % 2 == 0
select s; // створюється запит, нічого не виконується
sequence.Subscribe(Console.WriteLine); // підписується спостерігач (1,2,3,4,5,6,7,8,9,10)
query.Subscribe(Console.WriteLine); // підписується спостерігач (2,4,6,8,10) 
}
}
}


У прикладі спочатку створюється послідовність, яка надає дані типу int від 1 до 10, потім до неї застосовується LINQ-вираз, який вибирає з послідовності тільки значення, кратні 2. Таким чином, виходить дві різні послідовності, на які можна підписати різних спостерігачів. Це вкрай простий приклад, але Rx надає дуже багато методів, які дають величезну гнучкість.

Висновки
Reactive extensions дозволяє створювати окремі модулі, які будуть стежити за станом системи і реагувати на нього. Кожна частина системи буде повністю незалежна, так як вона не знає нічого про інших модулях. Спостерігачі очікують зміни послідовності, а їй, у свою чергу, все одно, хто спостерігає за її змінами. Тим самим досягається зв'язаність модулів. Rx має сенс застосовувати для обробки UI-подій, доменних подій, змін навколишнього середовища, змін на сторонніх сервісах (RSS, Twitter тощо). Rx також надає можливість перетворювати події в
IObservable
, що дозволяє інтегруватися в систему.

Не варто застосовувати Rx для того, щоб перетворити статичні послідовності
IObservable
, це буде марна трата ресурсів і не принесе ніякої вигоди. Також не варто реалізовувати черзі, так як це абсолютно різні підходи. Величезною перевагою є той факт, що Rx підтримує
LINQ
та нічого нового не потрібно вивчати.

Rx — відмінний інструмент, який дозволяє створювати реактивні системи, але це не означає, що потрібно все кинути і почати писати в цьому стилі. Головне — завжди використовувати сіра речовина!

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.