Розробляємо систему real-time fulltext-пошуку по error-логів на основі ClickHouse від Яндекса

У цій статті я розповім про те, як розробити систему для індексування та повнотекстового пошуку error-логів (або будь-яких інших логів) на основі СУБД від Яндекса під назвою ClickHouse. Про саму базу Яндекс писав на Хабре спочатку коли база була закритою, а потім коли вони її заопенсорсили. База даних в першу чергу призначена для аналітики і для реалізації сервісу Яндекс.Метрика, але може використовуватися для чого завгодно, якщо вам підходить завантажувати дані пачками, видаляти їх теж величезними стосами і ніколи не оновлювати окремі рядки.

Що ми будемо робити
Ми будемо реалізовувати систему для індексування і пошуку по error-логів. При цьому, вважається, що самі логи ви вже зуміли доставити на центральний сервер (або кілька серверів) і вже засунули самі тексти повідомлень в базу, тобто у вас вже є таблиця в якій-небудь базі даних приблизно наступного вигляду:

CREATE TABLE Messages (
message_id BIGINT PRIMARY KEY AUTO_INCREMENT,
created_ts DATETIME,
message_text BLOB
)


Ми навчимося швидко віддавати результати пошуку за таким ловга (тобто, завжди відсортовані по часу) і індексувати його в режимі реального часу.


Чому не ElasticSearch / Sphinx / MySQL / другое_решение?
Мені здається цікавим подивитися, що з себе представляє ClickHouse, і які завдання з його допомогою можна вирішувати. Метою статті є дати людям огляд і поживу для роздумів, ніж дати готове рішення. Elastic, Sphinx та інші являють собою готові движки для пошуку, тоді як ClickHouse є базою даних загального призначення, з якої можна зліпити все, що завгодно. Також, у мене є думка, що система пошуку, представлена в статті на основі ClickHouse буде справлятися із завданням пошуку по логам краще, ніж Sphinx, і при цьому не потрібно буде використовувати 2 види індексів (реалтайм і звичайний). Ваш досвід може відрізнятися, тому рекомендую спочатку спробувати зробити прототип перед тим, як впроваджувати таку систему в продакшен.

Установка сервера
Доручіть завдання встановлення ClickHouse (github) системного адміністратора або поставте його самі з докера, якщо ви не хочете нічого вирішувати, чи вам просто лінь. Якщо будете збирати самі з вихідних кодів, вам знадобиться до 30 гб місця, майте це на увазі.

Установка клієнта
Якщо у вас в системі чомусь немає curl або php, встановіть їх. Подальші приклади будуть користуватися curl в якості API бази і PHP для написання системи індексації та пошуку.

Готуємо структури даних для індексу
Як правило, структури для повнотекстового пошуку в движках для пошуку досить прості. Структура називається Inverted Index, і ми її реалізуємо, у дещо спрощеному вигляді. Ми будемо користуватися движком «за замовчуванням», рекомендованим для даних, що мають як первинний ключ, так і дату — MergeTree:

CREATE TABLE FT (
EventDate Date,
word_id UInt32,
message_id UInt64
) ENGINE=MergeTree(EventDate, (word_id, message_id), 8192);


Щоб створити таблицю в базі, можна скористатися наступною командою:

$ cat create.sql | curl 'http:/hostname:8123/?query=' --data-binary @-

У цій команді у файлі create.sql повинен лежати запит, який потрібно виконати, а hostname — це хост з піднятим ClickHouse, 8123 — дефолтний порт.

У зазначеній вище структурі word_id — це id слова у словнику (який ми створимо пізніше, у словнику зберігається відповідність word_text => word_id), а message_id — це id відповідного запису в таблиці з логами (аналог document_id для Sphinx).

Параметри для MergeTree движка: перше поле EventDate означає ім'я колонки з датою події, друга колонка (word_id, message_id) визначає первинний ключ (по суті, звичайний індекс) і 8192 — це налаштування, що впливає на гранулярність індексу, ми її залишимо за замовчуванням.

