Горизонтальне масштабування websocket-ів на Ruby

Не так давно вийшла стаття, в якій автор описував свій framework для написання додатків з використанням Ruby, Sinatra і websoсket. Але в тому рішенні не було порушено питання горизонтального масштабування. Так при підключенні до одного з вузлів, користувачі можуть отримувати повідомлення/дані тільки про події/змінах, викликаних користувачами цього сайту, а при змінах, внесених через інший, вони не дізнаються. Для рішення даної задачі необхідно організувати загальну шину даних. Розглядати дану задачу буду в контексті обміну повідомленнями клієнт-клієнт.

Шина даних

Вимоги, які будемо пред'являти до шині наступні:
  • простота робота;
  • передача в «реальному часі»;
  • продуктивність.
Організувати шину можна через сховище з періодичним опитуванням, або через сервер черг.
Перший варіант не задовольняє і другій умові, оскільки затримка в передачі буде дорівнює періоду опитування сховища. Зменшення періоду призведе до зростання навантаження на нього. Тому цей варіант відкидаємо відразу.

Другий варіант підходить найкраще. У цьому випадку можна скористатися спеціалізованими рішеннями на подобі RabbitMQ, ActiveMQ. Обидва цих продукту представляють із себе серйозні рішення, з безліччю функцій, хорошим масштабуванням. Можна використовувати і їх, але потрібно оцінити, чи не буде це гарматою по горобцях. Крім подібних рішень функціонал черг надає і Redis, в добавок отримуємо key-value сховище, яке нам теж знадобиться.

Redis надає найпростіший механізм Pub-Sub, якого достатньо для нашої задачі. Він досить швидкий, простий в роботі і має малі затримки при передачі.

Рішення

Наша система буде мати наступну схему.



Повідомлення між користувачами одного вузла передаються безпосередньо, а сполучення між вузлами через шину.
Для цього:
  1. вузол генерує унікальне ім'я;
  2. підписується за нього на повідомлення в Redis;
  3. всі клієнти, що підключені до цього сайту записують пару ключ-значення в вигляді ідентифікатора клієнта і ідентифікатора вузла, до якого він підключений;
  4. при відправці повідомлення іншому клієнту, дізнаємося ім'я вузла і передаємо повідомлення у його чергу для обробки.

А тепер реалізуємо

В якості бібліотеки для websocket обраний faye-websocket-ruby. Для роботи з Redis стандартний гем redis (hiredis) + код приклад для PubSub через EventMachine, так як реалізація гема працює в блокувальному режимі, а при роботі в одному потоці з web-сервером це не допустимо.

module App
class << self
def configuration
yield(config) if block_given?
config.sessions = Metriks.counter('total_sessions')
config.active = Metriks.counter('active_sessions')
end 
def config 
@config ||= OpenStruct.new( redis: nil, root: nil )
end 
def id
@instance_id ||= SecureRandom.hex
end
def logger
@logger ||= Logger.new $stderr
end

def register
config.redis.multi do
config.redis.set "node_#{App.id}", true
config.redis.expire "node_#{App.id}", 60*10
end if config.redis

EM.next_tick do 
config.sub = PubSub.connect
config.sub.subscribe App.id do |type, channel, message|
case type
when 'message'
begin
json = Oj.load(message, mode: :compat)
WS::Base.remote_messsage json
rescue => ex
App.logger.error "ERROR: #{message.class} #{message} #{ex.to_s}"
end
else
App.logger.debug "(#{type}) #{channel}:: #{message}"
end
end
@pingpong = EM.add_periodic_timer(30) do
App.config.redis.expire "node_#{App.id}", 60
end
end
rescue
config.redis = nil
end
end
end

Основна робота цього модуля полягає в методі register, який реєструє себе на шині і чекає вхідні повідомлення. Для моніторингу створюється ключ виду node_%node_id% c TTL в 60 секунд і періодом оновлення 30 секунд, на випадок якщо вузол відвалиться. Таким чином можна завжди дізнатися скільки вузлів зараз знаходиться в мережі і їх імена.

module WS
class Base
NEXT_RACK= [404, {}, []].freeze
def self.call(*args)
instance.call(*args)
end
def self.instance
@instance ||= self.new
end
def self.remote_messsage(json)
user = User.get json['from']
instance.send :process, user, json if user
rescue => ex
user.error( { error: ex.to_s } )
end
def initialize
@ws_cache = {}
end
def call(env)
return NEXT_RACK unless Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, ['xmpp'], ping: 5)
user = User.register(ws)
ws.onmessage = lambda do |event|
json = Oj.load(event.data, mode: :compat)
process(user, json )
end
ws.onclose = lambda do |event|
App.logger.info [:close, event.code, event.reason]
user.unregister
user = nil
end
ws.rack_response
rescue WS::User::NotUnique => ex
ws.send Oj.dump({ action: :error, data: { error: 'not unique session' } })
ws.close
ws.rack_response
end
private

def process(user, json)
action = json['action'].to_s
data = json['data']
return App.logger.info([:message, 'Empty action']) if action.empty?
return App.logger.info([:message, "Unknown action #{json['action']}"]) unless user.respond_to? "on_#{action}"
user.send "on_#{action}", data
rescue => ex
user.error({ error: ex.to_s })
puts ex.to_s
puts ex.backtrace
end
end
end

