Message Passing в F#. Застосування MailboxProcessor

Дана стаття продовжує серію публікацій про технології, які ми використовуємо для розробки сервісу перевірки доступності веб-сайтів HostTracker.
Сьогодні мова піде про…

MailboxProcessor


image



MailboxProcessor — клас, який дозволяє створити агент мовою F#, який складається з двох частин — черги повідомлень і функції, яка обробляє повідомлення з черги. Він надає наступний інтерфейс взаємодії:

Post — відправити повідомлення 'msg чергу процесора асинхронно, не чекаючи його обробки;

PostAndReply: (msgBuilder: AsyncReplyChannel<'reply> -> 'msg) -> 'reply — відправити повідомлення з асинхронним каналом очікування результату AsyncReplyChannel. Потік, який викликав даний метод, очікує виклик AsyncReplyChannel.Reply: 'reply -> unit з обробника повідомлень з черги для отримання резульатата. msgBuilder — лямбда-функція, яка компонуються повідомлення з каналу;

PostAndAsyncReply: (msgBuilder: AsyncReplyChannel<'reply> -> 'msg) -> Async<'reply> — аналогічний до PostAndReply, але очікування результату не відбувається. Повертається асинхронне обчислення, результатом якого є передане в AsyncReplyChannel.Reply значення. Клієнт сам вирішує яким чином очікувати результат — віддавши поточний потік пулу або блокуючи його;

TryPostAndReply: (msgBuilder: AsyncReplyChannel<'reply> -> 'msg)*?timeout:int -> 'reply option — аналогічний PostAndReply, але очікування відбувається протягом timeout мілісекунд. Якщо результат за цей час отримано, то метод повертає Some(result). В іншому випадку повертає None;

PostAndTryAsyncReply: (msgBuilder: AsyncReplyChannel<'reply> -> 'msg)*?timeout:int -> Async<'reply option> — аналогічний TryPostAndReply, але метод повертає асинхронні обчислення;

Receive: unit -> Async<'msg>; TryReceive: timeout:int -> Async<'msg option> — асинхронні обчислення, які повертають повідомлення в чергу в порядку FIFO (TryReceive — асинхронні обчислення, які очікують протягом timeout мілісекунд на повідомлення і повертає Some(msg), якщо воно прийшло в даний період часу, і None, якщо немає). Використовується, в основному, на стороні обробника повідомлень і дуже рідко на стороні клієнта;

Scan: (scanner: 'msg -> Async<'T> option) -> Async<'T>; TryScan: (scanner: 'msg -> Async<'T> option)*?timeout:int -> Async<'T option> — повертають асинхронні обчислення, які будуть перевіряти чергу повідомлень на наявність такого повідомлення, для якого scanner зможе побудувати асинхронні обчислення. Для Scan обчислення, створені з допомогою scanner, будуть виконані після його побудови, і результат стане результатом асинхронного обчислення, яке поверне Scan. Для TryScan задається час сканування черги. Якщо протягом даного часу scanner побудує обчислення, то вони виконуються і їх результат стане результатом асинхронного обчислення, створеного TryScan. В іншому випадку асинхронне обчислення повертає None.

У формуванні MailboxProcessor-а приймає участь лямбда-функція, яка генерує асинхронні обчислення, які буде виконувати агент. Дана функція приймає як параметр сам процесор, що дозволяє в створеному обчисленні використовувати вищеописаний інтерфейс взаємодії з чергою (дуже рідко застосовуються методи PostAndReply, часто Post, Receive, Scan). Таким чином, відбувається обробка черги асинхронними обчисленнями. Клієнт, побудований процесором, використовує той же інтерфейс для взаємодії з чергою повідомлень, але зазвичай (у більшості сценаріїв) виконання методів Receive, Scan на його боці не відбувається.

Використання MailboxProcessor:

• Кеші (caches) даних при роботі з їхніми сховищами (SQL бази даних, ...). Незалежно від типу сховища, в додатку часто виникає завдання доступу до даних за певними критеріями (як правило, по ідентифікатору запису). Це може привести до помітного зниження швидкодії системи. Рішення на базі MaiboxProcessor наведено в наступному коді:

// Повідомлення кеша:
// GetFromCache: id - ідентифікатор ресурсу - generic
// AsyncReplyChannel<Choice<'res, exn>> - канал відповіді для передачі ресурсу 
// ClearCache - чистка кеша
type cacheCommands<'id'res> = 
| GetFromCache of 'id * AsyncReplyChannel<Choice<'res, exn>>
| ClearCache

