Сервер черг


 
У процесі росту в багатьох проектах з'являється необхідність вирішення низки завдань, пов'язаних з чергами. Часто черги повідомлень використовують як сполучна ланка між різними внутрішніми підсистемами. Кілька класичних прикладів:
 
     
  • відкладена обробка даних користувача;
  •  
  • передача статистики;
  •  
  • згладжування навантаження на відносно повільні системи;
  •  
  • виконання періодичних завдань.
  •  
 
Існує кілька підходів до організації черг:
 
     
  • використовувати реляційні бази даних;
  •  
  • застосувати існуючі рішення (RabbitMQ тощо);
  •  
  • написати свій велосипед.
  •  
 
«Мій Світ» якийсь час використовував черги в реляційній базі, але із зростанням проекту почалися проблеми з продуктивністю. Ми встали перед вибором: застосувати існуючі рішення або розробити свою систему.
 
Для початку ми оцінили перший варіант. Існує безліч серверів черг під різні завдання. В основному ми розглядали найбільш відомий, RabbitMQ, що по праву вважається лідером серед подібних систем з відкритим вихідним кодом.
 
Кілька достоїнств:
 
     
  • непогана продуктивність, достатня для великої кількості завдань;
  •  
  • багатий функціонал, що дозволяє реалізувати майже будь-яку логіку роботи з чергами;
  •  
  • підсистема витіснення повідомлень на диск, яка дозволяє працювати на серверах з недостатнім обсягом оперативної пам'яті, нехай і з втратою продуктивності;
  •  
  • можливість розробляти плагіни для розширення функціональності.
  •  
 
Але в будь-якій системі вистачає і недоліків. Ось деякі з тих, які вплинули на наш вибір на користь розробки свого рішення:
 
     
  • Недостатня продуктивність. Нам важлива інформація, що передається через черги, тому ми хочемо всі зміни в сервері черг писати на диск у вигляді бінарних логів. З включенням запису повідомлень на диск продуктивність RabbitMQ падає до дуже низьких значень (<10 000 вставок подій в секунду).
  •  
  • Занадто швидке падіння продуктивності зі збільшенням розміру повідомлення, навіть без включення запису на диск.
  •  
  • Відсутність таймаутів на обробку повідомлення. Якщо обробник черги завис через помилку в коді або ще з якоїсь причини, то повідомлення не буде передано іншому оброблювачу, поки що завис НЕ обірве з'єднання.
  •  
  • Нестабільне час відповіді на запити, особливо на великих чергах (> 10 000 000 повідомлень).
  •  
  • Той факт, що сервер визначає, кому з обробників віддавати події і скільки їх віддавати. У мережі чимало статей підганяти параметрів під конкретну задачу. Нас це не влаштовувало, хотілося винести логіку управління ресурсами обробників за межі сервера. Забігаючи вперед, скажу — це виявилося хорошою ідеєю. Ми побудували над серверами черг велику інфраструктуру, що відстежує пріоритети черг, кількість подій в чергах і вільні ресурси на серверах з обработчиками подій. Це дозволяє динамічно породжувати і вбивати обробники черг, а також налаштовувати ліміти на допустиму безліч подій, одержуваних з сервера черг за один запит (batch processing).
  •  
  • Досить великий обсяг службових даних на кожне повідомлення. З незрозумілих (для мене) причин обсяг займаної пам'яті на одне і те ж кількість повідомлень може помітно відрізнятися від запуску до запуску.
  •  
 
Отже, проаналізувавши плюси і мінуси цього варіанту, після деяких роздумів ми вибрали розробку власної системи. Одним з основних аргументів стало те, що в 2009 році (коли відбувся перший реліз власного сервера черг) існуючі рішення працювали під навантаженням не дуже стабільно. Зараз багато що виправлено і вдосконалено, але аргументів на користь свого рішення все ще вистачає.
 
Усвідомивши, що нам потрібно, підготували ТЗ і визначили вимоги, пропоновані до нашого сервера черг:
 
     
  • Будь-яке повідомлення має видаватися споживачам тільки після спрацьовування деякої умови. В якості умови вибрано встановлюване клієнтом час, після якого подія вважається активним і може бути отримано обробником відповідної черги.
  •  
  • Зберігати всі зміни (Persistence) подій на диск, у разі програмних або апаратних збоїв відновлюватися з усіма даними.
  •  
  • Забезпечити можливість задати порядок видачі подій обробникам (сортування за часом активації події).
  •  
  • Чи не обманювати клієнта, відповідати OK на вставці тільки після запису даних на диск.
  •  
  • Забезпечувати стабільно низьку затримку при роботі з чергою до 100 000 000 подій.
  •  
  • Працювати з подіями різного розміру від 1 байта до декількох мегабайтів.
  •  
  • Не менш 15000 вставок в секунду.
  •  
  • Продуктивність не повинна падати при роботі як мінімум з 1000 виготовлювачів подій і 1000 споживачів.
  •  
  • Забезпечити відмовостійкість (хоча б часткову) у разі апаратного збою дисків і втрати \ псування даних. При запуску сервера важливо вміти визначати коректні дані і відкидати биті запису.
  •  
 
 Архітектура
 
 
