Як подружити Bagri і MongoDB

Приблизно місяць тому, я розповів Хабру про проект Bagri: NoSQL базі даних з відкритим кодом, побудованої поверх розподіленого кешу.

Після досить непоганого відгуку, вирішив написати статтю про те як можна нарощувати функціонал Bagri шляхом написання розширень (extensions) використовуючи вбудований API системи.

image

На даний момент Bagri публікує два API для підключення до зовнішніх систем: DataFormat API і DataStore API.

Перший призначений для парсингу документів у новому форматі і перетворення їх у внутрішній формат системи, а також для зворотного побудови документів у новому форматі з внутрішнього представлення системи.

Другий API служить для завантаження/збереження/видалення документів із зовнішніх систем зберігання. Найчастіше, для підключення до нового джерела документів необхідно реалізувати обидва інтерфейсу.

Я покажу, як реалізувати DataStore connector до MongoDB і використовувати його в якості системи зберігання документів. В даному випадку реалізації DataFormat API не потрібно, оскільки Mongo надає документи у форматі JSON, який підтримується системою.

Відразу хочу зробити кілька зауважень:

  1. Практична користь від такого коннектора? Очевидно, Mongo можна просто використовувати як централізованого сховища документів. Так само він може бути корисний у ситуаціях, описаних у даній статті, коли дані вже зберігаються в Mongo, але її можливостей стало не вистачати для розвитку функціоналу системи;
  2. Я не є знавцем MongoDB, якщо є більш оптимальні способи роботи з нею, я буду радий їх почути;
Отже, почнемо.

DataStore API передбачає реалізацію інтерфейсу com.bagri.xdm.cache.api.DocumentStore:

public interface DocumentStore {

/**
* Lifecycle method. Invoked when the store initialized. 
* 
* @param context the environment context
*/
void init(Map<String, Object> context);

/**
* Lifecycle method. When Invoked parent schema is closing
*/
void close();

/**
* Load document from persistent store
* 
* @param key the document key
* @return XDM Document instance if corresponding document found, null otherwise
*/
Document loadDocument(DocumentKey key);

/**
* Load bunch of documents from persistent store
* 
* @param keys the collection of document keys to load
* @return the map of loaded documents with their keys
*/
Map<DocumentKey, Document> loadAllDocuments(Collection<DocumentKey> keys);

/**
* Load document keys. Can do it in synch or asynch way.
* 
* @return iterator over found document keys
*/
Iterable<DocumentKey> loadAllDocumentKeys();

/**
* Stores document to persistent store.
* 
* @param key the document key
* @param value the XDM document instance
*/
void storeDocument(DocumentKey key, Document value);

/**
* Stores bunch of documents to persistent store
* 
* @param entries the map of document keys and corresponding document instances
*/
void storeAllDocuments(Map<DocumentKey, Document> entries);

/**
* Deletes document from persistent store
* 
* @param key the document key
*/
void deleteDocument(DocumentKey key);

/**
* Deletes bunch o documents from persistent store 
* 
* @param keys the keys identifying documents to be deleted 
*/
void deleteAllDocuments(Collection<DocumentKey> keys);

}

Простіше всього буде наслідувати наш клас, який реалізує інтерфейс DocumentStore, від абстрактного класу DocumentStoreBase, що надає набір допоміжних методів для доступу до контексту системи. Почнемо з обробки параметрів підключення до зовнішньої системи і його ініціалізації.

Для підключення до Mongo нам знадобляться адресу сервера mongod, ім'я бази даних і імена колекцій, з яких ми хочемо завантажувати документи. Визначимо імена для цих параметрів: mongo.db.uri, mongo.db.database, mongo.db.collections. Тоді код ініціалізації підключення до сервера mongo може виглядати таким чином:

