Аналіз даних на Scala. Вважаємо кореляцію 21-го століття


Дуже важливо вибрати правильний інструмент для аналізу даних. На форумах Kaggle.com, де проводяться міжнародні змагання з Data Science, часто запитують, який інструмент краще. Перші рядки популярності займають R і Python. У статті ми розповімо про альтернативний стек технологій аналізу даних, зроблений на основі мови програмування Scala і платформи розподілених обчислень Spark.

Як ми дійшли до цього? Retail Rocket ми багато займаємося машинним навчанням на дуже великих масивах даних. Раніше для розробки прототипів ми використовували зв'язку IPython + Pyhs2 (hive драйвер для Python) + Pandas + Sklearn. В кінці літа 2014 року прийняли принципове рішення перейти на Spark, так як експерименти показали, що ми отримаємо 3-4 кратне підвищення продуктивності на тому ж парку серверів.

Ще один плюс — ми можемо використовувати одну мову програмування для моделювання та коду, який буде працювати на бойових серверах. Для нас це було великою перевагою, так як до цього ми використовували 4 мови одночасно: Hive, Pig, Java, Python, для невеликої команди це серйозна проблема.

Spark добре підтримує роботу з Python/Scala/Java через API. Ми вирішили вибрати Scala, так як саме на ньому написаний Spark, тобто можна аналізувати його вихідний код і при необхідності виправляти помилки, плюс — це JVM, на якому крутиться весь Hadoop. Аналіз форумів з мов програмування під Spark звів до наступного:

Scala:
+ функціональний;
+ рідний для Spark;
+ працює на JVM, а значить рідна для Hadoop;
+ сувора статична типізація;
— досить складний вхід, але код читабельний.

Python:
+ популярний;
+ простий;
— динамічна типізація;
— продуктивність гірше, ніж у Scala.

Java:
+ популярність;
+ рідний для Hadoop;
— занадто багато коду.

Більш докладно щодо вибору мови програмування для Spark можна прочитати тут.

Повинен сказати, що вибір дався не просто, так як Scala ніхто в команді на той момент не знав.
Відомий факт: щоб навчитися добре спілкуватися на мові, потрібно зануритися в мовне середовище і використовувати його як можна частіше. Тому для моделювання і швидкого аналізу даних ми відмовилися від питоновского стека на користь Scala.

В першу чергу потрібно було знайти заміну IPython, варіанти були такі:
1) Zeppelin — an IPython-like for notebook Spark;
2) ISpark;
3) Spark Notebook;
4) Spark IPython Notebook від IBM.

Поки що вибір упав на ISpark, так як він простий, — це IPython для Scala/Spark, до нього відносно легко вдалося прикрутити графіки HighCharts і R. І у нас не виникло проблем з підключенням його до Yarn-кластеру.

Наша розповідь про середовище аналізу даних на Scala складається з трьох частин:
1) Нескладне завдання на Scala в ISpark, яка буде виконуватися локально на Spark.
2) Налаштування та установка компонент для роботи в ISpark.
3) Пишемо Machine Learning завдання на Scala, використовуючи бібліотеки R.
І якщо ця стаття буде популярною, я напишу дві інші. ;)

Завдання
Давайте спробуємо відповісти на питання: чи залежить середній чек покупки в інтернет-магазині від статичних параметрів клієнта, які включають в себе населений пункт, тип браузера (мобільний/Desktop), операційну систему і версію браузера? Зробити це можна за допомогою «Взаємної інформації» (Mutual Information).

Retail Rocket ми багато де використовуємо ентропію для наших рекомендаційних алгоритмів і аналізу: класичну формулу Шеннона, розбіжність Кульбака-Лейблера, взаємну інформацію. Ми навіть подали заявку на доповідь на конференцію RecSys на цю тему. Цим заходам присвячений окремий, хоч і невеликий розділ у відомому підручнику з машинного навчання Мерфі.

Проведемо аналіз на реальних даних Retail Rocket. Попередньо я скопіював вибірку з нашої кластера до себе на комп'ютер у вигляді файлу csv.

Завантаження даних
Тут ми використовуємо ISpark і Spark, запущений у локальному режимі, тобто всі обчислення відбуваються локально, розподіл йде по ядрам. Власне в коментарях все написано. Найголовніше, що на виході ми отримуємо RDD (структура даних Spark), яка являє собою колекцію кейс-класів типу Row, який визначений в коді. Це дозволить звертатися до полів через ".", наприклад _.categoryId.

На вході:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.tribbloid.ispark.display.dsl._
import scala.util.Try

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

// Оголошуємо CASE class, він нам знадобиться для dataframe
case class Row(categoryId: Long, orderId: String ,cityId: String, osName: String,
osFamily: String, uaType: String, uaName: String,aov: Double)

