Масштабування ClickHouse, управління міграціями та надсилання запитів з PHP в кластер

У попередньої статті ми поділилися своїм досвідом впровадження та використання СУБД ClickHouse у компанії СМИ2. У поточній статті ми торкнемося питання масштабування, які виникають із збільшенням обсягу аналізованих даних і зростанням навантаження, коли дані вже не можуть зберігатися і оброблятися в рамках одного фізичного сервера. Також ми розповімо про розробленому нами інструменті для міграції DDL-запитів в ClickHouse-кластер.
Два шарда по дві репліки

ClickHouse спеціально проектувався для роботи в кластерах, розташованих в різних дата-центрах. Масштабується СУБД лінійно до сотень вузлів. Так, наприклад, Яндекс.Метрика на момент написання статті — це кластер з більш ніж 400 вузлів.
ClickHouse надає шардирование і реплікацію "з коробки", вони можуть гнучко настроюватися окремо для кожної таблиці. Для забезпечення реплицирования потрібно Apache ZooKeeper (рекомендується використовувати версію 3.4.5+). Для більш високої надійності ми використовуємо ZK-кластер (ансамбль) з 5 вузлів. Слід вибирати непарне число ZK-сайтів (наприклад, 3 або 5), щоб забезпечити кворум. Також відзначимо, що ZK не використовується в операціях SELECT, а застосовується, наприклад, в ALTER-запитів для змін стовпців, зберігаючи інструкції для кожної з реплік.
Шардирование
Шардирование в ClickHouse дозволяє записувати і зберігати порції даних в кластері распределенно і обробляти (читати) дані паралельно на всіх вузлах кластера, збільшуючи throughput і зменшуючи latency. Наприклад, у запитах з GROUP BY ClickHouse виконає агрегування на віддалених вузлах і передасть сайту-ініціатору запиту проміжні стани агрегатних функцій, де вони будуть доагрегированы.
Для шардирования використовується спеціальний движок Distributed, який не зберігає дані, а делегує SELECT-запити на шардированные таблиці (таблиці, що містять порції даних) з подальшою обробкою отриманих даних. Запис даних у шарды може виконуватися в двох режимах: 1) через Distributed-таблицю і необов'язковий ключ шардирования або 2) безпосередньо в шардированные таблиці, з яких далі дані будуть читатися через Distributed-таблицю. Розглянемо ці режими більш докладно.
У першому режимі дані записуються в Distributed-таблицю по ключу шардирования. У найпростішому випадку ключем шардирования може бути випадкове число, тобто результат виклику функції rand(). Однак в якості ключа шардирования рекомендується брати значення хеш-функції від поля в таблиці, яка дозволить, з одного боку, локалізувати невеликі набори даних на одному шарде, а з іншого — забезпечить досить рівний розподіл таких наборів з різних шардам в кластері. Наприклад, ідентифікатор сесії (sess_id) користувача дозволить локалізувати покази сторінок одному користувачеві на одному шарде, при цьому сесії різних користувачів будуть розподілені рівномірно по всьому шардам в кластері (за умови, що значення поля sess_id будуть мати гарний розподіл). Ключ шардирования може бути також числовим або складеним. В цьому випадку можна використовувати вбудовану хеширующую функцію cityHash64. У цьому режимі дані, записувані на один з вузлів кластера, по ключу шардирования будуть перенаправлятися на потрібні шарды автоматично, збільшуючи, однак, при цьому трафік.
Більш складний спосіб полягає в тому, щоб поза ClickHouse обчислювати потрібний шард і виконувати запис безпосередньо в шардированную таблицю. Складність тут обумовлена тим, що потрібно знати набір доступних вузлів-шардов. Однак у цьому разі запис стає більш ефективним, і механізм шардирования (визначення потрібного шарда) може бути більш гнучким.
Реплікація
ClickHouse підтримує реплікацію даних, забезпечуючи цілісність даних на репліках. Для реплікації даних використовуються спеціальні механізми MergeTree-сімейства:
  • ReplicatedMergeTree
  • ReplicatedCollapsingMergeTree
  • ReplicatedAggregatingMergeTree
  • ReplicatedSummingMergeTree
