Потокова обробка даних за допомогою Akka

Привіт, Хабр! Всі звикли асоціювати обробку великих даних з Hadoop або Spark), які реалізують парадигму MapReduce (або його розширення). У цій статті я розповім про недоліки MapReduce, про те, чому ми прийняли рішення відмовлятися від MapReduce, і як ми пристосували Akka + Akka Cluster на заміну MapReduce.



Data Management Platform
Завдання, для вирішення якої нам знадобилися інструменти роботи з великими даними, – сегментація користувачів. Клас систем, які вирішують завдання сегментації користувачів у всьому світі прийнято називати Data Management Platform або скорочено DMP. На вхід DMP надходять дані про дії користувачів (в першу чергу, це факти відвідувань тих або інших сторінок в інтернеті), на виході DMP видає «профіль користувача» — його стать, вік, інтереси, наміри і так далі. Цей профіль надалі використовується для таргетування реклами, персональних рекомендацій і для персоналізації контенту в цілому. Продробнее про DMP можна почитати тут: http://digitalmarketing-glossary.com/What-is-DMP-definition.

Оскільки DMP працює з даними великої кількості користувачів, об'єми даних, які потрібно обробляти, можуть досягати дуже значних розмірів. Наприклад, наша DMP Facetz.DCA обробляє дані 600 мільйонів браузерів, щодня обробляючи майже полпетабайта даних.

Архітектура DMP Facetz.DCA
Для того, щоб обробляти такі обсяги даних необхідна хороша масштабована архітектура. Спочатку ми побудували систему на основі стека hadoop. Детальний опис архітектури цілком заслуговує окремої статті, в цьому ж матеріалі я обмежуся коротким описом:

  1. Логи дій користувача складаються на HDFS – розподілену файлову систему, яка є однією з базових компонент екосистеми hadoop

  2. З HDFS дані складаються в сховище сирих даних, реалізоване на основі Apache HBase – розподіленої масштабованої бази даних, побудованої на основі ідей Big Table. По суті, HBase – дуже зручна для масової обробки key-value база даних. Всі дані користувачів зберігаються в одній великій таблиці facts. Дані одного користувача відповідають одному рядку HBase, що дозволяє дуже швидко та зручно отримати всю необхідну інформацію про нього.

  3. Один раз на добу запускається Analytic Engine – великий MapReduce job, який, власне, і виконує сегментацію користувачів. По суті, Analytic Engine – контейнер для правил сегментації, які готуються окремо аналітиками. Наприклад, один сценарій може розмічати стать користувача, інший — його інтереси і так далі.

  4. Готові сегменти користувачів складаються в Aerospike – key-value базу даних, яка дуже добре заточена на швидку віддачу – 99% запитів на читання відпрацьовують менш ніж за 1 мс навіть при великих навантаженнях в десятки тисяч запитів в секунду.


Архітектура Facetz.DCA

Проблеми MapReduce
Розроблена архітектура показала себе добре – дозволила швидко смасштабироваться до обробки профілів користувачів всього Рунета і розмічати їх за допомогою сотень скриптів (кожен може розмічати користувача по кільком сегментам). Проте вона виявилася не позбавленою недоліків. Основна проблема – відсутність інтерактивності при обробці. MapReduce, за своєю природою, – парадигма offline–обробки даних. Так, наприклад, якщо користувач подивився квитки на футбол сьогодні, в сегмент «Цікавиться футболом» він може потрапити тільки завтра. У деяких випадках така затримка є критичною. Типовий приклад – ретаргетінг – реклама користувачеві товарів, які він вже подивився. На графіку наведено ймовірність здійснення покупки користувачем після перегляду товару по закінченні часу:


Графік ймовірності конверсії після перегляду товару. При отстутсвіі real-time движка для нас доступна тільки зелена частина, в той час як максимальна вірогідність припадає на перші години.

Видно, що найбільша ймовірність покупки – протягом перших кількох годин. При такому підході система дізналася б, що користувач хоче купити товар, тільки після доби – коли ймовірність покупки практично вийшла на плато.

Очевидно, що необхідний механізм потокового real-time обробки даних, який зводить до мінімуму затримку. При цьому хочеться зберегти універсальність обробки – можливість будувати як завгодно складні правила сегментації користувачів.

Модель Акторів
Поміркувавши, ми прийшли до висновку, що найкраще для розв'язання задачі нам підходить парадигма реактивного програмування і модель акторів. Актор – це примітив паралельного програмування, який вміє:

  • Приймати повідомлення
  • Посилати повідомлення
  • Створювати нових актор'ів
  • Встановлювати реакцію на повідомлення
Модель акторів зародилася в erlang-співтоваристві, зараз реалізації цієї моделі існують для багатьох мов програмування.

Для мови scala, на якому написана наша DMP, дуже хорошим тулкитом є akka. Вона лежить в основі кількох популярних фреймворків, добре задокументированна. Крім того, на Coursera є прекрасний курс принципи реактивного програмування, в якому ці самі принципи розповідаються як раз на прикладі akka. Окремо варто згадати модуль akka cluster, який дозволяє масштабувати рішення (що базуються на актора) на кілька серверів.

Архітектура Real-Time DMP
Підсумкова архітектура виглядає наступним чином:



Постачальник даних складає інформацію про дії користувачів в RabbitMQ.

  1. З RabbitMQ повідомлення про дії користувача вичитує Dispatcher. Dispatcher-ів може бути кілька, вони працюють незалежно.

  2. Для кожного онлайн-користувача в системі заводиться актор. Dispatcher надсилає повідомлення про нову подію (вычитанном з RabbitMQ) відповідного актору (або заводить новий актор, якщо це перша дія користувача і для нього ще немає актора).

  3. Актор, відповідний користувачеві, додає інформацію про дії в список користувальницьких дій і запускає скрипти сегментації (ті ж, що запускають Analytic Engine при MapReduce-обробці).

  4. Дані про розмічених сегментах складаються в Aerospike. Також дані про сегментах і діях користувачів доступні за API, підключеним безпосередньо до акторів.

  5. Якщо користувача не надходило даних протягом години, сесія вважається закінченою і актор знищується.
Шардированием акторів по кластеру, їх життям і знищенням управляє akka, що істотно спростило розробку.

Поточні результати:
  • Akka-кластер з 6 нсд;
  • Потік даних 3000 подій в секунду;
  • 4-6 мільйонів користувачів онлайн (залежно від дня тижня);
  • Середній час виконання одного скрипта сегментації користувачів менше п'яти мілісекунд;
  • Середнє час між подією та сегментацією на основі цієї події – одна секунда.


Подальший розвиток
Наш Real-Time Engine показав себе добре і ми плануємо розвивати його далі. Список кроків, які ми плануємо зробити:

  • Персистування – зараз Real-Time Engine сегментує користувачів тільки на основі останньої сесії. Ми плануємо додати підтягування більш старої інформації з HBase при появі нового користувача.
  • На поточний момент лише частина наших даних переведена на realtime-обробку. Ми плануємо поступово перевести всі наші джерела даних на потокову обробку, після цього потік оброблюваних даних зросте до 30000 подій в секунду.
  • Після завершення перекладу на Realtime ми зможемо відмовитися від щоденного розрахунку MapReduce, що дозволить заощадити на серверах за рахунок того, що будуть оброблятися тільки ті користувачі, які реально сьогодні проявили активність в інтернеті.
Посилання на схожі вирішення
Наприкінці хотілося б привести кілька посилань на фреймворки, на основі яких можна будувати потокову обробку даних:
Apache Storm
Spark Streaming
Apache Samza

Спасибі за увагу, готові відповісти на ваші питання.

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.