Використовуємо Apache Spark як SQL Engine



Привіт, Хабр! Ми, Wrike, щодня стикаємося з потоком даних від сотень тисяч користувачів. Всі ці відомості необхідно зберігати, обробляти та отримувати з них цінність. Впоратися з цим колосальним обсягом даних нам допомагає Apache Spark.

Ми не будемо робити введення в Spark або описувати його позитивні і негативні сторони. Про це ви можете почитати тут, тут або офіційній документації. У даній статті ми робимо наголос на бібліотеку Spark SQL та її практичне застосування для аналізу великих даних.

SQL? Мені не здалося?

Історично склалося, що відділ аналітики практично будь IT-компанії будувався на базі фахівців, які добре володіють і тонкощами бізнесу, і SQL. Робота BI або аналітичного відділу практично ніколи не обходиться без ETL. Він, у свою чергу, найчастіше працює з джерелами даних, до яких простіше всього звертатися за допомогою SQL.

Wrike не виняток. Тривалий час основним джерелом даних для нас були шарды нашої бази даних у поєднанні з ETL і Google Analytics, поки ми не зіткнулися із завданням аналізу поведінки користувачів на підставі серверних логів.

Одним з рішень такої проблеми може бути оренду програмістів, які будуть писати Map-Reduce для Hadoop і забезпечувати даними прийняття рішень в компанії. Навіщо це робити, якщо у нас вже є ціла група кваліфікованих фахівців, які добре володіють SQL і розбираються в тонкощах бізнесу? Альтернативним рішенням може бути складування всього в реляційну БД. У цьому випадку основним головним болем стане підтримка схеми як ваших таблиць, так і вхідних логів. Про продуктивність СУБД з таблицями на кілька сотень мільйонів записів, думаємо, можна навіть не говорити.

Рішенням для нас став Spark SQL.

Ok, що далі?

Основний абстракцій Spark SQL, на відміну від Spark RDD, є DataFrame.

DataFrame — це розподілена колекція даних, організована в вигляді іменованих колонок. DataFrame концептуально схожий на таблицю в базі даних, data frame R або Python Pandas, але, звичайно ж, оптимізований для розподілених обчислень.

Ініціалізувати DataFrame можна на базі безлічі джерел даних: структурованих або слабо-структурованих файлів, таких як JSON і Parquet, звичайних баз даних за допомогою JDBC/ODBC і багатьма іншими способами через коннектори сторонніх розробників (наприклад, Cassandra).

DataFrame API доступні з Scala, Java, Python і R. А з точки зору SQL звертатися до них як до звичайних SQL таблиць з повною підтримкою всіх можливостей діалекту Hive. Spark SQL реалізує інтерфейс Hive, тому ви можете підмінити свій Hive на Spark SQL без переписування системи. Якщо ви раніше не працювали з Hive але добре знайомі з SQL, тоді, швидше за все, вам не потрібно вивчати що-небудь додатково.

Я можу підключитися до Spark SQL за допомогою %my-favorite-software%?

Якщо ваше улюблене ЗА підтримує використання довільних JDBC-конекторів, тоді відповідь — так. Нам подобається DBeaver, а нашим розробникам — IntelliJ IDEA. І вони обидві чудово підключаються до Thrift Server.

Thrift Server є частиною стандартної установки Spark SQL, який перетворює Spark у постачальника даних. Підняти його дуже просто:

./sbin/start-thriftserver.sh


Thrift JDBC/ODBC сервер повністю сумісний з HiveServer2 і може прозоро замінити його собою.

Ось так, наприклад, виглядає вікно підключення DBeaver до SparkSQL:



Хочу різні постачальники даних в одному запиті

Легко. Spark SQL частково розширює діалект Hive таким чином, що ви можете формувати джерела даних прямо за допомогою SQL.

Давайте створимо таблицю» на базі логів в json-форматі:

CREATE TEMPORARY TABLE table_form_json 
USING org.apache.spark.sql.json 
OPTIONS (path '/mnt/ssd1/logs/2015/09/*/*-client.json.log.gz') 


Зверніть увагу, що ми використовуємо не просто один файл, а по масці отримуємо дані, доступні за місяць.

Зробимо те ж саме, але з нашої PostgreSQL базою. В якості даних візьмемо не всю таблицю, а тільки результат конкретного запиту:

CREATE TEMPORARY TABLE table_from_jdbc 
USING org.apache.spark.sql.jdbc 
OPTIONS ( 
url "jdbc:postgresql://localhost/mydb?user=[username]&password=[password]&ssl=true", 
dbtable "(SELECT * FROM profiles where profile_id = 5) tmp" 
) 


Тепер абсолютно вільно ми можемо виконати запит із JOIN'м, а Spark SQL Engine зробить всю роботу за нас:

SELECT * FROM table_form_json tjson JOIN table_from_jdbc tjdbc ON tjson.userid = tjdbc.user_id;


Комбінувати джерела даних можливо в довільному порядку. У себе під Wrike ми використовуємо PostgreSQL бази, json-логи і parquet-файли.



Що-небудь ще?

Якщо вам, як і нам, цікаво не тільки використовувати Spark, але й розуміти, як він влаштований під капотом, ми рекомендуємо звернути увагу на наступні публікації:



Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.