man!( Go => D ).concurrency

Багатозадачність — це те, що реалізовано в Go по справжньому добре, хоч і не ідеально. Приємний синтаксис з терпким присмаком, прості і потужні абстракції, підкуповують своєю витонченістю у порівнянні з іншими імперативними мовами. А спробувавши краще, вже так не хочеться скочуватися до посередності. Тому, якщо і переходити на іншу мову, то він повинен бути ще більш виразний і з не менш розумної реалізацією багатозадачності.

Якщо ви вже награлися з Go, втомилися від копипасты, ручного жонглювання мьютексами і всерйоз подумуєте про придбання протезів для рук, то дозвольте запропонувати вашій увазі переклад Tour of the Go з еквівалентним кодом на D і короткими поясненнями.

Частина перша. Основи.

Частина п'ята. Співпрограми.
Coroutines
Go
package main

import (
"fmt"
"time"
)

func say(string s) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}

func main() {
go say("world")
say("привіт")
}

Go дозволяє легко і просто запустити будь-яку функцію паралельним потоком і продовжити роботу, не чекаючи її завершення. Всі гопотоки (волокна, fibers, співпрограми, coroutines, greenlets) выполяются кооперативно на обмеженому числі нативних потоків (нитках, threads), тим самим максимально утилізуючи ядра процесора (cores). Стандартна бібліотека D підтримує волокна, але лише в рамках однієї нитки і не вміє балансувати волокна на кілька ниток. Але такий планувальник реалізований у проекті vibe.d, правда синтаксис запуску паралельного потоку все ще не настільки лаконічний як в Go. Тому ми скористаємося бібліотекою go.d предоставляющией шаблон "go!" для паралельного запуску функцій. Крім того, слідуючи кращим практикам, код прикладів ми будемо оформляти у вигляді тестів.

D
unittest
{
import core.time;
import std.range;
import jin.go;

__gshared static string[] log;

static void saying( string message )
{
foreach( _ ; 3.iota ) {
sleep( 100.msecs );
log ~= message;
}
}

go!saying( "привіт" );
sleep( 50.msecs );
saying( "world" );

log.assertEq([ "привіт" , "world" , "привіт" , "world" , "привіт" , "world" ]);
}

В D не прийнято велосипедить без потреби, тому цикл ми реалізували через итерирование по послідовності натуральних чисел. Функцію "saying" нам довелося оголосити статичної, щоб вона не мала доступу до локальних змінних, що небезпечно при паралельному виконанні її в різних нитках. Якщо зробити цю функцію замиканням, прибравши "static", то даний код не відбудеться створення — завдяки шаблонної магії компілятор не дозволить нам направляти пістолет у власні кінцівки. В Go ж питання конкурентного доступу залишається на совісті програміста, у якого, в більшості випадків, її немає.

Buffered Channels
Go
package main

import "fmt"

func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
fmt.Println(<-ch)
fmt.Println(<-ch)
}

Запускати паралельні потоки було б не так корисно, якщо б не було можливості їх синхронізації. Go використовує досить елегантну абстракцію для цього — канали. Канали являють з себе типізовані черги повідомлень. Якщо потік намагається прочитати щось з пустого каналу, то він блокується в очікуванні іншого потоку, які ці дані туди запише. І навпаки, якщо спробує записати в переповнений канал, то заблокується, поки інший потік не віднімає з каналу хоча б одне повідомлення. Канали легко і просто замінюють такі абстракції як ледачі генератори, події і обіцянки, несучи з собою набагато більше сценаріїв використання.
У стандартній бібліотеці D для спілкування між потоками використовується прийом/передача абстрактних повідомлень. Тобто, знаючи id потоку ви можете послати йому довільне повідомлення, а він повинен його розпакувати і якось обробити. Досить не зручний механізм. Vibe.d вводить абстракцію потоку байт з поведінкою, аналогічним гоканалам. Але часто потрібно не просто байти передавати, а деякі структури. Крім того, що в Go, що в D, межпотоковая комунікація реалізована через захоплення м'ютексу, що має досить відомі проблеми. Тому ми знову скористаємося бібліотекою go.d, надає нам типізовані wait-free канали.

D
unittest
{
import jin.go;

auto numbers = new Channel!int(2);
numbers.next = 1;
numbers.next = 2;
numbers.next.assertEq( 1 );
numbers.next.assertEq( 2 );
}

Віртуальне властивість "next", звичайно, не так наочно, як стрілочка в Go, зате компілятор пильно стежить за положенням нашого пістолета, і не дозволяє передати через канал типи, не безпечні для паралельного використання різних ниток. Однак є один момент — ці канали вимагають, щоб у них було не більше одного читача і не більше одного письменника. На жаль, поки за цим доводиться стежити вручну, але в майбутньому напевно і тут компілятор перейде в наші союзники.
Також варто відзначити, що розмір каналу в Go за замовчуванням дорівнює одному елементу, а в go.d близько 512 байт.

Channels
Go
package main

import "fmt"

func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // send to sum c
}

func main() {
s := []int{7, 2, 8, -9, 4, 0}

c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c, <-c // receive from c

fmt.Println(x, y, x+y) // -5 17 12
}

