Flume — управляємо потоками даних. Частина 3

Привіт, Хабр! Після довгої паузи ми нарешті повертаємося до розбору Apache Flume. У попередніх статтях ми познайомилися з Flume (Частина 1) і розібралися, як налаштовувати основні його компоненти (Частина 2). У цій заключній, частині циклу ми розглянемо наступні питання:

  • Як налаштувати моніторинг компонентів вузла.
  • Як написати власну реалізацію компонента Flume.
  • Проектування повноцінної транспортної мережі.

Моніторинг стану компонентів вузла
Отже, ми налаштували і запустили всі вузли, перевірили їх працездатність — дані успішно доставляються до пункту призначення. Але проходить якийсь час, ми дивимося на результат роботи нашої транспортної мережі (наприклад, папку з файлами, які упаковуються дані) і розуміємо, що виникла проблема — починаючи з якогось моменту нові файли не з'являються в нашому каталозі. Наступний крок здається очевидним — відкриваємо логи, шукаємо причину. Біда тільки в тому, що вузлів в нашій транспортної мережі може бути багато, а значить необхідно вручну переглядати логи всіх вузлів, що, м'яко кажучи, не дуже зручно. Коли подібні проблеми виникають, реагувати на них хотілося б максимально оперативно, а ще краще — взагалі не допускати таких критичних ситуацій.

Компоненти Flume в процесі роботи пишуть метрики, які дозволяють оцінити стан вузла. За значеннями цих показників досить легко визначити, що з вузлом не все в порядку.

Для зберігання лічильників та інших атрибутів своїх компонентів Flume використовує java.lang.management.ManagementFactoryреєструючи власні bean-класи для ведення метрик. Всі ці класи успадковані від MonitoredCounterGroup (для цікавих — посилання на вихідний код).

Якщо ви не плануєте розробляти власні компоненти Flume, то закопуватися в механізм ведення метрик зовсім необов'язково, достатньо розібратися, як їх дістати. Зробити це можна досить просто за допомогою утилітарного класу JMXPollUtil:

package ru.test.flume.monitoring;

import java.util.Map;
import org.apache.flume.instrumentation.util.JMXPollUtil;

public class FlumeMetrics {

public static Map<String, Map<String, String>> getMetrics() {
Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
return metricsMap;
} 
}

В результаті ви отримаєте метрики, згруповані за компонентів сайту, які виглядають приблизно так:

Метрики Flume-компонентів (JSON)
{
"SOURCE.my-source": {
"EventReceivedCount": "567393607",
"AppendBatchAcceptedCount": "5689696",
"Type": "ДЖЕРЕЛО",
"EventAcceptedCount": "567393607",
"AppendReceivedCount": "0",
"StartTime": "1467797931288",
"AppendAcceptedCount": "0",
"OpenConnectionCount": "1",
"AppendBatchReceivedCount": "5689696",
"StopTime": "0"
},
"CHANNEL.my-channel": {
"ChannelCapacity": "100000000",
"ChannelFillPercentage": "5.0 E-4",
"Type": "ДЖЕРЕЛО",
"ChannelSize": "500",
"EventTakeSuccessCount": "567393374",
"StartTime": "1467797930967",
"EventTakeAttemptCount": "569291443",
"EventPutSuccessCount": "567393607",
"EventPutAttemptCount": "567393607",
"StopTime": "0"
},
"SINK.my-sink": {
"ConnectionCreatedCount": "1",
"ConnectionClosedCount": "0",
"Type": "SINK",
"BatchCompleteCount": "2",
"EventDrainAttemptCount": "567393374",
"BatchEmptyCount": "959650",
"StartTime": "1467797930968",
"EventDrainSuccessCount": "567393374",
"BatchUnderflowCount": "938419",
"StopTime": "0",
"ConnectionFailedCount": "0"
}
}

Метрики отримали, тепер необхідно їх кудись відправити. Тут можна піти двома шляхами.

  1. Використовувати можливості Flume для надання метрик.
  2. Написати свою реалізацію опрацювання метрик.
Flume надає API, що дозволяє задати спосіб моніторингу — для цього використовуються реалізації інтерфейсу MonitorService. Для того, щоб підключити моніторинг, необхідно вказати клас, що реалізує
MonitorService
, в якості системного властивості при запуску сайту (або в коді).
java -Dflume.monitoring.type=org.apache.flume.instrumentation.http.HTTPMetricsServer ...

