Spotify: міграція підсистеми подій в Google Cloud (частина 1)

Всякий раз, коли користувач виконує дію в клієнті Spotify – таке як, наприклад, прослуховування пісні або пошук виконавця – невелика кількість інформації, подія, відправляється на наші сервера. Доставка подій, процес безпечної та надійної транспортування інформації від клієнтів по всьому світу до нашої центральної системи обробки, — цікава задача. У серії цих статей ми розглянемо деякі рішення, які реалізували в цій області. Якщо бути більш точними, то ми розглянемо архітектуру нашої нової системи доставки подій і розповімо, чому вирішили розгорнути її в Google Cloud.

У цій першій статті ми пояснимо, як працює наша поточна система доставки подій і розповімо про деяких уроках, які отримали в ході роботи з нею. В наступній – розглянемо створення нової системи і те, чому ми вибрали Cloud Pub/Sub в якості транспортного механізму для всіх подій. У третій, і останній, статті ми пояснимо, як працюємо з усіма подіями з допомогою DataFlow, і наскільки швидко все це відбувається.

image

Події, що поширюються через нашу систему доставки, мають безліч застосувань. Велика частина наших рішень в продуктовому дизайні заснована на результатах A/B тестів, а вони, в свою чергу, повинні спиратися на великі і точні дані. Плейлист Discover Weekly, запущений в 2015, швидко став однією з найбільш використовуваних функцій в Spotify. Він створюється на основі даних про програванні музики. Year in music, Spotify Party і безліч інших функцій Spotify також засновані на даних. Крім того, дані Spotify служать одним з джерел для складання топів Billboard.

Наша система доставки повідомлень є однією з основних частин інфраструктури даних Spotify. Ключова вимога до неї – доставка всіх даних з передбачуваною затримкою і доступність для наших розробників через добре описаний інтерфейс. Дані про використання можна описати як набір структурованих подій, сформований в певний момент часу як відповідь на певні заздалегідь визначені дії.

Більшість подій, що використовуються в Spotify, безпосередньо генеруються клієнтами Spotify як відповідь на певні дії користувачів. Всякий раз, як в клієнті Spotify відбувається подія, інформація про нього відправляється на один з шлюзів Spotify, який записує його в системний лог. Там йому присвоюється тимчасова відмітка, яка використовується в системі доставки повідомлень. Для того, щоб гарантувати певну затримку і закінченість доставки повідомлення, було вирішено для події використовувати мітку лода (syslog timestamp), а не клієнта, так як у нас немає контролю над подією до того, як воно потрапило на наші сервера.

У разі Spotify, всі дані треба доставити в центральний Hadoop кластер. Сервера Spotify, на яких ми збираємо дані, розташовані в кількох дата-центрах на двох континентах. Смуга пропускання між нашими дата-центрами є дефіцитним ресурсом і необхідно відноситися до передачі даних з особливою ретельністю.

Інтерфейс даних визначається місцем розташування даних в Hadoop і форматом, в якому вони зберігаються. Всі дані, які доставляються нашим сервісом, записуються в Avro форматі в HDFS. Отримані дані розбиті на розділи (партіціі) по 60 хвилин (час). Це пережиток минулого, коли перша система доставки повідомлень була заснована на scp команді і щогодинному копіюванні syslog файлів з усіх серверів на Hadoop. Оскільки всі процеси обробки даних сьогодні в Spotify зав'язані на погодинних даних, цей інтерфейс залишиться з нами і в осяжному майбутньому.

Більшість процесів роботи з даними в Spotify читають дані з годинної складання лише один раз. Вихідні значення одних процесів можуть служити вхідними даними для інших, формуючи, таким чином, довгі ланцюжки перетворень. Після того, як процес обробив дані за годину, він вже не проводить ніяких перевірок у цьому вихідному годині на зміни. Якщо дані змінилися, єдиний спосіб відтворити далі ці зміни — вручну перезапустити всі відповідні завдання (і відносяться до них завдання) для даного конкретного інтервалу (години). Це дорогий і трудомісткий процес, ось чому ми висуваємо такі вимоги до служби доставки повідомлень і після надання годинного сету вже не можемо доповнювати в ньому ніякі дані. Ця проблема, відома як проблема повноти даних, протиставляється вимогу мінімальної затримки при обробці даних. Цікава точка зору на проблему повноти даних викладена в доповіді Dataflow від Google.

Первісна система доставки повідомлень
Системна архітектура
Наша первісна система доставки повідомлень була побудована поверх Kafka 0.7.

image

В ній система доставки подій вибудувана навколо абстракції погодинних файлів. Вона призначена для потокової передачі файлів логів, які містять події, від сервісних машин до HDFS. Після того, як всі лог файли за певний годину передані на HDFS, вони перетворюються з тексту з табуляціями у формат Avro.

Коли система створювалася, одного з відсутніх функцій Kafka 0.7 була здатність кластера Kafka Broker працювати надійним постійним сховищем. Це вплинуло на прийняття важливого проектного рішення – не підтримувати постійні стану між виробником даних, Kafka Syslog Producer і Hadoop. Подія вважається надійно збережені тільки тоді, коли воно записано у файл на HDFS.

