Централізовані логи для додатків з допомогою зв'язки heka+elasticsearch+kibana

У статті описана налаштування центрального логування для різних типів додатків (Python, Java (java.util.logging), Go, bash) з допомогою досить нового проекту Heka.

Heka розробляється в Mozilla та написана Go. Саме тому я використовую його замість logstash, який має подібні можливості.

Heka заснований на плагінах, які мають п'ять типів:
  1. Вхідні — якимось чином беруть дані (слухає порти, читають файли та ін);
  2. Декодери — обробляють вхідні запити і переводять їх у внутрішні структури для повідомлень;
  3. Фільтри — роблять якісь дії з повідомленнями;
  4. Encoders (неясно, як переводити) — кодують внутрішні повідомлення у формати, які вирушають через вихідні плагіни;
  5. Вихідні відправляють куди-небудь дані.
Наприклад, у разі Java-додатків вхідним плагіном є LogstreamerInput, який дивиться за змінами у файлі логів. Нові рядки в балці обробляються декодером PayloadRegexDecoder (по заданому формату) і далі відправляються в elasticsearch через вихідний плагін ElasticSearchOutput. Вихідний плагін в свою чергу кодує повідомлення з внутрішньої структури в формат elasticsearch через ESJsonEncoder.

Установка Heka
Всі способи описані установки на сайтіhttp://hekad.readthedocs.org/en/v0.8.2/installing.html#binaries). Але найпростіше завантажити готовий бінарний пакет під свою систему зі сторінки https://github.com/mozilla-services/heka/releases.

Оскільки у мене використовуються сервера під ubuntu, то і опис буде для цієї системи. У цьому випадку установка зводиться до встановлення самого deb пакети, налаштування файлу конфігурації /etc/hekad.toml і додавання в сервіси upstart.

У базове налаштування /etc/hekad.toml в мене входить налаштування кількості процесів (я ставлю рівним кількості ядер), інструментів (у якому можна подивитися які включені плагіни) і udp сервер на 5565 порту, який очікує повідомлення за протоколом google protobuf (використовується для python і go додатків):

maxprocs = 4

[Dashboard]
type = "DashboardOutput"
address = ":4352"
ticker_interval = 15

[UdpInput]
address = "127.0.0.1:5565"
parser_type = "message.proto"
decoder = "ProtobufDecoder"

Конфігурація для upstart /etc/init/hekad.conf:

start on runlevel [2345]
respawn
exec /usr/bin/hekad-config=/etc/hekad.toml


Логування Python додатків
Тут використовується бібліотека https://github.com/kalail/heka-py і спеціальний хендлер для модуля logging. Код:

import logging
from traceback import format_exception

try:
import heka
HEKA_LEVELS = {
logging.CRITICAL: heka.severity.CRITICAL,
logging.ERROR: heka.severity.ERROR,
logging.WARNING: heka.severity.WARNING,
logging.INFO: heka.severity.INFORMATIONAL,
logging.DEBUG: heka.severity.DEBUG,
logging.NOTSET: heka.severity.NOTICE,
}
except ImportError:
heka = None


class HekaHandler(logging.Handler):
_notified = None
conn = None
host = '127.0.0.1:5565'

def __init__(self, name, host=None):
if host is not None:
self.host = host

self.name = name
super(HekaHandler, self).__init__()

def emit(self, record):
if heka is None:
return

fields = {
'Message': record.getMessage(),
'LineNo': record.lineno,
'Filename': record.filename,
'Logger': record.name,
'Pid': record.process,
'Exception': ",
'Traceback': ",
}

if record.exc_info:
trace = format_exception(*record.exc_info)
fields['Exception'] = trace[-1].strip()
fields['Traceback'] = ".join(trace[:-1]).strip()

msg = heka.Message(
type=self.name,
severity=HEKA_LEVELS[record.levelno],
fields=fields,
)

try:
if self.conn is None:
self.__class__.conn = heka.HekaConnection(self.host)

self.conn.send_message(msg)
except:
if self.__class__._notified is None:
print "Sending HEKA message failed"
self.__class__._notified = True

За замовчуванням хендлер очікує, що Heka слухає на порту 5565.

Логування Go додатків
Для логування я форкнул бібліотеку для логування https://github.com/ivpusic/golog і додав туди можливість відправки повідомлень в Heka. Результат розташований тут: https://github.com/ildus/golog

Використання:

import "github.com/ildus/golog"
import "github.com/ildus/golog/appenders"
...
logger := golog.Default
logger.Enable(appenders.Heka(golog.Conf{
"addr": "127.0.0.1",
"proto": "udp",
"env_version": "2",
"message_type": "myserver.log",
}))
...
logger.Debug("some message")


Логування Java додатків
Для Java додатків використовується вхідний плагін типу LogstreamerInput із спеціальним regexp декодером. Він читає логи віджети з файлів, які повинні бути записані в певному форматі.

Конфігурація для heka, що відповідає за читання і декодування логів:

[JLogs]
type = "LogstreamerInput"
log_directory = "/some/path/to/logs"
file_match = 'app_(?P<Seq>\d+\.\d+)\.log'
decoder = "JDecoder"
priority = ["Seq"]

