Apache Spark в «бойових» проектах — досвід виживання

Пропонуємо вашій увазі матеріали за мотивами виступу Олександра Сербула на конференції BigData Conference. Я, як автор і доповідач, текст трохи відредагував і додав сучасних думок і актуальних проблем, тому сподіваюся пост принесе вам додаткові корисні практичні знання в галузі, так і їжу для роздумів — куди податися зі своїми знаннями. Отже — до бою!


Bigdata
В моєму розумінні, BigData — це щось божевільне, тобто термін є, а звивин всередині нього — ні. З одного боку, адміністратори ламають голову, куди складати значний обсяг інформації. З іншого боку, це высоконагруженная розробка. Необхідно створювати кластер, шукати хардкорних розробників, дуже досвідчених. Крім того, необхідно використовувати вищу математику: диференціальні обчислення, лінійну алгебру, теорію ймовірності, машинне навчання, графи і навчання на них, логістичну регресію, лінійний дискримінантний аналіз і так далі. Як вижити в цій ситуації, які інструменти взяти, які ще сирі (на жаль, більшість), як принести фірмі гроші. Адже це основна мета використання BigData, а все інше — популізм.

MapReduce
MapReduce — це парадигма згортки функцій для виконання великих і складних запитів за даними, запропонована Google. Вона дозволяє паралельно обробляти інформацію в дусі data parallelizm. В ідеалі, потрібно всі алгоритми, використовувані для роботи з великими і не тільки великими даними, переробити на MapReduce. Але ніхто не робить це безкоштовно ;-) В старій книжці, у бабусі на полиці в павутині ви досить швидко знайдете хороший алгоритм кластеризації K-means і він буде надійний працювати. Але раптом, перед релізом, або коли даних виявиться більше, ніж ви очікували, ви виявите, що цей алгоритм не працює в «паралельному режимі», не працює через MapReduce — їм можна завантажити лише одне ядро процесора. Тому вам потрібно буде екстрено і заново винайти ще один алгоритм, який вміє працювати паралельно, і придумати, як він повинен працювати в парадигмі MapReduce. А це вдається далеко не всім – це доля Computer Science (насправді іноді вдається без PhD знань переробити алгоритм на MapReduce алгоритм методом міцної кави і дошки з маркерами).
Ми в своїх проектах почали з використання алгоритмів на платформі Hadoop MapReduce, але потім перейшли на Spark, тому що він виявився більш розумно і практично влаштованим. Класичний Hadoop MapReduce працює дуже просто: виконав завдання, поклав результат у файл. Взяв, виконав, поклав. Spark — бере, виконує всі завдання, а потім вивантажує результат. На мій погляд і не тільки мій, Hadoop MapReduce, як не крути — застарілий конгломерат, який постійно і конвульсионно намагається змінитися, із-за чого розробникам і сисадмінам постійно доводиться переучуватися, а бізнесу – грати в російську рулетку з «сирими технологіями». Але… вибору у нас майже немає (так, ми дивилися Apache Storm — але він зовсім з іншої області: task parallel computng).

Альтернатив Apache Spark, якщо чесно, поки що не видно. Це і найактивніший open-source проект в інфраструктурі Apache, це і об'єкт для наслідування — подивіться хоча б на Prajna від Microsoft.

Можна кинути у відповідь Apache Tez або відшукати що-небудь дрібне в зоопарку Apache — але, повірте, для зниження ризиків краще використовувати mainstream-технології, які розвиваються в ногу з ринком.

Десь поруч, не зовсім з цієї області, але цікавого і якщо дуже хочеться — подивіться також на Apache Giraph і TensorFlow. І запитайте: це TaskParallel або DataParallel технології — і все стане зрозуміло.

Для яких завдань ми використовуємо Spark
Ми використовуємо технології паралельної обробки даних приблизно так. На MySQL-серверах крутяться сотні тисяч баз даних клієнтів-компаній, зі штатом від одиниць до тисяч співробітників. Spark використовується в основному в сервісі персональних рекомендацій, про який розповідалося в одній з минулих публікацій.

