[The Methanum project] Створення інструментарію для побудови розподілених систем з топологією "Зірка"



Зірка — на сьогоднішній день найбільш поширена топологія комп'ютерних мереж. Така структура володіє рядом переваг: легкість масштабування, надійністю (вихід з ладу однієї машини не позначається на інших) і простота адміністрування. Звичайно це рішення з фізичного рівня давно реалізовано і на програмному рівні. Тим не менш, представляю на суд читачів свою версію інструментарію .Net для побудови розподілених систем з топологією зірка.

Системи, побудовані на основі такої топології можуть бути структурно організовані, наприклад, як на зображенні нижче.



У цій статті я опишу створення мінімального інструментарію, без багатьох корисних фіч, які колись використовував у своїх розробках на базі представленої архітектури. Однак цього цілком достатньо для побудови реально корисних систем.

Methanum



Проект отримав кодову назву Methanum виключно через структурної схожості топології з молекулою метану :). Центральний вузол виконує роль комунікатора названий «Core». До ядра підключаються інші додатки мережі і підписуються на події. Так само кожне додаток мережі може випускати події. Таким чином, через події здійснюється обмін даними в мережі. Події — це сериализуемый клас Event, який може містити довільні дані. Event мінімально містить 2 поля — рядковий поле Destination, классифицирующее подія, і поле Data, містить словник key value. Key — це рядок, ім'я аргументу, Value — має тип object і може містити примітиви (int, double, bool...). Для структур доводиться кілька допомогти системі сериализации їх.

Для початку створимо проект «methanum» бібліотеки класів на C# і по ходу тексту будемо додавати файли.

Event



Як вже було сказано дані передаються за допомогою подій. Подія — це клас, що включає поле з даними Data і поле для ідентифікації події Destination. Також я залишив ще два поля: Id — унікальний ідентифікатор і DataTime містить час створення події. Ці додаткові поля потрібні виключно для зручності, наприклад, для аналізу логів. Клас події так само містить деяку кількість методів, покликаних спростити життя програміста, їх призначення думаю буде зрозуміло з коду та додаткових пояснень не потребує.

Event.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Web.Script.Serialization;

namespace methanum
{
[DataContract]
[KnownType(typeof(List<DateTime>))]
public class Event {
/// <summary>
/// A unique id of the event
/ / / < /summary>
[DataMember]
public Guid Id { set; get; }

/// <summary>
/// DateTime of event creation
/ / / < /summary>
[DataMember]
public DateTime DataTime { get; set; }

/// <summary>
/// Target
/ / / < /summary>
[DataMember]
public string Destination { get; set; }


/// <summary>
/// Data container
/ / / < /summary>
[DataMember]
public Dictionary<string, object> Data { get; set; }

public Event() {
Init();
}

public Event(string destination) {
Init();
Destination = destination;
}

private void Init() {
Data = new Dictionary<string, object>();
Id = Guid.NewGuid();
DataTime = DateTime.Now;
}


public override string ToString() {

var properties = GetType().GetProperties();

var sb = new StringBuilder();
sb.AppendFormat("[{0}]", GetType().Name);

foreach (var property in properties) {
if (property.Name == "Data") {
sb.Append("\nData = ");
string s = string.Format(" {0}", '{');
s = Data.Keys.Aggregate(s,
(current, key) => current + String.Format("\n {0}\t{1}", key, Data[key]));
sb.AppendFormat("{0}\n{1}", s, '}');

}
else sb.AppendFormat("\n{0} = {1};", property.Name, property.GetValue(this, null));
}

return sb.ToString();
}

public void SetData(string key, object obj) {
Data[key] = obj;
}

public object GetObj(string key) {
return !Data.ContainsKey(key) ? null : Data[key];
}

public double GetDbl(string key) {
return !Data.ContainsKey(key) ? Double.NaN : Convert.ToDouble(Data[key]);
}

public int GetInt(string key) {
return !Data.ContainsKey(key) ? Int32.MinValue : Convert.ToInt32(Data[key]);
}

public bool GetBool(string key) {
return Data.ContainsKey(key) && Convert.ToBoolean(Data[key]);
}


public string GetStr(string key) {
return !Data.ContainsKey(key) ? null : Convert.ToString(Data[key]);
}

public void SetCustomData(string key, object value) {
var serializer = new JavaScriptSerializer();
var str = serializer.Serialize(value);
SetData(key, str);
}

public object GetCustom(string key, Type valueType) {
if (!Data.ContainsKey(key))
return null;

if (Data[key].GetType() != typeof(string))
return null;

var serializer = new JavaScriptSerializer();
var str = (string) Data[key];
var obj = serializer.Deserialize(str, valueType);

return obj;
}
}
}



