R і Spark

imageSpark – проект Apache, призначений для кластерних обчислень, являє собою швидку і універсальну середу для обробки даних, в тому числі і для машинного навчання. Spark має API R(пакет SparkR), який входить в сам дистрибутив Spark. Але, крім роботи з даними API, є ще два альтернативних способи роботи з Spark R. Отже, ми маємо три різні способи взаємодії з кластером Spark. В цьому пості наводитися огляд основних можливостей кожного із способів, а також, використовуючи один з варіантів, побудуємо найпростішу модель машинного навчання на невеликому обсязі текстових файлів (3,5 ГБ, 14 млн. рядків) на кластері Spark розгорнутого Azure HDInsight.

Огляд засобів взаємодії з Spark

Крім офіційного пакету SparkR, можливості в машинному навчанні якого слабкі (у версії 1.6.2 всього одна модель, у версії 2.0.0 їх чотири), є ще два варіанти доступу до Spark.
Перший варіант — це використання продукту від Microsoft — Microsoft R Server for Hadoop, який нещодавно була інтегрована підтримка Spark. Використовуючи даний продукт, можна обчислення за одним і тим же функцій R, в контексті локальних обчислень, Hadoop (map-reduce) або Spark. Крім локальної установки R і доступу до кластеру Spark, служба Microsoft Azure HDInsight дозволяє розгорнути готові кластери, і, крім звичайного кластера Spark, є можливість розгорнути кластер R Server on Spark. Даний сервіс являє собою кластер Spark з передвстановленим R server for Hadoop на додатковому, прикордонному вузлі, що дозволяє відразу обчислення, як локально на даному сервері, так і перемикатися на контекст Spark або Hadoop. Використання даного продукту досить добре описано в офіційній документації до HDInsight на сайті Microsoft.
Другий варіант -це використання нового пакету sparklyr, який поки знаходиться в стадії розробки. Цей продукт розробляється під егідою RStudio — компанії, під крилом якої випущені одні з найбільш корисних і необхідних пакетівknitr, ggplot2, tidyr, lubridate, dplyr та інші, тому цей пакет може стати ще одним лідером. Поки даний пакет слабо документований, так як ще офіційно не випущений.
На основі документації і експериментів з кожним з цих способів роботи з Spark, підготував наступну таблицю (Табл. 1) з узагальненими функціональними можливостями кожного із способів (також додав SparkR 2.0.0, в якому можливостей стало трохи більше).
image
Таблиця 1. Огляд можливостей різних способів взаємодії з Spark