MergeTree сортує дані по первинному ключу і розбиває їх за датою, тому пошук певного дня і конкретного слова з сортуванням по message_id повинен бути досить швидким.

Створюємо структури для словника
Для того, щоб заповнити цей індекс, нам потрібна структура типу «словник», яка потрібна для того, щоб зберігати числа ClickHouse замість рядків. Словник можна створити в базі даних, і якщо це MySQL, то структура буде виглядати наступним чином:

CREATE TABLE Words (
id int(11) unsigned NOT NULL AUTO_INCREMENT,
word varchar(150) COLLATE ascii_bin NOT NULL DEFAULT ",
PRIMARY KEY (id),
UNIQUE KEY word (word)
) ENGINE=InnoDB DEFAULT CHARSET=ascii COLLATE=ascii_bin;

Зверніть увагу на ASCII-порівняння, це дозволяє сильно збільшити продуктивність текстових індексів у випадку, коли всі слова англійською мовою. Якщо у вас не всі логи англійською мовою, рекомендую переглянути свої погляди порівняння можна залишити за замовчуванням (utf8_unicode_ci).

Процес індексування
Для того, щоб керувати процесом індексації і щоб ініціювати початкове індексування, можна завести окрему таблицю MySQL з чергою для повідомлень, які ми ще не проіндексували:

CREATE TABLE IndexQueue (
message_id bigint(20) unsigned NOT NULL DEFAULT '0',
shard_id int(11) NOT NULL,
PRIMARY KEY (shard_id,message_id)
);


Щоб наповнити цю таблицю в перший раз, можна використовувати наступний запит:

INSERT IGNORE INTO IndexQueue (message_id, shard_id) SELECT message_id, message_id % 4 Messages FROM


Тут 4 — це кількість потоків індексатора, які ми будемо використовувати. На PHP7 код з прикладу нижче дає продуктивність приблизно 3,5 мб/сек на один процес, в 4 потоки відповідно виходить 14 мб/сек. Якщо ви пишете більше error-логів, ніж 14 мб/сек, то можливо вам потрібно терміново лагодити ваш продакшен і вам не до того, що повнотекстовий пошук трохи відстає :).

Алгоритм індексування буде наступним:
  1. Подивитись записи в черзі (IndexQueue) для зазначеного шарда
  2. Вибрати пачку записів і виділити в кожному повідомленні слова і скласти в масив $index виду message_id => array(word1, ..., wordN)
  3. Для кожного знайти слова відповідний word_id в словнику, і якщо такого слова ще немає, то додати
  4. Вставити в індекс ClickHouse записи за всіма словами усіх повідомлень


Нижче наведено трохи спрощений код для розбору черги та індексації, вам доведеться доопрацювати самостійно, якщо ви хочете його використовувати в себе:

Спрощена реалізація індексування на PHP
const CH_HOST = '<hostname>:8123';
const MAX_WORD_LEN = 150; // повинно відповідати тому, що в таблиці Words
$mysqli = mysql_connect(...); // коннект до бази
$limit = 10000; // максимальний розмір пачки повідомлень при індексації
$shard_id = intval($argv[1] ?? 0); // номер шарда (вказується першим аргументом скрипту, якщо не вказано, то буде 0)
echo "Indexing shard $shard_id\n";