Gate



Суть ядра полягає в реалізації інтерфейсу, назвемо його «інтерфейс воріт». Основна мета воріт — надання функціоналу для реєстрації клієнтів і асинхронної посилки подій в обох напрямах (від додатка до ядра і назад).

IGate.cs
using System.ServiceModel;

namespace methanum {
[ServiceContract(CallbackContract = typeof(IListener))]
public interface IGate {
[OperationContract]
void Subscribe();

[OperationContract]
void KillConnection();

[OperationContract]
void Fire(Event evt);
}
}



Контракт даних у нас дуплексний, в прямому напрямку — від додатка до ядра — стріляємо події через IGate викликом методу void Fire(Event evt). Зворотний виклик — від ядра до додатка — відбувається через IListener інтерфейс, про який буде пізніше.
Ворота працюють за наступним принципом. Коли стартує ядро, створюється об'єкт класу Gate, успадкованого від інтерфейсу IGate. В Gate є статичне поле _subscribers, в якому зберігаються всі активні підключення до ядра. При виклику методу Subscribe(), додаємо поточне підключення, якщо воно ще не додано. Метод KillConnection() служить для видалення поточного з'єднання. Самим цікавим є метод Fire(Event evt), але і в ньому немає ні чого складного. Половину методу докапываемся до Ip адреси і порту, тільки щоб вивести інформацію в консоль. Я залишив цю частину коду виключно для того, щоб продемонструвати, як отримати доступ до адреси з'єднання, наприклад, щоб фільтрувати або логировать події щодо дозволених адрес. Основна робота цього методу полягає в обході всіх наявних підключень і асинхронного виклику методу Receive у їхніх слухачів IListener. Якщо виявляємо закрите підключення, то його негайно видаляємо зі списку активних підключень.

Gate.cs
using System;
using System.Collections.Generic;
using System.ServiceModel;
using System.ServiceModel.Channels;

namespace methanum {
public class Gate : IGate {
private static List<OperationContext> _subscribers;

public Gate() {
if (_subscribers == null)
_subscribers = new List<OperationContext>();
}

public void Subscribe() {
var oc = OperationContext.Current;

if (!_subscribers.Exists(c => c.SessionId == oc.SessionId)) {
_subscribers.Add(oc);
Console.WriteLine("(subscribe \"{0}\")", oc.SessionId);
}
}

public void KillConnection() {
var oc = OperationContext.Current;
_subscribers.RemoveAll(c => c.SessionId == oc.SessionId);

Console.WriteLine("(kill \"{0}\")", oc.SessionId);
}

public void Fire(Event evt) {
var currentOperationContext = OperationContext.Current;
var remoteEndpointMessageProperty =
currentOperationContext.IncomingMessageProperties[RemoteEndpointMessageProperty.Name] as
RemoteEndpointMessageProperty;
var ip = "";
var port = 0;

if (remoteEndpointMessageProperty != null) {
ip = remoteEndpointMessageProperty.Address;
port = remoteEndpointMessageProperty.Port;
}

Console.WriteLine("(Fire (event . \"{0}\") (from . \"{1}:{2}\") (subscribers . {3}))", evt.Id, ip port, _subscribers.Count);

for (var i = _subscribers.Count - 1; i >= 0; i--) {
var oc = _subscribers[i];

if (oc.Channel.State == CommunicationState.Opened) {
var channel = oc.GetCallbackChannel<IListener>();


try {
((DelegateReceive) (channel.Receive)).BeginInvoke(evt, null, null);
}
catch (Exception e) {
Console.WriteLine(e.Message);
}
}
else {
_subscribers.RemoveAt(i);
Console.WriteLine("(dead . \"{0}\")", oc.SessionId);
}
}
}
}
}