Реплікація часто застосовується разом з шардированием. Наприклад, кластер з 6 вузлів може містити 3 шарда по 2 репліки. Слід зазначити, що реплікація не залежить від механізмів шардирования і працює на рівні окремих таблиць.
Запис даних може виконуватися в будь-яку з таблиць-реплік, ClickHouse виконує автоматичну синхронізацію даних між усіма репліками.
Приклади конфігурації ClickHouse-кластера
В якості прикладів будемо розглядати різні конфігурації для чотирьох вузлів:
ch63.smi2, ch64.smi2, ch65.smi2, ch66.smi2
. Налаштування містяться в конфігураційному файлі /etc/clickhouse-server/config.xml.
Один шард і чотири репліки
Один шард і чотири репліки
<remote_servers>
<!-- One shard, four replicas -->
<repikator>
<shard>
<!-- replica 01_01 -->
<replica>
<host>ch63.smi2</host>
</replica>

<!-- replica 01_02 -->
<replica>
<host>ch64.smi2</host>
</replica>

<!-- replica 01_03 -->
<replica>
<host>ch65.smi2</host>
</replica>

<!-- replica 01_04 -->
<replica>
<host>ch66.smi2</host>
</replica>
</shard>
</repikator>
</remote_servers>

Приклад схеми створення таблиці:
Схема
Приклад SQL-запиту створення таблиці для вказаної конфігурації:
CREATE DATABASE IF NOT EXISTS dbrepikator
;

CREATE TABLE IF NOT EXISTS dbrepikator.anysumming_repl_sharded (
event_date Date DEFAULT toDate(event_time),
event_time DateTime DEFAULT now(),
body_id Int32
views Int32
) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{repikator_replica}/dbrepikator/anysumming_repl_sharded', '{replica}', event_date, (event_date, event_time, body_id), 8192)
;

CREATE TABLE IF NOT EXISTS dbrepikator.anysumming_repl AS dbrepikator.anysumming_repl_sharded
ENGINE = Distributed( repikator, dbrepikator, anysumming_repl_sharded , rand() )

Перевага даної конфігурації:
  • Найбільш надійний спосіб зберігання даних.
Недоліки:
  • Для більшості завдань буде зберігатися надмірна кількість копій даних.
  • Оскільки в даній конфігурації тільки 1 шард, SELECT-запит не може виконуватися паралельно на різних вузлах.
  • Потрібні додаткові ресурси на багаторазове реплицирование даних між всіма вузлами.
Чотири шарда по одній репліці
Чотири шарда по одній репліці
<remote_servers>
<!-- Four shards, one replica -->
<sharovara>
<!-- shard 01 -->
<shard>
<!-- replica 01_01 -->
<replica>
<host>ch63.smi2</host>
</replica>
</shard>

<!-- shard 02 -->
<shard>
<!-- replica 02_01 -->
<replica>
<host>ch64.smi2</host>
</replica>
</shard>

<!-- shard 03 -->
<shard>
<!-- replica 03_01 -->
<replica>
<host>ch65.smi2</host>
</replica>
</shard>

<!-- shard 04 -->
<shard>
<!-- replica 04_01 -->
<replica>
<host>ch66.smi2</host>
</replica>
</shard>
</sharovara>
</remote_servers>

Приклад SQL-запиту створення таблиці для вказаної конфігурації:
CREATE DATABASE IF NOT EXISTS testshara 
;
CREATE TABLE IF NOT EXISTS testshara.anysumming_sharded (
event_date Date DEFAULT toDate(event_time),
event_time DateTime DEFAULT now(),
body_id Int32
views Int32
) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{sharovara_replica}/sharovara/anysumming_sharded_sharded', '{replica}', event_date, (event_date, event_time, body_id), 8192)
;
CREATE TABLE IF NOT EXISTS testshara.anysumming AS testshara.anysumming_sharded
ENGINE = Distributed( sharovara, testshara, anysumming_sharded , rand() )

