I am Groot. Робимо свою аналітику на події



Навесні цього року я дізнався про можливості бази даних HP Vertica створювати запити з матчингом патернів подій. Так званий Events Pattern Matching добре лягав під завдання аналізувати поведінку користувачів в продуктах ivi.ru. Ми вирішили спробувати розібратися з воронками оплати, з пошуком проблемних місць на пристроях, глибше зануритися в аналіз трафіку. Нашій команді дуже подобається, як реалізована аналітика у Mixpanel і Localytics (вона якраз заснована на подіях та їх властивості), тому багато ідеї були запозичені у них.

Що взагалі відбувається?Історично для аналітики ми, як і більшість інших проектів, використовували Google Analytics. У якийсь момент на наших обсягах семплірування даних досягло небачених масштабів — вибірки будувалися менш ніж на 0,5% аудиторії. Це унеможливлювало роботу з невеликими вибірками — вони або взагалі не було видно, або похибка була катастрофічною. Плюс в GA неможливо було прокинути купу внутрішніх даних про вміст, що робило неможливим глибокий аналіз.

Цей факт послужив приводом для того, щоб зайнятися розробкою власної системи. Так народився Groot — внутрішня аналітика ivi.ru.

Ми почали зі списку вимог, яким повинен був відповідати Groot:

  • Відсутність семплювання, всі дані повинні зберігатися в сирому вигляді;
  • Кроссплатфоременность. Оскільки у нас крім сайту є дуже популярні додатки для мобільних платформ і Smart TV, система повинна вміти збирати дані навіть з праски, якщо він підключений до інтернету і на ньому стоїть наше додаток;
  • Можливість швидкого масштабування;
  • Відсутність SPOF;
  • Простота установки і розгортання.

Архітектура

Крім колонковою бази HP Vertica, вирішили використовувати Apache Kafka і Apache Storm, тим самим відкривши для себе великий і жахливий світ Java.

Apache Kafka — pub/sub система. Основною відмінністю від звичайних реалізацій pub/sub є те, що передплатник може почати читання повідомлень не з кінця, а з початку або середини. Це рішення дозволяє не турбуватися про втрату даних, коли користувач не працює.

Apache Storm — розподілена система для обчислень великого обсягу даних. Взагалі, на тему Storm можна говорити довго. Нам у ньому сподобалася інтеграція з kafka з коробки, можливість горизонтально масштабувати систему і досить швидка швидкість роботи.

Погляд зверху
В цілому система працює наступним чином:

  • Клієнт надсилає запит з JSON-інформацією про подію;
  • web-сервер на flask асинхронно відправляє пачку подій у kafka;
  • storm постійно забирає нові повідомлення з kafka;
  • storm топологія парсити, розбирає подія і будує batch запит в vertica і зберігає в базу дані.
Перші незграбні кроки


Перша версія працювала дуже погано. Точніше, проблем відправкою даних у kafka не було зовсім (все працює з коробки). А з apache storm довелося повозитися, так як нам треба було написати свою топологію на java, яку у нас в компанії ніхто не знає.

Топологія у storm складається з наступних частин:

  • spout — краник з якого постійно (або ні) прилітають дані. У нашому випадку це стандартний KafkaSpout;
  • bolt — власне обробник даних. У «болтах» відбувається вся магія роботи з даними;
  • tuple — стандартна структура даних. У tuple може зберігати що завгодно, від простого числа до об'єкта.
Я реалізував найпростіший bolt, який отримував подія, парсил json і відправляв в базу пачку. Перші тести виявили наступні проблеми:

  • Vertica блокує таблицю під час запису;
  • Дуже складно відстежити проблемні місця в топології;
  • Thread з вставкою в базу міг відправляти 1 запис, то відразу 100. Не було розуміння чому так відбувається;
Перша версія була дуже простий: є колонки з id, name, subsite_id, user_id, ivi_id, ts. При цьому виникли труднощі з таблицями в Vertica теж виявилося складно.

Як бачите, більше ніяких даних ми не записували. Потім, правда, вирішили записувати ще браузер, операційну систему, розмір вікна браузера, версію флеш плеєра. «Ха!», — подумали ми і зробили таку таблицю:


| id | event_id | name | int_value | рядок_значення | double_value | datetime_value | added |


Зробили другий болт, який з JSON дістає додаткові параметри, перевіряє тип і записує все це в нову табличку.

Все було чудово, я радів, що так круто вийшло реалізувати, аналітики раділи, що можна додавати будь-які параметри події і потім за ним будувати звіти. В той час у нас головним джерелом подій був сам сайт ivi.ru мобільні додатки ще нічого не відправляли. Коли ж вони почали відправляти, ми зрозуміли, що все дуже погано.

Спочатку давайте подивимося на наш запит для простої воронки «натиснув» -> «купив» для браузера Chrome:

WITH groupped_events AS (
SELECT MIN(e.ts) added as, MIN(e.user_id) as user_id, e.name,
MIN(CASE WHEN ep.name = 'browser' THEN рядок_значення ELSE NULL END) as browser
from events.events as e 
LEFT JOIN events.event_properties as ep ON ep.event_id = e.id 
WHERE e.added >= '2014-07-28' and e.added < '2014-07-29' and e.subsite_id = '10' 
GROUP BY e.id, e.name
) 
SELECT COUNT(q.match_id) as count, name 
FROM (
SELECT event_name() as name, user_id, match_id() as match_id 
FROM groupped_events as e 
WHERE e.name IN ('click', 'buy') 
MATCH ( 
PARTITION BY user_id ORDER BY e.added ASC 
DEFINE 
click as e.name = 'click' and e.browser = 'Chrome',
buy as e.name = 'buy'
PATTERN P as (click buy | click) 
)
) as q 
GROUP BY q.match_id, q.name;

Бачите підступ? Ми джойним табличку (зараз там більше мільярда записів), групуємо її і вытаскием через CASE потрібне значення. Звичайно ж, коли у нас стало багато подій, все це стало гальмувати. Запити працювали по кілька хвилин, що нас не влаштовувало. Аналітики скаржилися на запити півгодини, продуктологи хотіли влаштувати мені темну.

Чому?
Окремо хочеться пояснити факт, що все-таки HP Vertica це колоночная база даних. Вона дуже компактно зберігає купу даних в колонках і дозволяє, наприклад, додавати нову колонку нальоту, без перелопачивания всіх даних. З нашої ж табличкою «все-в-одному» вертика справлялася дуже погано — вона не розуміла як оптимізувати цю купу.

Тоді було прийнято рішення перетягнути основні параметри в таблицю events окремими колонками, і сформувати список параметрів, які часто використовуються в запитах. Таку процедуру ми виконали 2 рази. У перший раз у нас з'явилася таблиця з 30 колонками, вдруге, вже з 50. Після всіх цих маніпуляцій, середній час виконання запитів зменшилася в 6-8 разів.

Після всіх маніпуляцій, попередній запит перетворився в простий:

SELECT COUNT(q.match_id) as count, name 
FROM (
SELECT event_name() as name, user_id, match_id() as match_id 
FROM events.events as e 
WHERE e.name IN ('click', 'buy') 
MATCH ( 
PARTITION BY user_id ORDER BY e.added ASC 
DEFINE 
click as e.name = 'click' and e.browser = 'Chrome',
buy as e.name = 'buy'
PATTERN P as (click buy | click) 
)
) as q 
GROUP BY q.match_id, q.name;

На цьому муки з базою у нас припинилися, в такому вигляді все живе вже близько 3х місяців і нарікань у нас до неї не було.

Ми все одно залишили таблицю event_properties, щоб можна було розробляти програми швидше, а не чекати оновлення структури основної таблиці.

Apache Storm
Розібравшись з HP Vertica, ми стали розбиратися з Apache Storm: потрібно було стабілізувати роботу, прибрати окремий Thread і бути готовим до великих навантажень.

Є мінімум два способи batch-процесингу в storm:

  1. Окремий thread з заповнюваних списком;
  2. Використання стандартної можливості приймати tickTuple;
Спочатку ми випробували перший варіант і відкинули його поведінка нестабільним, запити йшли майже в холосту. Другий варіант показав нам всю красу Storm:

За допомогою простого налаштування при створенні топології ми можемо вказати, коли хочемо отримати tickTuple (у нас 10 секунд). TickTuple це пустий запис, яка відправляється в основний потік раз в 10 секунд. Можемо спокійно відстежити таку запис, додати в чергу або запису в базу.

private static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}

@Override
public void execute(Tuple tuple) {
if( isTickTuple(tuple) ) {
executeTickTuple(tuple);
} else {
executeTuple(tuple);
}
}

В
executeTuple
ми зберігаємо подія в чергу
LinkedBlockingQueue
, і, відповідно, до
executeTickTuple
ми проходимо по черзі і вставляємо пачкою в базу.

Нашу топологію ми розділили на кілька
Bolt
:

  • KafkaRecieverBolt — отримує дані з KafkaSpout, парсити JSON і відправляє в PropertiesParserBolt;
  • PropertiesParserBolt — парсити нестандратные параметри, відправляє їх EventPropertiesBatchBolt, відправляє всі подія далі в EventsBatchBolt
  • EventsBatchBolt — зберігає дані в основну таблицю;
  • EventPropertiesBatchBolt — зберігає дані в таблицю доппараметров
Тепер ми можемо подивитися який з «болтів» гальмує і скільки даних через нього ганяється: Статистика роботи топології з Storm UI

Післямова

У наступній статті я постараюся розповісти як це все адмініструвати та моніторити.

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.