System.setProperty("flume.monitoring.type", "org.apache.flume.instrumentation.http.HTTPMetricsServer");

Клас
HTTPMetricsServer
пропонує стандартний спосіб відстеження стану вузла. Він являє собою невеликий web-сервер, який за запитом віддає повний список метрик вузла у вигляді JSON (як у прикладі вище). Щоб вказати порт, на якому цей сервер буде слухати запити, досить додати в конфігурацію Flume параметр (типово використовує порт 41414):

flume.monitoring.port = 61509

Запит до сервера виглядає так:
localhost:61509/metrics
.

Якщо ж такого способу стежити за метриками недостатньо, то доведеться піти другим шляхом і написати власну реалізацію
MonitorService
. Саме так ми і вчинили, щоб спостерігати за станом наших сайтів з допомогою Graphite. Нижче наведено простий приклад такої реалізації.

FlumeGraphiteMonitor

package ru.dmp.flume.monitoring;

import com.google.common.base.CaseFormat;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flume.Context;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.util.JMXPollUtil;

public class FlumeGraphiteMonitor implements MonitorService {

// нормалізовані імена метрик, які не потрібно відправляти в Graphite
private static final Set<String> EXCLUDED_METRICS = new HashSet<String>() {{
add("start-time"); 
add("stop-time");
}};

private volatile long period = 60 * 1000; // інтервал відправлення, 1 хвилина

private volatile boolean switcher = true;
private Thread scheduler = new Thread(this::schedule);

@Override
public void configure(Context context) {
// Тут можна дістати які-небудь параметри з файлу конфігурації
} 

private void schedule() {
while (switcher) {
send();
synchronized (this) {
try {
wait(period);
} catch (InterruptedException ex) {}
}
}
}

@Override
public void start() { 
scheduler.start();
}

@Override
public void stop() {
switcher = false;
synchronized (this) {
notifyAll();
}
try {
scheduler.join();
} catch (InterruptedException ex){}
}

private void send() {
Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
for (Map.Entry<String, Map<String, String>> e: metricsMap.entrySet()) {
if (e.getValue() != null) {
// усі метрики від вузлів Flume починаємо з префікса "flume"
String group = "flume." + normalize(e.getKey().toLowerCase()) + ".";
for (Map.Entry<String, String> metric : e.getValue().entrySet()) {
try {
Double value = Double.valueOf(metric.getValue());
String metricName = normalize(metric.getKey());
if (!EXCLUDED_METRICS.contains(metricName)) {
String fullName = group + normalize(metric.getKey());
// Відправляємо дані в графіт або кудись ще
// Graphite.send(metricName, value);
}
} catch (NumberFormatException ex) {
// так відсіваємо значення не є числом
}
}
}
}
}

// приводимо до вигляду EventReceivedCount -> event-received-count (необов'язково)
private static String normalize(String str) {
return CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, str).replaceAll("_", "-");
} 

У підсумку отримуємо акуратну гілку Graphite з усіма метриками вузла.


Нижче наведені описи графіків та метрик, які ми використовуємо для одного з наших сервісів.

  1. Інтенсивність відправки сервісом повідомлень на сайт Flume. Графік будується не за метрик сайту — ці значення в Graphite відправляють сервіси, які генерують дані і є відправною точкою нашої транспортної системи. Якщо ваші джерела даних не дозволяють відслідковувати відправку даних у Flume, схожі графіки можна зняти з джерела(-ів) вузла.

    Якщо значення на цьому графіку падає до нуля, значить клієнт з якихось причин не може відправити повідомлення у Flume. Щоб діагностувати, хто винен в таких ситуаціях, ми окремо відображаємо графік помилок, що виникають на стороні клієнта. Відповідно, якщо він відмінний від нуля — проблема на сайті Flume, джерело не може прийняти дані. Якщо ж падіння інтенсивності не тягне зростання числа помилок — значить проблема на стороні сервісу, він перестав надсилати повідомлення.


  2. Заповненість каналів вузла. З цим графіком все просто — він завжди повинен бути дуже близький до нульового значення. Якщо канал не встигає спустошуватися, значить десь в нашій транспортної мережі виникло вузьке місце і необхідно шукати вузли, які не встигають справлятися з навантаженням. Метрика на графіку:
    flume.channel.{CHANNEL NAME}.channel-fill-percentage



  3. Інтенсивність роботи стоків вузла. Очікувані показники стоків на цьому сайті — «скільки отримали, стільки й відправили», оскільки події від сервісів не дублюються в канали. Таким чином, інтенсивність спустошення стоків повинна бути такою ж, як і інтенсивність відправлення даних клієнтами. Метрика на графіку:
    flume.sink.{SINK-NAME}.event-drain-success-count


    Падіння інтенсивності будь-якого з стоків до нуля свідчить про потенційну проблему на наступному, приймаючому, вузлі. Як наслідок, канал, опустошаемый «поламаним» стоком, почне заповнюватися. Також можлива ситуація, при якій приймаючі вузли працюють нормально, але просто не встигають обробляти вхідні дані — в цьому випадку графіки стоків будуть ненульовими, але канали будуть поступово заповнюватися.