Перевага даної конфігурації:
  • Оскільки в даній конфігурації 4 шарда, SELECT-запит може виконуватися паралельно на всіх вузлах кластера.
Недолік:
  • Найменш надійний спосіб зберігання даних (втрата вузла призводить до втрати порції даних).
Два шарда по дві репліки
Два шарда по дві репліки
<remote_servers>
<!-- Two shards, two replica -->
<pulse>
<!-- shard 01 -->
<shard>
<!-- replica 01_01 -->
<replica>
<host>ch63.smi2</host>
</replica>

<!-- replica 01_02 -->
<replica>
<host>ch64.smi2</host>
</replica>
</shard>

<!-- shard 02 -->
<shard>
<!-- replica 02_01 -->
<replica>
<host>ch65.smi2</host>
</replica>

<!-- replica 02_02 -->
<replica>
<host>ch66.smi2</host>
</replica>
</shard>
</pulse>
</remote_servers>

Приклад SQL-запиту створення таблиці для вказаної конфігурації:
CREATE DATABASE IF NOT EXISTS dbpulse 
;

CREATE TABLE IF NOT EXISTS dbpulse.normal_summing_sharded (
event_date Date DEFAULT toDate(event_time),
event_time DateTime DEFAULT now(),
body_id Int32
views Int32
) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{pulse_replica}/pulse/normal_summing_sharded', '{replica}', event_date, (event_date, event_time, body_id), 8192)
;

CREATE TABLE IF NOT EXISTS dbpulse.normal_summing AS dbpulse.normal_summing_sharded
ENGINE = Distributed( pulse, dbpulse, normal_summing_sharded , rand() )

Дана конфігурація втілює кращі якості першого і другого прикладів:
  • Оскільки в даній конфігурації 2 шарда, SELECT-запит може виконуватися паралельно на кожному з шардов в кластері.
  • Щодо надійний спосіб зберігання даних (втрата одного вузла кластера не призводить до втрати порції даних).
Приклад конфігурації кластерів в ansible
Конфігурація кластерів ansible може виглядати наступним чином:
name: "pulse"
shards:
- { name: "01", replicas: ["ch63.smi2", "ch64.smi2"]}
- { name: "02", replicas: ["ch65.smi2", "ch66.smi2"]}
- name: "sharovara"
shards:
- { name: "01", replicas: ["ch63.smi2"]}
- { name: "02", replicas: ["ch64.smi2"]}
- { name: "03", replicas: ["ch65.smi2"]}
- { name: "04", replicas: ["ch66.smi2"]}
- name: "repikator"
shards:
- { name: "01", replicas: ["ch63.smi2", "ch64.smi2","ch65.smi2", "ch66.smi2"]}

PHP-драйвер для роботи з ClickHouse-кластером
У попередньої статті ми вже розповідали про нашому open-source PHP-драйвері для ClickHouse.
Коли кількість вузлів стає великим, управління кластером стає незручним. Тому ми розробили простий і функціональний інструмент для міграції DDL-запитів в ClickHouse-кластер. Далі ми коротко опишемо на прикладах його можливості.
Для підключення до кластера використовується клас
ClickHouseDB\Cluster
:
$cl = new ClickHouseDB\Cluster(
['host'=>'allclickhouse.smi2','port'=>'8123','username'=>'x','password'=>'x']
);

В DNS
allclickhouse.smi2
перераховані IP-адреси всіх вузлів:
ch63.smi2, ch64.smi2, ch65.smi2, ch66.smi2
, що дозволяє використовувати механізм Round-robin DNS.
Драйвер виконує підключення до кластеру і відправляє ping-запити на кожен вузол, перерахований в DNS.
Установка максимального часу підключення до всіх вузлів кластера налаштовується наступним чином:

$cl->setScanTimeOut(2.5); // 2500 ms

Перевірка стану реплік кластера виконується так:
if (!$cl->isReplicasIsOk())
{
throw new Exception('Replica state is bad , error='.$cl->getError());
}

Стан ClickHouse-кластера перевіряється наступним чином:
  • Перевіряються з'єднання з усіма вузлами кластера, перерахованими в DNS.
  • На кожен вузол відправляється SQL-запит, який дозволяє визначити стан всіх реплік ClickHouse-кластера.
Швидкість виконання запиту може бути збільшена, якщо не вичитувати значення стовпців
log_max_index, log_pointer, total_replicas, active_replicas
, при отриманні даних з яких виконуються запити на ZK-кластер.
Для полегшеної перевірки в драйвері необхідно встановити спеціальний прапор:

$cl->setSoftCheck(true);

Отримання списку всіх доступних кластерів робиться наступним чином:
print_r($cl->getClusterList());
// result
// [0] => pulse
// [1] => repikator
// [2] => sharovara

Наприклад, отримати конфігурацію кластерів, які були описані вище, можна так:

foreach (['pulse','repikator','sharovara'] as $name)
{
print_r($cl->getClusterNodes($name));
echo "> $name , count shard = ".$cl->getClusterCountShard($name)." ; count replica = ".$cl->getClusterCountReplica($name)."\n";
}

//Результат:
//> pulse , count shard = 2 ; count replica = 2
//> repikator , count shard = 1 ; count replica = 4
//> sharovara , count shard = 4 ; count replica = 1

Отримання списку вузлів за назвою кластера або з шардированных таблиць:

$nodes=$cl->getNodesByTable('sharovara.body_views_sharded');

$nodes=$cl->getClusterNodes('sharovara');

Отримання розміру таблиці або розмірів всіх таблиць через відправку запиту на кожен вузол кластера:
foreach ($nodes as $node)
{
echo "$node > \n";
print_r($cl->client($node)->tableSize('test_sharded'));
print_r($cl->client($node)->tablesSize());
}

// Спрощений варіант використання
$cl->getSizeTable('dbName.tableName');

Отримання списку таблиць кластера:
$cl->getTables()

Визначення лідера в кластері:

$cl->getMasterNodeForTable('dbName.tableName') // Лідер має встановлений прапор is_leader=1

