Введення в Apache Spark

Привіт, хабр!

image

Минулого разу ми розглянули чудовий інструмент Vowpal Wabbit, який буває корисний у випадках, коли доводиться навчатися на вибірках, що не поміщаються в оперативну пам'ять. Нагадаємо, що особливістю цього інструменту є те, що він дозволяє будувати в першу чергу лінійні моделі (які, до речі, мають хорошу узагальнюючу здатність), а висока якість алгоритмів досягається за рахунок відбору та генерації ознак, регуляризації та інших додаткових прийомів. Сьогодні розглянемо інструмент, який більш популярний і призначений для обробки великих обсягів даних Apache Spark.

Не будемо вдаватися в подробиці історії виникнення даного інструменту, а також його внутрішнього устрою. Зосередимося на практичних речах. У цій статті ми розглянемо базові операції і основні речі, які можна робити в Spark'е, а в наступний раз розглянемо докладніше бібліотеку MlLib машинного навчання, а також GraphX для обробки графів (автор даного поста в основному для цього і використовують даний інструмент — це як раз той випадок, коли найчастіше граф необхідно тримати в оперативній пам'яті на кластері, у той час як для машинного навчання дуже часто досить Vowpal Wabbit'а). У цьому мануалі не буде багато коду, т. к. розглядаються основні поняття і філософія Spark'а. В наступних статтях (про MlLib і GraphX) ми візьмемо який-небудь датасет і докладніше розглянемо Spark на практиці.

Відразу скажемо, що нативно Spark підтримує Scala, Python та Java. Приклади будемо розглядати на Python, т. к. дуже зручно працювати безпосередньо у IPython Notebook, вивантажуючи невелику частину даних з кластера і обробляючи, наприклад, пакетом Pandas — виходить досить зручна зв'язка

Отже, почнемо з того, що основним поняттям у Spark'е є RDD (Resilient Distributed Dataset), який представляє собою набір даних, над яким можна робити перетворення двох типів (і, відповідно, вся робота з цими структурами полягає в послідовності цих двох дій).
image

Трансформації
Результатом застосування цієї операції до RDD є новий RDD. Як правило, це операції, які яким-небудь чином перетворюють елементи даного датасета. Ось неповний найпоширеніших перетворень, кожне з яких повертає новий датасет (RDD):

.map(function) — застосовує функцію function до кожного елементу датасета

.filter(function) — повертає всі елементи датасета, на яких функція function повернула справжнє значення

.distinct([numTasks]) — повертає датасет, який містить унікальні елементи вихідного датасета

Також варто зазначити про операції над множинами, зміст яких зрозумілий з назв:

.union(otherDataset)

.intersection(otherDataset)

.cartesian(otherDataset) — новий датасет містить у собі всілякі пари (A,B), де перший елемент належить вихідного датасету, а другий — датасету-аргументу

Дії
Дії застосовуються тоді, коли необхідно матеріалізувати результат — як правило, зберегти дані на диск, або вивести частину даних в консоль. Ось список найбільш поширених дій, які можна застосовувати над RDD:

.saveAsTextFile(path) — зберігає дані в текстовий файл (hdfs, на локальну машину або будь-яку іншу підтримувану файлову систему — повний список можна подивитися в документації)

.collect() — повертає елементи датасета у вигляді масиву. Як правило, це застосовується у випадках, коли даних в датасете вже мало (застосовані різні фільтри і перетворення) — і необхідна візуалізація, або додатковий аналіз даних, наприклад засобами пакету Pandas

.take(n) — повертає у вигляді масиву перші n елементів датасета

.count() — повертає кількість елементів у датасете

.reduce(function) — знайома операція для тих, хто знайомий з MapReduce. З механізму цієї операції випливає, що функція function (яка приймає на вхід 2 аргумент повертає одне значення) повинна бути обов'язково комутативність і асоціативної

Це засади, які необхідно знати при роботі з інструментом. Тепер займемося трохи практикою і покажемо, як завантажувати дані в Spark і робити з ними прості обчислення

При запуску Spark, перше, що необхідно зробити — це створити SparkContext (якщо говорити простими словами — це об'єкт, який відповідає за реалізацію більш низькорівневих операцій з кластером — докладніше див. документацію), який при запуску Spark-Shell створюється автоматично і доступний відразу (об'єкт sc)

Завантаження даних
Завантажувати дані в Spark можна двома шляхами:

а). Безпосередньо з локальної програми за допомогою функції .parallelize(data)

localData = [5,7,1,12,10,25]
ourFirstRDD = sc.parallelize(localData)

б). З підтримуваних сховищ (наприклад, hdfs) з допомогою функції .textFile(path)

ourSecondRDD = sc.textFile("path to some data on the cluster")

У цьому пункті важливо відзначити одну особливість зберігання даних в Spark'e і в теж час саму корисну функцію .cache() (почасти завдяки якій Spark став так популярний), яка дозволяє закешувати дані в оперативній пам'яті (з урахуванням доступності останньої). Це дозволяє проводити повторне обчислення в оперативній пам'яті, тим самим позбувшись від IO-overhead'а. Це особливо важливо в контексті машинного навчання та обчислень на графах, т. до. більшість алгоритмів повторне — починаючи від градієнтних методів, закінчуючи такими алгоритмами, як PageRank

Робота з даними
Після завантаження даних в RDD ми можемо робити над ним різні трансформації та дії, про які говорилося вище. Наприклад:

Подивимося перші кілька елементів:

for item in ourRDD.top(10): 
print item

Або відразу завантажимо ці елементи в Pandas і будемо працювати з DataFrame'ом:

import pandas as pd
pd.DataFrame(ourRDD.map(lambda x: x.split(";")[:]).top(10))

Взагалі, як видно, Spark настільки зручний, що далі, напевно немає сенсу писати різні приклади, а можна просто залишити цю вправу читачеві — багато обчислення пишуться буквально кілька рядків

Наостанок, покажемо лише приклад трансформації, а саме, обчислимо максимальний і мінімальний елементи нашого датасета. Як легко здогадатися, зробити це можна, наприклад, за допомогою функції .reduce():

localData = [5,7,1,12,10,25]
ourRDD = sc.parallelize(localData)
print ourRDD.reduce(max)
print ourRDD.reduce(min)

Отже, ми розглянули основні поняття, необхідні для роботи з інструментом. Ми не розглядали роботу з SQL, роботу з парами <ключ, значення> (що робиться легко — для цього достатньо застосувати до RDD, наприклад, фільтр, щоб виділити, ключ, а далі — вже легко користуватися вбудованими функціями, на зразок sortByKey, countByKey, join і ін) — читачеві пропонується ознайомитися з цим самостійно, а при виникненні питань — написати в коментарі. Як вже було зазначено, наступного разу ми розглянемо докладно бібліотеку MlLib і окремо — GraphX

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.