Створення власних компонентів Flume
Незважаючи на те, що набір стандартних компонентів Flume досить великий, досить часто виникають ситуації, розв'язати які цим стандартними компонентами неможливо. У цьому випадку можна написати свій компонент Flume і задіяти його у вузлах. Свою реалізацію можна написати для будь-якого з компонентів Flume — sink, source, channel, interceptor і т. п.

Перше, що кинулося в очі при вивченні стоків Flume — це відсутність гнучкого стоку для файлової системи. Так, є File-Roll Sink, можливості якого описувалися під 2й частини циклу. Але цей стік повністю позбавлений можливості впливати на ім'я файлу, що не дуже зручно.

Ми вирішили розробити свій стік, що дозволяє формувати файли в локальній файловій системі. При розробці керувалися наступними міркуваннями.

  • У нас досить багато сервісів з порівняно невеликим навантаженням. Це означає, що в результаті у нас буде досить багато різнорідних файлів — не хотілося б під кожен з них налаштовувати окремий стік.

  • Файли повинні ротироваться по часу. Причому, для різних даних період ротації може відрізнятися (під ротацією мається на увазі «нарізка даних» на файли по часу — 15 хвилин, годину тощо).

  • Дані від кожного сервісу повинні складуватися в окрему папку. Причому, один сервіс може генерувати дані для декількох вкладених папок.
Виходячи з цих тез, ми прийшли до висновку, що завдання формування імені файла краще залишити клієнта (тобто сервісів, що генерує дані), інакше конфігурація стоку буде занадто громіздкою, а для кожного нового «клієнта» доведеться додавати окремий стік з індивідуальними настройками.

Примітка. Тут доречне порівняння з HDFS-стоком, про який ми говорили в попередній статті. Цей стік дозволяє виконати тонку настройку ротації і іменування файлів. Але ця гнучкість налаштування має і зворотну сторону — наприклад, для файлів, які чергуються раз в 15 раз в 30 хвилин доводиться робити різні стоки, навіть якщо в усьому іншому параметри ідентичні.

Отже, рішення по функціональності файлового стоку було прийнято наступне:

  • Ім'я файлу, в який повинні бути записані дані, клієнт визначає та передає його в заголовку разом з подією.

  • В імені файлу можуть бути вказані підпапки.

  • Файли, в які ведеться запис стоком, закриваються по деякому таймауту, коли для них перестають приходити події.
Схематично процес обробки даних цим стоком виглядає так:

Що це дало в результаті:

  • Не потрібно додавати стік для кожного нового сервісу або типу даних.

  • Сток позбавлений витрат на формування імені файла (у попередній частині ми розглядали ці витрати на прикладі HDFS-стоку)

  • Оскільки ім'я файлу однозначно ідентифікується одним заголовком, ми можемо користуватися угрупованням подій на стороні клієнта (цей прийом описаний у другій частині циклу).
Вихідний код файлового стоку і приклад його конфігурації викладені на GitHub. Докладно розбирати процес розробки цього стоку, я думаю, сенсу немає, обмежуся кількома тезами:

  • За основу компонента береться або абстрактний клас, або інтерфейс компонента (залежно від того, що ви розробляєте — сток, перехоплювач або щось ще).

  • Робимо власну реалізацію — найпростіше взяти що-небудь з готових Flume-компонентів в якості прикладу, благо всі вихідні коди доступні.

  • При конфігурації вказуємо не зарезервований псевдонім компоненту (типу 'avro' або 'logger'), а ім'я класу цілком.