while ($mysqli->query('SELECT MAX(message_id) FROM IndexQueue WHERE shard_id =' . $shard_id)->fetch_row()[0]) {
$index = "";
$start = microtime(true);

$ids = [];
foreach ($mysqli->query('SELECT message_id FROM IndexQueue WHERE shard_id =' . $shard_id . 'ORDER BY message_id LIMIT' . $limit)->fetch_all() as $row) {
$ids[] = $row[0];
}

if (empty($ids)) {
break;
}

$message_texts = $mysqli->query('SELECT message_id, `message_text` Messages FROM WHERE message_id IN(' . implode(', ', $ids) . ')')->fetch_all(MYSQLI_ASSOC);

$unknown_words = [];
$msg_words = [];

$total_length = 0;

foreach ($message_texts as $msg) {
$msg_id = $msg['message_id'];
$text = $msg['message_text'];

$total_length += strlen($text);

$words = array_unique(
array_filter(
preg_split('/\W+/s', $text),
function($a) {
$len = strlen($a);
return $len >= 2 && $len <= MAX_WORD_LEN;
}
)
);

foreach ($words as $word) {
$unknown_words[$word] = true;
}

$msg_words[$msg_id] = $words;
}

if (!count($message_texts)) {
$mysqli->query('DELETE FROM IndexQueue WHERE shard_id =' . $shard_id . 'AND message_id IN(' . implode(', ', $ids) . ')');
continue;
}

if (!count($unknown_words)) {
var_dump($message_texts);
die("Empty message texts!\n");
}

$words_res = $mysqli->query('SELECT word, id Words FROM WHERE word IN(' . INstr(array_keys($unknown_words)) . ')')->fetch_all(MYSQLI_ASSOC);

$word_ids = [];
foreach ($words_res as $row) {
$word_ids[$row['word']] = $row['id'];
unset($unknown_words[$row['word']]);
}

if (count($unknown_words)) {
echo "Inserting " . count($unknown_words) . " words into dictionary\n";

$values = [];
foreach ($unknown_words as $word => $_) {
$values[] = "('" . $mysqli->escape_string($word) . "')";
}
$mysqli->query('INSERT IGNORE Words INTO (word) VALUES' . implode(',', $values));
$words_res = $mysqli->query('SELECT word, id Words FROM WHERE word IN(' . INstr(array_keys($unknown_words)) . ')')->fetch_all(MYSQLI_ASSOC));

foreach ($words_res as $row) {
$word_ids[$row['word']] = $row['id'];
unset($unknown_words[$row['word']]);
}
}

if (count($unknown_words)) {
die("Could not fill dictionary\n");
}

foreach ($msg_words as $msg_id => $words) {
foreach ($words as $word) {
// тут неявно припускається, що unix timestamp з message_id можна обчислити шляхом відрізання останніх 32 біт
$index .= date('Y-m-d', $msg_id >> 32) . "\t" . $word_ids[$word] . "\t" . $msg_id . "\n";
}
}

$ch = curl_init('http://' . CH_HOST . '/?query=' . rawurlencode('INSERT INTO FT FORMAT TabSeparated'));
curl_setopt($ch, CURLOPT_POSTFIELDS, $index);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);

$res = curl_exec($ch);

if ($res !== "") {
die($res . "\n");
}

$mysqli->query('DELETE FROM IndexQueue WHERE shard_id =' . $shard_id . 'AND message_id IN(' . implode(', ', $ids) . ')');
echo "Speed " . round($total_length / 1024 / (microtime(true) - $start), 2) . " KiB/sec\n";
}

function INstr(array $values) {
global $mysqli;
$res = [];
foreach ($values as $v) $res[] = "'" . $mysqli->escape_string($v) . "'";
return implode(',', $res);
}



Пошук за індексом
Нам не потрібні алгоритми ранжирування при пошуку, якими так багаті Elastic, Sphinx та інші рішення, і нам потрібна просто сортування за датою, тому пошук буде исключательно простий. По суті, щоб знайти що-небудь за запитом «hello world 111», нам потрібно спочатку знайти word_id у словнику (припустимо, що це буде 1, 2 і 3 відповідно) і виконати наступний запит:

SELECT message_id FROM FT
WHERE word_id IN(1, 2, 3)
GROUP BY message_id
HAVING uniq(word_id) = 3
ORDER BY message_id DESC
LIMIT 50


