Пишемо власний шлюз для Thrift API

Микросервисы, як не крути, — наше все. Можна опиратися SOAP 2.0 скільки завгодно довго, але рано чи пізно або вони прийдуть за тобою і звернуть в свою віру, чи ти прийдеш до них сам і попросиш хрестити себе вогнем і мечем. Як і у будь-якого архітектурного рішення, у микросервисов є свої мінуси. Одним з них є необхідність в кожен микросервис включати якусь логіку за авторизації запитів від зовнішніх систем або інших микросервисов. Ця логіка може бути безпосередньо «зашита» всередині микросервиса (і не важливо, що це окрема бібліотека), делегована іншій микросервису, а може бути оголошена декларативно. Що значить декларативно? Наприклад, можна домовитися, що кожен микросервис приходить особливий HTTP-заголовок, або якась структура даних, у якій є інформація про користувача, який робить запит. І даними в цій структурі необхідно однозначно довіряти. У всіх трьох варіантів є свої недоліки, але в рамках статті ми розберемо останній. Для його реалізації зазвичай використовується шаблон проектування API Gateway:
image

Під катом всі труднощі реалізації шаблону в умовах бінарного протоколу передачі даних.

У загальному випадку API Gateway обмежує кількість запитів до внутрішніх сервісів, авторизує запити клієнтів, виробляє логування та аудит, розподіляє запити між клієнтами та перетворює дані, якщо це потрібно. В якості прикладу може бути використаний звичайний nginx. Розглянемо функцію авторизації запитів користувачів. Якщо використовується HTTP-протокол, то загальноприйнятою практикою вважається додавання якогось токена (не важливо як ми його отримали) в заголовок Authorization:

Authorization: Bearer <some token> 

На стороні API Gateway цей заголовок якимось чином перевіряється і обмінюється на інший заголовок, що містить певне знання про користувача, якому токен був виписаний, наприклад, його ідентифікатор, і вже його можна прокинути внутрішнім сервісів:

Customer: <id> 

Все здається простим і зрозумілим, але біда в тому, що Apache Thrift складається з декількох частин:

+-------------------------------------------+
| Server |
| (single-threaded, event-driven etc) |
+-------------------------------------------+
| Processor |
| (compiler generated) |
+-------------------------------------------+
| Protocol |
| (JSON, compact, binary etc) |
+-------------------------------------------+
| Transport |
| (raw TCP, HTTP etc) |
+-------------------------------------------+

У загальному випадку ми не можемо зав'язатися на протокол або транспорт. Звичайно, можна вибрати щось одне, все домовитися, що ми використовуємо тільки HTTP, але це обмежує можливості щодо заміни транспорту і змушує робити якісь зовнішні обробники/фільтри вже всередині самих Thrift-сервісів (адже для них те http-заголовки не є нативними).

Залишається використовувати можливості самого протоколу, щоб у процесі проходження запиту через шлюз API зовнішній авторизаційний токен підмінявся на внутрішній.

Convention over configuration

Отже, нехай у нас є наступний внутрішній сервіс:

service InternalTestService { 
SomeReturnData getSomeData(
1: UserData userData,
2: RequestData requestData
) throws (1: SomeException e);
}

UserData — це якісь відомості про користувача, від імені якого викликається сервіс, щоб останній міг зрозуміти, а чиї дані тягнути. Зрозуміло, що такий сервіс виставляти назовні не можна. А яку можна? Наприклад такий:

service ExternalTestService { 
SomeReturnData getSomeData(
1: AuthToken authData,
2: RequestData requestData
) throws (1: SomeException e, 99: UnauthorizedException ue);
}

Вся різниця між сервісами — це їх перший аргумент і виключення, яке ініціюється в разі проблем з авторизацією запиту (я сподіваюся, що 98 власних винятків вистачить всім). Наше завдання на рівні шлюзу перевірити авторизаційний токен і замінити його на інформацію про користувача.

Кішочкі