В Go робота з каналом захищена мьютексом, так що ви можете використовувати його для комунікації відразу з декількома потоками, коли вам не важливо, в якому порядку вони нададуть дані. Канали з бібліотеки go.d, навпаки, неблокирующие, тому в такому сценарії їх використовувати не можна — для кожного потоку необхідно створити свій комунікаційний канал. Для спрощення роботи зі списками каналів бібліотека надає структури-балансировщики Inputs і Outputs. В даному випадку нам буде потрібно Inputs, яка по черзі читає з кожного непустого каналу, зареєстрованого в ній.

D
unittest
{
import std.algorithm;
import std.range;
import jin.go;

static auto summing( Channel!int sums , const int[] numbers ) {
sums.next = numbers.sum;
}

immutable int[] numbers = [ 7 , 2 , 8 , -9 , 4 , 0 ];

Inputs!int sums;
go!summing( sums.make(1) , numbers[ 0 .. $/2 ] );
go!summing( sums.make(1) , numbers[ $/2 .. $ ] );
auto res = sums.take(2).array;

( res ~ res.sum ).assertEq([ 17 , -5 , 12 ]);
}

Як звичайно, ми не пишемо руками підсумовування діапазону, а використовуємо для цього стандартний узагальнений алгоритм "sum". Щоб подібні алгоритми працювали з вашим типом даних достатньо реалізувати один з інтерфейсів діапазонів, які, зрозуміло, реалізовані як в Channel, так і в Inputs, і в Outputs. Алгоритм "take" видає ледачий діапазон, повертає вказану кількість перших елементів вихідного діапазону. А алгоритм "array" вигрібає з діапазону всі елементи і повертає нативний масив з ними. Зверніть увагу, що кожному потоку ми передаємо окремий канал одиничної довжини і зріз незмінного масиву (привіт, паралелізм!).

Range and Close
Go
package main

import (
"fmt"
)

func fibonacci(int n, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close©
}

func main() {
c := make(chan int, 10)
go fibonacci(cap©, c)
for i := range c {
fmt.Println(i)
}
}

Як видно, в Go ми теж можемо итерироваться по каналу, послідовно отримуючи з нього чергові елементи. Щоб не зависнути в нескінченному циклі, такі канали повинні закриватися передавальною стороною, щоб приймаюча могла зрозуміти, що більше даних не буде і цикл пора закінчувати. В D ми б написали практично те ж саме, хіба що оголосили б ряд Фібоначчі у вигляді математичної рекурентной формули.

D
unittest
{
import std.range;
import jin.go;

static auto fibonacci( Channel!int numbers , int count )
{
auto range = recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 ).take( count );
foreach( x ; range ) numbers.next = x;
numbers.close();
}

auto numbers = new Channel!int(10);
go!fibonacci( numbers , numbers.size );

numbers.array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]);
}

Але можна ще більше спростити код, знаючи, що шаблон "go!" сам вміє перекладати значення з діапазону канал.

D
unittest
{
import std.range;
import jin.go;

static auto fibonacci( int limit )
{
return recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 ).take( limit );
}

fibonacci( 10 ).array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]);
go!fibonacci( 10 ).array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]);
}

Таким чином, функції зовсім не обов'язково знати нічого про канали, щоб мати можливість запускати її паралельним потоком, а потім чекати від неї результату.

Select
Go
package main

import "fmt"

func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}

func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}

Go має спеціальний лаконічний синтаксис для одночасної роботи з декількома каналами. D нічого такого, звісно, не має. Однак, еквівалентний функціонал реалізується не особливо складніше ручної реалізацією циклу спостереження.

D
unittest
{
import std.range;
import jin.go;

__gshared int[] log;

static auto fibonacci( Channel!int numbers , Channel!bool control )
{
auto range = recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 );

while( !control.closed )
{
if( numbers.needed ) numbers.next = range.next;
yield;
}

log ~= -1;
numbers.close();
}

static void print( Channel!bool control Channel!int numbers )
{
foreach( i ; 10.iota ) log ~= numbers.next;
control.close();
}

auto numbers = new Channel!int(1);
auto control = new Channel!bool(1);

go!print( control , numbers );
go!fibonacci( numbers , control );

while( !control.empty || !numbers.empty ) yield;

log.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 , -1 ]);
}

Як видно, нам довелося позбутися замикання, а в циклах додати "yield", щоб конкуруючі волокна теж могли щось зробити, поки поточне висить в очікуванні.

Default Selection
Go
package main

import (
"fmt"
"time"
)

func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
}

Спеціальний синтаксис в Go дозволяє зробити що-то, якщо ні в одному з каналів не було активності. В D у вас, тим не менш, більше контролю над потоком виконання.

D
unittest
{
import core.time;
import jin.go;

static auto after( Channel!bool channel , Duration dur )
{
sleep( dur );
if( !channel.closed ) channel.next = true;
}

static auto tick( Channel!bool channel , Duration dur )
{
while( !channel.closed ) after( channel , dur );
}

auto ticks = go!tick( 101.msecs );
auto booms = go!after( 501.msecs );

string log;

while( booms.clear )
{
while( !ticks.clear ) {
log ~= "tick";
ticks.popFront;
}
log ~= ".";
sleep( 51.msecs );
}
log ~= "BOOM!";

log.assertEq( "..tick..tick..tick..tick..BOOM!" );
}

