Розподілений xargs, або Виконання гетерогенних додатків на Hadoop-кластері

enter image description here
Привіт, Хабр!
Мене звати Олександр Крашенинников, я керую DataTeam в Badoo. Сьогодні я поділюся з вами простий і елегантною утилітою для розподіленого виконання команд в стилі xargs, а заодно розповім історію її виникнення.
Наш відділ BI працює з обсягами даних, для обробки яких потрібні ресурси більш ніж однієї машини. У наших процесах ETL (Extract Transform Load) в хід йдуть звичні світу Big Data розподілені системи Hadoop і Spark в зв'язці з OLAP-базою Exasol. Використання цих інструментів дозволяє нам горизонтально масштабуватися як по дисковому простору, так і за CPU/ RAM.
Безумовно, в наших процесах ETL існують не тільки великі завдання на кластері, але і машинерія простіше. Широкий пласт завдань вирішується поодинокими PHP/ Python-скриптами без залучення гб оперативної пам'яті і дюжини жорстких дисків. Але в один прекрасний день нам треба було адаптувати одну CPU-bound завдання для виконання в 250 паралельних инстансов. Настала пора маленькому Python-скрипту покинути межі рідного хоста і полинути у великий кластер!
Варіанти маневру
Отже, ми маємо наступні вхідні умови завдання:
  1. Довгограюча (близько однієї години) CPU-bound завдання на мові Python.
  2. Потрібно виконати завдання 250 разів з різними вхідними параметрами.
  3. Результат виконання отримати синхронно, тобто запустити щось, почекати, вийти з exit code згідно з результатами.
  4. Мінімальний час виконання – вважаємо, що у нас є достатня кількість обчислювальних ресурсів для паралелізації.
Варіанти реалізації

Один фізичний хост

Той факт, що запускаються програми є однопоточными і не використовують понад 100% одного ядра CPU, дає нам можливість нехитро здійснювати послідовність fork-/ exec-дій при реалізації кожного завдання.
з використанням xargs:
commands.list:
/usr/bin/uptime
/bin/pwd

krash@krash:~$ cat commands.list | xargs -n 1 -P `nproc` bash -c
/home/krash
18:40:10 up 14 days, 9:20, 7 users, load average: 0,45, 0,53, 0,59

Підхід проста як валянок і добре себе зарекомендував. Але в нашому випадку ми його відкидаємо, оскільки при виконанні нашого завдання на машині з 32 ядрами результат ми отримаємо через ~вісім годин, а це не відповідає формулюванню «мінімальний час виконання».

Кілька фізичних хостів

Наступний інструмент, який можна застосувати для такого рішення, – GNU Parallel. Крім локального режиму, схожого по функціоналу з xargs, він має можливість виконання програм через SSH на декількох серверах. Вибираємо декілька хостів, на яких будемо виконувати завдання («хмара»), ділимо список команд між ними і за допомогою parallel виконуємо завдання.
Створюємо файл
nodelist
зі списком машин і кількістю ядер, які ми там можемо утилізувати:
1/ cloudhost1.domain
1/ cloudhost2.domain

Запускаємо:
commands.list:
/usr/bin/uptime
/usr/bin/uptime

krash@krash:~$ parallel --sshloginfile nodelist echo "Run on host \`hostname\`: "\; {} ::: `cat commands.list`
Run on host cloudhost1.domain:
15:54 up 358 days 19:50, 3 users, load average: 25,18, 21,35, 20,48
Run on host cloudhost2.domain:
15:54 up 358 days 15:37, 2 users, load average: 24,11, 21,35, 21,46

Однак цей варіант ми теж відмітаємо чинності експлуатаційних особливостей: у нас немає відомостей про поточне завантаження та доступності хостів кластера, і можливе попадання в ситуацію, коли паралелізація принесе тільки шкоду, так як один із цільових хостів буде перевантажений.

Hadoop-based рішення

У нас є перевірений інструмент BI, який ми знаємо і вміємо використовувати, зв'язка Hadoop+Spark. Щоб втиснути наш код у рамках кластера, є два рішення:
Spark Python API (PySpark)
Оскільки вихідна задача написана на Python, а у Spark є відповідний API для цієї мови, можна спробувати перенести код на парадигму map/reduce. Але і цей варіант нам довелося відкинути, так як вартість адаптації була неприйнятною в рамках цього завдання.
Hadoop Streaming
Map/reduce-фреймворк Hadoop дозволяє виконувати завдання, написані не тільки на JVM-сумісних мовах програмування. У нашому конкретному випадку задача називається map-only – немає reduce-стадії, так як результати виконання не піддаються будь-якої подальшої агрегації. Запуск задачі виглядає так:
hadoop jar $path_to_hadoop_install_dir/lib/hadoop-streaming-2.7.1.jar \
-D mapreduce.job.reduces=0 \
-D mapred.map.tasks=$number_of_jobs_to_run \
-input hdfs:///path_for_list_of_jobs/ \
-output hdfs:///path_for_saving_results \
-mapper "my_python_job.py" \
-file "my_python_job.py"

