Введення в CDRS, Cassandra driver повністю написаний на Rust

CDRS (Cassandra driver written in Rust) — це мій власний open source проект, який я зважився розробляти після того, як виявив, що в плані драйверів для Cassandra у Rust екосистемі утворився дефіцит.
Звичайно, я не скажу, що їх зовсім немає. Вони є, але одна частина це закинуті в зародковому стані Hello World пакети, а друга частина-це, напевно, єдиний binding до драйверу DataStax, написаному на C++.
Що стосується CDRS, то засобами Rust він повністю імплементує специфікацію 4-ї версії протоколу.
cargo.toml
Щоб включити драйвер в свій проект, як правило, необхідно наступне.
По-перше, додати CDRS в секцію
dependencies
вашого
cargo.toml
файл:
[dependencies]
cdrs = "1.0.0-beta.1"

Це дозволить використовувати TCP з'єднання без шифрування.
Якщо ви маєте намір створювати SSL-шифрування з'єднання зі своєю базою даних, то CDRS повинен бути включений з фичей "ssl":
[dependencies]
openssl = "0.9.6"
[dependencies.cdrs]
version = "1.0.0-beta.1"
features = ["ssl"]

По-друге, додати його в
lib.rs

extern crate CDRS

Установка з'єднання

TCP з'єднання

Для установки не шифрованого з'єднання вам знадобляться наступні модулі
use cdrs::client::CDRS;
use cdrs::authenticators::{NoneAuthenticator, PasswordAuthenticator};
use cdrs::transport::TransportPlain;

Якщо так сталося, що ваш кластер не вимагає авторизації паролем, з'єднання може бути встановлене таким чином:
let кодів = NoneAuthenticator;
let addr = "127.0.0.1:9042";
let tcp_transport = TransportPlain::new(addr).unwrap();

// pass кодів and transport into CDRS' constructor
let client = CDRS::new(tcp_transport, кодів);
use cdrs::compression;
// start session without compression
let mut session = try!(client.start(compression::None));

Для установки з'єднання, що вимагає авторизації паролем, замість
NoneAuthenticator
використовувати
PasswordAuthenticator
:
let кодів = PasswordAuthenticator::new("user", "pass");

TLS з'єднання

Встановлення з'єднання TLS багато в чому схоже на процес, описаний в попередньому кроці, за винятком того, що вам знадобиться PEM сертифікат для створення SSL транспорту.
use cdrs::client::CDRS;
use cdrs::authenticators::PasswordAuthenticator;
use cdrs::transport::TransportTls;
use openssl::ssl::{SslConnectorBuilder, SslMethod};
use std::path::Path;

let кодів = PasswordAuthenticator::new("user", "pass");
let addr = "127.0.0.1:9042";

// here needs to be a path of your SSL certificate
let path = Path::new("./node0.cer.pem");
let mut ssl_connector_builder = SslConnectorBuilder::new(SslMethod::tls()).unwrap();
ssl_connector_builder.builder_mut().set_ca_file(path).unwrap();
let connector = ssl_connector_builder.build();

let ssl_transport = TransportTls::new(addr, &connector).unwrap();

// pass кодів and SSL transport into CDRS' constructor
let client = CDRS::new(ssl_transport, кодів);

Connection pool

Для більш простого управління існуючими з'єднанням CDRS містить
ConnectionManager
, який по своїй суті є адаптор для r2d2.
use cdrs::connection_manager::ConnectionManager;
//...
let config = r2d2::Config::builder()
.pool_size(3)
.build();
let transport = TransportPlain::new(ADDR).unwrap();
let кодів = PasswordAuthenticator::new(USER, PASS);
let manager = ConnectionManager::new(transport, кодів, Compression::None);

let pool = r2d2::Pool::new(config, manager).unwrap();

for _ in 0..20 {
let pool = pool.clone();
thread::spawn(move || {
let conn = pool.get().unwrap();
// use the connection
// it will be returned to the pool when it falls out of scope.
});
}

Стиснення — lz4 і snappy
Щоб використовувати
lz4
та
snappy
стиснення, достатньо передати в конструктор сесії бажаний декодер:
// session without compression
let mut session_res = client.start(compression::None);
// session with lz4 compression
let mut session_res = client.start(compression::Lz4);
// session with snappy compression
let mut session_res = client.start(compression::Snappy);

Далі CDRS самостійно повідомить кластеру, що він готовий приймати інформацію в стислому вигляді з обраним декодером. Подальша розпакування буде проходити автоматично і не вимагає яких-небудь подальших дій від розробника.
Виконання запитів
Виконання запитів до Cassandra сервера проходить виключно в рамках існуючої сесії, після вибору методів авторизації, стиснення, а також типу транспорту.
Для виконання того чи іншого запиту необхідно створити об'єкт
Query
, який з першого погляду може здатися дещо надмірною для простих запитів, оскільки містить безліч параметрів, які, ймовірно, використовуються не так часто.
З цієї причини був створений
builder
, який спрощує процес конфігурування запиту. Наприклад, для простого '
USE my_namespace;
' досить просто
let create_query: Query = QueryBuilder::new("USE my_namespace;").finalize();
let with_tracing = false;
let with_warnings = false;