Listener



Щоб передати повідомлення від ядра до клієнту достатньо одного методу Receive, який визначений в інтерфейсі IListener.

IListener.cs
using System.ServiceModel;

namespace methanum {
public delegate void DelegateReceive(Event evt);
interface IListener {
[OperationContract(IsOneWay = true)]
void Receive(Event evt);
}
}



Від інтерфейсу IListener успадковується клас Connector, який реалізує всю логіку взаємодії клієнтського додатка і ядра. При створенні екземпляра класу створюється підключення до ядра, через який передаються і приймаються повідомлення. Відправка і отримання повідомлень відбувається в окремих потоках, щоб виключити блокування додатків і ядра. Щоб розрізняти події, що в них є поле Destination. Фільтрувати події за допомогою if-then-else або switch-case конструкцій незручно, тому був реалізований механізм, що дозволяє зіставити кожному питанню, що події у відповідність обробник. Таке зіставлення зберігається в словнику Dictionary<string, CbHandler> _handlers;. Коли подія прийнято, відбувається пошук у словнику і, якщо ключ знайдено, викликається відповідний обробник.

Connector.cs
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.ServiceModel;
using System.Threading;

namespace methanum {
public delegate void CbHandler(Event evt);

public class Connector : IListener {
private Dictionary<string, CbHandler> _handlers;
private NetTcpBinding _binding;
private EndpointAddress _endpointToAddress;
private InstanceContext _instance;
private DuplexChannelFactory<IGate> _channelFactory;
private IGate _channel;
private Thread _fireThread;
private List<Event> _eventQueue;

public event CbHandler ReceiveEvent;

private bool _isSubscribed;

private object _channelSync = new object();

protected virtual void OnReceive(Event evt) {
CbHandler handler = ReceiveEvent;
if (handler != null) handler.BeginInvoke(evt, null, null);
}

//localhost:2255
public Connector(string ipAddress) {
init(ipAddress);
}

private void init(string ipAddress) {
_handlers = new Dictionary<string, CbHandler>();
_binding = new NetTcpBinding();
_endpointToAddress = new EndpointAddress(string.Format("net.tcp://{0}", ipAddress));
_instance = new InstanceContext(this);

Conect();

_eventQueue = new List<Event>();

_fireThread = new Thread(FireProc);
_fireThread.IsBackground = true;
_fireThread.Start();
}

private void Conect() {
_isSubscribed = false;

while (!_isSubscribed) {
try {
_channelFactory = new DuplexChannelFactory<IGate>(_instance, _binding, _endpointToAddress);

_channel = _channelFactory.CreateChannel();

_channel.Subscribe();
_isSubscribed = true;
}
catch (Exception e) {
if (!(e is EndpointNotFoundException)) throw e;

Thread.Sleep(1000);
}
}
}

private void ReConect() {
lock (_channelSync) {
try {
_channel.KillConnection();
}
catch (Exception e) {
Console.WriteLine("(ReConect-exception \"{0}\"", e.Message);
}
Conect();
}
}

public void Fire(Event evt) {
lock (_eventQueue) {
_eventQueue.Add(evt);
}
}

private void FireProc() {
while (true) {
var isHasEventsToFire = false;

lock (_eventQueue) {
isHasEventsToFire = _eventQueue.Any();
}

if (_isSubscribed && isHasEventsToFire) {
Event evt;

lock (_eventQueue) {
evt = _eventQueue.First();
}

try {
lock (_eventQueue) {
_eventQueue.Remove(evt);
}

_channel.Fire(evt); 
}
catch (Exception) {
if (_isSubscribed)
_isSubscribed = false;
ReConect();
}
} else Thread.Sleep(10);
}
}

public void SetHandler(string destination, CbHandler handler) {
_handlers[destination] = handler;
}

public void DeleteHandler(string destination) {
if(_handlers.ContainsKey(destination)) _handlers.Remove(destination);
}

public void Receive(Event evt) {
if (_handlers.ContainsKey(evt.Destination)) {
_handlers[evt.Destination].BeginInvoke(evt, null, null);
}

OnReceive(evt);
}

public static void HoldProcess() {
var processName = Process.GetCurrentProcess().ProcessName;
var defColor = Console.ForegroundColor;

Console.ForegroundColor = ConsoleColor.Green;

Console.WriteLine("The {0} is ready", processName);
Console.WriteLine("Press < Enter > to terminate {0}", processName);

Console.ForegroundColor = defColor;

Console.ReadLine();
}
}
}



