Обробка 1 мільйона запитів на хвилину c Go

Переклад статті головного архітектора компанії Symantec про те, як вони досягли обробки 1 мільйона запитів на хвилину всього на 4 серверах.

У нас в Malwarebytes ми переживаємо постійне зростання і з тих пір, як я приєднався до компанії близько року тому в Кремнієвій Долині, однією з моїх основних обов'язків було проектування і розробка архітектур декількох систем для розвитку динамічної компанії і всієї необхідної інфраструктури для підтримки продукту, який використовують мільйони людей щодня. Я працював в індустрії антивірусів більше 12 років в різних компаніях, і знаю, наскільки складними виходять в результаті ці системи, із-за колосальних обсягів даних, з якими доводиться мати справу щодня.

Що цікаво, так це те, що останні 9 років або близько того, вся розробка веб-бекендов, з якою я стикався, здійснювалася на Ruby on Rails. Не зрозумійте мене неправильно, я люблю Ruby on Rails і я вірю, що це чудова середовище, але через деякий час ви звикаєте мислити про розробку систем в стилі Ruby, і ви забуваєте, наскільки ефективною і простий ваша архітектура могла б бути, якби ви задіяли мультипоточность, паралелізм, швидке виконання та ефективне використання пам'яті. Багато років я писав на C/C++, Delphi і C#, і я почав усвідомлювати наскільки менш складними речі можуть бути, якщо ви вибрали правильний інструмент для справи.

Як Головний Архітектор, я не любитель холиваров про мови і фреймворках, які так популярні в мережі. Я вірю, що ефективність, продуктивність і поддерживаемость коду залежать в основному від того, наскільки простим ви зможете побудувати ваше рішення.

Проблема

Працюючи над однією з частин нашої системи збору анонімної телеметрії і аналітики, перед нами стояло завдання обробляти величезну кількість POST-запитів від мільйонів клієнтів. Веб-обробник повинен був отримувати JSON-документ, який може містити колекцію даних (payload), які, в свою чергу потрібно зберегти на Amazon S3, щоб наші map-reduce системи пізніше обробили ці дані.

Традиційно ми б подивилися в бік worker-уровневной (worker-tier) архітектури, і використовували такі речі, як:
  • Sidekiq
  • Resque
  • DelayedJob
  • Elasticbeanstalk Worker Tier
  • RabbitMQ
  • і так далі
І встановили б 2 різних кластера, один для веб фронтенда, і інший для воркеров (workers), щоб можна було масштабувати фонові завдання.

Але з самого початку наша команда знала, що ми повинні написати це на Go, оскільки на етапі обговорення ми вже розуміли, що ця система повинна буде справлятися з величезним трафіком. Я використовував Go близько 2 років, і ми розробили декілька систем на ньому, але жодна з них не працювала поки з такими навантаженнями.

Ми почали зі створення кількох структур, щоб описати дані запиту, які будуть прийматися в POST-запити, і метод для завантаження їх на наш S3-бакет.

type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}

Payload type struct {
// [redacted]
}

func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name in collision
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

bucket := S3Bucket

b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}

// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"

return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}


Рішення «в лоб» з допомогою Go-рутин

Спочатку ми взяли найпростіше наївне рішення POST-обробника, просто намагаючись розпаралелити обробку з допомогою простої go-рутини:
func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- don't DO THIS
}

w.WriteHeader(http.StatusOK)
}

Для середніх навантажень, такий підхід буде працювати для більшості людей, але він швидко показав себе неефективним в більшому масштабі. Ми очікували, що запитів буде багато, але, коли ми викотили першу версію в продакшн, то зрозуміли, що помилилися на порядки. Ми недооцінили кількість трафіку.

Підхід вище поганий з кількох причин. В ньому нема способу контролювати, скільки горутин ми запускаємо. І оскільки ми отримували 1 мільйон POST-запитів в хвилину, цей код, звичайно, швидко падав та крашился.

Пробуємо знову

Ми повинні були знайти інший шлях. З самого початку ми обговорювали, що нам потрібно зменшити час обробки запиту до мінімуму і важкі завдання робити в тлі. Зрозуміло, це те, як ви повинні це робити в світі Ruby on Rails, інакше у вас заблокуються всі доступні веб-обробники, і неважливо, чи використовуєте ви puma, unicorn або passenger (Тільки давайте не обговорювати тут JRuby, будь ласка). Значить ми повинні були б використовувати загальноприйняті рішення для таких завдань, такі як Resque, Sidekiq, SQS, і т. д… Цей список великий, так як існує маса способів розв'язати нашу задачу.

