Принципи реактивного програмування на прикладі простого RSS-агрегатора з використанням ReactiveX для Python


В останні роки реактивне програмування в цілому, а технологія ReactiveX зокрема, знаходить все більшу популярність серед розробників. Одні вже активно використовують усі переваги цього підходу, а інші тільки «щось чули». Зі свого боку я постараюся допомогти вам уявити, наскільки деякі концепції реактивного програмування здатні змінити погляд на звичні, здавалося б, речі.

Існує два принципово різних способи організації великих систем: у відповідності з об'єктами і станами, які живуть у системі, і у відповідності з потоками даних, які проходять через неї. Парадигма реактивного програмування передбачає легкість вираження потоків даних, а також розповсюдження змін завдяки цим потокам. Наприклад, імперативний програмуванні операція присвоювання означає кінцівку результату, тоді як в реактивному значення буде перераховано при отриманні нових вхідних даних. Потік значень проходить в системі ряд трансформацій, які необхідні для вирішення певної задачі. Оперування потоками дозволяє системі бути розширюваної і асинхронної, а правильна реакція на виникаючі помилки – відмовостійкої.

ReactiveX – бібліотека, що дозволяє створювати асинхронні і подієво-орієнтовані програми, що використовують спостережувані послідовності. Вона розширює шаблон Спостерігача для підтримки послідовностей даних, додає оператори для їх декларативного з'єднання, позбавляючи від необхідності піклуватися про синхронізацію і безпеки потоків, поділюваних структурах даних і неблокірующіх I/O.

Одним з основних відмінностей бібліотеки ReactiveX від функціонального реактивного програмування є те, що вона оперує не безперервно змінюються, а дискретними значеннями, які «випускаються» протягом тривалого часу.

Варто трохи розповісти про те, що таке Observer, Вами, Subject. Модель Вами є джерелом даних і дозволяє обробляти потоки асинхронних подій схожим чином з тим, який ви використовуєте для колекцій даних, таких як масиви. І все це замість колбэков, а значить, код є більш читабельним і менш схильним до помилок.

У ReactiveX спостерігач (Observer) підписується на Вами і згодом реагує на елемент або послідовність елементів, які той відправляє. У кожного Observer, підписаного Вами, викликається метод Observer.on_next() на кожен елемент потоку даних, після якого може бути викликаний як Observer.on_complete(), так і Observer.on_error(). Часто Вами застосовується таким чином, що він не починає віддавати дані до тих пір, поки хто-небудь не підписується на нього. Це так звані «ледачі обчислення» – значення обчислюються лише тоді, коли в них виникає потреба.

observer
Бувають завдання, для вирішення яких потрібно з'єднати Observer та Вами, щоб отримувати повідомлення про події і повідомляти про них своїм передплатникам. Для цього існує Subject, що має, крім стандартної, ще кілька реалізацій:

  • ReplaySubject має можливість кешувати всі надійшли в нього дані, а при появі нового передплатника – віддавати всю цю послідовність спочатку, працюючи далі у звичайному режимі.

  • BehaviorSubject зберігає останнє значення, за аналогією з ReplaySubject віддаючи його з'явився передплатнику. При створенні він отримує значення за замовчуванням, яке буде отримувати кожен новий передплатник, якщо останнього значення ще не було.

  • AsyncSubject також зберігає останнє значення, але не віддає дані, поки не завершиться вся послідовність.
Вами і Observer – тільки початок ReactiveX. Вони не несуть в собі всю міць, яку являють собою оператори, що дозволяють трансформувати, об'єднувати, маніпулювати послідовностями елементів, які віддають Вами.

В документації ReactiveX опис операторів включає в себе використання Marble Diagram. Приміром, ось як ці діаграми представляють Вами та їх трансформації:

вами

Дивлячись на діаграму нижче, легко зрозуміти, що оператор map трансформує елементи, що віддаються Вами, шляхом застосування функції до кожного з них.

map

Гарною ілюстрацією можливостей ReactiveX є додаток RSS-агрегатора. Тут виникає необхідність асинхронної завантаження даних, фільтрації і трансформації значень, підтримки актуального стану шляхом періодичного оновлення.

У цій статті приклади для представлення основних принципів ReactiveX написані з використанням бібліотеки rx для мови програмування Python. Ось так, наприклад, виглядає абстрактна реалізація спостерігача:

class Observer(metaclass=ABCMeta):
@abstractmethod
def on_next(self, value):
return NotImplemented

@abstractmethod
def on_error(self, error):
return NotImplemented

@abstractmethod
def on_completed(self):
return NotImplemented

Наше додаток в режимі реального часу буде обмінюватися повідомленнями з браузером за допомогою веб-сокетів. Можливість легко реалізувати це надає Tornado.

Робота програми починається з запуску сервера. При зверненні браузера до сервера відкривається веб-сокет.

Код

import json
import os

import feedparser
from rx import config, Вами
from rx.subjects import Subject
from tornado.escape import json_decode
from tornado.httpclient import AsyncHTTPClient
from tornado.platform.asyncio import AsyncIOMainLoop
from tornado.web import Application, RequestHandler, StaticFileHandler, url
from tornado.websocket import WebSocketHandler

asyncio = config['asyncio']