public class MongoDBStore extends DocumentStoreBase implements DocumentStore {

private MongoClient client;
private MongoDatabase db;
private Map<String, MongoCollection<org.bson.Document>> clMap = new HashMap<>();

@Override
public void init(Map<String, Object> context) {
super.setContext(context);
String uri = (String) context.get("mongo.db.uri"); 
MongoClientURI mcUri = new MongoClientURI(uri);
client = new MongoClient(mcUri);

String dbName = (String) context.get("mongo.db.database");
db = client.getDatabase(dbName);

String clNames = (String) context.get("mongo.db.collections");
boolean all = ".*".equals(clNames);
List<String> clns = Arrays.asList(clNames.split(","));
for (String clName: db.listCollectionNames()) {
if (all || clns.contains(clName)) {
MongoCollection<org.bson.Document> cln = db.getCollection(clName); 
clMap.put(clName, cln);
}
}
}

@Override
public void close() {
client.close();
}
}

Метод init приймає параметри підключення до сервера mongod з контексту, встановлює з'єднання і кешує об'єкт MongoCollection для кожної колекції, оголошеною для завантаження. Тепер нам потрібно реалізувати метод для завантаження всіх ключів документів.

private String getMappingKey(String id, String cln) {
return id + "::" + cln;
}

private String[] getMappingParts(String keyMap) {
return keyMap.split("::");
}

@Override
public Iterable<DocumentKey> loadAllDocumentKeys() {
SchemaRepository repo = getRepository();
if (repo == null) {
return null;
}

String id;
DocumentKey key;
Map<DocumentKey, String> keyMap = new HashMap<>();
for (MongoCollection<org.bson.Document> cln: clMap.values()) {
String clName = cln.getNamespace().getCollectionName();
// load _ids only
MongoCursor<org.bson.Document> cursor = cln.find().projection(include("_id")).iterator();

while (cursor.hasNext()) {
org.bson.Document doc = cursor.next();
id = doc.get("_id").toString();
// TODO: handle possible duplicates via revisions
key = repo.getFactory().newDocumentKey(id, 0, 1);
keyMap.put(key, getMappingKey(id, clName));
}
}
PopulationManagement popManager = repo.getPopulationManagement();
popManager.setKeyMappings(keyMap);
return keyMap.keySet();
}

І методи завантаження документів, відповідних завантажених ключів:

@Override
public Map<DocumentKey, Document> loadAllDocuments(Collection<DocumentKey> keys) {
Map<DocumentKey, Document> entries = new HashMap<>(keys.size());
for (DocumentKey key: keys) {
Document doc = loadDocument(key);
if (doc != null) {
entries.put(key, doc);
}
}
return entries;
}

@Override
public Document loadDocument(DocumentKey key) {
SchemaRepository repo = getRepository();
Document doc = null;

PopulationManagement popManager = repo.getPopulationManagement(); 
String id = popManager.getKeyMapping(key); 
if (id == null) {
return null;
}
String[] mParts = getMappingParts(id);
Document newDoc = null;

int[] clns = null;
com.bagri.xdm.system.Collection xcl = repo.getSchema().getCollection(mParts[1]);
if (xcl != null) {
clns = new int[] {xcl.getId()};
}
MongoCollection<org.bson.Document> cln = clMap.get(mParts[1]);

Object oid;
Date creDate;
try {
oid = new ObjectId(mParts[0]);
creDate = ((ObjectId) oid).getDate();
} catch (IllegalArgumentException ex) {
oid = mParts[0];
creDate = new Date();
}
org.bson.Document mongoDoc = cln.find(eq("_id", oid)).first();
String content = mongoDoc.toJson(new JsonWriterSettings(true));

try {
DocumentManagementl docMgr = (DocumentManagement) repo.getDocumentManagement(); 
newDoc = docMgr.createDocument(key, mParts[0], content, "JSON", creDate, "owner", 1, clns, true);
} catch (XDMException ex) {
// TODO: error log, but do not stop loading the whole
}
return doc;
}

Для початку цього достатньо, методи storeDocument/storeAllDocuments і deleteDocument/deleteAllDocuments пропоную реалізувати читачеві самостійно. Також прошу врахувати, що приведений вище код призначений тільки в цілях демонстрації процесу реалізації коннектора і не обробляє різні надзвичайні ситуації та можливі додаткові параметри конфігурації. Повний код коннектора можна подивитися і зібрати з репозиторію bagri-extensions.