Проектуємо транспортну мережу
Загальні прийоми управління даними ми розглянули у попередніх частинах циклу — події можна ділити між вузлами, дублювати, вибирати «напрям руху», використовуючи заголовки і т. п. тепер Спробуємо використовувати всі ці прийоми для того, щоб побудувати надійну транспортну мережу. Припустимо, завдання виглядає наступним чином.

  1. Постачальник даних — якийсь сервіс, який працює на кількох машинах (має кілька однакових инстансов).

  2. Дані, що генеруються сервісом, різнорідні — частина з них треба доставити в HDFS, частина — у файлову систему на якийсь лог-сервер.

  3. Необхідно в режимі реального часу вести деякі неатомарные обчислення, пов'язані з даними.
На умови 3 зупинюся детальніше. Припустимо, що завдання полягає в підрахунку унікальних користувачів сайту за останню годину. У цьому випадку ми не можемо дозволити собі розпаралелити потік даних з машин або обчислювати значення окремо на кожному веб-сервісі — вести підрахунок унікальних користувачів за їх кукам необхідно на єдиному потоці даних з усіх машин. В іншому випадку кожен інстанси буде мати свій набір унікальних користувачів, які не можна «взяти і скласти» для отримання кінцевого результату.

Примітка. Зрозуміло, приклад трохи надуманий — цю задачу можна вирішити і іншими, більш ефективними способами. Суть прикладу зводиться до ситуації, коли вам необхідно пропускати деякий потік даних централізовано через один обробник і розділити навантаження неможливо із-за особливостей задачі.

Отже, для початку підготуємо клієнтські та кінцеві вузли:

Для кожного веб-сервісів ставимо свій, індивідуальний вузол — на тій же машині, що і веб-сервіс. Робиться це з наступних причин:

  • Ці вузли відіграють роль буфера — якщо з якоїсь причини доставка подій на інші машини стане неможлива, ці вузли дозволять деякий час «протриматися» без втрат даних за рахунок товстого файлового каналу.

  • Зменшується час відгуку. Зрозуміло, відправка даних у Flume повинна виконуватися асинхронно — але при пікових навантаженнях або забитою мережі може виникнути ситуація, коли фоновий потік не буде встигати відправляти нові події. У цьому випадку чергу на відправку може сильно зрости, безжально поглинаючи пам'ять сервісу, що не дуже добре. У разі, коли вузол розташований на тій же машині, що і сервіс, ці витрати значно знижуються.

  • Якщо подальша логіка обробки даних зміниться і ви вирішите перебудувати транспортну мережу, то зміни потрібно буде внести тільки в конфігурацію клієнтського вузла, а не веб-сервісу. Для нього все залишиться як і раніше — «я відправляю дані на свій сайт, далі нехай сам вирішує, як бути».
Залишається питання — як доставити дані так, щоб нічого не втратити, якщо щось зламається? Ряд заходів ми вже зробили — дані для HDFS і для FS пишуться на кілька машин. При цьому дані не дублюються, а діляться. Таким чином, якщо одна з кінцевих машин виходить з ладу, вся навантаження піде на що залишилася в живих. Наслідком такої поломки буде дизбаланс по записаному обсягу даних на різних машинах, але з цим цілком можна жити.

Щоб забезпечити більшу стабільність, додамо кілька проміжних вузлів Flume, які будуть займатися безпосередньо розподілом даних:


Вийшла досить страшна павутина. Що тут відбувається:

  1. Веб-сервіс відправляє події на клієнтський вузол.

  2. Кожна подія має заголовок, який вказує «пункт призначення» (наприклад dist=FS або dist=HDFS), а також заголовок uniq з можливими значеннями 1 або 0.

  3. Кожен клієнтський вузол має по 3 стоку, які рівноправно спустошують канал і рівномірно розподіляють події між трьома проміжними вузлами — Splitter'ами (поки що без оглядки на заголовок dist).

  4. Кожний Splitter має кілька каналів — для HDFS, FS і лічильника унікальних користувачів. Необхідний канал вибирається по заголовкам dist та uniq.

  5. Кожен з цих каналів на Splitter'e має кілька стоків, які рівномірно розподіляють події між кінцевими машинами (FS, HDFS або UNQ).
Якщо з клієнтськими вузлами все відносно просто — вони просто ділять події між Splitter'ами, то структуру окремо взятого Splitter'a варто розглянути більш детально.