Збираємо події — перегляди замовлень, додавання в корзину, оплати замовлень, — обробляємо, кладемо в Amazon Kinesis, обробляємо worker'ами, зберігаємо в NoSQL (DynamoDB) і, нарешті, відправляємо в Spark для генерації моделей і аналітики.

На перший погляд, схема виглядає надмірно ускладненою, але поступово приходить розуміння для чого все це потрібно. Це — мінімум необхідних інструментів, які потрібні для виживання.

Отримані результати ми вивантажуємо в Apache Mahout і на виході отримуємо конкретні рекомендації для клієнта. Загальний обсяг подій, що реєструються сервісом рекомендацій, досягає десятків мільйонів на день.

Поки ми інтенсивно використовуємо і розвиваємо алгоритми колаборативної фільтрації, але бачимо приблизно таку дорожню карту розвитку алгоритмів:
• Мультимодальность
• Content-based рекомендації
• Кластеризація
• Machine learning, deep learning
• Таргеттинг

Зараз, як ніколи раніше, цінується мультимодальность — тобто використання декількох, різних алгоритмів, для відповіді на запитання (у нашому випадку — видачі персональної рекомендації). Недостатньо просто запустити сервіс на базі Apache Mahout, це не дасть ніякого виграшу перед конкурентами. Колаборативної фільтрації сьогодні вже нікого не здивуєш. Потрібно враховувати і хмара тегів користувача, коли він ходив по магазину і отримував якусь інформацію. Кластеризація дозволяє більш гнучко організувати таргетування інформації. Тут, звичайно, не обійтися без machine learning. Deep learning — це, простими словами, «якісне» машинне навчання, що передбачає дуже детальне вивчення проблеми машиною та часто використання багатошарової рекурентної нейронної мережі. При грамотному застосуванні це допомагає ще більше збільшувати конверсію, більш ефективно працювати з клієнтами.

Зворотна сторона різноманітності
Сьогодні на ринку існує безліч програмних середовищ, засобів, інструментів і продуктів для розробки та аналізу даних. Спасибі opensource (так, там повно сирих глюковатых вовкулаків, але є й відмінні рішення)! З одного боку, різноманітність — це безсумнівне благо. З іншого — процес розробки може стати досить хаотичним і безглуздий. Наприклад, спочатку пробують використовувати Apache Pig, коли щось не виходить, звертаються до Apache Hive. Подивилися, погралися, починають писати на Spark SQL. Коли запити починають падати — кидаються до Impala (а там все ще гірше). Під загрозою суїциду деякі проклинають світ BigData і повертаються до старих добрих РСУБД. Таке іноді враження, що створена купа іграшок для «фахівців», часто такими «фахівцями». А бізнес не розуміє всіх цих шукань, йому потрібно заробляти гроші, він вимагає конкретики в строк.
Сьогодні, мабуть, лише Apache Hive можна вважати класичним і надійним інструментом для роботи з SQL-запитами з розподіленими даними, як і HDFS є класикою серед кластерних файлових систем (так, є звичайно ще GusterFS, CEPH). Тим не менш, багато хто переходить на Spark SQL, тому що цей фреймворк написаний (хочеться вірити) з урахуванням інтересів бізнесу. Також, все більш активно використовуються HBase, Casandra, Mahout, Spark MLLib. Однак вимагати, щоб кожен розробник і/або сисадмін вільно орієнтувався у всіх цих інструментах — нерозумно. Це профанація. Технології — глибокі, з купою налаштувань і кожна вимагає глибокого занурення на місяці. Не поспішайте — за гонки за кількістю неминуче буде страждати якість.

Що почитати
В першу чергу хочу порекомендувати всім, хто працює або має намір почати працювати з паралельними алгоритмами і MapReduce, прочитати книгу «Mining of Massive Datasets», що знаходиться у відкритому доступі. Її треба прочитати кілька разів, з блокнотиком і олівцем, інакше каші в голові не уникнути. Спочатку нічого не буде зрозуміло (мені стало відкриватися рази з 3). Але це одна з базових і, що важливо, доступних для інженерів не володіють чорним поясом з математики, книга за алгоритмами роботи з великими даними. Зокрема, глава 2.3.6 присвячена реляційної алгебри та способів її проектування операцій на MapReduce. Дуже корисний матеріал, по суті, тут представлено готові поради для розробників, достатньо їх тільки уважно реалізувати.