Тепер нам потрібно зареєструвати DataStore connector і оголосити схему, яка буде його використовувати. Для цього нам необхідно додати конфігурацію коннектора в файл <BAGRI_HOME>/config/config.xml у секцію dataStores:

<dataStore name="mongo">
<version>1</version>
<createdAt>2016-08-01T16:17:20.542+03:00</createdAt>
<createdBy>admin</createdBy>
<description>MongoDB data store</description>
<enabled>true</enabled>
<storeClass>com.bagri.samples.MongoDBStore</storeClass>
<properties>
<entry name="mongo.db.uri">mongodb://localhost:27017</entry>
<entry name="mongo.db.database">test</entry>
<entry name="mongo.db.collections">*</entry>
</properties>
</dataStore>

Ми протестуємо роботу коннектора на прикладі колекції restaurants, розглянутої в багатьох прикладах роботи з MongoDB. Завантажте колекцію тестових документів у Mongo, як показано тут: docs.mongodb.com/getting-started/shell/import-data. Тепер зареєструємо схему для роботи з MongoDB і налаштуємо її на завантаження даних з даної колекції. В тому ж файлі config.xml додамо нову схему в розділ schemas:

Додаємо наступні параметри:

<entry name="xdm.schema.store.enabled">true</entry>
<entry name="xdm.schema.store.type">mongo</entry>
<entry name="mongo.db.collections">restaurants</entry>

Розділ schemas в config.xml з новими параметрами
<schema name="Mongo" active="true">
<version>1</version>
<createdAt>2016-08-01T21:30:58.096+04:00</createdAt>
<createdBy>admin</createdBy>
<description>Schema for MongoDB</description>
<properties>
<entry name="xdm.schema.store.tx.buffer.size">1024</entry>
<entry name="xdm.schema.data.backup.read">false</entry>
<entry name="xdm.schema.trans.backup.async">0</entry>
<entry name="xdm.schema.store.enabled">true</entry>
<entry name="xdm.schema.thread.pool">10</entry>
<entry name="xdm.schema.data.stats.enabled">true</entry>
<entry name="xdm.schema.query.cache">true</entry>
<entry name="xdm.schema.store.type">mongo</entry>
<entry name="mongo.db.collections">restaurants</entry>
<entry name="xdm.schema.format.default">JSON</entry>
<entry name="xdm.schema.ports.first">10300</entry>
<entry name="xdm.schema.ports.last">10400</entry>
<entry name="xdm.schema.population.size">1</entry>
<entry name="xdm.schema.population.buffer.size">1000000</entry>
<entry name="xdm.schema.data.backup.async">1</entry>
<entry name="xdm.schema.store.data.path">../data/mongo</entry>
<entry name="xdm.schema.dict.backup.sync">0</entry>
<entry name="xdm.schema.trans.backup.sync">1</entry>
<entry name="xdm.schema.query.backup.sync">0</entry>
<entry name="xdm.schema.buffer.size">16</entry>
<entry name="xdm.schema.dict.backup.async">1</entry>
<entry name="xdm.schema.dict.backup.read">true</entry>
<entry name="xdm.schema.trans.backup.read">false</entry>
<entry name="xdm.schema.query.backup.async">0</entry>
<entry name="xdm.schema.members">localhost</entry>
<entry name="xdm.schema.data.backup.sync">0</entry>
<entry name="xdm.schema.partition.count">157</entry>
<entry name="xdm.schema.query.backup.read">true</entry>
<entry name="xdm.schema.transaction.timeout">0</entry>
<entry name="xdm.schema.health.threshold.low">25</entry>
<entry name="xdm.schema.health.threshold.high">0</entry>
<entry name="xdm.schema.query.parallel">true</entry>
<entry name="xdm.schema.partition.pool">32</entry>
<entry name="xqj.schema.baseUri">file:///../data/mongo/</entry>
<entry name="xqj.schema.orderingMode">2</entry>
<entry name="xqj.schema.queryLanguageTypeAndVersion">1</entry>
<entry name="xqj.schema.bindingMode">0</entry>
<entry name="xqj.schema.boundarySpacePolicy">1</entry>
<entry name="xqj.schema.scrollability">1</entry>
<entry name="xqj.schema.holdability">2</entry>
<entry name="xqj.schema.copyNamespacesModePreserve">1</entry>
<entry name="xqj.schema.queryTimeout">0</entry>
<entry name="xqj.schema.defaultFunctionNamespace"></entry>
<entry name="xqj.schema.defaultElementTypeNamespace"></entry>
<entry name="xqj.schema.copyNamespacesModeInherit">1</entry>
<entry name="xqj.schema.defaultOrderForEmptySequences">2</entry>
<entry name="xqj.schema.defaultCollationUri"></entry>
<entry name="xqj.schema.constructionMode">1</entry>
</properties>
<collections>
<collection id="1" name="restaurants">
<version>1</version>
<createdAt>2016-08-01T01:01:26.965+03:00</createdAt>
<createdBy>admin</createdBy>
<description>Mongo restaurants collection</description>
<enabled>true</enabled>
</collection>
</collections>
<fragments/>
<indexes/>
<triggers/>
</schema>


