Маленький код для великих даних або Spark за 3 дні

Нехай Жираф був не прав,
Але винен не Жираф,
А той, хто крикнув з гілок:
«Жираф великий — йому видніше!» ©


Треба було оперативно розібратися з технологією Spark заточену для використання Big Data. У процесі з'ясування активно використовував habrahabr, так що спробую повернути інформаційний довжок, поділившись досвідом.

А саме: встановленням системи з нуля, налаштуванням і власне програмуванням коду вирішального завдання обробки даних для створення моделі для обчислення ймовірність банкрутства клієнта банку за набором таких ознак як сума кредиту, ставка і т. д.

Великих даних начебто повинно бути багато, але чому-то не просто знайти те злачне місце, де їх все мацають. Спочатку спробував варіант з ambari, але на моїй Window7 валилися помилки налаштувань мережевого моста. У результаті вибрав варіант з преднастроенной віртуальною машиною від Cloudera (CDH). Просто встановлюємо VirtualBox, запускаємо завантажений файл, вказуємо основні параметри (пам'ять, місце) і через 5 хвилин високоповажний джин Hadoop жадає ваших вказівок.

Кілька слів, чому саме Spark. Наскільки я розумію, ключові відмінності від початкової MapReduce в тому, що дані утримуються в пам'яті, замість скидання на диск, що дає прискорення у багато разів. Але, мабуть, більш важливі реалізації цілого ряду статистичних функцій і зручним інтерфейсом для завантаження/обробки даних.

Далі власне код для вирішення наступного завдання. Є реально великі дані (бо рука дуже втомлюється скролити ці 2000 рядків) у форматі:



Є припущення, що дефолт якось пов'язаний з іншими параметрами (крім першого, до шановним Ивановым1...N претензій немає) і треба побудувати модель лінійної регресії. Перш ніж почати, варто зауважити, що це мій перший код на Java, сам я працюю аналітиком і взагалі це мій перший запуск Eclipse, налаштування Maven і т. д. Так що не варто чекати вишуканих чудес, нижче розв'язання задачі в лоб тим способом, який чомусь заробив. Поїхали:

1. Створюємо Spark сесію. Важливий момент – це все працює тільки з версії 2.0.0, тоді як у постачанні CDH йде v1.6. Так що потрібно зробити апгрейд, інакше буде виняток при запуску.

SparkSession ss = SparkSession
.builder()
.appName("Bankrupticy analyser")
.getOrCreate();

2. Завантажуємо дані у спеціальний тип JavaRDD. По суті це приблизно як List в C#, принаймні я так це собі пояснив. Бібліотека вміє читати багато чого, але для початку зійде звичайний csv файл.

JavaRDD<Client> peopleRDD = ss.read()
.textFile(filename)
.javaRDD()
.map(new Function<String, Client>() 
{
public call Client(String line) throws Exception
{
String[] parts = line.split(","); // Роздільник
Client client = new Client(); 
client.setName(parts[0]); // Парсим поля (ПІБ в першій колонці)
client.setYearOfBirth(Double.parseDouble(parts[1])); 
client.setAmount(Double.parseDouble(parts[2]));
client.setTerm(Double.parseDouble(parts[3]));
client.setRate(Double.parseDouble(parts[4]));
client.setPaid(Double.parseDouble(parts[5]));
client.setStatus(Double.parseDouble(parts[6])); // Тут ознака банкрутства (1 - банкрут, 0 – поки ще платить)
return client;
}
});

Де Client це звичайний клас з нашими атрибутами (можна знайти у файлі проекту, за посиланням у кінці посту).

3. Створюємо датасет, який необхідний для нормалізації даних. Без нормалізації розрахунок моделі лінійної регресії методом градієнтного спуску не прокотить. Спочатку намагався прикрутити StandardScalerModel: fit -> transform але виникли проблеми типами даних, здається з-за різниці версій. Загалом, поки обійшовся обхідним рішенням, а саме через селект до даних, виконуючи нормалізацію прямо в ньому:


Dataset<Row> clientDF = ss.createDataFrame(peopleRDD, Client.class);
clientDF.createOrReplaceTempView("client"); 

Dataset<Row> scaledData = ss.sql(
"SELECT name, (minYearOfBirth - yearOfBirth) / (minYearOfBirth - maxYearOfBirth),"
+ "(minAmount - amount) / (minAmount - maxAmount),"
+ "(minTerm - term) / (minTerm - maxTerm),"
+ "(minRate - rate) / (minRate - maxRate),"
+ "(minPaid - paid) / (minPaid - maxPaid),"
+ "(minStatus - status) / (minStatus - maxStatus) "
+ "FROM client CROSS JOIN "
+ "(SELECT min(yearOfBirth) AS minYearOfBirth, max(yearOfBirth) AS maxYearOfBirth,"
+ "min(amount) AS minAmount, max(amount) AS maxAmount,"
+ "min(term) AS minTerm , max(term) AS maxTerm,"
+ "min(rate) AS minRate, max(rate) AS maxRate,"
+ "min(paid) AS minPaid, max(paid) AS maxPaid,"
+ "min(status) AS minStatus, max(status) AS maxStatus "
+ "FROM client)").cache();

4. Модель приймає дані у форматі JavaRDD в які віддамо ПІБ клієнта. Це норм для гарного відображення для тестового варіанту, в житті звичайно так не варто робити, хоча взагалі подібне може знадобиться для інших цілей.

JavaRDD<Row> rowData = scaledData.javaRDD(); // Dataset to JavaRDD

JavaRDD<Tuple2<String,LabeledPoint>> parsedData = rowData.map(
new Function<Row, Tuple2<String,LabeledPoint>>() 
{
public Tuple2<String,LabeledPoint> call(Row row) 
{
int last = row.length();

String cname = row.getString(0); // Перший елемент - ПІБ
double label = row.getDouble(last - 1); // Останній – ознака дефолту
double[] v = new double[last];

for (int i = 1; i < last - 1; i++) // Посередині незалежні змінні
v[i] = row.getDouble(i);

v[last - 1] = 1; // +intercept
return new Tuple2<String, LabeledPoint>
(cname, new LabeledPoint(label, Vectors.dense(v)));
}
});

5. Виділимо дані LabeledPoint для моделі:

JavaRDD<LabeledPoint> parsedDataToTrain = parsedData.map(
new Function<Tuple2<String,LabeledPoint>, LabeledPoint>() 
{
public LabeledPoint call(Tuple2<String,LabeledPoint> namedTuple) 
{
return namedTuple._2(); // 2 означає другий елемент у складі <String,LabeledPoint>
}
}); 
parsedData.cache();

6. Створюємо власне модель:

int numIterations = 200; 
double stepSize = 2; 
final LinearRegressionModel model 
= LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedDataToTrain), numIterations, stepSize); 