Даний клас відповідає за встановлення з'єднання та обробку повідомлень. У методі call створюється новий клієнт і вішаються обробники. Метод класу remote_messsage використовується для прийому зовнішніх повідомлень (шини). Метод process — єдина точка для повідомлень, що прийшли безпосередньо від клієнта і для повідомлень, що прийшли по шині.
Клієнти
module WS
class User
include UserBehavior
attr_reader :id
class Error < StandardError; end
class RoomFull < Error; end
class NotFound < Error
attr_reader :id
def initialize(id); @id = id end
def to_s; "User '@#{id}' not found" end
end
class NotUnique < Error; end

class << self
def cache
@ws_cache ||= {}
end

def get(id)
fail NotFound.new(id) if id.to_s.empty?
@ws_cache.fetch(id)
rescue KeyError
WS::RemoteUser.new(id)
end

def register(ws)
self.new(ws)
end

def unregister(ws)
url = URI.parse(ws.url)
id = url.path.split('/').last
get(id).unregister
end
end

def initialize(ws)
@ws = ws
register

@pingpong = EM.add_periodic_timer(5) do
@ws.ping(") do
App.config.redis.expire @id, 15 if App.config.redis
end
end
end

def unregister
on_close if respond_to? :on_close
App.config.active.decrement
App.config.redis.del @id if App.config.redis
User.cache.delete(@id)
@pingpong.cancel
@pingpong = nil
@ws = nil
@id = nil
end

def send_client(from, action, data)
return unless @ws
data = Oj.dump({ from: from.id action: action.to_s, data: data }, mode: :compat)
@ws.send(data)
end

private
def register
url = URI.parse(@ws.url)
@id = url.path.split('/').last
if App.config.redis
App.config.redis.multi do
App.config.redis.set @id, App.id
App.config.redis.expire @id, 15
end
App.config.sessions.increment
App.config.active.increment
end
User.cache[@id] = self
App.logger.info [:open, @ws.url @ws.version, @ws.protocol]
on_register if respond_to? :on_close
self
end
end

class RemoteUser
include UserBehavior
attr_reader :id
attr_reader :node
def initialize(id)
@id = id.to_s
fail WS::User::NotFound.new(id) if @id.empty?
@node = App.config.redis.get(@id).to_s
fail WS::User::NotFound.new(id) if @node.empty?
end
def send_client(from, action, data)
return if node.to_s.empty?
App.logger.info ['REMOTE', self.id from.id action]
data = Oj.dump({ from: from.id action: action.to_s, data: data }, mode: :compat)
App.config.redis.publish node, data
end
end
end


Метод register реєструє користувача в сховище, зіставляючи його ID з ID вузла куди він підключений і кешує його в вашому списку. Метод unregister навпроти прибирає всі записи про клієнта і видаляє таймер. Таймер використовується для періодичної перевірки стан клієнта та оновлення TTL для його запису, щоб у Redis не було мертвих душ.
ID клієнта виходить з URL яким був запит на підключення. Він має формат ws://%hostname%/ws/%user_id% user_id випадково згенерований уникальаня послідовність.

Метод send_client відправляє дані вже самому клієнтові.

Окреме місце займає метод класу get. Даний метод повертає по ID екземпляр класу WS::User або якщо користувач не знайдений в локальному кеші створює екземпляр класу WS::RemoteUser. При його створенні перевіряється чи є такий ID в сховище і якомусь вузлу він належить. Якщо ID не знайде кидається виняток.

Клас WS::RemoteUser на відміну від WS::User має тільки один метод send_client, який пересилає сформовані повідомлення через шину на потрібний вузол.

Таким чином, неважливо де знаходиться клієнт виклик методу send_client доставить дані до адресата.

module UserBehavior
module ClassMethods
def register_action(action, params = {})
return App.logger.info ['register_action', "Method #{action} already defined"] if respond_to? action

block = lambda do |*args |
if block_given?
data from = yield(self, *args)
send_client from || self, action, data
else
send_client self, action, args.first
end
end

define_method action, &block
define_method "on_#{action}" do |data|
self.send action, data
end if params[:passthrough]

end
end

def self.included(base)
base.instance_exec do
extend ClassMethods
register_action :message do |user from, text|
[{ to: user.id text: text }, from]
end

register_action :error, passthrough: true
end
end

def on_message(data)
App.logger.info ['MESSAGE', id data.to_s]

to_user_id = data['to']
to_user = WS::User.get(to_user_id)
to_user.message self, data['text']

rescue WS::User::NotFound => ex
error({ error: ex.to_s })
end
end

Обробка самих подій винесена в окремий модуль UserBehavior, який розширює попередні два класи методами для реакції на повідомлення. Кожне повідомлення має поля FROM, ACTION DATA. Перше ідентифікує від кого прийшло, друге визначає метод, а третя супутні дані. Так для ACTION зі значенням «message» буде викликаний метод on_message, який буде передано значення поля DATA.

Використовуючи такий підхід вийшло реалізувати прозору передачу повідомлень між підключеними клієнтами, при цьому не важливо знаходяться вони на одному вузлі або на різних. Для тестування запускав кілька екземплярів на різних портах, повідомлення коректно відправлялися і виходили.

Для бажаючих спробувати, код робочого додатки виклав на github. Запускається просто, через rackup

PS

Дане рішення не є закінченим, думаю є куди його поліпшити і прибрати зайве, але як відправна точка цілком згодиться.

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.