Тут видно, що кінцевий пункт для даних визначається за допомогою заголовка dist. При цьому, події, за якими вважаються унікальні користувачі, які не залежать від заголовка dist — вони орієнтуються на заголовок uniq. Це означає, що деякі події можуть бути продубльовані в кілька каналів, наприклад HDFS і UNQ.

Раніше я спеціально не вказав напрямку від Splitter'ів до вузлів UNQ. Справа в тому, що ці вузли не приймають відповідні дані, як HDFS або FS. Враховуючи специфіку задачі підрахунку унікальних користувачів, весь потік даних має відбуватися лише через один машину. Закономірне питання — навіщо нам тоді 2 вузла для підрахунку унікальних користувачів? Відповідь — тому що якщо один вузол зламається, його буде нікому замінити. Як тут бути — ділити події між вузлами ми не можемо залишити один — теж не можна?

Тут нам може допомогти ще один інструмент Flume, що дозволяє працювати стоків в групі за принципом «Якщо стік 1 зламався, використовуй сток 2». Цей компонент називається Failover Sink Processor. Його конфігурація виглядає наступним чином:

# Самі по собі стоки описуються як зазвичай
agent.sinks.sink-unq-1.type = avro
agent.sinks.sink-unq-1.batch-size = 5000
agent.sinks.sink-unq-1.channel = memchannel
agent.sinks.sink-unq-1.hostname = unq-counter-1.my-company.com
agent.sinks.sink-unq-1.port = 50001

agent.sinks.sink-unq-2.type = avro
agent.sinks.sink-unq-2.batch-size = 5000
agent.sinks.sink-unq-2.channel = memchannel
agent.sinks.sink-unq-2.hostname = unq-counter-2.my-company.com
agent.sinks.sink-unq-2.port = 50001

# Об'єднуємо їх в групу
agent.sinkgroups = failover-group
agent.sinkgroups.failover-group.sinks = sink-unq-1 sink-unq-2
# Тип процесингу вказуємо як failover
agent.sinkgroups.failover-group.processor.type = failover
# Пріоритети для стоків - стік з високим значенням буде задіяний першим
agent.sinkgroups.failover-group.processor.priority.sink-unq-1 = 10
agent.sinkgroups.failover-group.processor.priority.sink-unq-2 = 1
# Як часто перевіряти - повернувся сток в дію (мс)
agent.sinkgroups.failover-group.processor.maxpenalty = 10000

Наведена вище налаштування групи потоків дозволяє використовувати тільки один стік, але при цьому мати «запасний» на випадок аварії. Тобто доки стік з високим пріоритетом справно працює, стоки з низьким пріоритетом будуть простоювати.

Таким чином, поставлена задача виконана — дані розподіляються між HDFS і FS, лічильники унікальних користувачів працюють коректно. При цьому вихід з ладу будь-якої машини не призведе до втрати даних:

  1. Якщо ламається машина c Web-сервісом, то ця проблема вирішується балансером.

  2. Якщо з ладу вийшов один з Splitter'ів, навантаження буде розподілена між іншими.

  3. Кінцеві вузли також задублированы, поломка одного з них не призведе до застою або втрати даних.

  4. Вузол для підрахунку унікальних користувачів має «дублера» і в разі поломки буде замінений їм без порушення логіки обробки даних.
Для такої схеми задачі масштабування зводяться до простої зміни конфігурації вузлів Flume для відповідного рівня вузлів (Client Splitter або Final):

  1. Новий інстанси Web-сервісу — не вимагається зміни конфігурації, він просто встановлюється разом з клієнтським вузлом Flume.

  2. Новий Splitter — потрібно змінити конфігурацію клієнтських вузлів, додавши новий стік.

  3. Новий кінцевий вузол — потрібно змінити конфігурацію Splitter'a, додавши новий стік.
Висновок
На цьому ми завершуємо цикл статей про Apache Flume. Ми освітили всі самі ходові його компоненти, розібралися як управляти потоком даних і розглянули приклад повноцінної транспортної мережі. Тим не менш, можливості Flume не вичерпуються всім цим — в стандартному пакеті є ще досить багато не розглянутих нами компонентів, які можуть значно полегшити життя при вирішенні певних завдань. Сподіваємося, що цей цикл статей допоміг вам познайомитися з Flume і отримати досить повне уявлення про нього.

Спасибі за увагу!
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.