Як видно з таблиці, немає жодного засоби повною мірою реалізують необхідні потреби «з коробки», але пакет sparklyr вигідно відрізняється відSparkR R Server. Основні його достоїнства – читання csv,json, parquet файли hdfs. Повністю сумісний з dplyr синтаксис маніпулювання даними – включає в себе операції фільтрації, вибору колонок, агрегуючі функції, можливості виконувати злиття даних, модифікацію імена колонок і багато іншого. На відміну від SparkR або R server for Hadoop, де деякі з цих завдань або не виконуються, або виконуються дуже незручно (R server for Hadoop злиття даних для об'єктів немає зовсім, воно підтримується тільки для вбудованого типу даних xdf). Ще однією перевагою пакета є можливість написання функцій для запуску методів Java безпосередньо з R коду.
Приклад
count_lines <- function(sc, file) {
spark_context(sc) %>% 
invoke("textFile", file, 1L) %>% 
invoke("count")
}
count_lines(sc, "/text.csv")

Завдяки цьому, можна реалізувати відсутню функціональність пакету, використовуючи існуючі методи java Spark або реалізувавши їх самостійно.
І, зрозуміло, кількість моделей машинного навчання значно більше, ніж у SparkR (навіть у версії 2.0) і R server for Hadoop. Тому зупинимо свій вибір на даному пакеті, як найбільш перспективний і зручний у використанні. Кластер Spark був розгорнутий, при використанні Azure HDInsight хмарної служби пропонує розгортання 5 типів кластерів (HBase, Storm, Hadoop, Spark, R Server on Spark), в різних конфігураціях при мінімальних зусиллях.

Використовувані ресурси

  • Кластер HDInsight Apache Spark 1.6 Linux (розгортання кластера докладно описано в документації Microsoft Azure)
  • R 3.3.2 инсталлированный на головний вузол
  • RStudio preview редакції (дод. можливості для sparklyr), инсталлированный також на головний вузол
  • Putty клієнт, для встановлення сесії з головним вузлом кластера тунелювання порту RStudio на порт локального хоста (налаштування RStudio та його тунелювання описана в документації Microsoft Azure)


Налаштування середовища

Спочатку розгортаємо кластер Spark — я вибрав конфігурацію з 2 головними вузлами D12v2 і 4 робочими вузлами D12v2. (D12v2: 4 ядра/28 ГБ ОЗУ, 200 ГБ диск, ця конфігурація не зовсім оптимальна, але для демонстрації синтаксис sparklyr підходить). Опис розгортання різних типів кластером і роботи з ними описано у документації на HDInsight. Після успішного розгортання кластера, використовуючи підключення по SSH до робочого вузла, встановлюємо туди R і RStudio, з необхідними залежностями. RStudio бажано використовувати preview редакції, так як у неї з'явилися додаткові можливості для пакета sparklyr – додаткове вікно, в якому відображаються вихідні датафреймы в Spark, і можливість переглянути їх властивості або їх самих. Після установки R, R-Studio, переустанавливаем з'єднання, використовуючи тунелювання на localhost:8787.
Отже, тепер у браузері за адресою localhost:8787 ми підключаємося до RStudio і продовжуємо працювати.

Підготовка даних

Весь код даної задачі наведено в кінці цього поста.
Для даної тестової задачі, будемо використовувати csv файли NYC Taxi датасета, розташовані за адресою NYC Taxi Trips. Дані являють собою інформації про поїздки на таксі та їх оплаті. Для цілей ознайомлення, обмежимося одним місяцем. Побудова моделі на тому ж повному наборі даних, але використовуючи R Server for Hadoop (у контексті Hadoop), описаний у статті: Exploring NYC Taxi Data with Microsoft R Server and HDInsight. Але там читання файлів, вся передобробка — фільтрація даних, злиття таблиць було виконано в Hive, і в R Server лише будували модель, тут же все зроблено на звичайному R використовуючи sparklyr.
Перемістивши обидва файлу hdfs кластера Spark, і використовуючи функцію sparklyr, читаємо дані файли.

Маніпуляція даними

Файли по поїздкам і тарифами пов'язані з ключем — стовпцях "medallion", "hack_licence" і "pickup_datetime", тому виконаємо приєднання зліва до датафрейму data, датафрейма fare. Після об'єднання даних і маніпуляцій, зберігаємо датафрейм у форматі parquet. Перш ніж будувати модель, подивимося на дані, для цього створимо вибірку з 2000 випадкових спостережень і передамо їх в R, використовуючи collect. На даній малій вибірці, побудували просту діаграму ggplot2 (залежність від плати за проїзд, із зазначенням розміру точки — відстанню маршруту і кольором точки кількістю пасажирів, і розбитою на панель-сітку за типами оплати і оператора таксі) (рис. 1).
image
Малюнок 1 Діаграма зображує основні залежності

На ній видно, що присутня залежність (лінійна, як «стандарт» % від рахунку) розміру чайових від вартості проїзду, більша частина платежів здійснено з використанням кредитної картки (панель CRD) і готівки (панель CSH), і що при оплаті готівкою чайові завжди відсутні (ймовірно, це пояснюється тим, що при оплаті готівкою чайові вже входять у вартість оплати, а при оплаті карткою немає). Тому у вибірці для навчання залишаємо тільки ті поїздки, які оплачувалися кредитною карткою. Об'єднаний датафрейм, використовуючи зручний синтаксис dplyr, і пайпинг magrittr, передаємо далі по ланцюжку: відбір рядків (виключаючи викиди і нелогічні значення) і колонок (залишаючи тільки потрібні для побудови моделі), передаємо фінальний датасет в функцію лінійної регресії. Для тренування моделі використовуємо 70% всіх даних, для тіста, що залишилися 30%. Для даної задачі використовуємо просту лінійну регресію. Залежність, яку ми хочемо виявити, це розмір чайових від параметрів поїздки. Дана модель на цих даних достатньо выроджена і не цілком коректна (є велика кількість чайових рівних 0), але вона проста, покаже інтерпретовані коефіцієнти моделі і дозволить продемонструвати основні можливості sparklyr. У моделі будемо використовувати такі предиктори: vendor_id – ідентифікатор оператора таксі, passenger_count – число пасажирів, trip_time_in_secs – час поїздки,trip_distance — відстань поїздки, payment_type – тип платежу, fare_amount – ціна поїздки, surcharge – збір. В результаті навчання, модель має наступний вигляд:
Call: ml_linear_regression(., response = "tip_amount", features = c("vendor_id", "passenger_count", "trip_time_in_secs", "trip_distance", "fare_amount", "surcharge"))

Deviance Residuals: (approximate):
Min 1Q Median 3Q Max 
-27.55253 -0.33134 0.09786 0.34497 31.35546 

Coefficients:
Estimate Std. Error t value Pr(>|t|) 
(Intercept) 3.2743 e-01 1.4119 e-03 231.9043 < 2e-16 ***
vendor_id_VTS -1.0557 e-01 1.1408 e-03 -92.5423 < 2e-16 ***
passenger_count -1.0542 e-03 4.1838 e-04 -2.5197 0.01175 * 
trip_time_in_secs 1.3197 e-04 2.0299 e-06 65.0140 < 2e-16 ***
trip_distance 1.0787 e-01 4.7152 e-04 228.7767 < 2e-16 ***
fare_amount 1.3266 e-01 1.9204 e-04 690.7842 < 2e-16 ***
surcharge 1.4067 e-01 1.4705 e-03 95.6605 < 2e-16 ***
---
Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 '' 1

R-Squared: 0.6456
Root Mean Squared Error: 1.249

Використовуючи дану модель, передбачаємо значення на тестовій вибірці.

Висновки

У цій статті приведені основні функціональні можливості трьох способів взаємодії з Spark R і наведено приклад, що реалізує читання файлів, їх предобработку, маніпуляції з ними і побудова найпростішої моделі машинного навчання, використовуючи пакет sparklyr.

Вихідний код
devtools::install_github("rstudio/sparklyr")
library(sparklyr)
library(dplyr)
spark_disconnect_all()
sc <- spark_connect(master = "yarn-client")
data_tbl<-spark_read_csv(sc, "data", "taxi/data")
fare_tbl<-spark_read_csv(sc, "fare", "taxi/fare")
fare_tbl <- rename(fare_tbl, 
medallionF = medallion, 
hack_licenseF = hack_license, 
pickup_datetimeF=pickup_datetime)


taxi.join<-data_tbl %>% left_join(fare_tbl, by = c("medallion"="medallionF", 
"hack_license"="hack_licenseF", 
"pickup_datetime"="pickup_datetimeF", 
))
taxi.filtered <- taxi.join %>%
filter(passenger_count > 0 , passenger_count < 8 ,
trip_distance > 0 , trip_distance <= 100 ,
trip_time_in_secs > 10 , trip_time_in_secs <= 7200 ,
tip_amount >= 0 , tip_amount <= 40 ,
fare_amount > 0 , fare_amount <= 200, payment_type=="CRD" ) %>%
select(vendor_id,passenger_count,trip_time_in_secs,trip_distance,
fare_amount,surcharge,tip_amount)%>%
sdf_partition(training = 0.7, test = 0.3, seed = 1234)

spark_write_parquet(taxi.filtered$training, "taxi/parquetTrain")
spark_write_parquet(taxi.filtered$test, "taxi/parquetTest") 

for_plot<-sample_n(taxi.filtered$training,1000)%>%collect()
ggplot(data=for_plot, aes(x=fare_amount, y=tip_amount, color=passenger_count, size=trip_distance))+
geom_point()+facet_grid(vendor_id~payment_type)

model.lm <- taxi.filtered$training %>%
ml_linear_regression(response = "tip_amount", features = c("vendor_id",
"passenger_count",
"trip_time_in_secs",
"trip_distance",
"fare_amount",
"surcharge"))
print(model.lm)
summary(model.lm)

predicted <- predict(model.lm, newdata = taxi.filtered$test)
actual <- (taxi.filtered$test %>%
select(tip_amount) %>%
collect())$tip_amount

data <- data.frame(predicted = predicted,actual = actual)


Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.