Робота з Aerospike на scala за допомогою магії макросів

Робота з Aerospike на scala за допомогою магії макросів
N|Solid
У нашому відділі бигдаты частину даних зберігається в Aerospike. Споживачів досить багато, серед них два додатки, написаних на Scala, взаємодія з базою в яких буде розширено у зв'язку з постійно зростаючими вимогами бізнесу. Єдиним пристойним драйвером для нас був джавовый клієнт, згаданий на сайті самої бази даних aerospike.com http://www.aearospike.com/docs/client/java). Конвертація скаловых типів даних (а особливо ієрархічних) у відповідні аэроспайковские типи призводить до великої кількості бойлерплейта. Щоб цього уникнути, необхідний більш зручний, а заодно і типобезопасный інтерфейс.
Інженери не люблять писати багато разів один і той же код і намагаються спростити і оптимізувати всі повторювані дії. Такі задачі часто вирішує кодогенерация. Тому ми вирішили написати свою бібліотеку для роботи з Aerospike, користуючись проекти.

Трохи про Aerospike

Aerospike — це розподілена schema-less key-value база даних, що працює за принципом хеш-таблиці. Вона активно використовується в нашому банку для побудови розподілених кешей і для завдань, що вимагають низького часу відгуку. База легко встановлюється і без проблем адмініструється, що спрощує її запровадження та підтримку.
Про моделі зберігання: параметри namespace і setName пов'язані з ключами записів, а самі дані зберігають у так званих бинах. Значення можуть бути різних типів:
Integers, Strings, Bytes, Doubles, Lists, Maps, Sorted Maps, GeoJSON
. Цікаво, що тип біна не є фіксованим і, записавши, скажімо,
Integer
, можна замінити його на будь-який інший. Драйвери, написані для цієї бази, володіють неабиякою кількістю коду для серіалізації значень зовнішньої моделі у внутрішню.

Про створення DSL

Розглянемо на простих прикладах процес проектування нашого DSL, чому ми вирішили використовувати макроси, і що з цього всього вийшло.
В умовах обмеженого часу (взаємодія з базою тільки мала частина проекту) складно написати цілком клієнт з реалізацією протоколу. До того ж це вимагало б більше зусиль у підтримці. Тому ми зупинилися на створенні обгортки для вже існуючого клієнта.
Розглянемо на прикладах.
В якості базису використаний Aerospike Java Client версії 3.3.1 (його можна знайти на сайті www.aerospike.com, вихідні коди є на Гітхабі), чимала частина методів в якому оперує з ключами і бинами з пакету
com.aerospike.client
. Java Client підтримує роботу з базою як у синхронному, так і в асинхронному режимі. Ми використовуємо асинхронний
com.aerospike.client.async.AsyncClient
. Найпростіший спосіб створити:
val client = new AsyncClient(new AsyncClientPolicy, hosts.map(new Host(_, port)): _*)

де
hosts
— це
List[String]
, що містить хости вашої бази, а
port
— порт типу
Int
(по дефолту 3000).
Якщо при створенні клієнта передати невалідні значення хостів або невірний порт драйвер викине помилку, тому що при виклику перевіряє з'єднання:
scala> new AsyncClient(new AsyncClientPolicy, List().map(new Host(_, port)): _*)
com.aerospike.client.AerospikeException$Connection: Error Code 11: Failed to connect to host(s):

Таблиця відповідностей типів DSL, Java CLient і базі даних


| Scala | Java Client | Aerospike |
|-------------- |-------------- |----------- |
| Int | IntegerValue | Integer |
| Long | LongValue | Integer |
| String | StringValue | String |
| Boolean | BooleanValue | Integer |
| Float | FloatValue | Double |
| Double | DoubleValue | Double |
| Seq | ListValue | List |
| Map | MapValue | Map |
| Char | StringValue | String |
| Short | IntegerValue | Integer |
| Byte | IntegerValue | Integer |
| HList | MapValue | Map |
| case class T | MapValue | Map |