Цей механізм працює наступним чином:
  1. Ми запитуємо у Hadoop-кластера (YARN) ресурси на виконання завдання.
  2. YARN виділяє якусь кількість фізичних JVM (YARN containers) на різних хостах кластера.
  3. Між контейнерами ділиться вміст файлів(а), що лежать в папці hdfs://path_for_list_of_jobs.
  4. Кожен з контейнерів, отримавши свій список рядків з файлу, запускає скрипт my_python_job.py і передає йому послідовно в STDIN ці рядки, інтерпретуючи вміст STDOUT як поворотне значення.
Приклад з запуском дочірнього процесу:
#!/usr/bin/python

import sys
import subprocess

def main(argv):
command = sys.stdin.readline()
subprocess.call(command.split())

if __name__ == "__main__":
main(sys.argv)

І варіант з «контролером», запускає бізнес-логіку:
#!/usr/bin/python

import sys

def main(argv):
line = sys.stdin.readline()
args = line.split()
MyJob(args).run()

if __name__ == "__main__":
main(sys.argv)

Цей підхід найбільш повно відповідає нашому завданні, але має ряд недоліків:
  1. Ми позбавляємося потоку STDOUT виконуваного завдання (він використовується в якості каналу комунікації), а хотілося б після завершення завдання мати можливість подивитися логи.
  2. Якщо в майбутньому ми захочемо запускати ще якісь завдання на кластері, нам доведеться робити для них wrapper.
В результаті аналізу вищеописаних варіантів реалізації ми прийняли рішення створити свій велосипед продукт.
Hadoop xargs
Вимоги, що пред'являються до розроблюваної системі:
  1. Виконання списку завдань з оптимальним використанням ресурсів Hadoop-кластера.
  2. Умова успішного завершення – «всі підзадачі відпрацювали успішно, інакше – fail».
  3. Можливість збереження підзадач для подальшого аналізу.
  4. Опціональний перезапуск завдання при коді виходу, відмінному від нуля.
В якості платформи для реалізації ми вибрали Apache Spark – ми з нею добре знайомі і вміємо її «готувати».
Алгоритм роботи:
  1. Отримати з STDIN список завдань.
  2. Зробити з нього Spark RDD (розподілений масив).
  3. Запитати у кластеру контейнери для виконання.
  4. Розподілити масив завдань по контейнерах.
  5. Для кожного контейнера запустити map-функції, що приймає на вхід текст зовнішньої програми і виробляє fork-exec.
Код всього додатка до непристойності простий, і безпосередньо інтерес представляє, власне, код функції:
package com.badoo.bi.hadoop.xargs;

import lombok.extern.log4j.Log4j;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.lang.NullArgumentException;
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.function.VoidFunction;

import java.io.IOException;
import java.util.Arrays;

/**
* Executor one of command
* Created by krash on 01.02.17.
*/
@Log4j
public class JobExecutor implements VoidFunction<String> {

@Override
public void call(String command) throws Exception {

if (null == command || command.isEmpty()) {
throw new NullArgumentException("Command can not be empty");
}

log.info("Going to launch '" + command + "'");
Process process = null;
try {

CommandLine line = CommandLine.parse(command);

ProcessBuilder builder = getProcessBuilder();
// quotes for in bash-style in order to pass correctly to execve()
String[] mapped = Arrays.stream(line.toStrings()).map(s -> s.replace("\'", "")).toArray(String[]::new);
builder.command(mapped);
process = builder.start();

int exitCode = process.waitFor();
log.info("Process " + command + " out with code " + exitCode);
if (0 != exitCode) {
throw new InstantiationException("Process " + command + " exited with non-zero exit code (" + exitCode + ")");
}
} catch (InterruptedException err) {
if (process.isAlive()) {
process.destroyForcibly();
}
} catch (IOException err) {
throw new InstantiationException(err.getMessage());
}
}

ProcessBuilder getProcessBuilder() {
return new ProcessBuilder().inheritIO();
}
}

Збірка

Збірка програми здійснюється стандартним для Java-світу інструментом – Maven. Єдина відмінність – у середовищі, в якій буде запускатися програма. Якщо ви не використовуєте Spark для вашого кластера, то збірка виглядає так:
mvn install clean

В цьому випадку отриманий JAR-файл буде містити в собі вихідний код Spark'а. У разі, якщо на машині, з якої здійснюється запуск програми, встановлений клієнтський код Spark, він повинен бути виключений зі складання:
mvn install clean -Dwork.scope=provided

В результаті такої збірки файл програми буде істотно менше (15 Кб проти 80 Мб).