7. І власне основна робота + результат:

final NumberFormat nf = NumberFormat.getInstance(); // Для краси виводу чисел
nf.setMaximumFractionDigits(2);

JavaRDD<Tuple2<Double, Double>> valuesAndPreds = parsedData.map(
new Function<Tuple2<String,LabeledPoint>, Tuple2<Double, Double>>() 
{
public Tuple2<Double, Double> call(Tuple2<String,LabeledPoint> namedTuple) 
{
double prediction = model.predict(namedTuple._2().features()); // Розрахунок залежної змінної для набору ознак даного клієнта

System.out.println(namedTuple._1() + " got the score " + nf.format(prediction) 
+ ". The real status is " + nf.format(namedTuple._2().label()));

return new Tuple2<Double, Double>(prediction, namedTuple._2().label());
}
});

8. І порахуємо середній квадрат помилки (п. 7):

double MSE = new JavaDoubleRDD(valuesAndPreds.map(
new Function<Tuple2<Double, Double>, Object>() 
{
public Object call(Tuple2<Double, Double> pair) 
{
return Math.pow(pair._1() - pair._2(), 2.0);
}
}).rdd()).mean();

В даному випадку висновок буде виглядати так:

Иванов1983 got the score 0.57. The real status is 1
Иванов1984 got the score 0.54. The real status is 1
Иванов1985 got the score -0.08. The real status is 0
Иванов1986 got the score 0.33. The real status is 1
Иванов1987 got the score 0.78. The real status is 1
Иванов1988 got the score 0.63. The real status is 1
Иванов1989 got the score 0.63. The real status is 1
Иванов1990 got the score 0.03. The real status is 0
Иванов1991 got the score 0.57. The real status is 1
Иванов1992 got the score 0.26. The real status is 0
Иванов1993 got the score 0.07. The real status is 0
Иванов1994 got the score 0.17. The real status is 0
Иванов1995 got the score 0.83. The real status is 1
Иванов1996 got the score 0.31. The real status is 0
Иванов1997 got the score 0.48. The real status is 0
Иванов1998 got the score 0.16. The real status is 0
Иванов1999 got the score 0.36. The real status is 0
Иванов2000 got the score -0.04. The real status is 0
16/11/21 21:36:40 INFO Executor: Finished task 0.0 in stage 176.0 (TID 176). 3194 bytes result sent to driver
16/11/21 21:36:40 INFO TaskSetManager: Finished task 0.0 in stage 176.0 (TID 176) in 432 ms on localhost (1/1)
16/11/21 21:36:40 INFO TaskSchedulerImpl: Removed TaskSet 176.0, whose tasks have all completed, pool from
16/11/21 21:36:40 INFO DAGScheduler: ResultStage 176 (mean at App.java:242) finished in 0.433 s
16/11/21 21:36:40 INFO DAGScheduler: Job 175 finished: mean at App.java:242, took 0.452851 s
Training Error = 0.11655428630639536