З таблиці видно, що попереду досить багато однотипних перетворень. Писати все це руками щоразу бажання немає.
Перша думка була про рефлекшен, але рантаймовый варіант нас не влаштовує — це довго і дорого. Залишається варіант з компайл-тайм рефлекшеном, який дозволить згенерувати конвертери і отримувати повідомлення про помилки ще на стадії компіляції.
В методах інтерфейсу нашої DSL для будь-яких дій з базою ми будемо передавати тільки конкретні значення ключів і бинов, а всі перетворення за нас зроблять макроси. Основна ідея була в тому, щоб позбутися від бойлерплейта і убезпечити користувача від досконального вивчення внутрішньої структури даних самого Aerospike. Ми попередньо описали найбільш оптимальний варіант зберігання, спираючись на тип переданого для запису значення.
Розглянемо на прикладі однієї з найбільш поширених операцій з Aerospike — додавання запису з подальшим її читанням по ключу. Будемо використовувати метод
Put
. Для початку нам потрібні функції перетворення значень певних типів у внутрішні моделі драйвера: ключів
com.aerospike.client.Key
, а бинов
com.aerospike.client.Bin
.
Нехай ключ буде
String
, а записувати в різних сервісах будемо бины типів
String, Int, Boolean
.
Напишемо функцію перетворення ключа:
import com.aerospike.client.Key
def createStringKey(namespace: String, setName: String value: String): Key =
new Key(namespace, setName, new StringValue(value))

і бинов відповідно:
import com.aerospike.client.Value.{IntegerValue, StringValue, BooleanValue}

def createStringBin(name: String value: String): Bin = new Bin(name, new StringValue(value))
def createIntBin(name: String value: Int): Bin = new Bin(name, new IntegerValue(value))
def createBooleanBin(name: String value: Boolean): Bin = new Bin(name, new BooleanValue(value))

Сигнатура потрібного нам методу в бібліотеці на java (кілька варіантів, ми беремо з найменшою кількістю параметрів):
public void put(WritePolicy policy, Key key, Bin... bins) throws AerospikeException;

Значить, дзвінки з використанням цієї бібліотеки будуть виглядати так:
import com.aerospike.client.policy.WritePolicy

client.put(new WritePolicy, createStringKey("namespace", "setName", "keyValue1"),
Seq(createStringBin("binName1", "binValue1"), createStringBin("binName2", "binValue2")): _*)
client.put(new WritePolicy, createStringKey("namespace", "setName", "keyValue2"),
Seq(createIntBin("binName1", 2), createIntBin("binName2", 4)): _*)
client.put(new WritePolicy, createStringKey("namespace", "setName", "keyValue3"),
Seq(createBooleanBin("binName1", true), createBooleanBin("binName2", false)): _*)

Не надто симпатично, правда? Спробуємо спростити:
def createKey[T](ns: String, sn: String value: T): Key = {
val key = value match {
case s: String => new StringValue(s)
case i: Int => new IntegerValue(i)
case b: Boolean => new BooleanValue(b)
case _ => throw new Exception("Not implemented")
}
new Key(ns, sn, key)
}

def createBin[T](name: String value: T): Bin = {
value match {
case s: String => new Bin(name, new StringValue(s))
case i: Int => new Bin(name, new IntegerValue(i))
case b: Boolean => new Bin(name, new BooleanValue(b))
case _ => throw new Exception("Not implemented")
}
}

def putValues[K, B](client: AsyncClient, namespace: String, setName:String,
keyValue: K, bins: Seq[(String, B)])(implicit wPolicy: WritePolicy): Unit = {
client.put(wPolicy, createKey(namespace, setName, keyValue), bins.map(b => createBin(b._1, b._2)): _*)
}

Тепер треба позбутися функцій
createKey
та
createBin
, додамо магії имплиситов.
Нам знадобляться службові об'єкти, які будуть на основі типів вхідних даних генерувати відповідні моделі використовуваного драйвера:
KeyWrapper: [K => Key]
BinWrapper: [B => Bin]

Тепер можна зібрати всю логіку в один метод:
case class SingleBin[B](name: String value: B)

def putValues[K, B](client: AsyncClient, key: K, value: SingleBin[B])(implicit kC: KeyWrapper[K],
bC: BinWrapper[B], wPolicy: WritePolicy): Unit = client.put(wPolicy, kC(key), bC(value))

де
WritePolicy
— об'єкт контейнер, що містить різні параметри запису. Ми будемо користуватися дефолтними, створюючи його так
scala new WritePolicy
.
Очевидно, що самим банальним варіантом буде описати створення врапперов всіх типів. Але навіщо це робити, коли ми знаємо, як саме буде створюватися кожен з инстансов? Ось тут нам знадобляться макроси.
Найпростіший варіант — описати створення того чи іншого типу конвертера за допомогою квазиквот. Почнемо з ключів:
trait KeyWrapper[KT] {

val namespace: String = ""
val setName: String = ""

def apply(k: KT): Key

def toValue(v: KT): Value = v match {
case b: Int => new IntegerValue(b)
case b: String => new StringValue(b)
case b: Boolean => new BooleanValue(b)
case _ => throw new Exception("not implemented")
}
}