Читаючи літературу, повну математичних деталей, згадуйте анекдот і посміхайтеся :-)

Двоє летять на повітряній кулі, потрапили в туман, заблукали. Раптом їх
притискає до землі, і вони бачать людини. Один з них кричить вниз: "Де
ми?". Людина, подумавши, відповідає: "Ви на повітряній кулі...". Черговим
поривом вітру куля несеться вгору.
— Він ідіот?
— Ні, математик.
— ??????
— По-перше, він подумав, перш ніж відповісти. По-друге, дав абсолютно
точний відповідь. А по-третє, ця відповідь зовсім нікому не потрібен.

Смаколики Apache Spark
• DAG (directed acyclic graph) vs Hadoop MapReduce vs Hadoop Streaming. Можна написати SQL-запит, який представляє собою кілька MapReduce-операцій в ланцюжку, який буде виконуватися коректно. Streaming реалізований в Spark набагато краще, ніж у Hadoop, їм набагато зручніше користуватися і працює часто ефективніше, за рахунок кешування даних в пам'яті.
• Spark Programming Guide + API. Документація вельми розумна і корисна.
• Можна програмувати на Java. С++ вважався складним мовою, але Scala набагато… ні, не складніше, швидше высокоумнее на мій погляд. Scala — класний мова, незважаючи на якусь академічну протухшесть і неорганічну зв'язок з живими мерцями з широко витріщеними очима типу Haskel. Я дуже люблю Scala, але від нього можна збожеволіти, а час компіляції залишає бажати кращого собі і своїм дітям. Тому, при бажанні, до Spark можна підключитися і з Java і Python і з R.
• Зручна абстракція: Resilient Distributed Dataset (RDD). Прекрасна, просто божественна концепція світу з функціонального програмування, що дозволяє распараллеливать файли величезного обсягу — на сотні гігабайт, або навіть терабайти.
• Зручні колекції: filter, map, flatMap. Зручний підхід до Spark — колекції та операції над ними: filter, map, flatMap. Звідки це взялося? Це прийшло з функціонального програмування, яке зараз активно проповідується Scala (і не тільки в ньому).