Сервер черг реалізований на C модулем до нашого першого мережного фреймворку для побудови сховищ, з якого виріс і Tarantool. Це однопотоковий асинхронний сервер, який використовує libev для організації event loop'а. Всі запити обробляються по простому бінарним протоколу на основі IPROTO.
 
 WAL процес
Всі зміни пишуться на диск у вигляді бінарних логів за допомогою окремого WAL процесу. Ідеологічно все дуже схоже на Tarantool, позначаються спільні корені. Кожен запис підписується за допомогою crc32, щоб в процесі завантаження перевірити коректність зчитувальних даних. Сервер черг, мабуть, більше за всіх наших модулів взаємодіє з WAL процесом, так як практично всі команди, в тому числі і видача повідомлень споживачам, модифікують стан сервера, і їх необхідно писати на диск.
 
 Dumper
Час від часу породжується процес, який зберігає повний образ поточного стану користувача даних і необхідну службову інформацію на диск. За великим рахунком Dumper непотрібен, але дозволяє прискорити підйом сервера після перезапуску, так як досить прочитати останній snapshot і застосувати тільки ті бінарні логи, що зроблені після запису образу даних.
 
 Say Logger
Останній процес відповідає за запис текстових логів на диск. Часто логи повністю відключають на бойових серверах через погіршення продуктивності; ми постаралися уникнути цієї долі. Для цього породжується окремий процес, в якому виконується зовнішній логер, наприклад cronolog . Спілкування реалізовано за допомогою socket'ов таким чином, що ми можемо працювати в одному з двох режимів:
 
     
  • чекати запис на диск. Затримки запису логів погіршать загальну продуктивність.
  •  
  • ігнорувати переповнення черги повідомлень до процесу логера. Це призведе до втрати деяких записів, але дозволить не залежати від продуктивності диска з текстовими логами.
  •  
 
 Занурюємося далі
 
 
Всі події в черзі перебувають в одному з трьох станів:
 
     
  • Неактивне. Повідомлення прийнято сервером повідомлень, але його не можна віддавати обробникам черг до настання часу активації.
  •  
  • Активно. Час активації події настало і подія може бути видано обробникам черг.
  •  
  • Заблоковано. Подія вже видано і очікує підтвердження про обробку. Може бути видано повторно, якщо через X секунд не прийде команда на видалення події.
  •  
 
У кожної черги організовані індекси під події в кожному з трьох станів і ще один центральний індекс по всіх подіях черги.
 
Сервер черг працює з двома типами черг. Логіка розрізняється в політиці видачі ID-повідомлень: або ID видає сервер, або клієнт. Наявність ідентифікатора у всіх повідомлень дозволяє реалізувати розширену логіку роботи з чергами. Крім вставки, отримання та видалення, підтримуються команди зміни даних або часу активації повідомлення. Це дозволяє організувати перестановку повідомлень і зміна статусу обробки в рамках однієї черги. Якщо у вас є періодичні дії, необов'язково видаляти подія після його обробки — досить переставити час активації на потрібну кількість хвилин \ годин \ днів.
 