І нашою другою спробою було створення буферизированного каналу, в якому ми могли помістити чергу завдань, і завантажувати їх на S3, а так як ми можемо контролювати максимальну кількість об'єктів в нашій черзі, і у нас є купа RAM, щоб тримати все в пам'яті, ми вирішили, що буде достатньо просто буферізіровать завдання в каналі черги.
var Queue chan Payload

func init() {
Queue = make(chan Payload, MAX_QUEUE)
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}

А потім, власне, щоб вичитувати завдання з черги і обробляти їх, ми використовували щось подібне цього коду:

func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}

Чесно кажучи, я поняття не маю, про що ми тоді думали. Це, мабуть, було пізно вночі, з купою випитих Red-Bull-ів. Цей підхід не дав нам ніякого виграшу, ми просто обміняли погану конкурентність на буферізірованний канал і це просто відкладало проблему. Наш синхронний обробник черзі завантажував лише одну пачку даних на S3 за одиницю часу, і оскільки частота вхідних запитів була набагато більше можливості обробника завантажувати їх на S3, наш буферізірованний канал дуже швидко досягав свого ліміту і блокував можливість додавати в чергу нові задачі.

Ми мовчки проігнорували проблему і запустили зворотний відлік краху нашої системи. Час відгуку (latency) збільшувалася по наростаючій вже через кілька хвилин після того, як ми задеплоили цю глючную версію.



Найкраще рішення

Ми вирішили використати популярний патерн роботи з каналами в Go, щоб створити дворівневу систему каналів, одна — для роботи з чергою каналів, іншу для контролю за кількістю обробників завдань, працюють з чергою одночасно.

Ідея була в тому, щоб розпаралелити завантаження на S3, контролюючи цей процес, щоб не перенавантажувати машину і не впиратися в помилки з'єднання з S3. Тому ми вибрали Job/Worker патерн. Для тих, хто знайомий з Java, C#, etc, вважайте це Go-способом реалізації Worker Thread-Pool, використовуючи канали.
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
Job type struct {
Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel

select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}

case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}

Ми змінили наш хендлер запитів так, щоб він створював об'єкт типу Job з даними, і відправляв його в канал JobQueue, щоб далі його підхоплювали обробники завдань.
func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {

// let's create a job with the payload
work := Job{Payload: payload}

// Push the work onto the queue.
JobQueue <- work
}

w.WriteHeader(http.StatusOK)
}

Під час ініціалізації сервера ми створюємо Dispatcher і викликаємо Run() щоб створити пул воркеров (pool of workers) і почати слухати вхідні завдання в JobQueue.
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()

Нижче наведено наш код реалізації диспатчера:
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}

go d.dispatch()
}

func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool

// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}

Зауважте, що ми вказуємо кількість фінансових посередників, які будуть запущені і додані в пул. Оскільки ми використовували Amazon Elasticbeanstalk для цього проекту і докеризированное Go-оточення, і завжди намагалися дотримуватися двенадцатифакторной методології, щоб конфігурувати наші системи в продакшені, то ми читаємо ці значення змінних оточення. Таким чином ми можемо контролювати кількість обробників і максимальний розмір черги, щоб швидко подтюнить ці параметри без редеплоя всього кластера.
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)

Миттєвий результат

Відразу ж після того, як ми задеплоили останнє рішення, ми побачили, що час відгуку впало до незначних цифр і наша можливість обробляти запити зросла радикально.



Через кілька хвилин після розігріву Elastic Load Balancer-ів, ми побачили, що наше ElasticBeanstalk додаток обробляє близько 1 мільйона запитів на хвилину. У нас є зазвичай кілька годин вранці, коли піки трафіку сягають понад 1 мільйон запитів на хвилину.

Як тільки ми задеплоили новий код, кількість необхідних серверів значно впало, зі 100 до приблизно 20 серверів.



Після того, як ми налаштували наш кластер і налаштування авто-масштабування, ми змогли зменшити їх кількість ще більше — до 4-х EC-c4.large инстансов і Elastic Auto-Scaling запускав новий інстанси, якщо використання CPU перевищувала 90% протягом 5 хвилин.



Висновки

Я глибоко впевнений, що простота завжди перемагає. Ми могли створити складну систему з купою черг, фоновими процесами, складним деплоем, але замість цього ми вирішили скористатися силою авто-масштабування ElasticBeanstalk і ефективністю і простотою підходу до конкурентності, яку Golang дає з коробки.

Не кожен день ви бачите кластер з усього 4х машин, які навіть слабкіше, ніж мій нинішній Macbook Pro, обробні POST-запити, які пишуть на Amazon S3 бакет 1 мільйон разів кожну хвилину.

Завжди є правильний інструмент для завдання. І для тих випадків, коли ваша Ruby on Rails система потребує більш потужному веб-обробника, вийдіть трохи з екосистеми ruby для більш простих, при цьому більш потужних рішень.

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.