Big data від А до Я. Частина 3: Прийоми і стратегії розробки MapReduce-додатків

Привіт, Хабр! В попередніх статтях ми описали парадигму MapReduce, а також показали як на практиці реалізувати і виконати MapReduce-додаток на стеку Hadoop. Прийшла пора описати різні прийоми, які дозволяють ефективно використовувати MapReduce для вирішення практичних завдань, а також показати деякі особливості Hadoop, які дозволяють спростити розробку або істотно прискорити виконання MapReduce-завдання на кластері.



Map only job
Як ми пам'ятаємо, MapReduce складається з стадій Map, Shuffle і Reduce. Як правило, в практичних завданнях найважчою виявляється стадія Shuffle, так як на цій стадії відбувається сортування даних. Насправді існує ряд завдань, до яких можна обійтися тільки стадією Map. Ось приклади таких завдань:

  • Фільтрація даних (наприклад, «Знайти всі записи з IP-адреси 123.123.123.123» в логах web-сервера);
  • Перетворення даних («Видалити колонку в csv-логах»);
  • Завантаження і вивантаження даних із зовнішнього джерела («Вставити всі записи з журналу в базу даних»).
Такі завдання вирішуються за допомогою Map-Only. При створенні Map-Only завдання в Hadoop потрібно вказати нульове кількість reducer'ов:

Map Only Job

Приклад конфігурації map-only завдання на hadoop:
Native interface Hadoop Streaming Interface
Вказати нульове кількість редьюсеров при конфігурації job'a:

job.setNumReduceTasks(0);

Більш розгорнутий приклад посилання.
Не вказуємо редьюсер. Приклад:

hadoop jar hadoop-streaming.jar \
-input input_dir\
-output output_dir\
-mapper "python mapper.py"\
-file "mapper.py"
Map Only jobs насправді можуть бути дуже корисними. Наприклад, в платформі Facetz.DCA для виявлення характеристик користувачів по їх поведінці використовується саме один великий map-only, кожен маппер якого приймає на вхід і на вихід віддає його характеристики.

Combine
Як я вже писав, зазвичай найважча стадія при виконанні Map-Reduce завдання – це стадія shuffle. Відбувається це тому, що проміжні результати (вихід mapper'a) записуються на диск, сортуються і передаються по мережі. Однак існують задачі, в яких така поведінка здається не дуже розумним. Наприклад, у тієї ж задачі підрахунку слів у документах можна попередньо предагрегировать результати виходів декількох mapper'ів на одному вузлі map-reduce завдання, і передавати на reducer вже підсумовані значення по кожній машині.

Combine. Взято посилання

У hadoop для цього можна визначити комбинирующую функцію, яка буде обробляти вихід частини mapper-ів. Комбінує функція дуже схожа на reduce – вона приймає на вхід вихід частини mapper'ів і видає агрегований результат для цих mapper'ів, тому дуже часто reducer використовують і як combiner. Важлива відмінність від reduce – комбинирующую функцію потрапляють не всі значення, що відповідають одному ключу.

Більш того, hadoop не гарантує того, що комбінує функція взагалі буде виконана для виходу mapper'a. Тому що комбінує функція не завжди може бути застосована, наприклад, у разі пошуку медіанного значення по ключу. Тим не менш, в тих завданнях, де комбінує функція застосовна, її використання дозволяє домогтися істотного приросту до швидкості виконання MapReduce-завдання.

Використання Combiner'a на hadoop:

Native Interface Hadoop streaming
При конфігурації job-a вказати клас-Combiner. Як правило, він збігається з Reducer:

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);






В параметрах командного рядка вказати команду -combiner. Як правило, ця команда збігається з командою reducer'a. Приклад:

hadoop jar hadoop-streaming.jar \
-input input_dir\
-output output_dir\
-mapper "python mapper.py"\
-reducer "python reducer.py"\
-combiner "python reducer.py"\
-file "mapper.py"\
-file "reducer.py"\
Ланцюжка MapReduce-завдань
Бувають ситуації, коли для вирішення завдання одним MapReduce не обійтися. Наприклад, розглянемо трохи видозмінену завдання WordCount: є набір текстових документів, необхідно порахувати, скільки слів зустрілося від 1 до 1000 разів в наборі, скільки слів від 1001 до 2000, скільки від 2001 до 3000 і так далі.