object KeyWrapper {

implicit def materialize[T](implicit dbc: DBCredentials): KeyWrapper[T] = macro impl[T]

def impl[T: c.WeakTypeTag](c: Context)(dbc: c.Expr[DBCredentials]): c.Expr[KeyWrapper[T]] = {
import c.universe._
val tpe = weakTypeOf[T]

val ns = reify(dbc.splice.namespace)
val sn = reify(dbc.splice.setname)

val imports =
q"""
import com.aerospike.client.{Key, Value}
import collection.JavaConversions._
import com.aerospike.client.Value._
import scala.collection.immutable.Seq
import ru.tinkoff.aerospikescala.domain.ByteSegment
import scala.util.{Failure, Success, Try}
"""

c.Expr[KeyWrapper[T]] {
q"""
$imports
new KeyWrapper[$tpe] {
override val namespace = $ns
override val setName = $sn
def apply(k: $tpe): Key = new Key(namespace, setName, toValue(k))
}
"""
}
}
}

де
DBCredentials
містить namespace і setName.
Таким чином ми можемо описати метод для сервісу, при компіляції якого будуть самостійно генеруватися конвертери.
N|Solid
З бинами у нас ситуація дещо складніша. Необхідно діставати значення, збережені в базі, попередньо перетворені у внутрішній формат Aerospike. Для цього скористаємося найпростішим з методів драйвера:
public Record get(Policy policy, Key key) throws AerospikeException;

де обчислене значення:
public Record(
Map<String,Object> bins,
int generation,
int expiration
)

а необхідні нам дані лежать в
Map<String,Object> bins
. Тут виникає проблема (див. таблицю відповідностей). Так як наша мета — генерувати конвертери на етапі компіляції і забезпечити на виході значення типу, ідентичного записаним раніше, нам треба передбачити, як саме описати функцію, достающую потрібне нам вэлью з бази. Крім іншого типи, які ми отримуємо в
bins
з пакету
java.util
— отже, нам знадобляться конвертери з відповідних пакетів
scala.collection
.
Тепер напишемо конвертер для бинов:
trait BinWrapper[BT] {

import com.aerospike.client.Value._
import com.aerospike.client.{Bin, Record, Value}
import scala.collection.JavaConversions._
import scala.collection.immutable.Map
import scala.reflect.runtime.universe._

type Singleton = SingleBin[BT]
type Out = (Map[String, Option[BT]], Int, Int)

def apply(one: Singleton): Bin = {
if (one.name.length > 14) throw new IllegalArgumentException("Current limit for bean name is 14 characters")
else new Bin(one.name, toValue(one.value))
}

def toValue(v: BT): Value = v match {
case b: Int => new IntegerValue(b)
case b: String => new StringValue(b)
case b: Boolean => new BooleanValue(b)
case _ => throw new Exception("not implemented")
}

def apply(r: Record): Out = {
val outValue: Map[String, Option[BT]] = {
val jMap = r.bins.view collect {
case (name, bt: Any) => name -> fetch(bt)
}
jMap.toMap
}
if (outValue.values.isEmpty && r.bins.nonEmpty) throw new ClassCastException(
s"Failed to cast ${weakTypeOf[BT]}. Please, implement fetch function in BinWrapper")
else (outValue, r.generation, r.expiration)
}

def fetch(any: Any): Option[BT]
}

Метод
apply
приймає як параметр
Record
— тут можна узагальнити всі до моменту розбору безпосередньо типу значення. Реалізацію цього методу простіше написати на макроси:
object BinWrapper {

implicit def materialize[T]: BinWrapper[T] = macro materializeImpl[T]

def materializeImpl[T: c.WeakTypeTag](c: blackbox.Context): c.Expr[BinWrapper[T]] = {
import c.universe._
val tpe = weakTypeOf[T]
val singleton = weakTypeOf[SingleBin[T]]
val out = weakTypeOf[(Map[String, Option[T]], Int, Int)]
val tpeSt = q"${tpe.toString}"

val fetchValue = tpe match {
case t if t =:= weakTypeOf[String] => q"""override def fetch(any: Any): Option[$tpe] = match any {
case v: String => Option(v)
case oth => scala.util.Try(oth.toString).toOption
} """
case t if t =:= weakTypeOf[Boolean] => q"""override def fetch(any: Any): Option[$tpe] = match any {
case v: java.lang.Long => Option(v == 1)
case _ => None
} """
case t if t =:= weakTypeOf[Int] => q"""override def fetch(any: Any): Option[$tpe] = match any {
case v: java.lang.Long => Option(v.toInt)
case oth => scala.util.Try(oth.toString.toInt).toOption
} """
case t if t.toString.contains("HNil") || t.toString.contains("HList") =>
q"""override def fetch(any: Any): Option[$tpe] = match any {
case m: java.util.HashMap[Any, Any] =>
val newList = castHListElements(m.asScala.values.toList, $tpeSt)
newList.toHList[$tpe]
case oth => None
} """
case _ => q""""""
}

val imports =
q"""
import java.util.{List => JList, Map => JMap}
import com.aerospike.client.{Bin, Record, Value}
import com.aerospike.client.Value.{BlobValue, ListValue, MapValue, ValueArray}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import shapeless.{HList, _}
import shapeless.HList.hlistOps
import syntax.std.traversable._
....
"""

c.Expr[BinWrapper[T]] {
q"""
$imports

new BinWrapper[$tpe] {
override def apply(one: $singleton): Bin = {
if (one.name.length > 14) throw new IllegalArgumentException("Current limit for bean name is 14 characters")
else new Bin(one.name, toValue(one.value))
}
override def apply(r: Record): $out = {
val outValue: Map[String, Option[$tpe]] = {
val jMap = r.bins.view collect {
case (name, bt: Any) =>
val res = fetch(bt)
if (res.isEmpty && r.bins.nonEmpty) throwClassCast($tpeSt) else name -> res
}
jMap.toMap
}

(outValue, r.generation, r.expiration)
}
$fetchValue
}

"""
}
}
}