Сервер черг підтримує транзакції. Всі необхідні для відповіді дії (алокація тимчасових даних, перевірка наявності потрібних подій, підготовка буферів з'єднань) виконуються перед записом на диск. При виникненні помилок всі зміни відкочуються. Чи не підтримується виконання кількох команд в рамках однієї транзакції. Механізм транзакцій забезпечує ексклюзивне володіння правом на зміну події. Одночасна спроба двох клієнтів вчинити дії, що змінюють стан якоїсь події, закінчиться поверненням помилки з відповідним кодом другому клієнтові.
 
Для коректної активації подій в сервері реалізована службова команда, яку не можна викликати по мережі — вона активується за таймером. Усередині організована службова чергу по всіх користувальницьким чергам, що відповідає за активацію подій.
 
При отриманні запиту від обробника на нову порцію подій (зазвичай в районі 1000) з певної черги сервер виконує наступні дії:
 
     
  • створюємо транзакцію;
  •  
  • шукаємо зазначену чергу;
  •  
  • блокуємо необхідну кількість повідомлень з вершини індексу активних повідомлень;
  •  
  • формуємо службову запис про блокування обраних подій;
  •  
  • відправляємо службову запис в WAL процес;
  •  
  • отримуємо відповідь від WAL процесу;
  •  
  • застосовуємо транзакцію до даних в пам'яті, переносимо події в чергу заблокованих;
  •  
  • пишемо відповідь клієнту із заздалегідь підготовлених даних;
  •  
  • завершуємо транзакцію, знімаємо внутрішні блокування з усіх подій і чистимо тимчасові дані транзакції;
  •  
  • в разі будь-яких помилок на кожному з етапів відкатуємо транзакцію і повідомляємо клієнтові причину.
  •  
 
Обробка дублювання вставок повідомлень. У разі проблем з мережею або високим завантаженням сервера клієнт може вирішити, що настав таймаут, сервер не обробив повідомлення і потрібно його послати ще раз, а проте сервер до цього часу може вже обробити повідомлення. З чергами, в яких клієнт видає ID, все просто: в даних запиту є ID — перевіряємо, чи є такий у черзі, і, якщо це так, відсилаємо помилку.
 
У чергах з внутрішньої видачею ID ситуація складніша: у сервера немає однозначної ознаки, за яким можна зрозуміти, що поточний запит — це насправді дубль запиту, обробленого кілька секунд тому. «Значить, така ознака треба додати», — вирішили ми і ввели в пакет 2 додаткових поля: RequestID, гарантовано унікальний в рамках одного процесу клієнта, і PID процесу. На сервері черг організований кеш insert'ов по ключу {ClientIP, RequestID, PID}, що дозволяє відстежити дублікати запитів протягом 10-15 хвилин. На практиці цього більш ніж достатньо. Потенційний недолік — метод не працює через NAT, так як у всіх клієнтів виявиться один і той же IP та, відповідно, можливі помилкові спрацьовування.
 
Створення та налагодження черг. Для спрощення конфігурації чергу автоматично створюється з налаштуваннями за замовчуванням при першій спробі вставки повідомлення в зазначену чергу. У конфігурації можуть бути задані настройки конкретної черги, розмір порцій для спроб активації подій, час, після якого подія з черги переходить зі статусу заблоковано в статус активно і т. п.
 
До речі, зауважу, що зараз я б не став робити автоматичне створення черг. Це прижилося і подобається розробникам бізнес-логіки, що використовують черги направо і наліво, але налагодження усієї цієї пишноти відняла немало часу і сил.
Несподівано багато corner case'ов спливло в процесі тестування, заради рідкісних ситуацій довелося написати чимало коду. Основні проблеми виявилися при обробці відкату транзакції події, що породжує нову чергу. Якщо під час створення нової черги і запису події на диск інші клієнти намагаються додати події в ще не створену чергу, доводиться розуміти, що черга в процесі створення. Ситуація ускладнюється, якщо це — черга з внутрішніми ID, в яких сервер сам видає ID повідомлень у відповідь на команди вставки. Всі події в створену чергу блокуються до завершення процесу створення черги, при цьому їм вже призначається ID. Якщо транзакцію, що створює чергу, доводиться відкочувати, необхідно відкотити і всі транзакції залежних подій, які очікують створення черги. Звучить страшно, а в коді ще страшніше.
 
 Підводимо підсумки
 Хороше
 
     
  • Хороші показники продуктивності — не менше ніж 50 000 rps на вставках. Продуктивність залежить виключно від потужності дисків та кількості записів, через яке потрібно викликати системний виклик fdatasync.
  •  
  • Робота з великими чергами. Один час у нас були черги по 170 млн повідомлень на бойовий сервер.
  •  
  • Стабільна робота з нерівномірним навантаженням (в якихось чергах висока інтенсивність, в якихось часто приходить пікове навантаження).
  •  
  • Хороші результати SLAB-аллокатора — і по продуктивності і по фрагментації (зазвичай 90% повідомлень у рамках однієї черги мають однаковий або близький розмір).
  •  
  • Стабільність системи в цілому. Щодня ми обробляємо мільярди повідомлень на безлічі серверів черг. За останні 2-3 роки не було жодного збою з вини програмної частини.
  •  
 
 Погане й завдання на майбутнє
 
     
  • Ряд спадкових проблем, отриманих від використаного мережевого фреймворка. Всі вони повинні піти з перекладом на кодову базу Tarantool.
  •  
  • Шардінг на клієнті.
  •  
  • Обробка дублювання повідомлень — її варто було б переробити. В принципі працює, але проблема з NAT'ом бентежить.
  •  
  • Необхідно виділити створення черг окремою командою.
  •  
  • Іноді хочеться збережених процедур на Lua для розширення можливостей по роботі з чергами. Поки що ні настільки часто, щоб дійшли руки до реалізації.
  •  
  • Всі події завжди в пам'яті. Теоретично добре б витісняти події, які нескоро активуються на диск. А на практиці поки що для нас важливіше стабільне час відповіді на запити до сервера.
  •  
 
 Використання черг в Моєму Світі
 
     
  • Відкладена обробка дій користувача. Не краща ідея примушувати користувача чекати, поки ви збережете його дані в SQL базу або інше сховище (а часто необхідно вносити зміни відразу в кілька систем). Гірше того, в деяких реалізаціях (в основному в маленьких проектах у молодих розробників) дані можуть зовсім не потрапити в сховище, якщо клієнт обірвав з'єднання, не дочекавшись відповіді. Доброю практикою є додавання події про користувальницьких діях в досить швидкий сервер черг, після чого можна відповідати клієнтові, що операція пройшла успішно. Всю іншу роботу надійно і ефективно виконають обробники черги. В якості безкоштовного бонусу отримаєте спрощення коду на frontend-серверах, яким буде достатньо спілкуватися тільки з демоном черг для внесення змін до будь сховища. Знання про бізнес-логікою різних даних можна винести в обробники черг.
  •  
  • Розсилка повідомлень, листів тощо Вам необхідно відіслати велику кількість даних, при цьому не перевантаживши сховища сплеском запитів. Легко! Варіюючи кількість обробників черги, розмазує пікове навантаження до розумного рівня, щоб час обробки клієнтських запитів в ті ж джерела даних не погіршився. І, найголовніше, за допомогою черг легко уникнути дублювання повідомлень. Вкрай неприємно отримати два листи про одне й те ж подію. Для періодичних розсилок досить після обробки повідомлення оновлювати час його активації, а не видаляти: в потрібний час воно знову буде оброблено.
  •  
  • Транспорт для «надійної» статистики. Передача важливих (все, що пов'язано з грошима) даних на агрегатори статистики. Системи агрегації статистики зазвичай вимогливі до ресурсів процесора, і при обробці даних можуть не забезпечувати необхідне для frontend-серверів час відповіді. Ще одна особливість подібних систем — нерівномірне завантаження, зазвичай пов'язана з обробкою даних порціями. Передача статистики через сервери черг дозволить уникнути проблем з нестабільною затримкою і при цьому збереже гарантію доставки.
  •  
  • Угруповання подій. Якщо група подій буде звертатися до одного і того ж набору даних в інших системах, має сенс ставити однаковий час активації, так як навіть якщо встановити час в минулому, події відсортовані за часом активації. Фізичний сенс хитрувань в більш ефективному використанні кешей сховища, в яке підуть запити з обробників повідомлень.
  •  
  • Каскадні черги. Організація кінцевого автомата з декількох черг шляхом перекладання даних між чергами по завершенні чергового етапу обробки. Часто необхідно в процесі обробки повідомлення виконати ряд дій, сильно розрізняються за необхідних ресурсів. У такому випадку рознесення «швидких» і «повільних» дій за різними етапами (чергами) дозволяє ефективно управляти необхідною кількістю ресурсів, варіюючи число обробників для кожної черги. Додатково виграємо у спрощенні коду обробників і пошуку помилок в бізнес логіці. За графіками черг можна зрозуміти, на якому з етапів копляться події і в якому обробнику потрібно шукати проблеми.
  •  
 
У проекті використовуємо клієнти на Perl і C, в інших проектах реалізували клієнти на PHP, Ruby і Java.
 
PS: Спеціально не стали малювати таблички з порівнянням продуктивності з існуючими підсистемами. Не можна (я не знаю систем з підходящими можливостями) порівняти з тим же функціоналом, що ми використовуємо в бою, а без цього вийде ще один тест сферичного коня у вакуумі.
 
PPS: Опис деяких компонентів (адміністративний інтерфейс, локальна репліка тощо) опустили, так як вони реалізовані схожим чином у Tarantool.
 
PPPS: В одній з наступних статей постараємося розповісти про нашу інфраструктурі по роботі з чергами — про те, як управляємо ресурсами, як відстежуємо стан черг, як організований шардінг подій між серверами черг і про багато іншого.
 
Якщо є якісь питання, задавайте в коментарях.

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.