Ліпимо микросервис

Підкинули завдання зробити микросервис, який отримує дані від RabbitMQ, обробляє і відправляє дані далі по етапу в RabbitMQ. Після відправки завдання, я подивився на те що повчитися. Виявилося, що цей набір компонентів можна використовувати для швидкого прототипування pipeline архітектури
Використовувані компоненти:
Для прикладу буду робити микросервис для видачі рейтингу гравців. Від ядра системи в микросервис приходять наступні повідомлення:
  • player_registered(id,name);
  • player_renamed(id,name);
  • player_won(id points).
Сервіс разів на хвилину повинен відсилати повідомлення з вмістом рейтингу.Рейтинг сортується за набраними очками за календарний тиждень.
REACT-CPP
REACT-CPP — це обгортка над libev на C++11. Ця бібліотека потрібна для організації циклу обробка подій(event loop).
Оскільки крім роботи з сокетом потрібні таймери і обробники unix сигналів.
class Application
{
public:

Application();
~Application();

using IntervalWatcherPtr = std::shared_ptr<React::IntervalWatcher>;

void run();
void shutdown();
//...

private:

bool onMinute();
//... 

private:

React::MainLoop m_loop;
IntervalWatcherPtr m_minuteTimer;
//...
};

void Application::run()
{
m_minuteTimer = m_loop.onInterval(5.0, 60.0, std::bind(&Application::onMinute, this));

m_loop.onSignal(SIGTERM, [this]() -> bool
{
shutdown();
return false;
});

m_loop.onSignal(SIGUSR1, [this]()->bool{
cleanRating();
return true;
});

//...
m_loop.run();
}

bool Application::onMinute()
{
calculateRating();
sendRating();
return true;
}

Тут створюю таймер який стартує через 5 секунд і який буде викликати обробник кожні 60 секунд.
Будь-який пристойний демон/сервіс повинен мати обробник SIGTERM, що б з поза попросити його коректно завершиться.
Що стосується обробника SIGUSR1 тут можна самостійно обчислювати початок/кінець тижня через Boost.Date_Time, але мені тупо лінь, коли в GNU/Linux є cron+pkill.
AMQP-CPP
З тих пір, як опублікував RabbitMQ tutorials на C++ AMQP-CPP обзавелася реалізацією обробника на libev і libuv.
Підключення та обробка повідомлення:
void Application::createChannel(AMQP::TcpConnection &connection)
{
m_channel = std::make_unique<AMQP::TcpChannel>(&connection);

m_channel->declareQueue(m_cfg.source().name, AMQP::durable)
.onSuccess([&](const std::string &name, uint32_t messagecount, uint32_t consumercount)
{
LOG(INFO) << "Declared queue "
<< name
<< ", message count: "
<< messagecount;

m_channel->consume(m_cfg.source().name)
.onReceived([&](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
onMessage(message, deliveryTag, redelivered);
})
.onError([](const char *message)
{
LOG(ERROR) << "Error consume:" << message;
APP->shutdown();
});
})
.onError([&](const char *message)
{
LOG(ERROR) << "Error declare queue:" << message;
shutdown();
});
}

void Application::onMessage(const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
parseMessage(message);
m_channel->ack(deliveryTag);
}

Публікація повідомлення:
AMQP::Envelope env(s.GetString());

m_channel->publish("", m_cfg.destination().name, env);

LevelDB
Може знадобитися локальне сховище даних. Взяв LelevDB, я про нього писав в Використання LevelDB. Зробив лише невелику RAII обгортку:
Код обгортки
class DataBase
{
public:

DataBase();

bool open(const std::string &path2base, bool compression = true);

bool put(const std::string &key, const ByteArray &value, bool sync = false);
ByteArray get(const std::string &key);

Snapshot snapshot();

Iterator iterator();

private:

std::shared_ptr<leveldb::DB> m_backend;
};

class Snapshot
{
public:

Snapshot();

~Snapshot();

ByteArray get(const std::string &key);

Iterator iterator();

private:

Snapshot(const std::weak_ptr<leveldb::DB> &backend, const leveldb::Snapshot *snapshot);

private:

friend class DataBase;

std::weak_ptr<leveldb::DB> m_backend;
const leveldb::Snapshot *m_shapshot;
};

