Коли старий MapReduce краще нового Tez



Як всім відомо, кількість даних у світі зростає, збирати і обробляти потік інформації стає все складніше. Для цього служить популярне рішення Hadoop c ідеєю спрощення методів розробки та налагодження багатопоточних додатків, що використовує парадигму MapReduce. Ця парадигма не завжди вдало справляється зі своїми завданнями, і через деякий час з'являється «надбудова» над Hadoop: Apache Tez з парадигмою DAG. Під поява Tez підлаштовується і HDFS-SQL-обробник Hive. Але не завжди нове краще старого. У більшості випадків HiveOnTez значно швидше HiveOnMapReduce, але деякі підводні камені можуть сильно вплинути на продуктивність вашого рішення. Тут я хочу розповісти, з якими нюансами зіткнувся. Сподіваюся, це допоможе вам прискорити ETL або інший Hadoop UseCase.

MapReduce, Tez і Hive
Як я сказав раніше, даних у світі все більше і більше. І для їх зберігання і обробки придумують все більш хитрі рішення, серед них і Hadoop. Щоб процес обробки зберігаються на HDFS даних був простий навіть для пересічного аналітика, є кілька SQL-надбудов над Hadoop. Найстаріша і «проста» з них — Hive. Суть Hive така: ми маємо дані в якому-небудь внятном column-store форматі, заносимо інформацію про них в метадані, пишемо стандартний SQL з рядом обмежень, і він генерує ланцюжок MapReduce-job'ів, які вирішують нашу задачу. Здорово, зручно, але повільно. Наприклад, ось простий запит:

select 
t1.column1, 
t2.column2 
from 
table1 t1
inner join table2 t2 on t1.column1 = t2.column1
union 
select 
t3.column1, 
t4.column2 
from 
table3 t3
inner join table4 t4 on t3.column1 = t4.column1
order by 
column1;

Цей запит породжує чотири джоба:

  • table1 inner join table2;
  • table3 inner join table4;
  • union;
  • sort.


Кроки виконуються послідовно, і кожен з них завершується записом даних на HDFS. Це виглядає досить неоптимальним. Наприклад, кроки 1 і 2 могли б виконуватися паралельно. А бувають і такі ситуації, коли у декількох кроків розумно застосувати один і той же Mapper, а потім вже на результати цих Mapper'ів накласти кілька видів Reducer'ів. Але концепція MapReduce в рамках одного job'а не дозволяє так робити. Для вирішення цієї проблеми досить швидко з'являється Apache Tez з концепцією DAG. Суть DAG зводиться до того, що замість пари Mapper-Reducer (+epsilon) ми будуємо нецикличный спрямований граф, кожна вершина якого є Mapper.Class'ом або Reduser.Class'ом, а ребра означають потоки даних / порядок виконання. Окрім DAG, Tez надав ще кілька бонусів: прискорений запуск job'ів (можна посилати DAG-job'и через вже запущений Tez-Engine), можливість утримувати ресурси в пам'яті ноди між кроками, самостійно запускати розпаралелювання і т. д. Природно, разом з Tez вийшла і відповідна надбудова над Hive. З цієї надбудовою наш запит перетвориться DAG-job приблизно наступної структури:

  1. Mapper зчитує table1.
  2. Mapper зчитує table2 і джойнит її з результатом кроку 1.
  3. Mapper зчитує table3 і фільтрує column1 IS NOT NULL.
  4. Mapper зчитує table4 і фільтрує column1 IS NOT NULL.
  5. Reducer джойнит результати кроків 3 і 4.
  6. Reducer, робить union.
  7. Reducer Group By і Sort.
  8. Збирає результат.