На жаль документації по Thrift-кіт наплакав. Майже всі гайди, включаючи, мабуть, кращий з них, які не стосуються питань внутрішнього устрою тих чи інших протоколів. І це зрозуміло. В 99% випадків, лізти всередину протоколу розробнику не доведеться, але нам потрібно.

Є три найбільш популярних протоколу:

  • Binary — просто бінарний протокол даних рядка, наприклад, передаються як є в UTF-8)
  • Compact — той же бінарний тільки компактний
  • JSON — дуже своєрідний JSON
Кожен з представлених протоколів має свою реалізацію, приховану за одним і тим же API. Якщо розглянути бінарний протокол, то для нашого сервісу він буде з точки зору API виглядати так:
image

TMessage — про метаінформація для повідомленні. Складається з імені методу, типу і порядкого номери методу в сервісі. Тип повідомлення може бути таким:

  • CALL = 1 — вхідне повідомлення
  • REPLY = 2 — відповідь
  • EXCEPTION = 3 — в процесі виконання сталася помилка
  • ONEWAY = 4 — повідомлення не потребує відповіді
Все що не TMessage — корисна інформація, яка обгорнута в структуру вхідного повідомлення.
Всі представлені протоколи читають прийшов байтовий масив даних послідовно і зберігають його поточний індекс, щоб продовжити читання з потрібного місця.

Тому наш алгоритм повинен бути наступним:

  1. Прочитати TMessage
  2. Прочитати початок загальної структури повідомлення
  3. Прочитати метаінформацію про першому полі в повідомленні
  4. Запам'ятати поточну позицію в байтовому масив
  5. Прочитати інформацію про токені
  6. Запам'ятати поточну позицію в байтовому масиві
  7. Обміняти токен на дані про користувача
  8. Сериализации дані про користувача
  9. Сформувати новий бінарний масив з трьох частин:
    • Від початку вихідного повідомлення до індексу з пункту 4
    • Байтовий масив структури даних про користувача

    • Від індексу з пункту 6 до кінця оригінального повідомлення


Пишемо тест

Без тестування в розвідку не ходимо, тим більше що у випадку з бінарним протоколом це найпростіший спосіб перевірити працездатність вашого коду. Для тесту нам знадобиться наступні thrift-сервіси:
Заголовок спойлера
namespace java ru.aatarasoff.thrift

exception SomeException {
1: string code
}

exception UnauthorizedException {
1: string reason
}

service ExternalTestService {
SomeReturnData getSomeData(
1: AuthToken authData,
2: RequestData requestData
) throws (1: SomeException e, 99: UnauthorizedException ue);
}

service InternalTestService {
SomeReturnData getSomeData(
1: UserData userData,
2: RequestData requestData
) throws (1: SomeException e);
}

struct SomeReturnData {
1: string someStringField,
2: i32 someIntField
}

struct RequestData {
1: string someStringField,
2: i32 someIntField
}

struct AuthToken {
1: string token,
2: i32 checksum
}

struct UserData {
1: string id
}

Створимо і заповнимо зовнішній сервіс тестовими даними:

TMemoryBuffer externalServiceBuffer = new TMemoryBufferWithLength(1024);

ExternalTestService.Client externalServiceClient 
= new ExternalTestService.Client(protocolFactory.getProtocol(externalServiceBuffer));

externalServiceClient.send_getSomeData( 
new AuthToken().setToken("sometoken").setChecksum(128),
new RequestData().setSomeStringField("somevalue").setSomeIntField(8)
);

TMemoryBufferWithLength — спеціально створений клас, який усуває фатальний для нас недолік транспорту TMemoryBuffer. Останній не вміє віддавати справжню довжину повідомлення. Замість цього можна отримати довжину всього буфера, яка зазвичай більше довжини повідомлення оскільки частина байтового масиву резервується під майбутні дані.

Метод send_getSomeData серіалізует повідомлення в наш буфер.

Аналогічні дії зробимо і з внутрішнім сервісом:

internalServiceClient.send_getSomeData( 
new UserData().setId("user1"),
new RequestData().setSomeStringField("somevalue").setSomeIntField(8)
);