class Iterator
{
public:

Iterator(std::unique_ptr<leveldb::Iterator> rawIterator);
Iterator(Iterator &&iter);

/*!
* Create empty iterator
*/
Iterator() = default;

~Iterator();

bool isValid() const noexcept;

void next();

void prev();

std::string key();
ByteArray value();

/*!
* Seek to first
*/
void toFirst();

/*!
* Seek to last
*/
void toLast();

Iterator(const Iterator &) = delete;
Iterator &operator=(const Iterator &) = delete;

private:

std::unique_ptr<leveldb::Iterator> m_iterator;
};

LevelDB використовується для збереження/відновлення стану.
void Application::loadFromLocalStorage()
{
auto snapshot = m_localStorage->snapshot();
auto iter = snapshot.iterator();
iter.toFirst();
while (iter.isValid()) {
auto player = new Player(iter.value());
m_id2player[player->id] = player;
m_players.push_back(player);
iter.next();
}
}

void Application::updatePlayerInBD(const Player *player)
{
if (!m_localStorage->put(std::to_string(player->id), player->serialize())) {
LOG(ERROR) << "[" << player->id << ", "
<< player->name
<< "] is not updated in the database";
}
}

Логіка сервісу
Дані надходять у форматі JSON.
Розбирає json використовуючи RapidJSON, шукаю підходящий метод, викликаю потрібний обробник:
void Application::parseMessage(const AMQP::Message &message)
{
/*
* Схемка має вигляд
* {
* "method":"player_registered",
* "params":{
* ...
* }
* }
*/
rapidjson::Document doc;
doc.Parse(message.body(), message.bodySize());

const std::string method = doc["method"].GetString();
auto iter = m_handlers.find(method);
if (iter != m_handlers.end()) {
iter->second(*this, doc["params"]);
}
else {
LOG(WARNING) << "Unknown method:" << method;
}
}

Самі прості методи:
void Application::onPlayerRegistered(const JValue &params)
{
auto obj = params.GetObject();
const uint64_t playerId = obj["id"].GetUint64();
if (!isRegistred(playerId)) {
auto player = new Player;
player->id = playerId;
player->name = obj["name"].GetString();
m_players.push_back(player);
m_id2player[playerId] = player;
updatePlayerInBD(player);
}
}

void Application::onPlayerRenamed(const JValue &params)
{
auto obj = params.GetObject();
const uint64_t playerId = obj["id"].GetUint64();
if (isRegistred(playerId)) {
auto player = m_id2player[playerId];
player->name = obj["name"].GetString();
updatePlayerInBD(player);
}
else {
LOG(WARNING) << "Renaming an unknown user[" << playerId << "]";
}
}

void Application::onPlayerWon(const JValue &params)
{
auto obj = params.GetObject();
const uint64_t playerId = obj["id"].GetUint64();
if (isRegistred(playerId)) {
auto player = m_id2player[playerId];
player->points += obj["points"].GetInt64();
updatePlayerInBD(player);
}
else {
LOG(WARNING) << "Unknown player[" << playerId << "]";
}
}

Раз в хвилину сортуємо гравців і відправляємо рейтинг:
bool Application::onMinute()
{
calculateRating();
sendRating();
return true;
}

void Application::calculateRating()
{
std::sort(m_players.begin(), m_players.end(), [](const Player *a, const Player *b)
{
return a->points > b>points;
});
}

void Application::sendRating()
{
using namespace rapidjson;

StringBuffer s;
Writer<StringBuffer> writer(s);
writer.StartArray();

const size_t count = std::min(m_players.size(), size_t(10));
for (size_t i = 0;
i < count;
++i) {
writer.StartObject();

writer.Key("id");
writer.Uint64(m_players[i]->id);

writer.Key("name");
writer.String(m_players[i]->name.c_str());

writer.Key("points");
writer.Int64(m_players[i]->points);

writer.EndObject();
}

writer.EndArray();
AMQP::Envelope env(s.GetString());

m_channel->publish("", m_cfg.destination().name, env);
}

Весь код доступний на GitHub'e. Исходники бібліотек поставляються разом з сервісом і збираються автоматично на GNU/Linux з gcc.
Підведемо підсумки, що маємо:
  • event loop з таймерами, обробниками сигналів і всіма іншими плюшками libev;
  • робота з RabbitMQ;
  • вбудоване key-value сховище;
  • підтримка json.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.