Тепер має сенс порівняти його з аналітичним рішенням в ексель:



Як бачимо, результат дуже близький, модель вийшла годна, можна нацьковувати на тестову вибірку. Код проекту з вихідними даними можна скачати тут.

У цілому хочеться зазначити, що ажіотаж навколо великих даних представляється досить надмірним (велиииким таким). Більш цінним мені здається швидше не обсяг, а те, як саме обробляти ці дані. Тобто якась комбінація TF-IDF — нейромережа — ALS може дати чудовий результат при можливості творчо попрацювати і на обмеженому обсязі. Проблема мабуть в тому, що менеджери можуть вибивати бюджети під магічні слова Big Data, а витратити ресурс на просто дослідницькі мети вимагає занадто довгостроковий горизонт планування для звичайної компанії.

Для розуміння цієї думки уточню, зоопарк екосистеми Hadoop (Hive, Pig, Impala і т. д.) шикарний. Я сам займаюся розробкою розподіленої системи обчислень на нейромережах (одночасне виконання багатопоточних додатків на декількох серверах з синхронізацією і агрегацією результатів) для макроекономічного моделювання і приблизно розумію скільки граблів лежить на цьому шляху. Так, є завдання, де альтернатив цим технологіям немає — наприклад примітивна, але потокове онлайн обробка диких обсягів даних (умовно кажучи якийсь аналіз трафіку стільникових абонентів Москви). Тут Storm або Spark Streaming можуть створити диво.

Але якщо у нас є масив даних по мільйону клієнтів за рік, то вибірка кожного 10-го (або навіть 100-го) випадковим чином для побудови моделі якогось скорингу дасть практично той же результат, що і повний масив. Іншими словами, замість королеви балу Data mining стала падчеркою, хоча швидше за все це тимчасово. Ажіотаж спаде, але експериментальні підходи обкатывающиеся зараз на Hadoop-кластерах поширяться і ті, хто першими усвідомлює перспективи дослідження «маленьких» даних виявиться в дамках.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.