// читаємо файл в змінну val з допомогою sc (Spark Context), оголошує його Ipython заздалегідь 
val aov = sc.textFile("file:///Users/rzykov/Downloads/AOVC.csv")


// парсим поля
val dataAov = aov.flatMap { line => Try { line.split(",") match {
case Array(categoryId, orderId, cityId, osName, osFamily, uaType, uaName, aov) =>
Row(categoryId.toLong + 100, orderId, cityId, osName, osFamily, osFamily, uaType, aov.toDouble)
} }.toOption }

На виході:
MapPartitionsRDD[4] at map at <console>:28

Тепер подивимося на самі дані:

У цій рядку використовується новий тип даних DataFrame, доданий до Spark у версії 1.3.0, він дуже схожий на аналогічну структуру в бібліотеці pandas в Python. toDf підхоплює наш кейс-клас Row, завдяки чому отримує назви полів та їх типи.

Для подальшого аналізу потрібно вибрати якусь одну категорію бажано з великою кількістю даних. Для цього потрібно отримати список найбільш популярних категорій.

На вході:
//Найбільш популярна категорія
dataAov.map { x => x.categoryId } // вибираємо поле categoryId
.countByValue() // розраховуємо частоту появи кожної categoryId
.toSeq
.sortBy( - _._2) // робимо сортування по частоті за спаданням
.take(10) // беремо ТОП 10 записів

На виході ми отримали масив кортежів (tuple) у форматі (categoryId, частота):
ArrayBuffer((314,3068), (132,2229), (128,1770), (270,1483), (139,1379), (107,1366), (177,1311), (226,1268), (103,1259), (127,1204))

Для подальшої роботи я вирішив вибрати 128-ю категорію.

Підготуємо дані: відфільтруємо типи операційних систем, щоб не засмічувати графіки сміттям.

На вході:
val interestedBrowsers = List("Android", "OS X", "iOS", "Linux", "Windows")
val osAov = dataAov.filter(x => interestedBrowsers.contains(x.osFamily)) //залишаємо тільки потрібні ОС
.filter(_.categoryId == 128) // фільтруємо категорії
.map(x => (x.osFamily, (x.aov, 1.0))) // потрібно для розрахунку середнього чека
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
.map{ case(osFamily, (revenue, orders)) => (osFamily, revenue/orders) }
.collect()

На виході масив кортежів (tuple) у форматі OS, середній чек:
Array((OS X,4859.827586206897), (Linux,3730.4347826086955), (iOS,3964.6153846153848), (Android,3670.8474576271187), (Windows,3261.030993042378))

Хочеться візуалізації, давайте зробимо це в HighCharts:

Теоретично можна використовувати будь-які графіки HighCharts, якщо вони підтримуються Wisp. Всі графіки інтерактивні.

Спробуємо зробити те ж саме, але через R.
Запускаємо R клієнт:
import org.ddahl.rscala._
import ru.retailrocket.ispark._

def connect() = RClient("R", false)
@transient
val r = connect()

Будуємо сам графік:

Так можна будувати будь-які графіки R прямо в блокноті IPython.

Взаємна інформація
На графіках видно, що залежність є, але підтвердять нам цей висновок метрики? Існує безліч способів це зробити. В нашому випадку ми використовуємо взаємну інформацію (Mutual Information) між величинами в таблиці. Вона вимірює взаємну залежність між розподілами двох випадкових (дискретних) величин.

Для дискретних розподілів вона розраховується за формулою:



Але нас цікавить більш практична метрика: Maximal Information Coefficient (MIC), для розрахунку якої для безперервних змінних доводиться йти на хитрості. Ось як звучить визначення цього параметра.

Нехай D = (x, y) — це набір з n впорядкованих пар елементів випадкових величин X і Y. Це двовимірне простір розбивається X і Y сітками, групуючи значення x та y на X і Y розбиття відповідно (згадайте гістограми!).



де B(n) — це розмір сітки, I∗(D, X, Y ) — це взаємна інформація з розбиття X і Y. У знаменнику вказаний логарифм, який служить для нормалізації MIC в значення відрізка [0, 1]. MIC приймає безперервні значення на відрізку [0,1]: для крайніх значень дорівнює 1, якщо залежність є, 0 — якщо її немає. Що можна ще почитати по цій темі перераховано в кінці статті, в списку літератури.

У книзі MIC (взаємна інформація) названа кореляцією 21-го століття. І ось чому! На графіку нижче наведено 6 залежностей (графіки З — H). Для них були обчислені кореляція Пірсона і MIC, вони відзначені відповідними літерами на графіку ліворуч. Як ми бачимо, кореляція Пірсона практично дорівнює нулю, в той час як MIC показує залежність (графіки F, G, E).

Першоджерело: people.cs.ubc.ca