Крім того, потрібно створити новий файл з профілем сервера. В каталозі <BAGRI_HOME>/config створіть файл mongo.properties і вкажіть у ньому використовується сервером схему:

xdm.cluster.node.schemas=Mongo

Переконайтеся, що сервер MongoDB запущений і чекає підключень за адресою, вказаною у налаштуваннях коннектора. Тепер можна стартувати сервер Bagri. В каталозі <BAGRI_HOME>/bin виконайте команду >bgcache.cmd mongo (on Windows) або >./bgcache.sh mongo (on Linux). Даний скрипт стартує одиночний сервер Bagri з настройками з профілю mongo.properties. По закінченні завантаження серверний лог повинен містити наступні рядки:

image

показують, що в конектор ініціалізував схему Mongo і завантажив у неї 25359 документів з зовнішнього сервера MongoDB.

Тепер я покажу, як можна маніпулювати документами JSON з допомогою запитів XQuery.

Для інтерактивного виконання запитів XQuery нам знадобиться клієнт, що дозволяє це робити. З дистрибутивом Bagri поставляється plugin до VisualVM, що надає дану функціональність. Інструкцію з його встановлення дивіться здесь.

Запустіть адміністративний сервер Bagri <BAGRI_HOME>/bin/bgadmin. Відкрийте програму VisualVM, підключіться до адміністративного сервера Bagri Manager і виберіть схему Mongo. Закладка DocumentManagement дозволяє працювати з документами і колекціями:

image

, а закладка QueryManagement запитів XQuery. Виконайте наступний простий запит для вибірки ресторану по його ідентифікатору:

declare namespace m="http://www.w3.org/2005/xpath-functions/map"; 
let $props := map{'method': 'json', 'indent': fn:true()}

for $uri in fn:uri-collection("restaurants")
let $map := fn:json-doc($uri)
where m:get($map, 'restaurant_id') = '40362098'
return (fn:serialize($map, $props), '\&\#xa;')

* Зверніть увагу, що символ перекладу рядка в самої останньої рядку екранований \, так як Хабр перетворює його в дійсний переклад рядка, так що при виконанні запиту символ \ потрібно прибрати.

image

Або інший, для вибірки ресторанів за типом кухні:

declare namespace m="http://www.w3.org/2005/xpath-functions/map"; 
let $props := map{'method': 'json'}

for $uri in fn:uri-collection("restaurants")
let $map := fn:json-doc($uri)
where m:get($map, 'кухня') = 'Bakery'
return (fn:serialize($map, $props), '\&\#xa;')

image

XQuery легко дозволяє робити будь-які вибірки, доступні в Mongo (крім запитів по гео-індексами, прямо з коробки вони ще не підтримуються).