Для рішення нам буде потрібно 2 MapReduce job'а:

  1. Видозмінений wordcount, який для кожного слова розрахувати, в який з інтервалів воно потрапило;
  2. MapReduce, підраховує, скільки разів на виході першого MapReduce зустрівся кожен з інтервалів.
Рішення на псевдокоде:
#map1
def map(doc):
for word in doc:
yield word, 1
#reduce1
def reduce(word, values):
yield int sum(values)/1000), 1

#map2
def map(doc):
interval, cnt = doc.split()
yield interval, cnt
#reduce2
def reduce(interval, values):
yield interval*1000, sum(values)

Для того, щоб виконати послідовність MapReduce-завдань на hadoop, достатньо просто в якості вхідних даних для другої задачі вказати папку, яка була вказана як output для першої і запустити їх по черзі.

На практиці ланцюжка MapReduce-завдань можуть являти собою досить складні послідовності, у яких MapReduce-завдання можуть бути підключені як послідовно, так і паралельно один одному. Для спрощення управління такими планами виконання завдань існують окремі інструменти типу oozie і luigi, яким буде присвячена окрема стаття даного циклу.

Приклад ланцюжка MapReduce-завдань.

Distributed cache
Важливим механізмом Hadoop є Distributed Cache. Distributed Cache дозволяє додавати файли (наприклад, текстові файли, архіви, jar-файли) до оточення, в якому виконується MapReduce-завдання.

Можна додавати файли, що зберігаються на HDFS, локальні файли (локальні для тієї машини, з якої виконується запуск завдання). Я вже неявно показував, як використовувати Distributed Cache разом з hadoop streaming: додаючи через опцію -file файли mapper.py і reducer.py. Насправді можна додавати не тільки mapper.py і reducer.py а взагалі довільні файли, і потім користуватися ними як ніби вони знаходяться в локальній теці.

Використання Distributed Cache:
Native API
//конфігурація Job'a
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), 
job);
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);

//приклад використання в mapper-e:
public static class MapClass extends MapReduceBase 
implements Mapper<K, V, K, V> {

private Path[]localArchives;
private Path[] localFiles;

public void configure(JobConf job) {
// отримуємо кешовані дані з архівів
File f = new File("./map.zip/some/file/in/zip.txt");
}

public void map(K key, V value, 
OutputCollector<K, V> output, Reporter reporter) 
throws IOException {
// використовуємо дані тут
// ...
// ...
output.collect(k, v);
}
}
Hadoop Streaming
#перераховуємо файли, які необхідно додати в distributed cache в параметрі –files. Параметр –files повинен йти перед іншими параметрами.

yarn hadoop-streaming.jar\
-files mapper.py,reducer.py,some_cached_data.txt\
-input '/some/input/path' \
-output '/some/output/path' \ 
-mapper 'python mapper.py' \
-reducer 'python reducer.py' \

приклад використання:
import sys
#просто читаємо файл з локальної папки
data = open('some_cached_data.txt').read()

for line in sys.stdin()
#processing input
#use data here
Reduce Join
Ті, хто звик працювати з реляційними базами, часто користуються дуже зручною операцією Join, що дозволяє спільно опрацювати зміст деяких таблиць, об'єднавши їх за деякого ключа. При роботі з великими даними таке завдання теж іноді виникає. Розглянемо наступний приклад:

Є логи двох web-серверів, кожен лог має наступний вигляд:
<timestamp>\t<ip>\t<url>
. Приклад шматочка лода:

1446792139 178.78.82.1 /sphingosine/unhurrying.css
1446792139 126.31.163.222 /accentually.js
1446792139 154.164.149.83 /pyroacid/unkemptly.jpg
1446792139 202.27.13.181 /Chawia.js
1446792139 67.123.248.174 /morphographical/dismain.css
1446792139 226.74.123.135 /phanerite.php
1446792139 157.109.106.104 /bisonant.css

