Як Discord обробляє понад 1 000 000 push-запитів в хвилину за допомогою Elixir GenStage


Olympus

Olympus відчув небувалий зростання. Щоб впоратися з ним, нашу відділу розробки дісталася приємна проблема — шукати спосіб масштабування сервісів бекенду.

У цій справі ми добилися великого успіху з допомогою однієї технології, яка називається Elixir GenStage.

Ідеальний шторм: Overwatch і Pokémon GO
Цього літа наша система мобільних push-повідомлень стала скрипіти від навантаження. Чат /r/Overwatch перевалив за 25 000 одночасних користувачів, а чат-групи Pokémon GO виникали повсюдно, так що раптові сплески потоку повідомлень стали серйозною проблемою.

Сплески потоку повідомлень гальмують всю систему push-повідомлень, а іноді кладуть її. Push-повідомлення або приходять пізно, або не приходять зовсім.

GenStage йде на допомогу
Після невеликого розслідування ми з'ясували, що основним пляшковим горлечком була відсилання push-повідомлень у сервіс Google Firebase Cloud Messaging.

Ми зрозуміли, що можемо негайно поліпшити пропускну спроможність, якщо відправляти push-запити до Firebase за XMPP, а не по HTTP.

Firebase XMPP трохи складніше, ніж HTTP. Firebase вимагає, щоб у кожного XMPP-з'єднання в кожен момент часу було не більше 100 запитів у черзі. Якщо від вас полетіла 100 запитів, то слід почекати, поки Firebase підтвердить отримання запиту, перш ніж відправити наступний.

Оскільки в чергу допускаються тільки 100 запитів в кожен момент часу, нам довелося спроектувати нову систему, щоб XMPP-з'єднання не переповнювалися під час сплесків потоку запитів.

На перший погляд здалося, що GenStage буде ідеальним рішенням проблеми.

GenStage
Що таке GenStage?

GenStage — це новий режим Elixir для обміну подіями під зворотним тиском між процесами Elixir. [0]
Що це означає насправді? По суті, цей режим надає вам необхідні інструменти, щоб ні одна частина системи не перевантажувалася.

На практиці, система з режимами GenStage зазвичай має кілька етапів.

Етапи (stages) — це кроки обчислень, які відправляють і/або отримують дані від інших етапів.

Коли етап відправляє дані, він виступає в якості виробника. Коли отримує дані, то в якості споживача. Етапи можуть грати ролі одночасно і виробника, і споживача.

Крім призначення ролей виробника і споживача, етап можна призначити «джерелом» (source), якщо він тільки виробляє елементи, або призначити «стоком» (sink), якщо він їх тільки споживає. [1]
Підхід


Ми розділили систему на два етапи GenStage. Одне джерело і один стік.

  • Етап 1 — Push Collector. Це виробник, який одержує push-запити. Зараз у нас один процес Erlang для Push Collector на одну машину.

  • Етап 2 — Pusher. Це споживач, який вимагає push-запити від Push Collector і відправлю їх до Firebase. Він запитує тільки по 100 запитів за раз, щоб не перевищити ліміт Firebase на кількість одночасних запитів. Процесів типу Pusher (теж на Erlang) багато на кожній машині.
Зворотне тиск і скидання навантаження з допомогою GenStage
У GenStage є дві ключові функції, які допомагають нам під час сплеску запитів: зворотне тиск (back pressure) і скидання навантаження (load-shedding).

Зворотне тиск
Pusher використовує функціональність GenStage, щоб запитати у Push Collector'а максимальна кількість запитів, які Pusher може обробити. Це гарантує верхню межу за кількістю push-запитів, які перебувають в очікуванні. Коли Firebase підтверджує запит, тоді Pusher вимагає ще від Push Collector'а.

Pusher знає точну кількість запитів, яке може витримати з'єднання Firebase XMPP, і ніколи не вимагає зайвого. А Push Collector ніколи не надсилає запит у бік Pusher, якщо той не попросив.

Скидання навантаження
Оскільки Pusher'и надають зворотне тиск на Push Collector, то з'являється потенційний пляшкове горлечко у Push Collector. Супер-дупер потужні сплески можуть його перевантажити.

У GenStage є інша вбудована функція для таких ситуацій: буферизовані події.

У Push Collector ми визначаємо, скільки push-запитів поміщати в буфер. В нормальному стані буфер порожній, але один раз в місяць разі настання катастрофічних подій він доводиться дуже до речі.

Якщо через систему проходить ну вже дуже багато подій і буфер заповнюється, тоді Push Collector скидає вхідні push-запити. Це відбувається само собою, просто за рахунок вказівки опції
buffer_size
у функції
init
Push Collector'а.

З цими двома функціями ми здатні справлятися зі сплесками push-повідомлень.