Запити, пов'язані, наприклад, з вилученням чи зміною структури, відправляються на сайт з встановленим прапором
is_leader
.
Очищення даних у таблиці в кластері:
$cl->truncateTable('dbName.tableName')`

Інструмент міграції DDL-запитів
Для міграції DDL-запитів до реляційних СУБД в нашій компанії використовується MyBatis Migrations.
Про інструменти міграції на Хабре вже писали:
Для роботи з ClickHouse-кластером нам потрібен аналогічний інструмент.
На момент написання статті ClickHouse має ряд особливостей (обмежень) пов'язаних з DDL-запитами. Цитата:
Реплікуються INSERT, ALTER (див. подробиці в описі запиту ALTER). Реплікуються стислі дані, а не тексти запитів. Запити CREATE, DROP, ATTACH, DETACH, RENAME не реплікуються — тобто, відносяться до одного сервера. Запит CREATE TABLE створює нову реплицируемую таблицю на тому сервері, де виконується запит; а якщо на інших серверах така вона вже є — додає нову репліку. Запит DROP TABLE видаляє репліку, розташовану на тому сервері, де виконується запит. Запит RENAME перейменування таблицю на одній з реплік — тобто, репліковані таблиці на різних репліках можуть називатися по різному.
Команда розробників ClickHouse вже анонсувала роботу в цьому напрямку, але в даний час доводиться вирішувати цю задачу зовнішнім інструментарієм. Ми створили простий прототип інструменту phpMigrationsClickhouse для міграції DDL-запитів в ClickHouse-кластер. І в наших планах — абстрагувати phpMigrationsClickhouse від мови PHP.
Опишемо алгоритм, що використовується зараз в phpMigrationsClickhouse, який може бути реалізований на будь-якому іншому мовою програмування.
На поточний момент інструкція з міграції в phpMigrationsClickhouse складається з:
  • SQL-запитів, які потрібно вжити і відкотити у разі помилки;
  • імені кластера, в якому потрібно виконати SQL-запити.
Створимо PHP-файл, що містить наступний код:
$cluster_name = 'pulse'; 
$mclq = new \ClickHouseDB\Cluster\Migration($cluster_name);
$mclq->setTimeout(100);

Додамо SQL-запити, які потрібно охопити:
$mclq->addSqlUpdate(" CREATE DATABASE IF NOT EXISTS dbpulse "); 
$mclq->addSqlUpdate(" 

CREATE TABLE IF NOT EXISTS dbpulse.normal_summing_sharded (
event_date Date DEFAULT toDate(event_time),
event_time DateTime DEFAULT now(),
body_id Int32
views Int32
) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{pulse_replica}/pulse/normal_summing_sharded', '{replica}', event_date, (event_date, event_time, body_id), 8192)
"); 

Додамо SQL-запити для виконання відкоту в разі помилки:
$mclq->addSqlDowngrade(' DROP TABLE IF EXISTS dbpulse.normal_summing_sharded '); 

$mclq->addSqlDowngrade(' DROP DATABASE IF EXISTS dbpulse '); 

Існує 2 стратегії накочування міграцій:
  • відправка кожного окремого SQL-запиту на один сервер з переходом до наступного SQL-запитом;
  • відправка всіх SQL-запитів на один сервер з переходом до наступного сервера.
При виникненні помилки можливі наступні варіанти:
  • виконання downgrade-запиту на всі сайти, на яких вже були зроблені upgrade-запити;
  • очікування перед відправкою upgrade-запитів на інші сервера;
  • виконання downgrade-запиту на всіх серверах в разі виникнення помилки.
Окреме місце займають помилки, коли не відоме стан кластера:
  • помилка timeout з'єднання;
  • помилка зв'язку з сервером.
Принцип роботи PHP-коду при виконанні міграції наступний:

// Отримання списку IP-адрес вузлів кластера
$node_hosts=$this->getClusterNodes($migration->getClusterName());
// Отримання downgrade-запиту
$sql_down=$migration->getSqlDowngrade();
// Отримання upgrade-запиту
$sql_up=$migration->getSqlUpdate();

// Виконання upgrade-запиту на кожен вузол і, у разі помилки, виконання downgrade-запиту

$need_undo=false;
$undo_ip=[];

foreach ($sql_up as $s_u) {
foreach ($node_hosts as $node) {
// Виконання upgrade-запиту
$state=$this->client($node)->write($s_u);

if ($state->isError()) {
$need_undo = true;
} else {
// OK
}

if ($need_undo) {
// Фіксація вузлів кластера, де сталася помилка 
$undo_ip[$node]=1;
break;
}
}
}

// Перевірка успішності виконання upgrade-запитів на всіх вузлах кластера
if (!$need_undo)
{
return true; // OK
}

У випадку помилки надсилання на всі вузли кластера downgrade-запиту:
foreach ($node_hosts as $node) {
foreach ($sql_down as $s_u) {
try{
$st=$this->client($node)->write($s_u);
} catch (Exception $E) {
// Оповіщення користувача про помилку при виконанні downgrade-запиту

}
}
}

Ми продовжимо цикл матеріалів, присвячених нашому досвіду роботи з ClickHouse.
На завершення статті ми хотіли б провести невеличке опитування.

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.