Примітною особливістю є те, що нам не потрібно вручну створювати канал. Якщо функція першим аргументом приймає канал і ми його не передали, то він буде створений автоматично і повернений як результат роботи шаблону "go!", що дуже зручно. Функції "after" і "tick" дуже специфічні, щоб вносити їх у загальну бібліотеку, але реалізації у них досить прості.

Mutex
В деяких випадках без розділяється змінюваного стану все ж не обійтися і тут нам на допомогу приходять блокування.

Go
package main

import (
"fmt"
"sync"
"time"
)

// SafeCounter is safe to use concurrently.
type SafeCounter struct {
v map[string]int
mux sync.Mutex
}

// Inc increments the counter for the given key.
func (c *SafeCounter) Inc(string key) {
c.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
c.v[key]++
c.mux.Unlock()
}

// Value returns the current value of the counter for the given key.
func (c *SafeCounter) Value(string key) int {
c.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
defer c.mux.Unlock()
return c.v[key]
}

func main() {
c := SafeCounter{v: make(map[string]int)}
for i := 0; i < 1000; i++ {
go c.Inc("somekey")
}

time.Sleep(time.Second)
fmt.Println(c.Value("somekey"))
}

Так, саме так, реалізація розділяється змінюваного стану в Go — це біль і страждання. Один невірний рух при роботі з мьютексами і ви раптом виявите у себе фантомні кінцівки. Не кажучи вже про те, що компілятор навіть не натякне вам про те, де м'ютекси необхідні. А ось компілятор D вас сильно наругает за спробу роботи з незахищеним змінним станом з різних потоків. А найпростіший спосіб захистити стан при багатопотоковому виконання — реалізувати синхронізований клас.

D
unittest
{
import core.atomic;
import core.time;
import std.range;
import std.typecons;
import jin.go;

synchronized class SafeCounter
{
private int[string] store;

void inc( string key )
{
++ store[key];
}

auto opIndex( string key )
{
return store[ key ];
}
void opIndexUnary( string op = "++" )( string key )
{
this.inc( key );
}
}

static counter = new shared SafeCounter;

static void working( int i )
{
counter ++ ["somekey"];
}

foreach( i ; 1000.iota ) {
go!working( i );
}

sleep( 1.seconds );

counter["somekey"].assertEq( 1000 );
}

Особливість синхронізованого класу в тому, що для нього автоматично створюється м'ютекс і при виклику будь-якого публічного методу цей м'ютекс захоплюється, звільняючись лише при виході з методу. При цьому всі внутрішній стан зобов'язана бути приватним. Але тут є одна неприємна особливість (а насправді дуже небезпечний і прикрий баг компілятора): шаблонні методи, такі як, наприклад, "opIndexUnary!", не загортаються в захоплення м'ютексу. Тому ми створили окремий публічний метод "inc", який і викликаємо з шаблонного методу. Внутрішня реалізація вийшла вже не настільки красивою, зате зовнішній інтерфейс вийшов як рідний. Отриманий "shared SafeCounter" ми вже можемо спокійно передавати через канал і використовувати безпосередньо з різних потоків.

Яку частину переводити наступною?

/>
/>


<input type=«checkbox» id=«vv72308»
class=«checkbox js-field-data»
name=«variant[]»
value=«72308» />
Керування потоком виконання
<input type=«checkbox» id=«vv72310»
class=«checkbox js-field-data»
name=«variant[]»
value=«72310» />
Складові типи
<input type=«checkbox» id=«vv72312»
class=«checkbox js-field-data»
name=«variant[]»
value=«72312» />
Методи

Проголосувало 30 осіб. Утрималося 8 чоловік.


Ну що, на який прикладної мова переходимо?

/>
/>

<input type=«radio» id=«vv72314»
class=«radio js-field-data»
name=«variant[]»
value=«72314» />
C
<input type=«radio» id=«vv72316»
class=«radio js-field-data»
name=«variant[]»
value=«72316» />
C++
<input type=«radio» id=«vv72318»
class=«radio js-field-data»
name=«variant[]»
value=«72318» />
D
<input type=«radio» id=«vv72320»
class=«radio js-field-data»
name=«variant[]»
value=«72320» />
Go
<input type=«radio» id=«vv72322»
class=«radio js-field-data»
name=«variant[]»
value=«72322» />
Rust
<input type=«radio» id=«vv72324»
class=«radio js-field-data»
name=«variant[]»
value=«72324» />
Java
<input type=«radio» id=«vv72326»
class=«radio js-field-data»
name=«variant[]»
value=«72326» />
C#
<input type=«radio» id=«vv72328»
class=«radio js-field-data»
name=«variant[]»
value=«72328» />
Інший

Проголосувало 88 осіб. Утрималося 13 осіб.


Тільки зареєстровані користувачі можуть брати участь в опитуванні. Увійдіть, будь ласка.


Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.