Java7 і Spark
Історично склалося так, що ми пишемо в Spark на Java 7, а не на Java 8. На жаль, в «сімці» немає нормальної підтримки функціональних об'єктів, тому ми змушені займатися садомазохізмом і створювати об'єкти типу PairFunction, Tuple2, TupleN. Загалом готуйтеся — коли Java7 інтегрується зі Scala, то виходить моторошно нечитаний код. Я, звісно, трохи перебільшую, але в ньому все перемішано і хочеться одягнути окуляри з 13 окулярами.
JavaRDD<String> combinedRowsOrdered = combinedRows.mapToPair(new PairFunction<String,String,String>() {...
public Tuple2<String,String> call( String row ) {
...return new Tuple2<String, String>...

Якщо ви не хочете лізти в нетрі Scala, то краще використовуйте Java 8. Код виходить більш читається і коротше.

Ще трохи про Scala
Назва Scala походить від англійського scalable. Це мова для розробки великих компонентних програм, створений вченими-математиками. У мене особисто складається враження, що Scala — це тренд. Тренд пожвавлення функціонального програмування (знову привіт Haskel, F#, ...). Як то «раптом» виявилося (хоча вчені про це здогадувалися набагато раніше), що обробляти масиви даних у парадигмі Data-Parallel – зручніше у функціональному стилі, вау! :-) Spark активно використовує і Scala Actors (здрастуй Erlang). Вони дозволяють писати простою, читаний, однопотоковий код, який виконується на великій кількості серверів. Ви позбавляєтеся від ризиків багатопоточного програмування, яким змушені займатися в Java — це складно, довго і дорого (зате круто). Крім того, із-за складності багатопоточного програмування виникає чимало помилок. А завдяки акторам життя «раптом» спрощується.

Spark-кластер
Для розгортання в Amazon Spark пропонує нам скрипт під назвою Spark-ес2. Він викачує половину репозиторію Берклі, щось творить на Ruby під капотом (здрастуй Японія) і встановлює купу якогось софту. Вийшла система досить крихке, чутлива до змін конфігурації машин. Також є нарікання на логування та оновлення компонентів системи.
Якийсь час ми існували зі скриптом Spark-ec2, але виявилося, що краще самостійно написати інсталятор Spark. Крім того, інсталятор зможе враховувати можливість підключення нових spot-машин.
Все це для нас болісно, оскільки у нас немає великого штату сисадмінів — ми більше програмісти Якщо б у нас було 30 сисадмінів, я б сказав: «Хлопці, я буду програмувати на Scala, а ви тут, будь ласка, не спите ночами і займайтеся кластерами Spark». Куди більш привабливим варіантом виявилася зв'язка з Spark і Elastic MapReduce. Також колеги хвалять рішення з Spark від Cloudera і HortonWorks — може вони вам теж виявляться корисними.

Amazon EMR
Амазон пропонує нам не втрачати час і розгорнути кластер Spark у них з використанням сервісу ElasticMapReduce. Тут майже все буде працювати з коробки. Spark інтегрований в Yarn-кластер, є купа софту, є подглючивающий моніторинг, можна додавати машини, масштабувати HDFS, змінювати розмір кластера, збільшувати і зменшувати кількість завдань за рахунок spot-машин. Spot-машини в Amazon коштують в 5-10 разів дешевше. Ми їх використовуємо завжди, тому що це зручно, дешево і швидко.

Spark в EMR професійно інтегрований з S3. Це правильно, адже саме там ви швидше за все будете зберігати файли в Amazon. Ми порівнювали зберігання великих файлів в S3 і HDFS, і виявилося, що швидкість доступу приблизно однакова. Тоді навіщо зв'язуватися з HDFS, мучитися з кластерною файловою системою, якщо є готовий сервіс. Також в Elastic MapReduce до потрухів Sparlk/Hadoop можна прокинути через ssh-туннелінг веб-адмінки і звикнути до них (хоча я так і не звик).

Вартість Amazon EMR
Виходить трохи дорожче, ніж звичайні машини, різниця близько 10%. При цьому ви отримуєте «в комплекті» багато готового, правда трохи глючащего софта (Hue глючить більше всіх) і можливість масштабувати кластер. При цьому вам навіть адмін не потрібен — ви, як розробники, там царі і боги.
Типи машин
Машини тут бувають трьох типів:
• Master-машини, які контролюють взагалі весь кластер. На них встановлено Spark Master.
• Core-машини, на яких розгорнуто файлова система — HDFS. Їх може бути кілька штук. Правда, рекомендується збільшувати кількість core-машин, а не зменшувати, інакше втрачаються дані.
• Для решти використовуються task-машини. Це звичайні Spark-сервери, на яких працюють воркеры. Кількість spot-машин можна вільно змінювати, створюючи парк хоч із сотень машин.
Софт
• Spark. У попередніх версіях образів Amazon поки не підтримується spark.dynamicAllocation.enabled, так що ви повинні самі говорити йому, скільки потрібно машин для виконання завдання. Якщо кластер частково простоює, то Spark не займе залишилися машини для виконання. Ви повинні жорстко прописати, скільки машин йому потрібно. Це незручно. Але починаючи з AMI 4 ця функція вже працює.
• Hadoop/Yarn/HDFS. У Yarn-кластери, як і в Oracle, використовується безліч налаштувань, і, по-хорошому, потрібен адмін, який в цьому дуже добре розбирається. Але, незважаючи на біль, Hadoop-кластери впевнено вирішують свої завдання. Найбільше мені не подобається в Yarn і Hadoop те, що там бардак з логированием. В логи пишеться абсолютно все, налаштування рівнів логування розкидані по різних частинах кластерних потрухів і тому їх кількість дуже швидко розростається. І нормального вирішення цієї проблеми немає. Непогано було б повчитися у старих добрих мешканців unix – наприклад у mysql, apache.
• Ganglia. Це time-series софт, який будує графіки з різних метрик: навантаження, кількість завдань і т. д. Допомагає отримати уявлення про стан кластера. Зручна річ, але є недоліки – «убиті» спот машини продовжують висіти і захаращувати графіки.
• Hive. Це підтримка команд SQL, яка працює на файлах у HDFS і S3. Непоганий інструмент, але іноді його можливостей не вистачає. Використовуємо. Але коли потрібно більше — заходимо в Spark і безпосередньо вирішуємо завдання по реляційної алгебри.
• Pig. Ми його не використовуємо, тому дати якусь оцінку важко.
• Hbase. Варіант NoSQL, поки не використовуємо.
• Impala. Дуже цікава річ, про яку можна написати окремий пост.Поки справляє враження сирого софту. Так що використовуйте на свій страх і ризик.
• Hue. Це адмінка до «бигдате», написана на Python. Її GUI дозволяє об'єднати разом і Impala, і Hbase, і Pig, і Hive. Тобто можна зробити свій куточок аналітика в інтернеті :-) Я ним користувався тиждень, він став глючити, зависати, потім перестав відкриватися взагалі — загалом, недороблений софт

Основні проблеми Spark, з якими ми зустрічалися
Падіння по пам'яті
Що таке Map? Ми беремо те, що, раскидываем на ключі, а їх вже раскидываем по кластеру. Тут нічого не повинно впасти — алгоритмічно.
Що таке Reduce? Коли в один worker збираються згруповані по одному ключу дані. Якщо добре запрограмувати, то можна порційно передавати в reducer всі значення в межах одного ключа і нічого не буде падати. Але на практиці виявилося, що Spark може впасти в різних точках – то буфера не вистачило для серіалізації, то воркеру пам'яті не вистачило. І, на мій погляд, це основна проблема. Але все ж можна акуратно прилаштуватися. У нас Spark зараз не падає, хоча досягли ми цього за допомогою магії.

Обов'язково потрібно поставити розумне Executor Memory: --executor-memory 20G, --conf spark.kryoserializer.buffer.mb=256, --conf spark.default.parallelism=128, --conf spark.storage.memoryFraction=0.1
KryoSerializer дозволяє стискати об'єкти (spark.serializer org.apache.spark.serializer.KryoSerializer). Без цього вони споживають набагато більше пам'яті. Також не рекомендую зменшувати значення константи spark.default.parallelism=128, інакше може часто падати по пам'яті. Що стосується memoryFraction, то ми не використовуємо кешування.

Вивантаження результатів
Припустимо, вам потрібно вивантажити з Spark дані в модель. Якщо обсяг великий, то це буде виконуватися дуже довго.
• Завдяки --driver-memory 10G ви розумієте, що завантажуєте з драйвера.
• При використанні Colleсt() весь результат збирається в пам'ять в драйвері і він може впасти. Тому рекомендую використовувати toLocalIterator(). На жаль, його продуктивність дуже низька. Тому нам довелося написати код для складання партіцій. Кому цікаво, розповім детальніше.

Логування
Цей код — єдине, що допомогло нам впоратися з проблемою логування:
export SPARK_LOCAL_DIRS="/mnt/spark,/mnt2/spark"
export SPARK_WORKER_DIR="/mnt/spark_worker"
export SPARK_WORKER_OPTS="-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=1800 -Dspark.worker.cleanup.appDataTtl=259200"
#Worker executor stdout/stderr logs
spark.executor.logs.rolling.maxRetainedFiles 2
spark.executor.logs.rolling.time strategy
spark.executor.logs.rolling.time.interval daily


Висновок
Сподіваюся було і корисно, і цікаво. Все більше і активніше в наше життя рулять паралельні алгоритми на MapReduce. Їх мало, їх шукають, але іншого виходу схоже немає (ну може щось вийде швидше порахувати на Apache Giraph і TensorFlow і через парадигму Task-Parallel). Платформа, що стала класикою – Hadoop MapReduce, поступається місце функціонально написаної і на сучасному мовою і математиками платформі Apache Spark. Скоріше всього ви будете змушені почати розбиратися, хоча б на рівні термінів, у мешканців Hadoop-зоопарку: Hive, Pig, HDFS, Presto, Impala. Але постійно вчитися – наше все і щоб випереджати конкурентів потрібно знати більше, писати швидше і думати – яскравіше. Всім удачі і з наступаючим Новим Роком!

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.