Для зручності створимо ще один невеличкий клас, стартує сервіс.

SrvRunner.cs
using System;
using System.ServiceModel;

namespace methanum {
public class SrvRunner {
private ServiceHost _sHost;

public void Start(int port) {
var uris = new[] { new Uri(string.Format("net.tcp://0.0.0.0:{0}", port)) };

_sHost = new ServiceHost(typeof (Gate), uris);

_sHost.Open();

foreach (var uri2 in _sHost.BaseAddresses) {
Console.WriteLine("Start on: {0}", uri2.ToString());
}
}

public void Stop() {
_sHost.Close();
}
}
}



Core



Ми реалізували всі класи, необхідні для комунікації наших додатків. Залишилося створити ядро, до якого будуть підключатися наші додатки. Для цього у вирішенні створюємо проект «Core» консольного додатка до нього підключаємо збірку methanum. Взагалі, ми вже все написали, залишилося тільки запустити.

CoreMain.cs
using System;
using System.Linq;
using methanum;

namespace Core {
internal class CoreMain {
private static void Main(string[] args) {
int port = 0;
if ((!args.Any()) || (!int.TryParse(args[0], out port))) {
Console.WriteLine("Usage:");
Console.WriteLine("Core.exe port");
Environment.Exit(1);
}

try {
var coreSrv = new SrvRunner();
coreSrv.Start(port);

Console.WriteLine("The Core is ready.");
Console.WriteLine("Press < ENTER > to terminate Core.");
Console.ReadLine();

coreSrv.Stop();
}
catch (Exception e) {
Console.WriteLine(e.Message);
}
}
}
}



Приклад



Для демострації створимо примітивний месенджер: створюємо ще одне консольний додаток, додаємо посилання на збірку methanum і вставляємо вміст файлу Program.cs.

Program.cs
using System;
using System.Linq;
using methanum;

namespace ClentExamle {
class Program {
static void Main(string[] args) {

if ((!args.Any())) {
Console.WriteLine("Usage:");
Console.WriteLine("ClentExample.exe coreAddress:port");
Environment.Exit(1);
}

var userName = "";

while (String.IsNullOrWhiteSpace(userName)) {
Console.WriteLine("Please write user name:");
userName = Console.ReadLine(); 
}

try {
var maingate = new Connector(args[0]);

maingate.SetHandler("message", MsgHandler);

Console.WriteLine("Hello {0}, now you can send messages", userName);

while (true) {
var msg = Console.ReadLine();
var evt = new Event("message");
evt.SetData("name", userName);
evt.SetData("text", msg);

maingate.Fire(evt);
}
}
catch (Exception e) {
Console.WriteLine(e.Message);
}
}

private static void MsgHandler(Event evt) {
Console.WriteLine("[{0}] >> {1}", evt.GetStr("name"), evt.GetStr("text"));
}

}
}



Тепер запускаємо додаток Core.exe вказавши в командному рядку порт, наприклад «Core 2255». Потім стартуємо кілька примірників ClentExample.exe командою «ClentExample localhost:2255». Програми пропонують ввести ім'я користувача, після чого підключаються до ядра. У результаті, виходить широкомовний примітивний чат: кожне нове повідомлення надсилається викликом maingate.Fire(evt), приймається в оброблювачі MsgHandler(Event evt).



Повний вихідний файл доступний на гихабе methanum.

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.