Макроси зробили за нас всю роботу — инстансы всіх необхідних конвертерів будуть генеруватися самостійно, виклики методів будуть містити тільки самі значення ключів і бинов.
N|Solid
З
Quasiquotes
працювати легко: поведінка передбачувана, підводних каменів немає. Важливо пам'ятати, що при використанні такого підходу всі бібліотеки, які потрібні в описаних в
Quasiquotes
методи, що повинні бути імпортовані в файл, де використовується макрос. Тому я відразу додала параметр
imports
в обох конвертерах, щоб не копіювати безліч бібліотек в кожному файлі.
Тепер у нас є все, щоб написати сервіс-обгортку:
class SpikeImpl(client: IAsyncClient) {

def putValue[K, B](key: K, value: SingleBin[B])(implicit kC: KeyWrapper[K], bC: BinWrapper[B]): Unit = {
val wPolicy = new WritePolicy
client.put(wPolicy, kC(key), bC(value))
}

def getByKey[K, B](k: K)(implicit kC: KeyWrapper[K], bC: BinWrapper[B]): Option[B] = {
val policy = new Policy
val record = client.get(policy, kC(k))
bC.apply(record)._1.headOption.flatMap(_._2)
}
}

Тепер можна перевірити роботу нашого сервісу:
import shapeless.{HList, _}
import shapeless.HList.hlistOps
import scala.reflect.macros.blackbox._
import scala.language.experimental.macros

object HelloAerospike extends App {

val client = new AsyncClient(new AsyncClientPolicy, hosts.map(new Host(_, port)): _*)
val database = new SpikeImpl(client)
implicit val dbc = DBCredentials("namespace", "setName")

database.putValue("key", SingleBin("binName", 123 :: "strValue" :: true :: HNil))
val hlistBin = database.getByKey[String, Int :: String :: Boolean :: HNil]("key")
.getOrElse(throw new Exception("Failed to get bin value"))
println("hlistBin value = " + hlistBin)

}

Запускаємо і заходимо в базу:
Mac-mini-sanyok-5:~ MarinaSigaeva$ ssh user@host
user@host's password:
Last login: Wed Nov 23 19:41:56 2016 from 1.1.1.1
[user@host ~]$ aql
Aerospike Query Client
Version 3.9.1.2
Copyright 2012-2016 Aerospike. All rights reserved.
aql> select * from namespace.setName
+------------------------------------------+
| binName |
+------------------------------------------+
| MAP('{"0":123, "1":"strValue", "2":1}') |
+------------------------------------------+
1 row in set (0.049 secs)
aql>

Дані записані. Тепер подивимося, що додаток вивело в консоль:
[info] Compiling 1 Scala source to /Users/Marina/Desktop/forks/playground/target/scala-2.11/classes...
[info] Running HelloAerospike
hlistBin value = 123 :: strValue :: true :: HNil
[success] Total time: 0 s, completed 23.11.2016 20:01:44

Для scala розробників рішення може бути більш інтуїтивно зрозумілим, чому java бібліотека. Код поточного DSL викладений на Гітхабі з докладним описом how to і кукбуком, який буде доповнюватися.
У світлі останніх подій (scala 2.12 released) з'явилася завдання для цікавих експериментів scala-meta.
Сподіваюся, що цей досвід буде вам корисний у вирішенні подібних завдань.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.