Зверніть увагу на те, що в кожному документі, який ми шукаємо, повинні бути присутні всі слова запиту, тому ми пишемо HAVING uniq(word_id) = 3 (uniq(word_id) — це аналог COUNT(DISTINCT word_id) у звичайних SQL-бази), де 3 — це кількість різних слів у запиті.

Ми припускаємо, що сортування по message_id буде означати сортування за часом. Цього можна досягти, якщо в перші 32 біта message_id записувати UNIX TIMESTAMP події в секундах, а в другу половину — мікросекунди події (якщо є) і випадкові числа.

Результати
Для тестування продуктивності цього рішення, я взяв базу даних error-логів з нашого девел-сервера об'ємом 3 Гб (1,6 млн подій) і проіндексував. Індексатор показав швидкість індексації в 3,5 Мб/сек на один потік, що для мого випадку було більше, ніж достатньо. В даний момент ми використовуємо Sphinx для повнотекстового пошуку по error-логів, тому я можу приблизно порівняти продуктивність цих двох рішень, оскільки вони працюють приблизно в однакових умовах на одному і тому ж залозі. Індексація у Sphinx (принаймні, побудова не realtime-індексу) у кілька разів швидше в розрахунку на одне ядро, але враховуйте, що індексатор сфінкса написаний на C++, а наш — на PHP :).

Щоб обчислити найважчий запит для ClickHouse (і, очевидно, для Sphinx теж), я вирішив знайти найпопулярніші слова в індексі:
$ echo 'SELECT word_id, count() AS cnt FROM FT GROUP BY word_id ORDER BY cnt DESC LIMIT 5' | curl 'http://hostname:8123/?query=' --data-binary @-
5 1669487
187 1253489
183 1217494
159 1216255
182 1199507


Запит зайняв 130 мс при загальній кількості записів в 86 млн, вражає! (на тестовій машині 2 ядра).

Отже, якщо взяти топ-5 і перетворити word_id в нормальні слова, то запит для виконання вийде наступний: «php wwwrun _packages ScriptFramework badoo. Ці слова зустрічаються у нас майже в кожному повідомленні і їх можна спокійно викинути з індексу, але я їх залишив для перевірки продуктивності пошуку.

Виконуємо запит в ClickHouse:
SELECT message_id FROM FT WHERE IN word_id(189, 159, 187, 5, 183) GROUP BY message_id HAVING uniq(word_id) = 5 ORDER BY message_id DESC LIMIT 51


І схожий запит у Sphinx:
SELECT message_id FROM FT WHERE MATCH('php wwwrun _packages ScriptFramework badoo') ORDER BY message_id DESC LIMIT 51


Часи виконання запиту (обидва демона можуть використовувати для виконання запиту обидва ядра, все поміщається в оперативну пам'ять):

ClickHouse: 700 мс
Sphinx: 1500 мс

Враховуючи, що Sphinx вміє ранжувати результати, а наша система немає, час у Sphinx досить непогане. Не забувайте, що за час виконання запиту обидва демона повинні були об'єднати результати для ~6 млн документів (по 1,2 млн документів на слово) і робили це на скромних 2 ядрах. Цілком можливо, що при належній налаштування, часи, зазначені в цьому (трохи синтетичному) тесті, поміняються місцями, але тим не менш, результатами особисто я дуже задоволений і можна сміливо сказати, що для побудови реалтайм-пошуку по логам ClickHouse підходить дуже добре.

Спасибі за те, що прочитали статтю до кінця і сподіваюся, що вона вам сподобалася.

P. S. Я не є співробітником Яндекса і з Яндексом ніяк не пов'язаний, я просто хотів спробувати їх базу даних для реальної задачі :).

Посилання
  1. Сайт ClickHouse
  2. Стаття на Хабре до open-source
  3. Опен-сорс стаття на Хабре
  4. Github
  5. ClickHouse Docker
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.