Проблема з надійним існуванням події тільки всередині Hadoop полягає в тому, що кластер Hadoop стає єдиною точкою збою системи доставки повідомлень. Якщо Hadoop вийде з ладу, то вся система доставки зупиниться. Щоб справитися з цим, ми повинні переконатися в тому, що у нас є достатньо дискового простору на всіх сервісах, з яких ми збираємо події. Коли Hadoop повернеться в лад, нам потрібно «наздогнати» його стан, передавши всі дані настільки швидко, наскільки це можливо. Час відновлення в основному обмежується пропускною здатністю, яку ми можемо задіяти між нашими дата-центрами.

Продюсер (Producer) – це демон, який запущено на кожному хості, з якого ми хочемо відправляти події в Hadoop. Він відстежує лог-файли і відсилає пакети логів у Kafka Syslog Consumer. Producer нічого не знає від типі події або властивості, які можуть у нього бути. З його точки зору, подія це просто набір рядків у файлі і всі рядки перенаправляються в однаковий канал. Це означає, що події всіх типів, що містяться в одному лог-файлі, також передаються через один канал. У такій системі, топіки Kafka використовуються в якості каналів для передачі подій. Після того, як Продюсер відправляє логи до Споживача (Consumer), йому треба дочекатися підтвердження (ACK), що Consumer успішно зберіг рядка лода в HDFS. Тільки після того, як продюсер отримує ACK для відправлених логів, він вважає, що вони надійно збережені і переходить до передачі інших записів.

Для подій, щоб потрапити від Producer до Consumer, потрібно пройти Kafka Brokers і потім Kafka Groupers. Kafka Brokers — стандартний компонент Kafka, а Kafka Groupers — це компонент, написаний нами. Groupers обробляє всі потоки подій від локальних дата-центрів і потім публікує їх знову стислими, ефективно згрупованими в одному топіку, яка потім витягується Consumer.

Завдання Extract, Transform and Load (ETL) використовується для перетворення даних з простого формату з поділом табуляцією в Avro формат. Цей процес — звичайна Hadoop MapReduce робота, запроваджена з використанням фреймворку Crunch, який працює з погодинними наборами. Перш, ніж почати роботу з певним часом, йому треба переконатися, що всі файли повністю передані.

Всі Продюсери постійно відправляють контрольні мітки, які можу містити end-of-file маркери. Ці маркери відправляються лише один раз, коли Producer прийшов до висновку, що весь файл був надійно збережено на Hadoop. Монітор стану (або «живучості») постійно опитує наші системи виявлення сервісів в усіх дата-центрах, які сервіс-машини працювали у визначений час. Щоб перевірити, чи всі файли були остаточно передані за цей час, ETL порівнює інформацію про сервери, від яких йому варто очікувати дані, з end-of-file маркерами. Якщо ETL визначає розбіжність і неповну передачу даних, то він затримує обробку даних для безперечно години.

Для того, щоб мати можливість максимально використовувати наявні маперы і редюсеры, ETL, який є звичайною задачею Hadoop MapReduce, треба знати, як шардить вхідні дані. Маперы і редюсеры розраховуються на основі розміру вхідних даних. Оптимальний шардінг розраховується на основі кількості подій, безперервно надходять від Consumer-ів.

Уроки
Однією з основних проблем, пов'язаних з такої конструкції є те, що локальні Продюсери повинні переконатися в тому, що дані збереглися в HDFS в центральній локації до того, як їх можна вважати надійно доставленими. Це означає, що Producer сервера на західному узбережжі США повинен знати, коли дані запишуться на диск в Лондоні. Велику частину часу це працює просто чудово, але якщо передача даних сповільниться, то це викличе затримки в доставці, від яких потім буде важко позбутися.

Порівняйте це з варіантів, коли точка обслуговування знаходиться в локальному дата-центрі. Це спрощує конструкцію Продюсера, так як, зазвичай, мережа між хостами в центрі обробки даних дуже надійна.

Абстрагуючись від проблем, ми були цілком задоволені системою, яка може надійно доставляти більше 700,000 подій в секунду зі всього світу. Редизайн системи також дав нам можливість поліпшити процес розробки програмного забезпечення.

Відправляючи всі події разом через один канал, ми втрачали гнучкість управління потоками подій з різною якістю обслуговування (QoS). Це також обмежувало роботу в реальному часі, так як будь-який процес, що працює в реальному часі, повинен був передавати свої дані через єдиний канал, в якому йде весь потік, і виводити з нього тільки потрібне.

Передача неструктурованих даних додає непотрібну затримку, так як вимагає додаткового ETL-перетворення. В даний час, ETL робота додає близько 30 хвилин затримки в доставку події. Якщо б дані пересилалися в форматі Avro, то вони відразу ж були доступні при запису на HDFS.

Необхідність відправникові відслідковувати завершення години теж викликало проблеми. Наприклад, якщо машина помирає, то не може послати повідомлення про кінець файлу. Якщо end-of-file маркер втрачається, то ми будемо чекати вічно, до тих пір, поки цей процес не перервуть вручну. По мірі зростання числа машин, ця проблема стає все більш актуальною.

Наступні кроки
Кількість доставлених повідомлень в Spotify постійно збільшується. В результаті підвищених навантажень, ми стали відчувати все більше проблем. З часом, кількість відключень стало турбувати нас. Ми зрозуміли, що ні ми, ні система, більше не в змозі справлятися з підвищеним навантаженням. Як раз в наступної статті розповімо про те, як вирішили змінити нашу систему.

Кількість повідомлень, оброблюваних нашою системою, в певний момент часу.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.