Отримаємо байтовий масив нашого повідомлення:

byte[] externalServiceMessage = Arrays.copyOf( 
externalServiceBuffer.getArray(),
externalServiceBuffer.length()
);

Введемо клас, який буде транслювати наше повідомлення з подання для зовнішнього сервісу подання для внутрішнього: MessageTransalator.

public MessageTransalator(TProtocolFactory protocolFactory, AuthTokenExchanger authTokenExchanger) { 
this.protocolFactory = protocolFactory;
this.authTokenExchanger = authTokenExchanger;
}

public byte[] process(byte[] thriftBody) throws TException { 
//some actions
}

Реалізація обміну токена (AuthTokenExchanger) може бути різною в різних проектах, тому зробимо окремий інтерфейс:

public interface AuthTokenExchanger<T extends TBase, U extends TBase> { 
T createEmptyAuthToken();
U process(T authToken) throws TException;
}

createEmptyAuthToken повинен повернути якийсь об'єкт, який представляє порожній маркер, заповнений MessageTransalator-му. У методі process потрібно реалізувати обмін авторизаційного сертифіката на дані про користувача. Для нашого тесту використовуємо просту реалізація:

@Override
public AuthToken createEmptyAuthToken() { 
return new AuthToken();
}

@Override
public UserData process(AuthToken authToken) { 
if ("sometoken".equals(authToken.getToken())) {
return new UserData().setId("user1");
}
throw new RuntimeException("token is invalid");
}

Пишемо перевірку:

assert.assertTrue( 
"Translated external message must be the same as internal message",
Arrays.equals(
new MessageTransalator(
protocolFactory, 
new AuthTokenExchanger<AuthToken, UserData>() {}
).process(externalServiceMessage),
internalServiceMessage
)
)

Запускаємо тести, і нічого не працює. І це добре!

Зелений світло

Реалізуємо метод process згідно з алгоритмом:

TProtocol protocol = createProtocol(thriftBody);

int startPosition = findStartPosition(protocol);

TBase userData = authTokenExchanger.process( 
extractAuthToken(protocol, authTokenExchanger.createEmptyAuthToken())
);

int endPosition = findEndPosition(protocol);

return ArrayUtils.addAll( 
ArrayUtils.addAll(
getSkippedPart(protocol, startPosition),
serializeUserData(protocolFactory, userData)
),
getAfterTokenPart(protocol, endPosition, thriftBody.length)
);

В якості протоколу використовуємо TMemoryInputTransport, який дозволяє читати повідомлення безпосередньо з переданого в нього байтового масиву.

private TProtocol createProtocol(byte[] thriftBody) { 
return protocolFactory.getProtocol(new TMemoryInputTransport(thriftBody));
}

Реалізуємо знаходження меж сертифіката в байтовому масиві:

private int findStartPosition(TProtocol protocol) throws TException { 
skipMessageInfo(protocol); //пропускаємо TMessage
skipToFirstFieldData(protocol); //шукаємо початок даних в першому полі
return protocol.getTransport().getBufferPosition();
}

private int findEndPosition(TProtocol protocol) throws TException { 
return protocol.getTransport().getBufferPosition();
}

private void skipToFirstFieldData(TProtocol protocol) throws TException { 
protocol.readStructBegin();
protocol.readFieldBegin();
}

private void skipMessageInfo(TProtocol protocol) throws TException { 
protocol.readMessageBegin();
}

Сериализуем користувальницькі дані:

TMemoryBufferWithLength memoryBuffer = new TMemoryBufferWithLength(1024); 
TProtocol protocol = protocolFactory.getProtocol(memoryBuffer);

userData.write(protocol);

return Arrays.copyOf(memoryBuffer.getArray(), memoryBuffer.length()); 

Запускаємо тести, і…

Включаємо Шерлока

Отже, тести для Binary і Compact проходять, але JSON чинить опір. Що ж не так? Йдемо в дебаг і дивимося, які ж масиви ми порівнюємо:

//JSON звичайної людини
[1,"getSomeData",1,1,{"1":{"rec":{"1":{"str":"user1"}}},"2":{"rec":{"1":{"str":"somevalue"},"2":{"i32":8}}}}]

//JSON курця 
[1,"getSomeData",1,1,{"1":{"rec"{"1":{"str":"user1"}}},"2":{"rec":{"1":{"str":"somevalue"},"2":{"i32":8}}}}]

Не помітили різниці? А вона є. Після першого «rec» не вистачає двокрапки. API використовуємо один і той же, а результат різний. Розгадка прийшла тільки після уважного читання коду класу TJSONProtocol. Протокол містить контекст, який зберігає різні розділювачі в стеку, коли обходить JSON-структуру для читання або запису.

TJSONProtocol.JSONBaseContext context_ = new TJSONProtocol.JSONBaseContext(); 

При читанні структури, прочитується і символ ":", а от назад він не повертається, тому що контексту в самому об'єкті немає.

Вставляємо костиль в метод seriaizeUserData:

if (protocol instanceof TJSONProtocol) { 
memoryBuffer.write(COLON, 0, 1); //додаємо ":"
}

Запускаємо тести, і тепер то все ок.

Викид винятків

Ми вже близько до фінішної межі. Ок, згадаємо, що ми повинні викинути виняток у разі, якщо авторизація запиту пройшла успішно:

service ExternalTestService { 
SomeReturnData getSomeData(
1: AuthToken authData,
2: RequestData requestData
) throws (1: SomeException e, 99: UnauthorizedException ue);
}

Зробимо обробку виключень в окремому методі processError.

public byte[] processError(TException exception) throws Exception 

У Thrift-е є кілька типів виключень, які можуть виникнути в результаті виклику сервісу:
  1. TApplicationException — виняток рівня програми
  2. TProtocolException — виняток, пов'язане з протоколом
  3. TTransportException — виняток, пов'язаний з передаванням повідомлення
  4. TException — базове виняток, від якого успадковуються всі інші типи
  5. YourException extends TException — будь-яке виключення, яке було оголошено в DSL
Цікава деталь. Передати у відповідному повідомленні клієнту можна або TApplicationException, або клієнтська кастомное, в нашому випадку це UnauthorizedException. Тому ми повинні обернути будь-які помилки або у TApplicationException або UnauthorizedException.

public byte[] processError(TException exception) throws Exception {
TMemoryBufferWithLength memoryBuffer = new TMemoryBufferWithLength(1024);

TProtocol protocol = protocolFactory.getProtocol(memoryBuffer);

try {
throw exception;
} catch (TApplicationException e) {
writeTApplicationException(e, protocol);
} catch (TProtocolException e) {
writeTApplicationException(createApplicationException(e), protocol);
} catch (TTransportException e) {
writeTApplicationException(createApplicationException(e), protocol);
} catch (TException e) {
if (TException.class.equals(e.getClass())) {
writeTApplicationException(createApplicationException(e), protocol);
} else {
writeUserDefinedException(exception, protocol);
}
}

return Arrays.copyOf(memoryBuffer.getArray(), memoryBuffer.length());
}

Реалізація запису TApplicationException у відповідь пакет даних досить проста:

private void writeTApplicationException(TApplicationException exception, TProtocol protocol) throws TException {
protocol.writeMessageBegin(new TMessage(this.methodName, TMessageType.EXCEPTION, this.seqid));
exception.write(protocol);
protocol.writeMessageEnd();
}

private TApplicationException createApplicationException(TException e){
return new TApplicationException(TApplicationException.INTERNAL_ERROR, e.getMessage());
}

Згідно з протоколом, кожне повідомлення має свій ідентифікатор послідовності та ім'я викликається методу, які необхідно повернути назад клієнтові. Для цього потрібно додати нові поля: seqid та methodName у наш клас MessageTranslator, які заповнюються при читанні початку повідомлення. З-за цього наш клас перестає бути потокобезопасным.