[JDecoder]
type = "PayloadRegexDecoder"
#Parses com.asdf[INFO|main|2014-01-01 3:08:06]: Server started
match_regex = '^(?P<LoggerName>[\w\.]+)\[(?P<Severity>[A-Z]+)\|(?P<Thread>[\w\d\-]+)\|(?P<Timestamp>[\d\-\s:]+)\]: (?P<Message>.*)'
timestamp_layout = "2006-01-02 15:04:05"
timestamp_location = "Europe/Moscow"

[JDecoder.severity_map]
SEVERE = 3
WARNING = 4
INFO = 6
CONFIG = 6
FINE = 6
FINER = 7
FINEST = 7

[JDecoder.message_fields]
Type = "myserver.log"
Message = "%Message%"
Logger = "%LoggerName%"
Thread = "%Thread%"
Payload = ""


У програмі треба змінити Formatter через logging.properties. Приклад logging.властивості:

handlers= java.util.logging.FileHandler java.util.logging.ConsoleHandler

java.util.logging.FileHandler.level=ALL
java.util.logging.FileHandler.pattern = logs/app_%g.%u.log
java.util.logging.FileHandler.limit = 1024000
java.util.logging.FileHandler.formatter = com.asdf.BriefLogFormatter
java.util.logging.FileHandler.append=tru

java.util.logging.ConsoleHandler.level=ALL
java.util.logging.ConsoleHandler.formatter=com.asdf.BriefLogFormatter


Код BriefLogFormatter:

package com.asdf;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.logging.Formatter;
import java.util.logging.LogRecord;

public class BriefLogFormatter extends Formatter {
private static final DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static final String lineSep = System.getProperty("line.separator");
/**
* A Custom format implementation that is designed for brevity.
*/
public String format(LogRecord record) {
String loggerName = record.getLoggerName();
if(loggerName == null) {
loggerName = "root";
}
StringBuilder output = new StringBuilder()
.append(loggerName)
.append("[")
.append(record.getLevel()).append('|')
.append(Thread.currentThread().getName()).append('|')
.append(format.format(new Date(record.getMillis ())))
.append("]: ")
.append(record.getMessage()).append(' ')
.append(lineSep);
return output.toString();
}
}


Логування скриптів (bash)
Для bash в heka додається вхідний фільтр TcpInput (який слухає на певному порту) і PayloadRegexDecoder для декодування повідомлень. Конфігурація в hekad.toml:

[TcpInput]
address = "127.0.0.1:5566"
parser_type = "regexp"
decoder = "TcpPayloadDecoder"

[TcpPayloadDecoder]
type = "PayloadRegexDecoder"
#Parses space_checker[INFO|2014-01-01 3:08:06]: Need more space on disk /dev/sda6
match_regex = '^(?P<LoggerName>[\w\.\-]+)\[(?P<Hostname>[^\|]+)\|(?P<Severity>[A-Z]+)\|(?P<Timestamp>[\d\-\s:]+)\]: (?P<Message>.*)'
timestamp_layout = "2006-01-02 15:04:05"
timestamp_location = "Europe/Moscow"

[TcpPayloadDecoder.severity_map]
ERROR = 3
WARNING = 4
INFO = 6
ALERT = 1

[TcpPayloadDecoder.message_fields]
Type = "scripts"
Message = "%Message%"
Logger = "%LoggerName%"
Hostname = "%Hostname%"
Payload = "[%Hostname%|%LoggerName%] %Message%"

Для логування написана функція, яка відправляє повідомлення на TCP порт в заданому форматі:

log()
{
if [ "$1" ]; then
echo-e "app1[`hostname`|INFO|`date '+%Y-%m-%d %H:%M:%S"]: $1" | nc 127.0.0.1 5566 || true
echo $1
fi
}

Відправка повідомлення з рівнем INFO з типом app1:

log "test test test"


Відправка записів у elasticsearch
Додається наступна конфігурація в hekad.conf:

[ESJsonEncoder]
index = "heka-%{Type}-%{2006.01.02}"
es_index_from_timestamp = true
type_name = "%{Type}"

[ElasticSearchOutput]
message_matcher = "Type == 'myserver.log' || Type=='scripts' || Type=='nginx.access' || Type=='nginx.error'"
server = "http://<elasticsearch_ip>:9200"
flush_interval = 5000
flush_count = 10
encoder = "ESJsonEncoder"

Тут ми вказуємо, де знаходиться elasticsearch, як повинні формуватися індекси і які типи повідомлень туди відправляти.

Перегляд логів
Для перегляду логів використовується Kibana 4. Вона все ще в беті, але вже цілком робоча. Для установки потрібно завантажити архів зі сторінки http://www.elasticsearch.org/overview/kibana/installation/, розпакувати в будь-яку папку на сервері, вказати розташування elasticsearch сервера у файлі config/kibana.yml (параметр elasticsearch_url).

При першому запуску знадобиться додати індекси у вкладці Settings. Щоб можна було додати шаблон індексу і правильно визначилися поля, необхідно відправити тестове повідомлення з кожного додатка. Потім можна додати шаблон індексу виду heka-*(яке покаже всі повідомлення) або heka-scripts-*, тим самим відокремивши програми один від одного. Далі можна перейти на вкладку Discover і подивитися самі записи.

Висновок
Я показав тільки частина того, що можна логировать з допомогою Heka.
Якщо хтось зацікавився, то можу показати частину Ansible конфігурації, яка автоматично встановлює heka на всі сервери, а на обрані elasticsearch з kibana.

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.