У таблиці нижче наведено ряд показників, які були вирахувані на різних залежностях: випадковою, лінійної, кубічної і т. д. З таблиці видно, що MIC веде себе дуже добре, виявляючи нелінійні залежності:


Ще один цікавий графік ілюструє вплив шумів на MIC:


У нашому випадку ми маємо справу з розрахунком MIC, коли змінна Aov у нас безперервна, а всі інші дискретні з невпорядкованими значеннями, наприклад тип браузера. Для коректного розрахунку MIC знадобиться значення змінної Aov. Ми скористаємося готовим рішенням з сайту exploredata.net. Є з цим рішенням одна проблема: вона вважає, що обидві змінні неперервні і виражені у значеннях Float. Тому нам доведеться обдурити код, кодуючи значення дискретних величин у Float і випадково змінюючи порядок цих величин. Для цього доведеться зробити багато ітерацій з випадковим порядком (ми зробимо 100), а в якості результату візьмемо максимальне значення MIC.
import data.VarPairData
import mine.core.MineParameters
import analysis.Analysis
import analysis.results.BriefResult
import scala.util.Random 

//Кодуємо дискретну величину, випадково змінюючи порядок "кодів"
def encode(col: Array[String]): Array[Double] = {

val ns = scala.util.Random.shuffle(1 to col.toSet.size)
val encMap = col.toSet.zip(ns).toMap
col.map{encMap(_).toDouble}
}

// функція обчислення MIC
def mic(x: Array[Double], y: Array[Double]) = {
val data = new VarPairData(x.map(_.toFloat), y.map(_.toFloat))
val params = new MineParameters(0.6.toFloat, 15, 0, null)

val res = Analysis.getResult(classOf[BriefResult], data, params)
res.getMIC
}

//у разі дискретної величини робимо багато ітерацій і беремо максимум
def micMax(x: Array[Double], y: Array[Double], n: Int = 100) = 
(for{ i <- 1 to 100} yield mic(x, y)).max 

Ну ось ми близькі до фіналу, тепер здійснимо сам розрахунок:
val aov = dataAov.filter(x => interestedBrowsers.contains(x.osFamily)) //залишаємо тільки потрібні ОС
.filter(_.categoryId == 128) // фільтруємо категорії

//osFamily
var aovMic = aov.map(x => (x.osFamily, x.aov)).collect()
println("osFamily MIC =" + micMax(encode(aovMic.map(_._1)), aovMic.map(_._2)))

//orderId

aovMic = aov.map(x => (x.orderId, x.aov)).collect()
println("orderId MIC =" + micMax(encode(aovMic.map(_._1)), aovMic.map(_._2)))

//cityId
aovMic = aov.map(x => (x.cityId, x.aov)).collect()
println("cityId MIC =" + micMax(encode(aovMic.map(_._1)), aovMic.map(_._2)))

//uaName
aovMic = aov.map(x => (x.uaName, x.aov)).collect()
println("uaName MIC =" + mic(encode(aovMic.map(_._1)), aovMic.map(_._2)))

//aov
println("aov MIC =" + micMax(aovMic.map(_._2), aovMic.map(_._2)))

//random
println("random MIC =" + mic(aovMic.map(_ => math.random*100.0), aovMic.map(_._2)))

На виході:
osFamily MIC =0.06658
orderId MIC =0.10074
cityId MIC =0.07281
aov MIC =0.99999
uaName MIC =0.05297
random MIC =0.10599

Для експерименту я додав випадкову величину з рівномірним розподілом і сам AOV.
Як ми бачимо, практично всі MIC виявилися нижче випадкової величини (random MIC), що можна вважати «умовним» порогом прийняття рішення. Aov MIC практично дорівнює одиниці, що природно, так як кореляція самої до себе дорівнює 1.

Виникає цікаве питання: чому ми на графіках бачимо залежність, а MIC нульовий? Можна придумати безліч гіпотез, але швидше за все для випадку os Family все досить просто — кількість машин з Windows набагато перевищує кількість інших:


Висновок
Сподіваюся, що Scala отримає свою популярність серед аналітиків даних (Data Scientists). Це дуже зручно, так як є можливість працювати зі стандартним IPython notebook + отримати всі можливості Spark. Цей код може спокійно працювати з терабайтними масивами даних, для цього потрібно просто змінити рядок конфігурації в ISpark, вказавши URI вашого кластера.

До речі, у нас відкриті вакансії по цьому напрямку:

<a href=»retailrocket.ua/vakansii/#research-analyst>Research Analyst


Корисні посилання:
Наукова стаття, на базі якої розроблявся MIC.
Замітка на KDnuggets про взаємну інформацію (є відео).
Бібліотека C для розрахунку MIC з обгортками для Python і MATLAB/OCTAVE.
Сайт автора наукової статті, який розробив MIC (на сайті є модуль для R і бібліотека на Java).

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.