Фактично кроки 1 і 2 — це перший join, а 2, 3 і 4 — це другий join (я спеціально підібрав таблиці різних розмірів, щоб join'и оброблялися по-різному). При цьому два блоки один від одного не залежать і можуть виконуватися паралельно. Це вже дуже добре. Tez дійсно дає значний приріст в швидкості обробки складних запитів. Але іноді Tez може бути гірше MapReduce, і тому перед відправкою в production варто спробувати запит як з
set hive.execution.engine=tez
та
set hive.execution.engine=mr
.

Так що ж таке Tez?
Все, що треба знати про Tez: він змінює логіку MapReduce на логіку DAG (directed acyclic graph — спрямованого ациклічного графа), надаючи можливість в рамках одного DataFlow паралельно виконувати декілька різних процесів, будь то Mapper або Reducer. Головне, щоб його вхідні дані були готові. Зберігати дані можна локально на ноди між кроками, а іноді і просто в оперативній пам'яті ноди, не вдаючись до дискових операцій. Можна оптимізувати кількість і місце розташування Mapper'ів і Reducer'ів, щоб мінімізувати передачу даних по Мережі навіть з урахуванням багатокрокових розрахунків, переиспользовать контейнери, які вже відпрацювали у сусідніх процесах в рамках одного Tez-Job'а, і підлаштовувати паралельне виконання під статистику, зібрану на попередньому кроці. Крім того, движок дозволяє користувачеві створювати DAG-завдання з тією ж простотою, що і MapReduce, при цьому він сам буде займатися ресурсами, перезапуску і управлінням DAG на кластері. Tez дуже мобільний, додавання підтримки Tez не ламає вже працюють процеси, а тестування нової версії можливо локально «на стороні» тоді, коли у всіх завданнях кластера буде працювати стара версія Tez. Last but not least: зазначимо, що Tez може запускатися на кластері як служба і працювати в «фоновому режимі», що дозволяє йому відправляти завдання на виконання значно швидше, ніж це відбувається при стандартному запуску MapReduce. Якщо ви ще не пробували Tez і у вас залишилися сумніви, то подивитеся на порівняння швидкості, опубліковане в презентації HortonWorks:



І в парі з Hive:



Але при всій цій красі графіків і описів в HiveOnTez є і проблеми.

Tez менш стійкий до нерівномірного розподілу даних, ніж MapReduce
Перша і найбільша проблема лежить в різниці створення DAG-job і MapReduce-job. У них один принцип: кількість Mapper'ів і Reducer'ів розраховується в момент запуску job'а. Тільки коли запит виконується ланцюжком MapReduce-job'ів, Hadoop розраховує необхідну кількість завдань на основі результату попередніх кроків і зібраної аналітики за джерелами, а в разі DAG-job це відбувається до обчислення всіх кроків, тільки на основі аналітики.

Поясню на прикладі. Десь в середині запиту по мірі виконання вкладених запитів у нас з'являються дві таблиці. За оцінками статистики, у кожній з n рядків і k унікальних значень join-ключа. На виході очікуємо приблизно n*k рядків. І припустимо, це кількість добре вкладається в один контейнер, і Tez виділить на наступний крок (припустимо, сортування) один Reducer. І це число Reducer'ів вже в процесі виконання не зміниться незалежно ні від чого. Тепер припустимо, що насправді у цих таблиць дуже поганий skew: на одне значення доводиться n – k + 1 рядок, а всі інші — по одному рядку. Таким чином, на виході ми отримаємо n^2 + k^2 – 2kn – k + 2n рядків. Тобто (n + 2 – 2k)/k + (k – 1)/n більше n/k в два рази. І вже така кількість один Reducer буде виконувати вічність. А у випадку з MapReduce, отримавши на виході цього кроку n^2 + k^2 – 2kn – k + 2n, Hadoop об'єктивно оцінить свої сили і видасть потрібну кількість Mapper'ів і Reducer'ів. В результаті c MapReduce всі відпрацює набагато швидше.

Сухі обчислення можуть здатися надуманими, але насправді така ситуація реальна. І якщо її не сталося, то вважайте, що вам пощастило. З аналогічним ефектом Tez-DAG'а я стикався ще при використанні lateral view у складних запитах або кастомних Mapper'ах.

Особливості тюнінгу Tez
За іронією, остання відома мені важлива особливість Tez, яка може нашкодити, пов'язана з його силою — ДАГ. Найчастіше кластер — це не просто сховище інформації. Це ще й система, в якій ведеться постобробка даних, і важливо, щоб на цю частину кластера не впливала інша діяльність. Так як ноди — це ресурс, то зазвичай кількість ваших контейнерів не безмежно. А значить, коли ви запускаєте job, то краще не забивати всі контейнери, щоб сильно не гальмувати регулярні процеси. І тут DAG може підкласти вам свиню. DAG потрібно (в середньому по палаті) менше контейнерів за рахунок їх перевикористання, більш плавною навантаження і т. д. Але коли швидких кроків багато, контейнери починають розмножуватися в геометричній прогресії. Перші Mapper'и ще не допрацювали, але дані поширюються по іншим Mapper'ам, під все це виділяються контейнери, і — бум! Ваш кластер забитий в стелю, ніхто більше не може запустити жодного job'а. Ресурсів не вистачає, і ви дивіться, як повільно змінюються цифри на прогрес-барі. MapReduce з-за своєї послідовності від такого ефекту позбавлений, але платите ви за це, як завжди, швидкістю.

Ми давно знаємо, як боротися з тим, що стандартний MapReduce займає надто багато контейнерів. Регулюємо параметри:

  • mapreduce.input.fileinputformat.split.maxsize
    : зменшуючи — збільшуємо кількість Mapper'ів;
  • mapreduce.input.fileinputformat.split.minsize
    : збільшуючи — зменшуємо кількість Mapper'ів;
  • mapreduce.input.fileinputformat.split.minsize.per.node
    ,
    mapreduce.input.fileinputformat.split.minsize.per.rack
    : тонка настройка для контролю локальних (в сенсі node або rack) партіцій;
  • hive.exec.reducers.bytes.per.reducer
    : збільшуючи — зменшуємо кількість Reducer'ів;
  • mapred.tasktracker.reduce.tasks.maximum
    : виставляємо максимальна кількість Reducer'ів;
  • mapred.reduce.tasks
    : задаємо конкретне число Reducer'ів.
Обережно! У DAG всі reduce-кроки будуть мати стільки процесів, скільки ви вкажете тут! Але параметри Tez більш хитрі, і не завжди параметри, які ми поставили для MapReduce, на нього діють. По-перше, Tez дуже чутливий до
hive.tez.container.size
, і інтернет радить брати значення між
yarn.scheduler.minimum-allocation-mb
та
yarn.scheduler.maximum-allocation-mb
. По-друге, погляньте на параметри утримання невикористаного контейнера:

  • tez.am.container.ide.release-timeout-max.millis
    ;
  • tez.am.container.ide.release-timeout-min.millis
    .
Опція
tez.am.container.reuse.enabled
активує або дезактивує переиспользование контейнерів. Якщо вона відключена, то попередні два параметра не працюють. І по-третє, подивіться на параметри угруповання:

  • tez.grouping.split-waves
    ;
  • tez.grouping.max-size
    ;
  • tez.grouping.min-size
    .

Справа в тому, що заради розпаралелювання читання зовнішніх даних Tez змінив процес формування завдань: спочатку Tez оцінює, скільки хвиль (w) можна запустити на кластері, потім це кількість множиться на параметр
tez.grouping.split-waves
, і твір (N) ділиться на кількість стандартних сплитов на завдання. Якщо результат дій знаходиться між
tez.grouping.min-size
та
tez.grouping.max-size
, то все добре і завдання запускається N завдань. Якщо ні, то число адаптується до рамок. Документація по Tez радить «тільки в якості експерименту» виставляти параметр
tez.grouping.split-count
, який скасовує всю вищевикладену логіку і групує спліти у вказане в параметрі кількість груп. Але я цим властивістю намагаюся не користуватися, воно не дає гнучкості Tez'у і Hadoop'в цілому для оптимізації під конкретні вхідні дані.

Нюанси Tez
Крім великих проблем, Tez не позбавлений від маленьких недоліків. Наприклад, якщо ви користуєтеся http Hadoop ResourceManager'ом, то ви не побачите в ньому, скільки якою Tez-job займає контейнерів, а тим більше — в якому стані його Mapper'и і Reducer'и. Для моніторингу стану кластера я використовую цей маленький python-скрипт:

import os
import threading

result = []
e = threading.Lock()

def getContainers(appel):
attemptfile = os.popen("yarn applicationattempt -list " + appel[0])
attemptlines = attemptfile.readlines()
attemptfile.close()

del attemptlines[0]
del attemptlines[0]

for attempt in attemptlines:
splt = attempt.split('\t');
if ( splt[1].strip() == "RUNNING" ):
containerfile = os.popen("yarn container -list " + splt[0] )
containerlines = containerfile.readlines()
containerfile.close()
appel[2] += int( containerlines[0].split("Total number of containers :")[1].strip() )
e.acquire()
result.append(appel)
e.release()

appfile = os.popen("yarn application -list -appStates RUNNING")
applines = appfile.read()
appfile.close()

apps = applines.split('application_')
del apps[0]

appsparams = []

for app in apps:
splt = app.split('\t')
appsparams.append(['application_' + splt[0],splt[3], 0])

cnt = 0
threads = []

for app in appsparams:
threads.append(threading.Thread(target=getContainers, args=(app)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()

result.sort( key=lambda x:x[2] )

total = 0
for app in result:
print(app[0].strip() + '\t' + app[1].strip() + '\t' + str(app[2]) )
total += app[2]

print("Total:",total)

Незважаючи на запевнення HortonWorks, наша практика показує, що коли у Hive ви робите простий SELECT FROM smth table WHERE smth, то найчастіше MapReduce відпрацює швидше, правда, не набагато. До того ж на початку статті я вас трохи обдурив: розпаралелювання в HiveOnMapReduce можливо, але не таке інтелектуальне. Досить лише зробити
set hive.exec.parallel=true
налаштувати
set hive.exec.parallel.thread.number=
… — і незалежні кроки (пари Mapper + Reducer) будуть виконуватися паралельно. Так, в ньому немає можливості, що на виході одного Mapper'а буде запускатися кілька Reducer'ів або наступних Mapper'ів. Так, розпаралелювання куди більш примітивно, але теж прискорює роботу.

Ще одна цікава особливість Tez: він запускає свій движок на кластері і тримає його включеним деякий час. З одного боку, це дійсно прискорює роботу, так як завдання запускається на ноди значно швидше. Але з іншого боку — несподіваний мінус: важливі процеси в такому режимі запускати не можна, тому що TEZ-engine з часом породжує дуже багато класів і падає з GC-overflow. І буває так: ви запустили на ніч
nohup hive -f ....hql > hive.log &
, прийшли вранці, а воно десь посередині впало, хайвей тревел завершився, temporary tables віддалилися, і все треба вважати заново. Неприємно.

Додає в скарбничку дрібних проблем є те, що старий добрий MapReduce вже увійшов в стабільний реліз, а TEZ, незважаючи на популярність і прогресивність, досі перебуває у версії 0.8.4, і помилки в ній можуть зустрітися на будь-якому кроці. Найстрашніший баг для мене — це видалення інформації, але такого я не зустрічав. А ось з некоректним розрахунком на Tez ми стикалися, причому MapReduce вважає коректно. Наприклад, мій колега використовував дві таблиці table1 і table2, що мають унікальне поле EntityId. Зробив через Tez запит:

select 
table1.EntityId, count(1)
from 
table1
left join table2 on table1.EntityId = table2.EntityId
group by 
EntityId 
having 
count(1) > 1

І отримав на виході якісь рядки! Хоча MapReduce очікувано повернула пустий результат. Про схожу проблему є bugreport.

Висновок
Tez — безумовне благо, яке в більшості випадків робить життя простішим, дозволяє писати в Hive більш складні запити і чекати на них швидкого відповіді. Але, як і будь-яке благо, воно вимагає до себе обережного підходу, обачності та знання якихось нюансів. І як наслідок, використання старого, перевіреного, надійного MapReduce краще, ніж використання Tez. Я дуже здивувався, що не зміг знайти жодної статті (ні в інтернеті, ні в инглишнете) про мінуси HiveOnTez, і вирішив заповнити цей пробіл. Сподіваюся, що інформація виявиться комусь корисною. Спасибі! Всім удачі і поки!
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.