Готуємо багатопоточність з core.async

image
Усе більше набирає популярність патерн використання каналів при створенні
багатопоточних додатків. Ідея не нова, її дизайн закладено ще в далекому 1978 році
у вигляді CSP.Найбільш відома реалізація зараз повсюдно використовується в Golang.
Ми ж у статті розглянемо реалізацію CSP в core.async для Clojure, якщо цікаво, ласкаво просимо під кат.

У статті будуть розглянуті прості і базові практики для роботи з core.async, описаного у статті буде достатньо для гарного старту у багатопоточне програмування.
На відміну від Golang де парадигма роботи з потоками через канали вбудована в сам мову, core.async є просто бібліотекою для Clojure, якщо вам імпонує інша парадигма, то вибір є: pulsar, promesa, manifold
При цьому core.async і promesa можна також використовувати на стороні браузера в ClojureScript, природно в цьому випадку ні про яку багатопоточності говорити не доводиться, так як все це добро компілюється в ES5 і виконуються у браузері, але знайомий інтерфейс і зручна робота з асинхронностью може добре послужити.
Так що ж нам дає core.async? Якщо пояснювати на пальцях то core.async нам надає диспетчеризацію через go-блоки в свій фіксований Thread Pool складається з 8 тредів (розмір Thread Pool можна змінювати через спеціальну опцію). При надходженні повідомлення в канал, core.async сам знайде вільний потік, і передасть йому завдання, які поставить повідомлення в чергу. Хто вперше чує про Thread Pool можна почитати хорошу замітку по паттерну Worker Thread


Приклад № 1
(defonce log-chan (chan))

(defn loop-worker [msg]
(println msg))

(go-loop []
(let [msg (<! log-chan]
(loop-worker msg)
(recur)))

У прикладі вище, ми створили канал
log-chan
, і визначили функцію loop-worker яка буде обробляти повідомлення з каналу.Потім створили go-block з нескінченним циклом, помістивши туди наш
loop-worker
.Тепер ми можемо відправити дані в канал:
(>!! log-chan "привіт")

Функція loop-worker була винесена окремо за go-block навмисне, задля зручною її налагодження через REPL.
Саме тіло go-loop так як це макрос запікається десь всередині core.async, і перекомпіляція його на льоту в REPL носить дивний характер, тому обробник простіше винести окремо і жити спокійно.
Тут варто зауважити що ніякого нескінченного циклу в звичному його розумінні go-loop не робить.
Після отримання повідомлення, відбувається разове виконання функції обробника, а потім go-block паркується функцією
<!
яка буде чекати нового повідомлення. Таким чином можна створювати скільки завгодно багато каналів і обробників до них.
В межах go-блоку функція читання з каналу
<!
здійснює паркування потоку.
За межами go блоку є можливість використовувати для читання з каналу функцію
<!!
яка блокує основний потік до отримання повідомлення. Поведінка
<!!
можна порівняти з функцією await в ES7.
Parking go блоку, цей термін core.async означає, що потік звільнений, і доступний для інших завдань. Також існує термін blocking, який означає, що потік буде безпосередньо заблокований і недоступний для нових завдань до його звільнення.
У прикладі №1 є вада, якщо у loop-worker буде викликаний
Exception
, то відбудеться переривання виконання форми та
(recur)
ніколи не буде викликаний, отже очікування даних з каналу
log-chan
припиниться, виправимо це в прикладі № 2.
Приклад № 2
(defonce log-chan (chan))

(defn loop-worker [msg]
(throw (Exception. "my exception message")))

(go-loop []
(let [msg (<! log-chan)
res (try
(loop-worker msg)
:ok
(catch Exception e
(println (.getMessage e))
:error))]
(recur)))

У цьому прикладі ми обернули весь виклик loop-worker у форму
try
, а змінна
res
, буде містити прапор, який повідомляє про успішне виконання форми або ж про помилку. Цей прапор може знадобитися, наприклад, якщо ми захочемо закрити канал в разі помилки. Робочий приклад цього підходу можна подивитися тут
Приклад № 3
(let [c1 (go (<! (timeout (rand-int 1000))) 5)
c2 (go (<! (timeout (rand-int 1000))) 7)]
(go (let [v1 (<! c1)
v2 (<! c2)]
(println {:v1 v1
:v2 v2
:summ (+ v1 v2)}))))

Цей приклад буде чекати результату від усіх асинхронних операцій перелічених в блоці
let
. Ця практика дуже зручна для вирішення проблеми callback hall в JavaScript, і черговий привід порадіти що це можна використовувати на стороні браузера в особі ClojureScript.
Приклад № 4
(defn upload 
"upload emulator"
[headshot c time]
(go (Thread/sleep time)
(>! c headshot)))

(let [c1 (chan) c2 (chan]
(upload "pic1.jpg" c1 30)
(upload "pic2.jpg" c2 40)
(let [[headshot channel] (alts!! [c1 c2 (timeout 20)])]
(if headshot
(println "Sending headshot notification for" headshot)
(println "Timed out!"))))

У цьому прикладі ми створили функцію upload эмулирующую асинхронну операцію, в даному випадку завантаження файлу. Останнім аргументом upload, приймає час затримки в мілісекундах. За допомогою функції alts!!! ми можемо отримати перший же результат, який нам поверне один з перерахованих у векторі каналів. У нашому векторі, останнім каналом йде
(timeout 20)
, цей канал нам поверне результат через 20 мілісекунд, і це буде першим значенням, яке буде записане в змінну
headshot
та буде продовжено виконання форми. Таким чином даний приклад емулює установку часу на timeout, протягом якого ми будемо чекати виконання набору асинхронних операцій.
Приклад № 5
(def ping (chan))
(def pong (chan))

(go-loop []
(let [msg (<! ping)]
(when (= msg :ping)
(println msg)
(>! pong :pong)
(Thread/sleep 1000))
(recur)))

(go-loop []
(let [msg (<! pong)]
(when (= msg :pong)
(println msg)
(>! ping :ping)
(Thread/sleep 1000))
(recur)))

(>!! ping :ping)

Приклад спілкування двох каналів, класичний Ping-Pong.
Це був останній приклад який я хотів показати. Також окремо варто виділити наявність у clojure типів даних, створених спеціально для запису туди інформації в декілька потоків, це atom і agent а також загальну иммутабельность інших типів, все це дуже полегшує життя розробника при розробці багатопотокового додатку.


Корисні посилання:
» clojure.com/blog/2013/06/28/clojure-core-async-channels.html
» github.com/clojure/core.async
» github.com/clojure/core.async/wiki/Getting-Started
» www.braveclojure.com/core-async/
» go.cognitect.com/core_async_webinar_recording
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.