Необхідно порахувати для кожного IP-адреси на який з 2-х серверів він частіше заходив. Результат повинен бути представлений у вигляді:
<ip>\t<first or second>
. Приклад частини результату:

178.78.82.1 first
126.31.163.222 second
154.164.149.83 second
226.74.123.135 first

На жаль, на відміну від реляційних баз даних, в загальному випадку об'єднання двох логів по ключу (в даному випадку – за IP-адресою) являє собою досить важку операцію і вирішується за допомогою 3-х MapReduce і патерну Reduce Join:

Загальна схема ReduceJoin

ReduceJoin працює наступним чином:

  1. На кожен з вхідних логів запускається окремий MapReduce (Map only), що перетворює вхідні дані до наступного вигляду:

    key -> (type, value)

    Де key – це ключ, по якому потрібно об'єднувати таблиці, Type – тип таблиці (first або second в нашому випадку), а Value – це будь-які додаткові дані, прив'язані до ключа.

  2. Виходи обох MapReduce подаються на вхід 3-го MapReduce, який, власне, і виконує об'єднання. Цей MapReduce містить порожній Mapper, який просто копіює вхідні дані. Далі shuffle розкладає дані по ключам і подає на вхід редьюсеру у вигляді:

    key -> [(type, value)]
Важливо, що в цей момент на редьюсер потрапляють записи з обох логів і при цьому по полю type можна ідентифікувати, з якого з двох логів потрапило конкретне значення. Значить даних достатньо, щоб вирішити вихідну задачу. У нашому випадку reducere просто повинен порахувати для кожного ключа записів, з яким type зустрілося більше і вивести цей type.

MapJoin
Патерн ReduceJoin описує загальний випадок об'єднання двох логів по ключу. Однак є окремий випадок, при якому завдання можна істотно спростити і прискорити. Це випадок, при якому один з логів має розмір істотно меншого розміру, ніж інший. Розглянемо наступну задачу:

Є 2 лода. Перший лог містить лог web-сервера (такий же як у попередній задачі), другий файл (розміром до 100 кб) містить відповідність URL-> Тематика. Приклад 2-го файла:

/toyota.php auto
/football/spartak.html sport
/cars auto
/finances/money business

Для кожного IP-адреси необхідно розрахувати сторінки якої категорії з цієї IP-адреси завантажувалися найчастіше.

У цьому випадку нам теж необхідно виконати Join 2-х логів по URL. Однак у цьому випадку нам не обов'язково запускати 3 MapReduce, так як другий лог повністю влізе в пам'ять. Для того, щоб вирішити завдання за допомогою 1-го MapReduce, ми можемо завантажити другий лог в Distributed Cache, а при ініціалізації Mapper'a просто вважати його в пам'ять, поклавши його в словник -> topic.

Далі задача вирішується наступним чином:

Map:

# знаходимо тематику кожній з сторінок першого лода</em>
input_line -> [ip, topic]

Reduce:

Ip -> [topics] -> [ip, most_popular_topic]

Reduce отримує на вхід ip і список всіх тематик, просто обчислює, яка з тематик зустрілася найчастіше. Таким чином задача вирішена за допомогою 1-го MapReduce, а власне Join взагалі відбувається всередині map (тому якщо б не потрібна була додаткова агрегація по ключу – можна було б обійтися MapOnly job-ом):

Схема роботи MapJoin

Резюме
У статті ми розглянули кілька патернів і прийомів вирішення завдань за допомогою MapReduce, показали, як об'єднувати MapReduce-завдання в ланцюжки та join-ить логи по ключу.

У наступних статтях ми більш детально розглянемо архітектуру Hadoop, а також інструменти, що спрощують роботу з MapReduce і дозволяють обійти його недоліки.

Посилання на інші статті циклу
» Big Data від А до Я. Частина 1: Принципи роботи з великими даними, парадигма MapReduce
» Big Data від А до Я. Частина 2: Hadoop

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.