Для довільного запису винятку потрібно більше рухів:
private static final String ERROR_STRUCT_NAME = "result";
private static final String ERROR_FIELD_NAME = "exception";
private static final short ERROR_FIELD_POSITION = (short) 99;
private static final String WRITE_METHOD_NAME = "write";

private void writeUserDefinedException(TException exception, TProtocol protocol) throws TException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
TStruct errorStruct = new TStruct(ERROR_STRUCT_NAME);
TField errorField = new TField(ERROR_FIELD_NAME, TType.STRUCT, ERROR_FIELD_POSITION);

protocol.writeMessageBegin(new TMessage(this.methodName, TMessageType.REPLY, this.seqid));
protocol.writeStructBegin(errorStruct);
protocol.writeFieldBegin(errorField);

exception.getClass().getMethod(WRITE_METHOD_NAME, TProtocol.class).invoke(exception, protocol);

protocol.writeFieldEnd();
protocol.writeFieldStop();
protocol.writeStructEnd();
protocol.writeMessageEnd();
}

Тут цікаво те, що для виключення тип зворотного повідомлення не TMessageType.EXCEPTION, TMessageType.REPLY.

Тепер ми вміємо брати вхідне повідомлення, підміняти в ньому токен і коректно віддавати клієнту відповідь, якщо під час перевірки сертифіката сталася помилка.

В бар вривається Spring

Ок, ми зробили препарацію бінарних пакетів. Тепер саме час зробити практичну реалізацію на популярному фреймворку для створення микросервисов. Наприклад, на Spring Boot. Він хороший тим, що, з одного боку, для нього можна знайти вже готові рішення, а з іншого — його просто і зручно кастомизировать анотаціями додаючи нові можливості двома-трьома рядками коду. Для роутінга і обробки запитів HTTP візьмемо Netflix Zuul, який входить в набір розширень Spring Cloud. Схема роботи Zuul-а представлена на наступному зображенні:



Якщо зовсім просто, то Netflix Zuul представляє з себе звичайний сервлет з ланцюжком власних фільтрів, які можуть запускатися динамічно, так і бути включеними в програму. Кожен фільтр додає нове поведінка, і навіть запис HTTP-відповіді теж реалізована фільтром. Існує кілька типів фільтрів, які виконуються послідовно як показано на картинці вище. Усередині кожного типу, фільтри виконуються в порядку, визначеному пріоритетом конкретного фільтру. Підключити Zuul до додатка Spring Boot простіше простого (ну ще залежності додати):

@SpringBootApplication
@EnableZuulProxy
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

Ми хочемо того ж, але для API-шлюзу, щоб ті, хто буде використовувати наше рішення, могли сконцентруватися на бізнес-логікою авторизації своєї програми, а не на перерахованих в статті проблеми. Для цього створимо анотацію @EnableThriftGateway:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(ThriftGatewayConfiguration.class)
public @interface EnableThriftGateway { 
}

Конфігурація ThriftGatewayConfiguration буде містити три біна, які створюються у разі, якщо анотація буде додано до основного класу додатки: Application.

@Configuration
public class ThriftGatewayConfiguration { 
@Bean
@ConditionalOnMissingBean(AuthTokenExchanger.class)
AuthTokenExchanger authTokenExchanger() {
throw new UnsupportedOperationException("You should implement AuthTokenExchanger bean");
}
@Bean
@ConditionalOnMissingBean(TProtocolFactory.class)
TProtocolFactory thriftProtocolFactory() {
return new TBinaryProtocol.Factory();
}
@Bean
public AuthenticationZuulFilter authenticationZuulFilter() {
return new AuthenticationZuulFilter();
}
}

