Concurrency структури .net. ConcurrentQueue зсередини

ConcurrentQueue можна віднести до lock-free конкурентних структур даних. В її реалізації немає блокувань (lock, Mutex...) і реалізована вона з використанням:
— класичної функції CompareExchange;
— SpinWait
— volatile (використовується як memory-barrier)
В основу ConcurrentQueue закладена структура ring-buffer (кільцевої буфер).


Ring-buffer (кільцевий буфер)

Кільцевий буфер ідеально підходить для реалізації структури даних «черга» (FIFO).

В його основі лежить масив даних і 2 покажчика — початок (start) і кінець (end).

Передбачено дві основні операції:
  1. Push — додавання в кінець. При додаванні нового елементів в буфер, лічильник end збільшується на 1 і на його місце записується новий елемент. Якщо ми вперлися в верхню межу масиву, то значення end обнуляється («переходить» на початок масиву) і елементи починають записуватися в початок масиву. Запис можлива поки індекс end не досяг індексу start.
  2. Pop — вибірка елементів спочатку. Вибірка елементів відбувається з елемента start, послідовно збільшуючи його значення, до тих поки не досягне end. Вибірка можлива, поки індекс start не досяг індексу end.


Блоковий кільцевої буфер

Пристрій ConcurrentQueue трохи складніше, ніж класичний кільцевої буфер. В його реалізації використовується поняття сегмента (Segment). ConcurrentQueue складається із зв'язаного списку (однонаправленої) сегментів. Розмір сегмента дорівнює 32.
private class Segment {
volatile VolatileBool[] m_state;
volatile T[] m_array;
volatile int m_low;
volatile int m_high;
volatile Segment m_next;
}

Спочатку в ConcurrentQueue створюється 1 сегмент

По мірі необхідності до нього праворуч додається нові сегменти


У результаті виходить односпрямований зв'язаний список. Початок пов'язаного списку задає m_head, кінець — m_tail. Обмеження:
  • m_head сегмент може мати порожні клітинки тільки зліва
  • m_tail сегмент може мати порожні клітинки тільки праворуч
  • якщо m_head = m_tail то пусті клітинки можуть бути як зліва, так і справа.
  • В сегментах, між m_head і m_tail порожніх клітинок бути не може.

Додавання елемента (Enqueue)

Нижче представлений приблизний алгоритм додавання елементів в сегмент.
  • Збільшується m_high на 1
  • масив m_array з індексом m_high записується нове значення.
index = Interlocked.Increment(ref this.m_high);
if (index <= 31)
{
m_array[index] = value;
m_state[index].m_value = true;
}

m_state — масив стану осередків, якщо значення true — елемент записаний в клітинку, якщо false — ще ні. По суті, це якийсь «Commit» запису. Потрібен він для того, щоб між операціями збільшення індексу Interlocked.Increment і записом значення m_array[index] = value не відбулося читання елемента іншим потоком. Тоді читання даних буде здійснюватися після виконання:
while (!this.m_state[index].m_value)
spinWait2.SpinOnce();


Додавання нового сегмента (Segment.Grow)

Як тільки m_high поточного сегмента стає рівним 31, запис у поточний сегмент припиняється і створюється новий сегмент (поточні сегменти продовжують жити своїм життям).
m_next = new ConcurrentQueue<T>.Segment(this.m_index + 1L, this.m_source);
m_source.m_tail = this.m_next;

m_next — посилання на наступний сегмент
m_source.m_tail — посилання останній сегмент списку сегментів.

Вибірка елемента (TryDequeue)

В основі вибірки елементів з черги лежать дві базові функціональності:
  • Interlocked.CompareExchange — атомарна операція, яка записує значення змінної, в разі якщо її значення дорівнює сравниваемому значенням.
    public static extern int CompareExchange(ref int location1, int value, int comparand);
    

  • SpinWait, з MSDN
    System.Threading.SpinWait is a lightweight synchronization type that you can use in low-level scenarios to avoid the expensive context switches and kernel transitions that are required for kernel events. On multicore computers, when a resource is not expected to be held for long periods of time, it can be more efficient waiting for a thread to spin in user mode for a few dozen or a few hundred cycles, and then retry to acquire the resource. If the resource is available after spinning, then you have saved several thousand cycles. If the resource is still not available, then you have spent only a few cycles and can still enter a kernel-based wait. This spinning-then-waiting combination is sometimes referred to as a two-phase wait operation.
Приблизний алгоритм роботи вибірки:
  1. Отримати m_low
  2. Збільшити m_low на 1, з використанням CompareExchange
  3. Якщо m_low більше 31 — перейти на наступний сегмент
  4. Дочекатися коміта (m_state[low].m_value) елемента з індексом m_low.
SpinWait spinWait1 = new SpinWait();
int low = this.Low;
if (Interlocked.CompareExchange(ref this.m_low, low + 1, low) == low)
{
SpinWait spinWait2 = new SpinWait();
while (!this.m_state[low].m_value)
spinWait2.SpinOnce();
result = this.m_array[low];


Count vs IsEmpty

Код IsEmpty:
ConcurrentQueue<T>.Segment segment = this.m_head;
if (!segment.IsEmpty)
return false;
if (segment.Next == null)
return true;
SpinWait spinWait = new SpinWait();
for (; segment.IsEmpty; segment = this.m_head)
{
if (segment.Next == null)
return true;
spinWait.SpinOnce();
}
return false;

Тобто, по суті, це знайти перший непорожній сегмент. Якщо він знайдений — черга не порожня.

Код Count:
ConcurrentQueue<T>.Segment head;
ConcurrentQueue<T>.Segment tail;
int headLow;
int tailHigh;
this.GetHeadTailPositions(head-out, out tail, out headLow, out tailHigh);
if (head == tail)
return tailHigh - headLow + 1;
return 32 - headLow + 32 * (int) (tail.m_index - head.m_index - 1L) + (tailHigh + 1);

По суті, він шукає перший і останній сегмент і на основі цих двох сегментів обчислює кількість елементів.
Висновок — операція Count буде займати більше процесорного часу, ніж IsEmpty.

Снепшот & GetEnumerator

Структура ConcurrentQueue підтримує технологію снепшотов для отримання цілісного набору елементів.
Цілісні повертають дані:
  • ToArray
  • ICollection.CopyTo
  • GetEnumerator
Оператори вище так само працюю без блокувань, а цілісність досягається за рахунок введення лічильника
volatile int m_numSnapshotTakers
в рамках всієї черги — число операцій, які працюють з снепшотами в поточний момент часу. Тобто кожна операція, яка хоче отримати цілісну картину, повинна реалізувати наступний код:
Interlocked.Increment(ref this.m_numSnapshotTakers);
try
{
...//Ітератор по всім сегментам
}
finally
{
Interlocked.Decrement(ref this.m_numSnapshotTakers);
}

У доповненні до цього, зміни у нас «пише» тільки операція Dequeue, тому тільки в ній перевіряється необхідність видаляти посилання на елемент черги:
if (this.m_source.m_numSnapshotTakers <= 0)
this.m_array[low] = default (T);


Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.