Код (нарешті, найважливіша частина)
Нижче приклад коду, як ми налаштували етапи Pusher і Push Collector. Для простоти ми прибрали багато фрагментів, що відповідають за обробку відмов, коли втрачається з'єднання, Firebase повертає помилки і т. д.

Ви можете пропустити код, якщо хочете подивитися на результат.

Push Collector (виробник)
push_collector.ex

defmodule GCM.PushCollector do
use GenStage

# Client

def push(pid, push_requests) do
GenServer.cast(pid, {:push, push_requests})
end

# Server

def init(_args) do
# Run as producer and specify the amount max 
# of push requests to buffer.
{:producer, :ok, buffer_size: @max_buffer_size}
end

def handle_cast({:push, push_requests}, state) do
# Dispatch the push_requests as events.
# These will be buffered if there are no consumers ready.
{:noreply, push_requests, state}
end

def handle_demand(_demand, state) do
# Do nothing. Events will be dispatched as-is.
{:noreply, [], state}
end
end


Pusher (споживач)
pusher.ex

defmodule GCM.Pusher do
use GenStage
# The maximum number of requests Firebase allows at once per XMPP connection
@max_demand 100 

defstruct [
:producer,
:producer_from,
:fcm_conn_pid,
:pending_requests,
]

def start_link(producer, fcm_conn_pid, opts \\ []) do
GenStage.start_link(__MODULE__, {producer, fcm_conn_pid}, opts)
end

def init({producer, fcm_conn_pid}) do
state = %__MODULE__{
next_id: 1,
pending_requests: Map.new,
producer: producer,
fcm_conn_pid: fcm_conn_pid,
}
send(self, :init)
# Run as consumer
{:consumer, state}
end

def handle_info(:init, %{producer: producer}=state) do
# Subscribe to Push the Collector
GenStage.async_subscribe(self, to: producer, cancel: :temporary)
{:noreply, [], state}
end

def handle_subscribe(:producer, _opts, from, state) do
# Start demanding requests now that we are subscribed
GenStage.ask(from, @max_demand)
{:manual, %{state | producer_from: from}}
end

def handle_events(push_requests, _from, state) do
# We got some push requests from the Push Collector.
# Let's send them.
state = Enum.reduce(push_requests, state, &do_send/2)
{:noreply, [], state}
end

# Send the message to FCM, track as a request pending
defp do_send(%{fcm_conn_pid: fcm_conn_pid, pending_requests: pending_requests}=state, push_request) do
{message_id, state} = generate_id(state)
xml = PushRequest.to_xml(push_request, message_id)
:ok = FCM.Connection.send(fcm_conn_pid, xml)
pending_requests = Map.put(pending_requests, message_id, push_request)
%{state | pending_requests:pending_requests}
end

# FCM response handling
defp handle_response(%{message_id: message_id}=response, %{pending_requests: pending_requests, producer_from: producer_from}=state) do
{push_request, pending_requests} = Map.pop(pending_requests, message_id)

# Since we finished a request, ask the Push Collector for more.
GenStage.ask(producer_from, 1)

%{state | pending_requests: pending_requests}
end

defp generate_id(%{next_id: next_id}=state) do
{to_string(next_id), %{state | next_id: next_id + 1}}
end
end

Приклад інциденту
Нижче показаний реальний інцидент, з яким зіткнулася система. На верхньому графіку показано кількість push-запитів в секунду, що проходять через систему. На нижньому графіку — кількість push-запитів, поміщених в буфер Push Collector.





Хроніка подій:
  • ~17:47:00 — Система працює в нормальному режимі.
  • ~17:47:30 — До нас починає надходити потік повідомлень. Push Collector трохи задіяв буфер, очікуючи реакції Pusher. Незабаром буфер трохи звільнився.
  • ~17:48:50 — Pusher'и не можуть відправляти повідомлення в Firebase швидше, ніж вони вступають, так що буфер Push Collector'а починає заповнюватися.
  • ~17:50:00 — Буфер Pusher Collector досягає піку і починає скидати деякі запити.
  • ~17:50:50 — Буфер Pusher Collector починає звільнятися і перестає скидати запити.
  • ~17:51:30 — Наплив запитів пішов на спад.
  • ~17:52:30 — Система повністю повернулася в норму.


Успіх Elixir
Ми Olympus дуже задоволені використанням Elixir і Erlang як ключової технології на наших сервісах бекенду. Приємно бачити розширення начебто GenStage, які спираються на непорушні технології Erlang/OTP.

Ми шукаємо сміливих духом, щоб допомогти у вирішенні таких проблем, оскільки Olympus продовжує зростати. Якщо ви любите гри і такого роду завдання змушують ваше серце битися частіше, перегляньте наші вакансії.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.