Анотація ConditionalOnMissingBean запобіжить створення дефолтного біна в тому випадку, якщо в програмі буде оголошено власний бін цього класу. Раніше створений інтерфейс AuthTokenExchanger повинен бути в обов'язковому порядку реалізований розробником конкретного проекту. Ми не можемо, з причин безпеки, зробити яку-небудь дефолтну реалізацію, тому в методі створення біна викидається виняток. Також, потрібно визначити протокол, використовуваний для передачі thrift-повідомлень. За замовчуванням, це TBinaryProtocol, але завжди можна використовувати потрібний для проекту, якщо перевизначити бін створення фабрики протоколу. Але найважливішою частиною конфігурації безумовно є бін AuthenticationZuulFilter, який реалізує бізнес-логіку авторизаційного шару.

public class AuthenticationZuulFilter extends ZuulFilter {
@Override
public String filterType() {
return "pre";
}
@Override
public int filterOrder() {
return 6;
}
@Override
public boolean shouldFilter() {
return true;
}
@Override
public Object run() {
RequestContext ctx = RequestContext.getCurrentContext();
HttpServletRequestWrapper request = (HttpServletRequestWrapper) ctx.getRequest();
//тут ваші дії
return null;
}
}

Після отримання об'єктів контексту і HTTP-запиту, створимо MessageTransalator.

MessageTransalator messageTransalator = new MessageTransalator(protocolFactory, authTokenExchanger);

Позитивний сценарій складається з обробки вхідного пакету даних, запису нового пакета в полі requestEntity контексту запиту, і зазначення нової довжини повідомлення замість оригінального:

byte[] processed = messageTransalator.process(request.getContentData()); 
ctx.set("requestEntity", new ByteArrayInputStream(processed)); 
ctx.setOriginContentLength(processed.length);

Якщо сталася помилка, то її необхідно обробити:

ctx.setSendZuulResponse(false); 
ctx.setResponseDataStream(new ByteArrayInputStream(new byte[]{}));
try { 
ctx.getResponse().getOutputStream().write(messageTransalator.processError(e));
} catch (Exception e1) {
log.error("unexpected error", e1);
ctx.setResponseStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
}

Тут нам довелося застосувати кілька неочевидних трюків для запобігання подальшої обробки запиту і спроби відправити пакет внутрішнього сервісу. По-перше,
ctx.setSendZuulResponse(false)
не дає провести GZIP-стиснення вихідного пакету. Не всі thrift-клієнти здатні вижити після такої перепакування. А по-друге,
ctx.setResponseDataStream(new ByteArrayInputStream(new byte[]{}))
дозволяє використовувати оригінальний фільтр формування вихідного повідомлення, незважаючи на установку в попередньому пункті заборони на передачу даних назад на клієнта.

З'єднуємо всі разом

Створимо нове Spring Boot додаток і додамо в нього дві анотації @EnableZuulProxy та @EnableThriftGateway:

@SpringBootApplication
@EnableZuulProxy
@EnableThriftGateway
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

Реалізуємо просту логіку авторизації:

@Configuration
public class AuthenticationConfiguration {
@Bean
AuthTokenExchanger authTokenExchanger() {
return new AuthTokenExchanger<Token, TName>() {
@Override
public Token createEmptyAuthToken() {
return new Token();
}

@Override
public TName process(Token authToken) throws TException {
if (authToken.getValue().equals("heisours")) {
return new TName("John", "Smith");
}

throw new UnauthorizedException(ErrorCode.WRONG_TOKEN);
}
};
}
}

Як видно, якщо до нас прийшов токен зі значенням heisours, то ми авторизовываем запит, а якщо ні, то викидаємо помилку. Залишається тільки налаштувати Zuul:

zuul:
routes:
greetings:
#шлях в URL, на якому буде спроксирован сервіс
path: /greetings/**
#ідентифікатор сервісу
serviceId: greetings

greetings: 
ribbon: 
listOfServers: localhost:8080 #список серверів, де розгорнуто сервіс greetings

і API Gateway можна використовувати.

Посилання

Базова частина для перетворення бінарних пакетів: https://github.com/aatarasoff/thrift-api-gateway-core
Чарівні анотації для Spring-а: https://github.com/aatarasoff/spring-thrift-api-gateway
Приклади: https://github.com/aatarasoff/spring-thrift-api-gateway/tree/master/examples

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.