Запуск

Нехай у нас є файл
commands.list
зі списком завдань такого виду:
/bin/sleep 10
/bin/sleep 20
/bin/sleep 30

Запускаємо додаток:
akrasheninnikov@cloududs1.mlan:~> cat log.log | /local/spark/bin/spark-submit --conf "spark.master=yarn-client" hadoop-xargs-1.0.jar
17/02/10 15:04:26 INFO Application: Starting application
17/02/10 15:04:26 INFO Application: Got 3 jobs:
17/02/10 15:04:26 INFO Application: /bin/sleep 10
17/02/10 15:04:26 INFO Application: /bin/sleep 20
17/02/10 15:04:26 INFO Application: /bin/sleep 30
17/02/10 15:04:26 INFO Application: Application name: com.badoo.bi.hadoop.xargs.Main
17/02/10 15:04:26 INFO Application: Execution environment: yarn-client
17/02/10 15:04:26 INFO Application: Explicit executor count was not specified, making same as job count
17/02/10 15:04:26 INFO Application: Initializing Spark
17/02/10 15:04:40 INFO Application: Initialization completed, starting jobs
17/02/10 15:04:52 INFO Application: Command '/bin/sleep 10' out on host bihadoop40.mlan
17/02/10 15:05:02 INFO Application: Command '/bin/sleep 20' out on host bihadoop31.mlan
17/02/10 15:05:12 INFO Application: Command '/bin/sleep 30' out on host bihadoop18.mlan
17/02/10 15:05:13 INFO Application: All the jobs completed in 0:00:32.258

Після завершення роботи через GUI YARN ми можемо отримати логи додатків, які запускали (приклад для команди
uptime
):
enter image description here
У разі неможливості виконання команди весь процес виглядає наступним чином:
akrasheninnikov@cloududs1.mlan:~> echo "/bin/unexistent_command" | /local/spark/bin/spark-submit --conf "spark.master=yarn-client" --conf "spark.yarn.queue=uds.misc" --conf "spark.driver.host=10.10.224.14" hadoop-xargs-1.0.jar
17/02/10 15:12:14 INFO Application: Starting application
17/02/10 15:12:14 INFO Main: Expect commands to be passed to STDIN, one per line
17/02/10 15:12:14 INFO Application: Got 1 jobs:
17/02/10 15:12:14 INFO Application: /bin/unexistent_command
17/02/10 15:12:14 INFO Application: Application name: com.badoo.bi.hadoop.xargs.Main
17/02/10 15:12:14 INFO Application: Execution environment: yarn-client
17/02/10 15:12:14 INFO Application: Explicit executor count was not specified, making same as job count
17/02/10 15:12:14 INFO Application: Initializing Spark
17/02/10 15:12:27 INFO Application: Initialization completed, starting jobs
17/02/10 15:12:29 Application ERROR: Command '/bin/unexistent_command' failed on host bihadoop36.mlan, 1 times
17/02/10 15:12:29 Application ERROR: Command '/bin/unexistent_command' failed on host bihadoop36.mlan, 2 times
17/02/10 15:12:30 Application ERROR: Command '/bin/unexistent_command' failed on host bihadoop36.mlan, 3 times
17/02/10 15:12:30 Application ERROR: Command '/bin/unexistent_command' failed on host bihadoop36.mlan, 4 times
17/02/10 15:12:30 ERROR Main: FATAL ERROR: Failed to execute all the jobs
java.lang.InstantiationException: Cannot run program "/bin/unexistent_command": error=2, No such file or directory
at com.badoo.bi.hadoop.xargs.JobExecutor.call(JobExecutor.java:56)
at com.badoo.bi.hadoop.xargs.JobExecutor.call(JobExecutor.java:16)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachAsync$1.apply(JavaRDDLike.scala:690)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachAsync$1.apply(JavaRDDLike.scala:690)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$15.apply(AsyncRDDActions.scala:118)
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$foreachAsync$1$$anonfun$apply$15.apply(AsyncRDDActions.scala:118)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Висновок
Розроблене рішення дозволило нам дотримати всі умови вихідної задачі:
  1. Ми отримуємо у Hadoop ядра для запуску нашого додатка, згідно вимогам (числу ядер) – максимальний рівень паралелізації.
  2. За видачу ресурсів враховуються завантаження і доступність хостів (за рахунок API YARN).
  3. Ми зберігаємо вміст STDOUT/ STDERR всіх завдань, які запускаємо.
  4. Не довелося переписувати вихідне додаток.
  5. "Write once, run anywhere" © Sun Microsystems – розроблене рішення тепер можна використовувати для запуску будь-яких інших завдань.
Радість від отриманого результату була настільки велика, що ми не могли не поділитися нею з вами. Вихідні коди Hadoop xargs ми опублікували на GitHub.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.