Flume — управляємо потоками даних. Частина 2

Привіт, Хабр! Ми продовжуємо цикл статей, присвячений Apache Flume. попередній частині ми побіжно розглянули цей інструмент, розібралися з тим, як його налаштувати і запустити. Цього разу стаття буде присвячена ключовим компонентам Flume, з допомогою яких не страшно маніпулювати вже справжніми даними.


File Channel
У минулій статті ми розглянули Memory Channel. Очевидно, що канал, що використовує для зберігання даних пам'ять, не є надійним. Перезапуск сайту призведе до того, що всі дані, що зберігаються в каналі, будуть втрачені. Це не робить Memory Channel марним, є деякі випадки, коли його використання дуже навіть виправдано в силу швидкодії. Проте для дійсно надійної транспортної системи необхідно більш стійке до неполадок рішення.

Таким рішенням є файловий канал — File Channel. Нескладно здогадатися, що цей канал зберігає дані у файлах. При цьому канал використовує Random Access для роботи з файлом, дозволяючи таким чином і додавати та забирати події, зберігаючи їх послідовність. Для швидкої навігації канал використовує систему міток (checkpoints), з допомогою яких реалізується механізм WAL. Все це, загалом-то, захований «під капотом» каналу, а для його установки використовуються наступні параметри (жирним шрифтом — обов'язкові параметри).
Налаштування Опис За замовчуванням
type Реалізація каналу, повинно бути зазначено file -
checkpointDir Папка для зберігання файлів з мітками. Якщо не вказана, канал буде використовувати домашню теку Flume. $HOME/...
useDualCheckpoints Робити бекап папки з мітками. false
backupCheckpointDir Папка для бекапів файлів з мітками, потрібно обов'язково вказувати, якщо useDualCheckpoints=true (зрозуміло, цей бекап краще тримати подалі від оригіналу — наприклад, на іншому диску). -
dataDirs Список папок через кому, в яких будуть розміщуватися файли з даними. Краще вказувати кілька папок на різних дисках для підвищення продуктивності. Якщо папки не вказані, канал також буде використовувати домашню теку Flume. $HOME/...
capacity Місткість каналу, вказується число подій. 1000000
transactionCapacity Максимальне число подій в одній транзакції. Дуже важливий параметр, від якого може залежати працездатність всієї транспортної системи. Докладніше про це буде написано нижче. 10000
checkpointInterval Інтервал між створенням нових позначок, у миллисекуднах. Мітки відіграють важливу роль при перезапуску, дозволяючи «перестрибувати» ділянки файлів з даними при відновленні стану каналу. У підсумку канал не читає файли з даними цілком, що істотно прискорює запуск при «забитому» каналі. 30000
checkpointOnClose Записувати мітку при закритті каналу. Замикає мітка дозволить каналу відновитися при перезапуску максимально швидко — але її створення займе деякий час при закритті каналу (насправді, дуже незначне). true
keep-alive Таймаут (у секундах) для операції додавання канал. Тобто, якщо канал забитий, транзакція «дасть йому шанс», почекавши деякий час. І якщо вільного місця в каналі так і не з'явилося, то транзакція відкотиться. 3
maxFileSize Максимальний розмір файлу каналу, в байтах. Значення цього параметра не визначає, скільки місця може «відкусити» ваш канал — воно задає розмір одного файлу з даними, а цих файлів канал може створити кілька. 2146435071 (2ГБ)
minimumRequiredSpace Якщо на вашому диску менше вільного місця, ніж зазначено у цьому пункті, то канал не буде приймати нові події. У разі, якщо папки з даними розташовані на декількох дисках, Flume буде використовувати 524288000 (500МБ)
Інші налаштування відносяться до шифруванню даних у файлах каналу і процесу відновлення (replay). Тепер пара слів про те, що потрібно враховувати при роботі з файловим каналом.

  • Переконайтеся, що Flume має право записувати дані в папки.
    Або, якщо бути точніше, користувач, від чийого імені запущений Flume, має права запису в папках для checkpoints data.
  • SSD значно прискорює роботу каналу.
    На графіку нижче показано час відправки пачки з 500 подій на вузли Flume, використовують файлові канали. Один з вузлів використовує SSD для зберігання даних каналу, інший — SATA. Різниця суттєва.


    Якщо виконати просте ділення, то отримаємо, що вузол Flume з файловим каналом на SSD може перетравлювати до 500/0.025 = 20000 подій у секунду (для довідки — розмір повідомлень у даному прикладі близько 1КБ, а канал використовує для зберігання тільки один диск).
  • Capacity каналу дуже чутлива до змін.
    Якщо ви раптом вирішили поміняти місткість вашого каналу, то вас може чекати неприємний сюрприз — канал запустить replay для відновлення даних. Це означає, що замість використання файлів checkpoints для подальшої навігації/роботи з даними канал повністю пробіжить по всіх файлів з даними. Якщо у даних в каналі багато, процес може зайняти пристойне час.
  • Нештатна зупинка каналу може призвести до втрати даних.
    Це може статися, якщо ви вбили процес Flume (або hard reset). А може і не відбутися. На моїй пам'яті у нас таке траплялося лише один раз — файл з даними був «зіпсований» і довелося вручну видалити всі файли з даними каналу (благо, канали у нас не забивалися і втрат вдалося уникнути). Таким чином, 100% надійності канал все-таки не дає — завжди є ймовірність, що хтось «смикне рубильник» і станеться непоправне. Що ж, якщо таке відбулося і канал відмовляється запускатися, ваші дії можуть бути такими:

    1. Спробуйте видалити файли міток (checkpoints) — у цьому випадку канал спробує відновитися тільки за файлів з даними.

    2. Якщо попередній пункт не допоміг і канал пише щось в стилі «Unable to read data from channel, channel will be closed», значить файл з даними зіпсований. Тут допоможе тільки повна чистка всіх папок з даними. На жаль.
В якості альтернативи File-Channel Flume пропонує ще декілька каналів, зокрема JDBC-channel, що використовує в якості буфера базу даних, і Kafka-channel. Зрозуміло, що для використання таких каналів потрібно окремо розгортати базу даних і Kafka.

Avro Source і Avro Sink
Avro — це один з інструментів серіалізації даних, завдяки якому джерело і стік отримали свої назви. Мережеве взаємодія цих компонентів реалізовано з допомогою Netty. У порівнянні з Netcat Source, розглянутих в попередній статті, Avro Source володіє наступними перевагами:

  • Може використовувати заголовки в подіях (тобто передавати разом з даними допоміжну інформацію).

  • Може приймати інформацію від декількох клієнтів одночасно. Netcat працює на звичайному сокеті і приймає вхідні з'єднання послідовно, а значить, може обслуговувати тільки одного клієнта в момент часу.
Отже, розглянемо налаштування, які нам пропонує Avro Source.
Параметр Опис параметри
type Реалізація джерела, повинно бути зазначено avro. -
channels Канали, в які джерело буде відправляти події (через пробіл). -
bind Хост/IP, за яким закріплюємо джерело. -
port Порт, на якому джерело буде приймати підключення від клієнтів. -
threads Кількість потоків, що обробляють вхідні події (I/O workers). При виборі значення слід орієнтуватися на число потенційних клієнтів, які будуть слати події цього джерела. Необхідно виставляти як мінімум 2 потоку, інакше ваш джерело може «зависнути», навіть якщо клієнт у нього всього один. Якщо не упевнені, скільки потоків необхідно — не вказуйте цей параметр конфігурації. не обмежено
compression-type Стиснення даних, тут варіантів небагато — або none, або deflate. Вказувати необхідно тільки в тому випадку, якщо клієнт передає дані в стислому вигляді. Стиснення допоможе вам істотно заощадити трафік, і чим більше подій за раз ви передаєте — тим сильніше буде ця економія. none
Як і для будь-якого іншого джерела, для Avro Source можна вказати:

  1. selector.type — селектор каналів, про них я вже згадував у попередній статті. Дозволяють ділити або дублювати події в кілька каналів за деякими правилами. Детальніше селектори будуть розглянуті нижче.

  2. interceptors — список перехоплювачів, через пробіл. Перехоплювачі спрацьовують ДО того, як події потраплять в канал. Їх використовують, щоб якимось чином модифікувати події (наприклад, додати заголовки або змінити вміст події). Про них мова піде нижче.
Також для цього джерела передбачена налаштування фільтрів Netty і параметри шифрування даних. Для відправки подій цього джерела можна використовувати ось такий код.

Примітивний Java-client для Avro Source
import java.util.HashMap;
import java.util.Map;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.event.SimpleEvent;

public class FlumeSender {

public static void main(String[] args) throws EventDeliveryException {
RpcClient avroClient = RpcClientFactory.getDefaultInstance("127.0.0.1", 50001);

Map<String, String> headers = new HashMap<>();
headers.put("type", "common");

Event event = EventBuilder.withBody("Тіло події".getBytes(), headers);

avroClient.append(event);
avroClient.close();
}
}

Тепер розглянемо конфігурацію Avro-стоку.
Налаштування Опис За замовчуванням
type Реалізація стоку, повинно бути зазначено avro. -
channel Канал, з якого стік буде витягувати події. -
hostname Хост/IP, на який стік буде відправляти події. -
port Порт, на якому зазначена машина (hostname) очікує підключення клієнтів. -
batch-size дуже важливий параметр: розмір «пачки» подій, які надсилаються клієнту за один запит. У той же час, це значення використовується при спустошенні каналу. Тобто, це ще й число подій, прочитуваних з каналу за одну транзакцію. 100
connect-timeout Таймаут з'єднання (handshake) у мілісекундах. 20000
request-timeout Таймаут запиту (надсилання пачки подій), в мілісекундах. 20000
reset-connection-interval Інтервал «зміни хоста». Мається на увазі, що за зазначеним hostname може ховатися кілька машин, що обслуговуються балансером. Цей параметр примусово змушує сток перемикатися між машинами через вказаний інтервал часу. Зручність, за задумом творців стоку, полягає в тому, що якщо в зону відповідальності балансера додається нова машина, відсутня необхідність перезавантажувати вузол Flume — сток сам зрозуміє, що з'явився ще один «пункт призначення». За замовчуванням сток не здійснює зміни хостів. -1
maxIoWorkers Аналог threads для Avro Source. 2 * PROC_CORES
compression-type Те ж саме, що і для Avro Source. Різниця в тому, що стік стискає дані, а джерело, навпаки, розпаковує. Відповідно, якщо Avro Sink шле події на Avro Source, тип стиснення на обох повинен бути однаковий. none
compression level Рівень стиснення, тільки якщо compression-type=deflate (0 — не стискати, 9 — максимальне стиснення). 6
Тепер поговоримо про те, що важливо враховувати при налаштуванні цих компонентів.

  • Акуратно вибирайте Batch Size.

    Як я вже говорив, це дуже важливий параметр, непродуманий вибір якого може значно зіпсувати вам життя. Насамперед, batch-size обов'язково повинен бути менше або дорівнює місткості транзакції каналу (transactionCapacity). Це явно стосується Avro Sink і неявно — Avro Source. Розглянемо на прикладі:


    Тут TC — це transactionCapacity, а BS — batch-size. Умова нормальної роботи полягає в тому, що: BS <= TC1 і BS <= TC2. Тобто, необхідно враховувати не тільки місткість транзакції каналу, з яким працює сток, але місткість транзакції каналу (ів), з яким працює приймає Avro Source. В іншому випадку сток не зможе спустошувати свій канал, а джерело — додавати події у свій. У таких випадках Flume починає інтенсивно лити в лог повідомлення про помилки.
    Випадок з практики. В одному зі стоків ми поставили batch-size = 10000, в той час як на приймальному вузлі для каналу була виставлена TC = 5000. І все працювало чудово. Поки обсяг даних був невеликим, сток просто не витягав з каналу дозволені 10000 подій за раз — у каналі не встиг накопичитися стільки подій. Але через деякий час обсяг даних зріс і у нас почалися проблеми. Приймає вузол почав відхиляти великі пачки даних. Помилку вчасно помітили, змінили параметри і скупчилися в каналі дані веселим струмочком дотекли до місця призначення.
  • Надсилайте події великими пачками.
    Трансакція — операція досить дорога ресурсів. Менше транзакцій — більше продуктивність. Знову ж таки, стиснення при передачі великого числа подій працює набагато ефективніше. Відповідно, крім batch-size доведеться збільшити і transactionCapacity ваших каналів.
  • Перевизначити залежність netty для ваших сайтів.
    Ми використовуємо версію netty 3.10.5 Final, в той час як Flume підтягує більш стару netty 3.6.2 Final. Проблема старої версії полягає в невеликому ба, з-за якого Avro Sink / Avro Source не можуть періодично підключитися один до одного. Це призводить до того, що в передачі даних періодично виникають простої на кілька хвилин (потім все приходить в норму). У разі, якщо дані повинні діяти максимально швидко, такі «пробки» можуть стати проблемою.


    У разі, якщо ви запускаєте Flume засобами Java, перевизначити залежність можна засобами Maven. Якщо ж ви налаштовуєте Flume засобами Cloudera або виді сервісу, то залежність Netty доведеться змінювати вручну. Знайти їх можна в таких папок:

    • Cloudera /opt/cloudera/parcels/CDH-${VERSION}/lib/flume-ng/lib;
    • Service (stand-aloone) — $FLUME_HOME/lib.
File-Roll Sink
Отже, ми розібралися, як налаштувати транспортні вузли на основі Avro Source/Sink і файлового каналу. Залишилося тепер розібратися з компонентами, які замикають (тобто виводять дані із зони відповідальності Flume) нашу транспортну мережу.


Перший замикає стік, який варто розглянути, це File-Roll Sink. Я б сказав, що це сток для ледачих. Він підтримує мінімум налаштувань і може робити тільки одну річ — записувати події в файли.
Налаштування Опис За замовчуванням
type Реалізація стоку, повинно бути зазначено file_roll. -
channel Канал, з якого стік буде витягувати події. -
directory Папка, в якій будуть зберігатися файли. -
rollInterval Інтервал між створенням нових файлів (0 — писати все в один файл), в секундах. 30
serializer Серіалізація подій. Можна вказати: TEXT, HEADER_AND_TEXT, AVRO_EVENT або свій клас, який реалізує інтерфейс EventSerializer.Builder. TEXT
batch-size Аналогічно Avro Sink, розмір пачки подій, забираемых за транзакцію з каналу. 100
Чому я вважаю його стоком для лінивих? Тому що в ньому абсолютно нічого не можна налаштувати. Ні стиснення, ні наименоваия файлів (в якості імені буде використаний timestamp створення), ні угруповання з теки — нічого. Навіть розмір файлу не можна обмежити. Цей сток підходить, мабуть, тільки для випадків, коли «немає часу пояснювати — нам потрібно терміново почати приймати дані!».
Примітка. Оскільки необхідність записувати дані у файли все-таки є, ми прийшли до висновку, що доцільніше реалізувати свій файловий стік, ніж використовувати. Враховуючи, що всі вихідні коди Flume відкриті, зробити його виявилося нескладно, ми вклалися за день. На другий день поправили дрібні баги — і сток вже більше року справно працює, розкладаючи дані по папках в акуратні архіви. Цей стік я викладу на GitHub після третьої частини циклу.
HDFS Sink
Цей стік вже серйозніше — він підтримує безліч налаштувань. Трохи дивно, що File-Roll Sink не зроблено аналогічним чином.
Налаштування Опис За замовчуванням
type Реалізація стоку, повинно бути зазначено hdfs. -
channel Канал, з якого стік буде витягувати події. -
hdfs.path Папка, в яку будуть записуватися файли. Переконайтеся, що для цієї папки виставлені потрібні права доступу. Якщо ви налаштовуєте сток засобами Cloudera, то дані будуть писатися від імені користувача flume. -
hdfs.filePrefix Префікс імені файлу. Базове ім'я файлу, як і для File-Roll — timestamp його створення. Соответстенно, якщо ви вкажете my-data, підсумкове ім'я файлу my-data1476318264182. FlumeData
hdfs.fileSuffix Постфікс імені файлу. Додається в кінець імені файлу. Можна використовувати, щоб вказати розширення, наприклад, .gz. -
hdfs.inUsePrefix Аналогічно filePrefix, але для тимчасового файла, в який ще ведеться запис даних. -
hdfs.inUseSuffix Аналогічно fileSuffix, але для тимчасового файлу. По суті, тимчасове розширення. .tmp
hdfs.rollInterval Період створення нових файлів, в секундах. Якщо файли не потрібно закривати за таким критерієм, ставимо 0. 30
hdfs.rollSize Тригер для закриття файлів за обсягом, вказується в байтах. Також ставимо 0, якщо цей критерій нам не підходить. 1024
hdfs.rollCount Тригер для закриття файлів по числу подій. Також можна поставити 0. 10
hdfs.idleTimeout Тригер для закриття файлів через неактивність, в секундах. Тобто, якщо файл деякий час нічого не записується — він закривається. Цей тригер за замовчуванням вимкнено. 0
hdfs.batchSize Те ж саме, що і для інших стоків. Хоча у документації до стоку написано, що це число подій, які записуються у файл, перш ніж вони будуть скинуті в HDFS. При виборі також орієнтуємося на обсяг транзакції каналу. 100
hdfs.fileType Тип файлу — SequenceFile (Hadoop-файл з парами ключ-значення, як правило, в якості ключа використовується timestamp з хидера «timestamp» або поточний час), DataStream (текстові дані, по суті, порядкова запис із зазначеної серіалізацією, як в File-Roll Sink) або CompressedStream (аналог DataStream, але з стисненням). SequenceFile
hdfs.writeFormat Формат запису — Text або Writable. Тільки для SequenceFile. Відмінність — в якості значення буде писатися текст (TextWritable) або байти (BytesWritable). 5000
serializer Налаштовується для DataStream CompressedStream, за аналогією з File-Roll Sink. TEXT
hdfs.codeC Цей параметр необхідно вказувати, якщо ви використовуєте тип файлу CompressedStream. Пропонуються такі варіанти стиснення: gzip, bzip2, lzo, lzop, snappy. -
hdfs.maxOpenFiles Максимально допустиме число одночасно відкритих файлів. Якщо цей поріг буде перевищено, то найбільш старий файл буде закритий. 5000
hdfs.minBlockReplicas Важливий параметр. Мінімальна кількість реплік на блок HDFS. Якщо не вказано, береться з конфігурації Hadoop, зазначеної в classpath при запуску (тобто налаштувань вашого кластера). Чесно кажучи, я не можу пояснити причину поведінки Flume, пов'язаного з цим параметром. Суть в тому, що якщо значення цього параметра відрізняється від 1, то сток почне закривати файли без оглядки на інші тригери і в рекордні терміни наплодит безліч дрібних файлів. -
hdfs.maxOpenFiles Максимально допустиме число одночасно відкритих файлів. Якщо цей поріг буде перевищено, то найбільш старий файл буде закритий. 5000
hdfs.callTimeout Таймаут звернення до HDFS (відкрити/закрити файл, скинути дані) у мілісекундах. 10000
hdfs.closeTries Кількість спроб закрити файл (якщо з першого разу не вийшло). 0 — намагатися до переможного кінця. 0
hdfs.retryInterval Як часто намагатися закрити файл в разі невдачі, в секундах. 180
hdfs.threadsPoolSize Кількість потоків, які здійснюють IO операції з HDFS. Якщо у вас «солянка» з подій, які розфасовуються по багатьом файлів, то краще поставити це число побільше. 10
hdfs.rollTimerPoolSize На відміну від попереднього пулу, цей пул потоків виконує завдання по расписнию (закриває файли). Причому, він працює на основі двох параметрів — rollInterval retryInterval. Тобто цей пул виконує як планове закриття за тригеру, так і періодичні повторні спроби закрити файл. Одного потоку повинно бути достатньо. 1
hdfs.useLocalTimeStamp HDFS сток передбачає використання елементів дати в назании формуються файлів (наприклад, hdfs.path = /logs/%Y-%m-%d дозволить вам групувати файли по днях). Використання дати припускає, що вона звідкись повинна бути отримана. Цей параметр пропонує два варіанти: використовувати час на момент обробки події (true) або використовувати час, вказаний у події — а саме, у заголовку «timestamp» (false). Якщо ви використовуєте timestamp події, то переконайтеся, що ваші собтия мають цей заголовок. Інакше не будуть записані в HDFS. false
hdfs.round Округляти timestamp до деякого значення. false
hdfs.roundValue Наскільки округляти timestamp. 1
hdfs.roundUnit В яких одиницях округляти (second,minute або hour). second
Ось такий величезний перелік налаштувань для HDFS-стоку. Цей стік дозволяє нарізати дані у файли практично як завгодно — особливо приємно те, що можна використовувати елементи дати. Офіційна документація по цьому стоку знаходиться на тій же сторінці.

Можливо ви помітили цікаву особливість конфігурації HDFS-стоку — тут немає параметра, що вказує адресу HDFS. Творці стоку припускають, що даний сток повинен бути використаний на тих же машинах, що і HDFS.

Отже, що ж необхідно враховувати при налаштуванні цього стоку.

  • Використовуйте великі batch-size та transactionCapacity.
    Загалом-то, тут все аналогічно з іншими стоками транзакція досить дорога в плані ресурсів, тому краще лити великими порціями.
  • Не зловживайте макросами в іменуванні файлів.
    Використання елементів дати в іменах файлів/папок або плейсхолдеров для заголовків — це, безумовно, зручний інструмент. Але не дуже швидкий. Мені здається, підстановку дати творці могли зробити оптимальніше — якщо ви заглянете в исходники, то здивуєтеся кількістю виконуваних операцій для форматування цих рядків. Припустимо, ми вирішили задати ось таку структуру папок:
    hdfs.path = /logs/%{dir}
    hdfs.filePrefix = %{src}/%Y-%m-%d/%Y-%m-%d-%H-%M-%S.%{host}.%{src}
    Тут dir й src — значення заголовків подій з соотв. ключами. Результуючий файл буде мати вигляд /logs/web/my-source/2016-04-15/2016-04-15-12-00-00.my-host.my-source.gz. На моєму комп'ютері генерація цього імені для 1 млн. подій займає майже 20 секунд! Тобто для 10000 подій це займе приблизно 200мс. Робимо висновок: якщо ви претендуєте на швидкість запису 10000 подій в секунду, будьте готові віддати 20% часу на генерацію імені файлу. Це жахливо. Вилікувати це можна, взявши на себе відповідальність за генерацію імені файлу на стороні клієнта. Так, для цього доведеться написати трохи коду, але зате можна буде змінити налаштування стоку на ось такі:
    hdfs.path = /logs
    hdfs.filePrefix = %{file-name}
    Передаючи сформований ім'я файлу в заголовку file-name ви заощадите ресурси і час. Шляхи формування файлу з таким заголовком займає вже не 20 секунд, а 500-600 мілісекунд 1 млн. подій. Тобто, майже в 40 разів швидше.
  • Об'єднуйте події.
    Ще один маленький хак, що дозволяє істотно підвищити продуктивність стоку. Якщо ви пишете події у файл порядково, то можна об'єднувати їх на стороні клієнта. Наприклад, ваш сервіс генерує логи, які повинні йти в один і той же файл. Так чому б не об'єднати кілька рядків в один, використавши в якості роздільника \n? Сама по собі запис даних в HDFS або файлову систему займає набагато менше часу, ніж вся ця «цифрова бюрократія» навколо даних.


    Об'єднуючи події у співвідношенні хоча б 5 до 1 ви вже отримаєте істотний приріст продуктивності. Природно, тут потрібно бути обережним — якщо події на клієнті генеруються по одному, то наповнення буфера для об'єднання подій може зайняти деякий час. Все це час події будуть зберігатися в пам'яті, очікуючи формування групи для об'єднання. А отже, підвищуються шанси втратити дані. Резюме:

    1. Для невеликих обсягів даних клієнтові краще відправляти події у Flume по одному — менше шансів їх втратити.

    2. Для великих обсягів даних переважно використовувати об'єднання подій. Якщо події генеруються інтенсивно, буфер для 5-10 подій буде набиратися досить швидко. При цьому ви істотно підвищите продуктивність стоків.
  • Розгорніть стоки на декількох машинах HDFS-кластера.
    При налаштуванні Flume через Cloudera є можливість запустити на кожній ноде кластера окремий вузол Flume. І цією можливістю краще скористатися, оскільки таким чином навантаження розподіляється між усіма машинами кластера. При цьому, якщо ви використовуєте загальну конфігурацію (тобто один і той же файл конфігурації на всіх машинах), переконайтеся, що у вас не виникне конфліктів імен файлів. Зробити це можна, задіявши перехоплювач подій, додає в заголовки назва хоста. Відповідно, вам залишиться тільки вказати шаблон імені файлу цей заголовок (див. нижче).
    Примітка. Насправді при прийнятті такого рішення варто задуматися — адже кожен стік буде писати однорідні дані в файл. В результаті ви можете отримати купу дрібних файлів на HDFS. Рішення має бути зваженим — якщо обсяг даних невеликий, то можна обмежитися одним вузлом Flume для запису в HDFS. Це так звана консолідація даних — коли дані з різних джерел в результаті потрапляють на один стік. Однак якщо дані «течуть рікою», то одного вузла може бути недостатньо. Детальніше про проектування всієї транспортної мережі ми поговоримо в наступній статті цього циклу.
Перехоплювачі подій (Flume Interceptors)
Я багато разів згадував ці таємничі перехоплювачі, мабуть тепер саме час розповісти про те, що це таке. Перехоплювачі — це обробники подій, які працюють на етапі між отриманням подій на джерелі і відправкою їх в канал. Перехоплювачі можуть перетворювати події, змінювати їх або фільтрувати.

Flume надає за замовчуванням безліч перехоплювачів, дозволяють:

  • Додавати статичні заголовки (константи, timestamp, hostname).
  • Генерувати випадкові UUID в заголовках.
  • Витягувати значення з тіла події (регулярними виразами) і використовувати їх як заголовки.
  • Редагувати вміст подій (знову регулярними виразами).
  • Фільтрувати події на основі вмісту.
Приклад конфігурації різних перехоплювачів
# ============================ Avro-джерело з перехоплювачами ============================ #
# Обов'язкові параметри для Vvro-джерела
my-agent.sources.avro-source.type = avro
my-agent.sources.avro-source.bind = 0.0.0.0
my-agent.sources.avro-source.port = 50001
my-agent.sources.avro-source.channels = my-agent-channel

# Додаємо до джерела перехоплювачі, вказуємо їх назви (назви значення не мають)
my-agent.sources.avro-source.interceptors = ts directory host replace group-replace filter extractor

# ------------------------------------------------------------------------------ #

# Перший перехоплювач додає статичний заголовок до всіх подій.
# Найменування заголовка буде "dir", а значення — "test-folder".
my-agent.sources.avro-source.interceptors.directory.type = static
my-agent.sources.avro-source.interceptors.directory.key = dir
my-agent.sources.avro-source.interceptors.directory.value = test-folder

# Якщо такий заголовок вже є — зберегти наявний (за замовчуванням — false)
my-agent.sources.avro-source.interceptors.directory.preserveExisting = true

# ------------------------------------------------------------------------------ #

# Другої перехоплювач додає заголовок "timestamp" до всіх подій з поточним значенням часу, в мілісекундах
my-agent.sources.avro-source.interceptors.ts.type = timestamp
my-agent.sources.avro-source.interceptors.ts.preserveExisting = true

# ------------------------------------------------------------------------------ #

# Третій перехоплювач додає заголовок з хостом/IP поточної машини
my-agent.sources.avro-source.interceptors.host.type = host
my-agent.sources.avro-source.interceptors.host.useIP = true

# Найменування заголовка (аналог directory.key)
my-agent.sources.avro-source.interceptors.host.hostHeader = host
my-agent.sources.avro-source.interceptors.host.preserveExisting = true

# ------------------------------------------------------------------------------ #

# Цей перехоплювач замінює всі символи табуляції ; в тілі події
my-agent.sources.avro-source.interceptors.replace.type = search_replace
my-agent.sources.avro-source.interceptors.replace.searchPattern = \t
my-agent.sources.avro-source.interceptors.replace.replaceString = ;

# Тіло передається як byte[], тому необхідно вказати кодування (за замовчуванням — UTF-8)
my-agent.sources.avro-source.interceptors.replace.charset = UTF-8

# ------------------------------------------------------------------------------ #

# Більше "розумний" варіант заміни
my-agent.sources.avro-source.interceptors.group-replace.type = search_replace

# Припустимо, наша рядок починається з дати 2014-01-20 і нам потрібно поміняти її формат на 20/01/2014
# при цьому зберігши все інше. Ми "розбиваємо" рядок на 4 блоку () і потім виконуємо підстановку,
# використовуючи індекси цих блоків в кінцевому рядку
my-agent.sources.avro-source.interceptors.group-replace.searchPattern = (\\d{4})-(\\d{2})-(\\d{2})(.*)
my-agent.sources.avro-source.interceptors.group-replace.replaceString = $3/$2/$1$4

# ------------------------------------------------------------------------------ #

# Перехоплювач-фільтр, виключає події по регулярному виразу
my-agent.sources.avro-source.interceptors.filter.type = regex_filter
my-agent.sources.avro-source.interceptors.filter.regex = error$
# Якщо true — то фільтрувати події, тіло яких підходить під регулярний вираз,
# в іншому випадку — фільтрувати те, що не підходить під регулярку
my-agent.sources.avro-source.interceptors.filter.excludeEvents = true

# ------------------------------------------------------------------------------ #

# Перехоплювач, извлекающий дані події і додає їх в заголовки
my-agent.sources.avro-source.interceptors.extractor.type = regex_extractor

# Наприклад, ми передаємо події вигляду: "2016-04-15;WARINING;ЯКАСЬ ІНФОРМАЦІЯ"
my-agent.sources.avro-source.interceptors.extractor.regex = (\\d{4}-\\d{2}-\\d{2});(.*);

# тут важливо — сериализаторы повинні бути перераховані у тому ж порядку, 
# що і соотв. групи в регулярному виразі 
# (\\d{4}-\\d{2}-\\d{2}) -> $1 -> ts 
# (.*) -> $2 -> loglevel
my-agent.sources.avro-source.interceptors.extractor.serializers = ts loglevel

# Першу групу будемо сериализации спеціальним класом, який витягуючи з дати TS
my-agent.sources.avro-source.interceptors.extractor.serializers.ts.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
my-agent.sources.avro-source.interceptors.extractor.serializers.ts.name = timestamp
my-agent.sources.avro-source.interceptors.extractor.serializers.ts.pattern = yyyy-MM-dd

# Другу групу будемо сериализации as is
my-agent.sources.avro-source.interceptors.extractor.serializers.loglevel.name = level

Серед стандартних перехоплювачів, на жаль, не виявилося фільтра по заголовкам. Втім, при бажанні такий перехоплювач можна написати самому. Тепер, щоб повноцінно використовувати транспорт Flume, нам необхідно розглянути ще один тип компонентів Flume — селектори.

Канальні селектори (Flume Channel Selectors)
Селектор необхідний каналу для того, щоб розуміти, у який канал які події відправляти. Всього існує 2 типи селекторів:

  1. replicating — селектор, завдяки якому джерело дублює події у всі пов'язані канали. Саме він використовується Flume за замовчуванням. При цьому, цей селектор дозволяє виділити «опціональні» канали. На відміну від основних, джерело буде ігнорувати невдалі додавання подій в такі канали.

  2. multiplexing — селектор, розподіляє події між каналами за деякими правилами. Реалізація стандартного multiplexing-селектора дозволяє розподіляти події між каналами на основі значень заголовків.
Приклад конфігурації multiplexing-селектора
# ============================ Avro-джерело з селектором ============================ #
my-source.sources.avro-source.type = avro
my-source.sources.avro-source.port = 50002
my-source.sources.avro-source.bind = 127.0.0.1
my-source.sources.avro-source.channels = hdfs-channel file-roll-channel null-channel

# Оголошуємо селектор — multiplexing, будемо сортувати події
# Припустимо, що ми раніше позначали події як "важливі" і "звичайні" і хочемо,
# щоб важливі події записувалися в файлову систему і HDFS, а звичайні — тільки файли
my-source.sources.avro-source.selector.type = multiplexing

# вказуємо назву заголовка, за яким будемо ділити події
my-source.sources.avro-source.selector.header = type

# якщо type = important, то відправляємо події і в HDFS, і в файловий сток
my-source.sources.avro-source.selector.mapping.important = hdfs-channel file-roll-channel

# якщо type = common, то тільки в файловий сток
my-source.sources.avro-source.selector.mapping.common = file-roll-channel

# якщо заголовок type не знайдений або значення якесь інше, відправляємо подія на фільтрацію
# (як правило, для фільтрації використовуємо невеликий memchannel і null-sink)
my-source.sources.avro-source.selector.mapping.default = hdfs-null-channel


Селектори обробляють події після перехоплювачів. Це означає, що ви можете виконати над подіями деякі маніпуляції перехоплювачами (наприклад, понаизвлекать різні заголовки) і використовувати результати цих маніпуляцій вже в селекторі.

Висновок
Стаття несподівано вийшла великий, тому обіцяний моніторинг сайту я вирішив розглянути в наступній частині цього циклу статей. На закінчення хочу продемонструвати одну з робочих конфігурацій Flume для HDFS. Вона непогано підходить для доставки та організації невеликих обсягів даних — приблизно до 2000 подій в секунду на одну ноду. Цей сайт вимагає наявності у подіях заголовків roll («15m» або «60м»), dir і ѕгс — з допомогою них виходить дворівнева ієрархія папок.

Конфігурація Flume для HDFS
flume-hdfs.sources = hdfs-source
flume-hdfs.channels = hdfs-15m-channel hdfs-60м-channel hdfs-null-channel
flume-hdfs.sinks = hdfs-15m-sink hdfs-60м-sink

# =========== Avro-джерело, з селектором і додаванням заголовка host ============ #
flume-hdfs.sources.hdfs-source.type = avro
flume-hdfs.sources.hdfs-source.port = 50002
flume-hdfs.sources.hdfs-source.bind = 0.0.0.0
flume-hdfs.sources.hdfs-source.interceptors = hostname
flume-hdfs.sources.hdfs-source.interceptors.hostname.type = host
flume-hdfs.sources.hdfs-source.interceptors.hostname.hostHeader = host
flume-hdfs.sources.hdfs-source.channels = hdfs-null-channel hdfs-15m-channel
flume-hdfs.sources.hdfs-source.selector.type = multiplexing
flume-hdfs.sources.hdfs-source.selector.header = roll
flume-hdfs.sources.hdfs-source.selector.mapping.15m = hdfs-15m-channel
flume-hdfs.sources.hdfs-source.selector.mapping.60m = hdfs-60м-channel
flume-hdfs.sources.hdfs-source.selector.mapping.default = hdfs-null-channel

# ============================ Файловий канал, 15 хвилин ============================ #
flume-hdfs.channels.hdfs-15m-channel.type = file
flume-hdfs.channels.hdfs-15m-channel.maxFileSize = 1073741824
flume-hdfs.channels.hdfs-15m-channel.capacity = 10000000
flume-hdfs.channels.hdfs-15m-channel.transactionCapacity = 10000
flume-hdfs.channels.hdfs-15m-channel.dataDirs = /flume/flume-hdfs/hdfs-60м-channel/data1,/flume/flume-hdfs/hdfs-60м-channel/data2
flume-hdfs.channels.hdfs-15m-channel.checkpointDir = /flume/flume-hdfs/hdfs-15m-channel/checkpoint

# ============================ Файловий канал, 60 хвилин ============================ #
flume-hdfs.channels.hdfs-60м-channel.type = file
flume-hdfs.channels.hdfs-60м-channel.maxFileSize = 1073741824
flume-hdfs.channels.hdfs-60м-channel.capacity = 10000000
flume-hdfs.channels.hdfs-60м-channel.transactionCapacity = 10000
flume-hdfs.channels.hdfs-60м-channel.dataDirs =/flume/flume-hdfs/hdfs-60м-channel/data1,/flume/flume-hdfs/hdfs-60м-channel/data2
flume-hdfs.channels.hdfs-60м-channel.checkpointDir = /flume/flume-hdfs/hdfs-60м-channel/checkpoint

# =========== Стік для файлів, заворачиваемых кожні 15 хвилин (5 хв. неактивність) =========== #
flume-hdfs.sinks.hdfs-15m-sink.type = hdfs
flume-hdfs.sinks.hdfs-15m-sink.channel = hdfs-15m-channel
flume-hdfs.sinks.hdfs-15m-sink.hdfs.filePrefix = %{src}/%Y-%m-%d/%Y-%m-%d-%H-%M-%S.%{src}.%{host}.log
flume-hdfs.sinks.hdfs-15m-sink.hdfs.path = /logs/%{dir}
flume-hdfs.sinks.hdfs-15m-sink.hdfs.fileSuffix = .gz
flume-hdfs.sinks.hdfs-15m-sink.hdfs.writeFormat = Text
flume-hdfs.sinks.hdfs-15m-sink.hdfs.codeC = gzip
flume-hdfs.sinks.hdfs-15m-sink.hdfs.fileType = CompressedStream
flume-hdfs.sinks.hdfs-15m-sink.hdfs.minBlockReplicas = 1
flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollInterval = 0
flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollSize = 0
flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollCount = 0
flume-hdfs.sinks.hdfs-15m-sink.hdfs.idleTimeout = 300
flume-hdfs.sinks.hdfs-15m-sink.hdfs.round = true
flume-hdfs.sinks.hdfs-15m-sink.hdfs.roundValue = 15
flume-hdfs.sinks.hdfs-15m-sink.hdfs.roundUnit = minute
flume-hdfs.sinks.hdfs-15m-sink.hdfs.threadsPoolSize = 8
flume-hdfs.sinks.hdfs-15m-sink.hdfs.batchSize = 10000

# =========== Стік для файлів, заворачиваемых кожні 60 хвилин (20 хв. неактивність) =========== #
flume-hdfs.sinks.hdfs-60м-sink.type = hdfs
flume-hdfs.sinks.hdfs-60м-sink.channel = hdfs-60м-channel
flume-hdfs.sinks.hdfs-60м-sink.hdfs.filePrefix = %{src}/%Y-%m-%d/%Y-%m-%d-%H-%M-%S.%{src}.%{host}.log
flume-hdfs.sinks.hdfs-60м-sink.hdfs.path = /logs/%{dir}
flume-hdfs.sinks.hdfs-60м-sink.hdfs.fileSuffix = .gz
flume-hdfs.sinks.hdfs-60м-sink.hdfs.writeFormat = Text
flume-hdfs.sinks.hdfs-60м-sink.hdfs.codeC = gzip
flume-hdfs.sinks.hdfs-60м-sink.hdfs.fileType = CompressedStream
flume-hdfs.sinks.hdfs-60м-sink.hdfs.minBlockReplicas = 1
flume-hdfs.sinks.hdfs-60м-sink.hdfs.rollInterval = 0
flume-hdfs.sinks.hdfs-60м-sink.hdfs.rollSize = 0
flume-hdfs.sinks.hdfs-60м-sink.hdfs.rollCount = 0
flume-hdfs.sinks.hdfs-60м-sink.hdfs.idleTimeout = 1200
flume-hdfs.sinks.hdfs-60м-sink.hdfs.round = true
flume-hdfs.sinks.hdfs-60м-sink.hdfs.roundValue = 60
flume-hdfs.sinks.hdfs-60м-sink.hdfs.roundUnit = minute
flume-hdfs.sinks.hdfs-60м-sink.hdfs.threadsPoolSize = 8
flume-hdfs.sinks.hdfs-60м-sink.hdfs.batchSize = 10000

# ================ NULL-сток + невеликий канал для нього =============== #
flume-hdfs.channels.hdfs-null-channel.type = memory
flume-hdfs.channels.hdfs-null-channel.capacity = 30000
flume-hdfs.channels.hdfs-null-channel.transactionCapacity = 10000
flume-hdfs.channels.hdfs-null-channel.byteCapacityBufferPercentage = 20

flume-hdfs.sinks.hdfs-null-sink.channel = hdfs-null-channel
flume-hdfs.sinks.hdfs-null-sink.type = null

У наступній, заключній статті циклу, ми розглянемо:

  • Процес побудови повноцінного транспорту даних на основі Flume.
  • Приклади розробки власних компонентів.
  • Обіцяний моніторинг вузлів, який не увійшов у цю статтю.


Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.