class WSHandler(WebSocketHandler):
urls = ['https://lenta.ru/rss/top7',
'http://wsrss.bbc.co.uk/russian/index.xml']
def open(self):
print("WebSocket opened")
# тут буде основна логіка нашого додатка

def on_message(self, message):
obj = json_decode(message)
# Відправляє повідомлення, яке отримує user_input
self.subject.on_next(obj['term'])

def on_close(self):
# Відписатись від Вами; по ланцюжку зупинить роботу усіх вами
self.combine_latest_sbs.dispose()
print("WebSocket closed")

class MainHandler(RequestHandler):
def get(self):
self.render("index.html")

def main():
AsyncIOMainLoop().install()

port = os.environ.get("ПОРТУ", 8080)
app = Application([
url(r"/", MainHandler),
(r'/ws', WSHandler),
(r'/static/(.*)', StaticFileHandler, {'path': "."})
])
print("Starting server at port: %s" % port)
app.слухати(port)
asyncio.get_event_loop().run_forever()


Для обробки введеного користувачем запиту створюється Subject, при підписці на який він надсилає значення за замовчуванням (в нашому випадку — порожній рядок), а потім раз в секунду відправляє те, що введене користувачем і задовольняє умовам: довжина 0 або більше 2, значення змінилося.

# Subject одночасно і Вами, і Observer
self.subject = Subject()
user_input = self.subject.throttle_last(
1000 # На заданому часовому проміжку отримувати останнє значення
).start_with(
"# Відразу ж після підписки відправляє значення за замовчуванням
).filter(
lambda text: len(text) == 0 або len(text) > 2
).distinct_until_changed() # якщо значення змінилося

Також для періодичного оновлення новин передбачений Вами, який раз у 60с віддає значення.


interval_obs = Вами.interval(
60000 # Віддає значення раз на 60с (для періодичного оновлення)
).start_with(0)

Два цих потоку з'єднуються оператором combine_latest, в ланцюжок вбудовується Вами для отримання списку новин. Після чого на цей Вами створюється підписка, вся ланцюжок починає працювати тільки в цей момент.


# combine_latest збирає 2 потоку запитів користувача і тимчасових
# інтервалів, спрацьовує на будь-повідомлення з кожного потоку
self.combine_latest_sbs = user_input.combine_latest(
interval_obs, lambda input_val, i: input_val
).do_action( # Спрацьовує на кожен випущений елемент
# Відправляє повідомлення для очищення списку на фронтенд
lambda x: send_response('clear')
).flat_map(
# В ланцюжок вбудовується Вами для отримання списку
self.get_data
).subscribe(send_response, on_error)
# Створюється підписка; весь ланцюжок починає працювати тільки в цей момент

Слід детальніше зупинитися на тому, що таке «Вами для отримання списку новин». Зі списку url для отримання новин ми створюємо потік даних, елементи якого приходять в функцію, де за допомогою HTTP-клієнта Tornado AsyncHTTPClient відбувається асинхронне завантаження даних для кожного елемента списку urls. З них також створюється потік даних, який фільтрується по запиту, введеному користувачем. З кожного потоку ми беремо по 5 новин, які наводимо до потрібного формату для відправки на фронтенд.

Код

def get_rss(self, rss_url):
http_client = AsyncHTTPClient()
return http_client.fetch(rss_url, method='GET')

def get_data(self, query):
# Вами створюється зі списку url
return Вами.from_list(
self.urls
).flat_map(
# Для кожного url створюється Вами, який завантажує дані
lambda url: Вами.from_future(self.get_rss(url))
).flat_map(
# Отримані дані парсятся, з них створюється Вами
lambda x: Вами.from_list(
feedparser.parse(x.body)['entries']
).filter(
# Фільтрує щодо входження запиту в заголовок або текст новости
lambda val, i: query in val.title or query in val.summary
).take(5) # Беремо тільки по 5 новин по кожному url
).map(lambda x: {'title': x.title, 'link': x.link,
'published': x.published, 'summary': x.summary})
# Перетворює дані для відправки на фронтенд


Після того, як потік вихідних даних сформовано, його передплатник починає поелементно отримувати дані. Функція send_response надсилає отримані значення у фронтенд, який додає новину в список.


def send_response(x):
self.write_message(json.dumps(x))

def on_error(ex):
print(ex)

У файлі feeder.js

Код
ws.onmessage = function(msg) {
var value = JSON.parse(msg.data);
if (value === "clear") {$results.empty(); return;}

// Append the results
$('<li><a tabindex="-1" href="' + value.link +
'">' + value.title +'</a> <p>' + value.published +
'</p><p>' + value.summary + '</p></li>'
).appendTo($results);
$results.show();
}


Таким чином, реалізується push-технологія, в якій дані надходять від сервера до фронтенду, який лише відправляє введений користувачем запит для пошуку по новинах.

В якості висновку пропоную задуматися про те, яка реалізація вийшла б при звичайному підході з використанням колбэков замість Вами, без можливості легко об'єднати потоки даних, без можливості миттєвої відправки даних споживачеві-фронтенду і з необхідністю відстежувати зміни в рядку запиту. Серед Python-розробників технологія поки що практично не поширена, проте я бачу вже кілька можливостей її застосувати на поточних проектах.

Приклад використання ReactiveX для Python ви можете знайти в github репозиторії з демо-проектом RSS-агрегатора.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.