let switched = session.query(create_query, with_tracing, with_warnings).is_ok();

Створення нової таблиці

Щоб створити нову таблицю в Cassandra кластері, як і раніше, необхідно спочатку сконфігурувати
Query
і після цього виконати запит:
use std::default::Default;
use cdrs::query::{Query, QueryBuilder};
use cdrs::consistency::Consistency;

let mut create_query: Query = QueryBuilder::new("CREATE TABLE keyspace.authors (
id int,
name text,
messages list<text>,
PRIMARY KEY (id)
);")
.consistency(Consistency::One)
.finalize();
let with_tracing = false;
let with_warnings = false;

let table_created = session.query(create_query, with_tracing, with_warnings).is_ok();

Що стосується самого CQL запиту на створення нової таблиці, то за більш повною інформацією краще звернутися до спеціалізованих ресурсів, наприклад, DataStax.

SELECT запит і маппінг результатів

Припустимо, що в нашій базі даних є таблиця авторів, при чому кожен автор має список своїх повідомлень. Нехай ці повідомлення зберігаються всередині list-колонки. У термінах Rust автор повинен мати наступний вигляд:
struct Author {
pub name: String,
pub messages: Vec<String>
}

Сам запит може бути виконаний через
Session::query
метод, як це було зроблено у випадку створення таблиці. Природно, CQL повинен бути в даному разі чимось на зразок '
SELECT * FROM keyspace.authors;
'. Якщо таблиця містить дані про якихось авторів, ми можемо спробувати відобразити отримані дані в колекцію Rust структур, типу '
Vec<Author>
'
//...
use cdrs::error::{Result as CResult};
let res_body = parsed.get_body();
let rows = res_body.into_rows().unwrap();
let messages: Vec<Author> = rows
.iter()
.map(|row| {
let name: String = row.get_by_name("name").unwrap();
let messages: Vec<String> = row
// unwrap Option<CResult<T>>, де T implements AsRust
.get_by_name("messages").unwrap().unwrap()
.as_rust().unwrap();
return Author {
author:name,
text: messages
};
})
.collect();

Під час відображення результатів слід звернути увагу на наступні трейты:
  1. IntoRustByName. Говорячи простою мовою, цей трейт застосовується по відношенню до складних типів Cassandra таким, як row (яка, строго кажучи, не є окремим типом, визначеними у специфікації, але за своїм внутрішнім устроєм може розглядатися, як щось близьке до User Defined Type) і UDT. Грубо кажучи,
    get_by_name
    намагається відшукати "властивість" по його імені, і якщо знаходить, то повертає результат перетворення цієї властивості до Rust типу або до CDRS типів, таких як
    List
    , 'Map',
    UDT
    . Самі ж ці типи є відображення відповідних типів даних, визначених у специфікації.
  2. AsRust. Цей трейт призначений для кінцевого відображення у Rust типи. Повний список имплиментаторов можна побачити у приведеному посиланню.

Prepare & Execute

Іноді буває зручним спочатку один раз підготувати складний запит, а після цього виконати його декілька раз з різними даними у різний час. Для цього чудово підходить Prepare & Execute.
// prepare just once
let insert_table_cql = " insert into user_keyspace.users (user_name, password, gender, session_token, state) values (?, ?, ?, ?, ?)";

let prepared = session.prepare(insert_table_cql.to_string(), true, true)
.unwrap()
.get_body()
.into_prepared()
.unwrap();

// execute later and possible few times with different values
let v: Vec<Value> = vec![Value::new_normal(String::from("john").into_bytes()),
Value::new_normal(String::from("pwd").into_bytes()),
Value::new_normal(String::from("male").into_bytes()),
Value::new_normal(String::from("09000").into_bytes()),
Value::new_normal(String::from("FL").into_bytes())];
let execution_params = QueryParamsBuilder::new(Consistency::One).values(v).finalize();
// without tracing and warnings
let executed = session.execute(prepared.id execution_params, false, false);

Також має сенс комбінувати Prepare & Batch для виконання відразу декількох підготовлених запитів. Найпростіший приклад Batch також можна знайти на примерах.
Cassandra events
Крім усього вищеописаного, CDRS надає можливість підписатися і стежити за подіями, які публікує сервер.
let (mut listener, stream) = session.listen_for(vec![SimpleServerEvent::SchemaChange]).unwrap();

thread::spawn(move || listener.start(&Compression::None).unwrap());

let topology_changes = stream
// inspects all events in a stream
.inspect(|event| println!("inspect event {:?}", event))
// filter by подієві s type: topology changes
.filter(|event| event == &SimpleServerEvent::TopologyChange)
// filter by подієві s specific information: new node was added
.filter(|event| {
match event {
&ServerEvent::TopologyChange(ref event) => {
event.change_type == TopologyChangeType::NewNode
},
_ => false
}
});

println!("Start listen for server events");

for change in topology_changes {
println!("server event {:?}", change);
}

Щоб знайти повний список подій краще всього звернутися в саму спецификацию, а також до документації драйвера.
У майбутньому є плани використовувати події для "розумного" load balancing.
Корисні посилання


Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.