Тепер я покажу запити, які не підтримуються в MongoDB: JOIN. Для цього можна привести дані в колекції restaurants до більш нормалізованої увазі, наприклад відокремити відгуки про ресторани від даних по самому ресторану і зберегти їх в різних колекціях.

Виконайте цей запит і збережіть результати у файл, потім зробіть імпорт отриманих даних в MongoDB, в колекцію rest-short.

declare namespace m="http://www.w3.org/2005/xpath-functions/map";
let $props := map{'method': 'json'}

for $uri in fn:uri-collection("restaurants")
let $rest := fn:json-doc($uri)
let $rest := m:remove($rest, '_id')
let $rest := m:remove($rest, 'grades')
return (fn:serialize($rest, $props), '\&\#xa;')

Наступний запит виводить дані за відгуками. Так само зберегти їх в окремий файл і потім імпортуйте в MongoDB в колекцію grades.

declare namespace a="http://www.w3.org/2005/xpath-functions/array";
declare namespace m="http://www.w3.org/2005/xpath-functions/map";
let $props := map{'method': 'json'}

for $uri in fn:uri-collection("restaurants")
let $rest := fn:json-doc($uri)
let $grades := m:get($rest, 'grades')
return 
for $i in (1 to a:size($grades))
let $grade := a:get($grades, $i)
let $date := m:get($grade, 'date')
return ('{"restaurant_id": "', m:get($rest, 'restaurant_id'), 
'", "date": ', fn:serialize($date, $props), 
', "grade": "', m:get($grade, 'grade'), 
'", "score": "', m:get($grade, 'score'), '"}', '\&\#xa;')

Тепер поправте налаштування схеми, щоб заявити нові колекції для завантаження:

<schema name="Mongo" active="true">
............
<properties>
<entry name="xdm.schema.store.collections">rest-short, grades</entry>
.........
</properties>
<collections>
<collection id="2" name="rest-short">
<version>1</version>
<createdAt>2016-08-01T01:01:26.965+03:00</createdAt>
<createdBy>admin</createdBy>
<description>Restaurant headers collection</description>
<enabled>true</enabled>
</collection>
<collection id="3" name="grades">
<version>1</version>
<createdAt>2016-08-01T01:01:26.965+03:00</createdAt>
<createdBy>admin</createdBy>
<description>Restaurant grades collection</description>
<enabled>true</enabled>
</collection>
</collections>
<fragments/>
<indexes/>
<triggers/>
</schema>

Рестартуйте сервер Bagri для завантаження нових колекцій з даними. Тепер можна перевірити, як працюють join'и. Виконайте наступний запит для формування повної структури restaurants з двох колекцій:

declare namespace m="http://www.w3.org/2005/xpath-functions/map";
let $props := map{'method': 'json'}

for $ruri in fn:uri-collection("rest-short")
let $rest := fn:json-doc($ruri)
let $rid := m:get($rest, 'restaurant_id')
let $addr := m:get($rest, 'address')
let $txt := ('{"restaurant_id": "', $rid,
'", "cuisine": "', m:get($rest, 'кухня'), 
'", "name": "', m:get($rest, 'name'), 
'", "borough": "', m:get($rest, 'borough'), 
'", "address": ', fn:serialize($addr, $props),
', "grades": [')
return ($txt, fn:string-join(
for $guri in fn:uri-collection("grades")
let $grade := fn:json-doc($guri)
let $gid := m:get($grade, 'restaurant_id')
where $gid = $rid
return fn:serialize(m:remove(m:remove($grade, '_id'), 'restaurant_id'), $props), ', '), ']}\&\#xa;')

Отже, ми з вами розглянули, як можна реалізувати DataStore connector до MongoDB і використовувати його в якості системи зберігання документів. Сподіваюся ця стаття зможе стати для вас відправною точкою для написання інших розширень Багри або просто спонукає вас більш докладно ознайомитися з цим цікавим продуктом. Проектом завжди потрібні Java разработчки зацікавлені в розвитку Bagri, більш докладно на код проекту можна подивитися на Гітхабі.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.