// клас Cache інкапсулює MailboxProcessor
// getById - функція отримання даних з джерела в випадку кеш промаху
// cleanupCacheInterval - інтервал очищення кешу
type Cache<'itemId,'item when 'itemId : comparison > (getById, cleanupCacheInterval) =
// MailboxProcessor працює на пулі потоків
let inbox = MailboxProcessor.Start(fun inbox ->
// цикл обробки повідомлень в черзі
// cache - стан кеша - Map - відображення id на ресурс
let rec loop cache = async {
let! res = inbox.Receive() 
//беремо нове повідомлення
//потік MailboxProcessor повернеться пулу до появи повідомлення
match with res
| GetFromCache (itemId:'itemId, rc) ->
let (item, cache')= 
match Map.tryFind itemId cache with 
| None -> 
// кеш промах
try
match getById itemId with
| None -> (Choice2Of2(KeyNotFoundException() :> exn), cache)
| Some (item:'item) -> 
(Choice1Of2(item), Map.add itemId item cache)
with exc ->
(Choice2Of2(exc), cache)
| Some item ->
// кеш попадання
(Choice1Of2(item), cache)
rc.Reply item //повертаємо результат
return! loop cache' //йдемо на нову ітерацію циклу
| ClearCache -> 
return! loop Map.empty //повне очищення кешу
}
loop Map.empty)
//цикл очищення кеша через інтервали фиксированые
let rec runCleanupCicle () =
async {
do! Async.Sleep cleanupCacheInterval //потік повернеться пулу
inbox.Post ClearCache
return! runCleanupCicle ()
}
do if cleanupCacheInterval > 0 then 
runCleanupCicle () |> Async.Start
with
//інтерфейс кеша
member x.TryGet(itemId) =
match inbox.PostAndReply( fun rc -> GetFromCache (itemId, rc) ) with
| Choice1Of2 item -> Some item
| _ -> None
member x.Get(itemId) =
match inbox.PostAndReply( fun rc -> GetFromCache (itemId, rc) ) with
| Choice1Of2 item -> item
| Choice2Of2 e -> raise e
member x.Cleanup() =
inbox.Post ClearCache


Інша проблема виникає при операція модифікації даних у сховище (insert, delete, update). Такі операції оптимально групувати (batch mode). Для цього можна реалізувати кеш угруповання дані з різних потоків:

// повідомлення кеша:
// Save-зберегти сутність в кеші
// SaveToDB: збереження даних у джерелі
// UpdateSaveState: операція в потоці кеша над повним його станом
type commandsSave<'itemId, 'item when 'itemId: comparison> = 
| Save of 'itemId * 'item
| SaveToDB
| UpdateSaveState of (Map<'itemId, 'item> -> Map<'itemId, 'item>)

// saveToDB - безпосереднє звернення до джерела з передачею Map<'itemId, 'item>
// savingInterval - інтервал звернення до джерела
type SavingCache<'itemId, 'item when 'itemId : comparison>(saveToDB, savingInterval) = 
let inbox = 
MailboxProcessor.Start(
fun inbox ->
// цикл обробки повідомлень
let rec loop cache = 
async{
let! msg = inbox.Receive() 
let cache' =
match msg with
| Save (key: 'itemId, value: 'item) ->
Map.add key value cache //додавання даних у кеш
| SaveToDB ->
saveToDB cache
Map.empty
| UpdateSaveState updater ->
updater cache //зміни стану кеша через функцію updater (повідомлення Save - приватний випадок) 
return! loop cache'
}
loop Map.empty
)
// цикл збереження даних
let rec runCleanSaveCicle () = 
async{
do! Async.Sleep savingInterval
inbox.Post(SaveToDB)
return! runCleanSaveCicle()
}
do if savingInterval > 0 then runCleanSaveCicle() |> Async.Start
with
member x.Save(itemId, item) = inbox.Post( Save (itemId, item) )
member x.UpdateBy f = inbox.Post <| UpdateSaveState f


• MailboxProcessor і є машиною станів, яка має хоча б один стан. Для набору станів, кількість яких більше одного, для кожного з них визначається набір повідомлень, які можуть бути оброблені. Під час обробки певного повідомлення можливий перехід в інший стан. Лямбда-функція, яка передається в конструктор MailboxProcessor визначає набір асинхронних рекурсивних обчислень, по одному на кожне стан машини. У кожному такому обчисленні відбувається очікування одного повідомлення з усіх можливих з черги (Receive), або певної підмножини повідомлень (Scan). Після очікування відбувається обробка, при якій можливий перехід до обчислення для іншого стану машини, продовження роботи з поточним станом або завершення роботи. Далі йде приклад — проксі для роботи з віддаленим агентом. Визначено стану working, stoped, recovery. Всі повідомлення представлені у вигляді типу RemoteAgentProxyCommand:

// ProcessData - обробити повідомлення
// Reconfigure - 
//

private type RemoteAgentProxyCommand<'data, 'result> =
| ProcessData of 'data * ('result option -> unit)
| Reconfigure of string
| Suspend | Resume
| CleanupExpiredData

type RemoteAgentProxy<'data, 'result>(transport, ?cleanupInterval) =
let cleanupInterval = defaultArg cleanupInterval 60000 
let processor = MailboxProcessor.Start(fun inbox -> 
let cleanup state now = ...//очищення стану
let send state = async { ... } //відсилання даних по мережі
// робочий стан 
let rec working (state: Map<_, _>) = async { 
let! msgOpt = inbox.TryReceive(1000) //спроба отримати повідомлення за 1 секунду
let now = DateTime.UtcNow
match with msgOpt
| None when state.Count = 0 -> //повідомлення не отримано але і стан порожнє
return! working state //нова ітерація в поточному стані
| None -> // повідомлення не отримано але стан не пусте 
let nextStateHandler = // наступне дію процесора
async { 
try
let! state' = send state //намагаємося відіслати стан мережі
return! working state' //стан відіслано без проблем - продовжуємо цикл обочего стану
with e -> 
return! recovery(state, 10000) //стався мережевий збій - переходимо в стан відновлення
}
return! nextStateHandler
| Some CleanupExpiredData -> return! working (cleanup state now) //очищення кешу
| Some (ProcessData (data, channelReply)) -> //додавання елемента даних на відсилання 
let expiration = DateTime.UtcNow.AddMilliseconds(float cleanupInterval)
return! working (Map.add (Guid.NewGuid()) 
(expiration, data, channelReply) state)
| Some Suspend -> //команда зупинити роботу
return! stoped (working state) //перехід в стан stoped 
| Some _ -> //ігноруємо інші повідомлення
return! working state
}
and stoped nextStateHandler = inbox.Scan(function | Resume -> Some(nextStateHandler) | _ -> None) //чекаємо повідомлення Resume 
and recovery(state, timeToRecieve) = async { //відновлення з'єднання
let! nextTimeToRecieve =
if timeToRecieve <= 100 then 
async {
try 
let! state' = send state
return Choice1Of2 state'
with e -> return Choice2Of2 10000
}
else async.Return <| Choice2Of2 timeToRecieve
match with nextTimeToRecieve
| Choice1Of2 state -> return! working state
| Choice2Of2 time ->
let nextDateTime = DateTime.UtcNow.AddMilliseconds(float time)
let! msg = inbox.TryReceive(timeToRecieve)
let now = DateTime.UtcNow
let nextTime = 
int (nextDateTime - now).TotalMilliseconds
match msg with
| Some (ProcessData (data, channelReply)) -> 
channelReply None
return! recovery(state, nextTime)
| Some CleanupExpiredData -> return! recovery (cleanup state now, nextTime)
| Some Suspend -> return! stoped (recovery(state, nextTime))
| None -> return! recovery(state, 0)
| _ -> return! recovery(state, nextTime)
}
working Map.empty)


• MailboxProcessor можна використовувати для організації асинхронного багаторазового каналу передачі даних. Його мета — передати дані з однієї частини програми в іншу (передача можлива між різними потоками), при цьому жорстко не пов'язуючи ці частини. Канал — кортеж з двох функцій: функції для відправки даних і функції для очікування їх отримання без блокування потоку:

let CreateAsyncChannel<'a> () = //'a - тип даних для передачі через канал
//MailboxProcessor синхронізує доступ до даних
let inbox = new MailboxProcessor<_>( fun inbox ->
// стан процесора при отриманні AsyncReplyChannel для відповіді
let rec waitResponse (repl:AsyncReplyChannel<'a*bool>) = 
inbox.Scan <| function
| GetResult repl -> //якщо знаходимо новий AsyncReplyChannel в черзі повідомлень - використовуємо його
Some <| waitResponse repl
| IntermediateResult res -> //проміжний результат в черзі
repl.Reply (res, false)
Some <| waitRepl ()
| Result res -> //фінальний результат в черзі
repl.Reply (res, true)
Some <| async.Return ()
and waitRepl () = 
//початковий стан процесора - очікуємо в черзі повідомлення GetResult 
//при цьому всі інші повідомлення залишаються в черзі процесора
//repl - канал для передачі відповіді
inbox.Scan <| function
| GetResult repl -> Some <| waitResponse repl
| _ -> None
waitRepl ()
)
inbox.Start()
// перша функція кортежу resultWaiter - очікування даних 
let resultWaiter timeout = 
inbox.PostAndTryAsyncReply ((fun replChannel -> GetResult replChannel), timeout)
// друга функція кортежу postResult - багаторазова відсилання даних
// дані збираються в черзі MailboxProcessor
let postResult closeChannel = 
if closeChannel then Result else IntermediateResult
>> inbox.Post
(resultWaiter, postResult)


Таким чином на основі MailboxProcessor

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.