AsyncCollections: історія одного велосипеда

З давніх часів я був великим шанувальником System.Collections.Concurrent і BlockingCollection в особливості. Скільки раз це диво інженерної думки виручало в самих різноманітних ситуаціях — не злічити.

З трохи менш давніх часів в побут міцно увійшли async/await. Здавалося б, життя прекрасне, але є одне «але»: асинхронний код міксувати з блокуючим кодом як-то не дуже-то хочеться. А BlockingCollection, як нескладно здогадатися (хоча б з назви), у ряді випадків блокує потік.

Помилковий слід: Nito.AsyncEx
Одного разу я натрапив згадка бібліотеки Nito.AsyncEx за авторством Stephen Cleary, в якій знайшовся клас з інтригуючою назвою AsyncCollection. Однак, поглянувши що у нього знаходиться під капотом, я залишився в деякому подиві: там виявився AsyncLock з цієї бібліотеки, навішений на всі дії над оберненої IProducerConsumerCollection. AsyncLock, в свою чергу, активно юзає звичайні lock-та й тонкий шар магії, розплутувати який мені раптом перехотілося. Навіть якщо ця реалізація робить те що заявлено, вона виглядає дещо навороченно, монстроподібно і, можливо, не дуже оптимально. Невже не можна вирішити цю задачу більш акуратно?

Всі ми знаємо чим загрожують подібні думки. Visual Studio, New project…

AsyncQueue
Для початку визначимося, що ми взагалі хочемо від нашої асинхронної колекції. У якості відправної точки можна взяти наступний інтерфейс:

public interface IAsyncCollection<T>: IEnumerable < T>
{
int Count { get; }
void Add( T item );
Task<T> TakeAsync();
}

Крім того, для простоти зупинимося на тому, що наша колекція — це черга. Чому саме чергу? Так приблизно з тієї ж причини, по якій чергу по дефолту використовується в BlockingCollection.

Далі слід напружена робота думки, пов'язана зі спробою визначити можливі стани нашої колекції. На перший погляд їх може бути 3 штуки:

1. Елементів в колекції немає, але були виклики TakeAsync(), Task-і яких треба завершити, коли елементи з'являться (для простоти і стислості, далі я буду називати їх awaiter-ами). У цьому випадку:
  • Awaiter-и явно потрібно десь зберігати. Напрошується чергу, конкретніше — ConcurrentQueue.
  • Якщо відбувається виклик TakeAsync(), у нас з'являється новий awaiter, закидаємо його в чергу awaiter-ів.
  • Якщо відбувається виклик Add(), у нас з'являється новий елемент, за допомогою якого можна миттєво взяти один з awaiter-ів і завершити його.
2. Awaiter-ів немає, але були виклики Add(). Ситуація повністю симетрична попередньої:
  • Елементи потрібно десь зберігати. Де? У ConcurrentQueue, де ж іще.
  • Якщо відбувається виклик Add(), з'являється новий елемент, закидаємо його в чергу елементів.
  • Якщо відбувається виклик TakeAsync(), з'являється новий awaiter, який можна миттєво завершити, забравши верхній елемент з черги.
3. Обидві черги — і черга awaiter-ів, і черга елементів — порожні. В залежності від наступного дії переходимо або в стан 1, або в стан 2:
  • Якщо відбувається виклик Add(), з'являється новий елемент, намагаємося взяти для нього awaiter з черги, там пусто, намагаємося додати його в чергу елементів...
  • В цей самий момент відбувається виклик TakeAsync(), з'являється новий awaiter, намагаємося взяти для нього елемент з черги, там поки що порожньо, намагаємося додати його в чергу awaiter-ів...
  • Упс. Ми все зламали: awaiter і елемент сидять в різних чергах і чекають один одного. Що робити?
Розвішувати локі не хочеться, ми не для того пішли від напханої ними реалізації з Nito.AsyncEx. Що в таких випадках роблять всякі ConcurrentQueue? Розуміють, що прямо зараз в сусідньому потоці відбувається операція, яка ось-ось завершиться і після якої ми зможемо зробити що-небудь корисне, створюють SpinWait і крутяться в очікуванні. Спробуємо відтворити цю ідею у нас. Потрібно:

  • зрозуміти в якому стані ми знаходимося (1 або 2)
  • одночасно відрапортувати, що ми почали свою операцію, будь то додавання awaiter-а чи додавання елемента
  • в залежності від стану додати awaiter/елемент у чергу, або покрутитися, поки в протилежну чергу не додадуть елемент/awaiter, який ми відразу ж заберемо
Перші дві вимоги дуже вже сильно нагадують роботу класу Interlocked; для зберігання стану можна використовувати що-то типу балансу черг: TakeAsync() атомарно зменшує баланс на одиницю, Add() атомарно ж його збільшує. І за значенням балансу, яке поверне Interlocked.Increment/Interlocked.Decrement, можна дізнатися про те, що гряде новий елемент/awaiter, ще до того, як він з'явиться у відповідній черзі. Досить балачок, спробуємо закодить все вищеперелічене:

public class AsyncQueue<T>: IAsyncCollection<T>
{
private ConcurrentQueue<T> _itemQueue = new ConcurrentQueue<T>();
private ConcurrentQueue<TaskCompletionSource<T>> _awaiterQueue = new ConcurrentQueue<TaskCompletionSource<T>>();

// _queueBalance < 0 means there are free awaiters and not enough items.
// _queueBalance > 0 means the opposite is true.
private long _queueBalance = 0;

public void Add( T item )
{
long balanceAfterCurrentItem = Interlocked.Increment( ref _queueBalance );

if ( balanceAfterCurrentItem > 0 )
{
// Items are dominating, so we can safely add a new item to the queue.
_itemQueue.Enqueue( item );
}
else
{
// There's at least one awaiter available or being added as we're speaking, so we're giving the item to it.

TaskCompletionSource<TItem> awaiter;
SpinWait spin = new SpinWait();

while ( !_awaiterQueue.TryDequeue( out awaiter ) )
spin.SpinOnce();

awaiter.SetResult( item );
}
}

public Task<T> TakeAsync()
{
long balanceAfterCurrentAwaiter = Interlocked.Decrement( ref _queueBalance );

if ( balanceAfterCurrentAwaiter < 0 )
{
// Awaiters are dominating, so we can safely add a new awaiter to the queue.

var taskSource = new TaskCompletionSource<TItem>();
_awaiterQueue.Enqueue( taskSource );
return taskSource.Task;
}
else
{
// There's at least one item available or being added, so we're returning it directly.

T item;
SpinWait spin = new SpinWait();

while ( !_itemQueue.TryTake( out item ) )
spin.SpinOnce();

return Task.FromResult( item );
}
}
}

Тестуємо, з подивом виявляємо, що воно начебто навіть працює. Перемога? З одного боку, та, з іншого, разогнавшийся творчий порив так просто не зупинити…

Корисні (і не дуже) плюшки

Подивимось уважно на те, що у нас вийшло. Синхронний Add(), асинхронний TakeAsync()… Стоп, асинхронний метод без можливості його скасування? Непорядок. Виправляємо.

По-перше, при скасуванні CancellationToken-а потрібно негайно скасувати відповідний таск:

public Task<TItem> TakeAsync( CancellationToken cancellationToken )
{
// ...

if ( balanceAfterCurrentAwaiter < 0 )
{
var taskSource = new TaskCompletionSource<TItem>();
_awaiterQueue.Enqueue( taskSource );

cancellationToken.Register(
state =>
{
TaskCompletionSource<T> awaiter = state as TaskCompletionSource<T>;
awaiter.TrySetCanceled();
},
taskSource,
useSynchronizationContext : false );

return taskSource.Task;
}
else
{
// ...
}
}

По-друге, виколупати скасований awaiter звідкись з середини черги ми явно не зможемо, тому потрібно навчити Add() пропускати цей скасований awaiter. Баланс при цьому чарівним чином підтримується автоматично:

private bool TryAdd( TItem item )
{
long balanceAfterCurrentItem = Interlocked.Increment( ref _queueBalance );

if ( balanceAfterCurrentItem > 0 )
{
_itemQueue.Enqueue( item );
return true;
}
else
{
TaskCompletionSource<T> awaiter;
SpinWait spin = new SpinWait();

while ( !_awaiterQueue.TryDequeue( out awaiter ) )
spin.SpinOnce();

// Returns false if the cancellation occurred earlier.
return awaiter.TrySetResult( item );
}
}

public void Add( TItem item )
{
while ( !TryAdd( item ) ) ;
}

По-третє, старий метод TakeAsync() (який без CancellationToken) взагалі можна винести в extension до інтерфейсу IAsyncCollection:

public interface IAsyncCollection<T>: IEnumerable < T>
{
int Count { get; }
void Add( T item );
Task<T> TakeAsync( CancellationToken cancellationToken );
}

public static class AsyncCollectionExtensions
{
public static Task<T> TakeAsync<T>( this IAsyncCollection<T> collection )
{
return collection.TakeAsync( CancellationToken.None );
}
}

До речі, про IAsyncCollection. Якщо придивитися, то наша реалізація AsyncQueue не зобов'язана бути прибита цвяхами до ConcurrentQueue, для зберігання елементів підійде будь-яка потокобезопасная IProducerConsumerCollection. Наприклад, ConcurrentStack. Тому можна зробити ось так:

public class AsyncCollection<TItem, TItemQueue>: IAsyncCollection<TItem>
where TItemQueue: IProducerConsumerCollection<TItem>new()
{
private TItemQueue _itemQueue = new TItemQueue();
private ConcurrentQueue<TaskCompletionSource<TItem>> _awaiterQueue = new ConcurrentQueue<TaskCompletionSource<TItem>>();

// ...
}

public class AsyncQueue<T>: AsyncCollection<T, ConcurrentQueue<T>>
{
}

public class AsyncStack<T>: AsyncCollection<T, ConcurrentStack<T>>
{
}

З одного боку, хотілося б не плодити type parameter-и, а просто приймати IProducerConsumerCollection в конструкторі, але ось біда: нам можуть підсунути колекцію, на яку вже посилаються зовні і в яку можуть зовні ж понапихать елементів (або, що ще гірше, забрати частину наших елементів), там самим зруйнувавши синхронізацію між реальним станом колекції і запам'ятовані балансом. З factory method та ж проблема, що колекцію доведеться створювати самим.

Бенчмарки!

Настав час поміряти швидкість роботи нашого велосипеда. Для прогону бенчмарків є пакет BenchmarkDotNet, реалізує купу дрібних деталей, які бажано враховувати при прогоні бенчмарків, так що його і заюзаем. Загальна ідея бенчмарку полягає в наступному:

class AsyncQueueBenchmark
{
private const int _consumerThreadCount = 3;
private const int _producerThreadCount = 3;
private const int _itemsAddedPerThread = 10000;
private const int _itemsAddedTotal = _producerThreadCount * _itemsAddedPerThread;

private IAsyncCollection<int> _currentQueue;
private CancellationTokenSource _cancelSource;
private int _itemsTaken;

// Виконання цього методу буде вимірюватися
private void DdosCurrentQueue()
{
_consumerTasks = Enumerable.Range( 0, _consumerThreadCount )
.Select( _ => Task.Run( () => RunConsumerAsync() ) )
.ToArray();

_producerTasks = Enumerable.Range( 0, _producerThreadCount )
.Select( _ => Task.Run( () => RunProducer() ) )
.ToArray();

Task.WaitAll( _producerTasks );
Task.WaitAll( _consumerTasks );
}

private async Task RunConsumerAsync()
{
try
{
CancellationToken cancelToken = _cancelSource.Token;

while ( _itemsTaken < _itemsAddedTotal && !cancelToken.IsCancellationRequested )
{
int item = await _currentQueue.TakeAsync( cancelToken );
int itemsTakenLocal = Interlocked.Increment( ref _itemsTaken );

if ( itemsTakenLocal >= _itemsAddedTotal )
{
_cancelSource.Cancel();
break;
}
}
}
catch ( OperationCanceledException )
{
}
}

private void RunProducer()
{
for ( int i = 0; i < _itemsAddedPerThread; i++ )
{
int item = 42;
_currentQueue.Add( item );
}
}

Тобто просто беремо фіксовану пачку елементів, фигачим їх в чергу в декілька потоків, паралельно в кілька ж потоків цю чергу розгрібаємо, засікаємо скільки часу на це піде. Підсовуємо різні реалізації IAsyncCollection, порівнюємо. У забігу беруть участь:

1. Свеженавелосипеденный AsyncQueue
2. Nito.AsyncEx.AsyncCollection в наступному вигляді:

class NitoAsyncCollectionAdapter<T>: IAsyncCollection<T>
{
private Nito.AsyncEx.AsyncCollection<T> _collection;

public NitoAsyncCollectionAdapter()
{
_collection = new Nito.AsyncEx.AsyncCollection<T>();
}

#region IAsyncCollection<T> Members

public void Add( T item )
{
_collection.Add( item );
}

public Task<T> TakeAsync( System.Threading.CancellationToken cancellationToken )
{
return _collection.TakeAsync( cancellationToken );
}

#endregion
}

3. BlockingCollection (ну як же не порівняти з нею) у вигляді:

class BlockingCollectionAdapter<T>: IAsyncCollection<T>
{
private BlockingCollection<T> _collection;

public BlockingCollectionAdapter()
{
_collection = new BlockingCollection<T>();
}

#region IAsyncCollection<T> Members

public void Add( T item )
{
_collection.Add( item );
}

public Task<T> TakeAsync( System.Threading.CancellationToken cancellationToken )
{
T item = _collection.Take( cancellationToken );
return Task.FromResult( item );
}

#endregion
}

Результати:

HellBrick.AsyncCollections.Asyncqueue: 1ms | Stats: MedianTicks= 3368, MedianMs= 1, Error=06.34%
Nito.AsyncEx.AsyncCollection: 12ms | Stats: MedianTicks=40503, MedianMs=12, Error=31.36%
System.Concurrent.BlockingCollection: 2ms | Stats: MedianTicks= 7222, MedianMs= 2, Error=38.82%

Інтуїтивна оцінка Nito.AsyncEx.AsyncCollection не підвела: це дійсно монструозная тормознутая хрень. Але найцікавіше: нам вдалося обігнати BlockingCollection по продуктивності і при цьому обійтися без блокування потоків. Win! Відкриваємо тортик або будь-яку іншу бонусну смакоту і їдемо далі.

AsyncBatchQueue
Мені періодично доводилося використовувати невеликий врапперов над BlockingCollection, який брав на вхід поодинокі елементи і віддавав їх пачками певного розміру. При цьому, якщо за певний час потрібну кількість елементів так і не набралося, спрацьовував таймер і робив примусовий flush того що ми встигли набрати. Хто хоче асинхронну версію такої штуки? Я хочу.

Для початку обійдемося без таймера і ручного flush-а. Зібрані пачки елементів логічно зберігати і віддавати засобами нашої нової AsyncQueue:

public class AsyncBatchQueue<T>
{
private int _batchSize;
private Batch _currentBatch;
private AsyncQueue<IReadOnlyList<T>> _batchQueue = new AsyncQueue<IReadOnlyList<T>>();

public AsyncBatchQueue( int batchSize )
{
_batchSize = batchSize;
_currentBatch = new Batch( this );
}

public void Add( T item )
{
SpinWait spin = new SpinWait();

while ( !_currentBatch.TryAdd( item ) )
spin.SpinOnce();
}

public Task<IReadOnlyList<T>> TakeAsync( CancellationToken cancellationToken )
{
return _batchQueue.TakeAsync( cancellationToken );
}

private class Batch: IReadOnlyList<T>
{
private AsyncBatchQueue<T> _queue;
// ?

public Batch( AsyncBatchQueue<T> queue )
{
_queue = queue;
}

public bool TryAdd( T item )
{
// ?
}
}
}

Що тут відбувається: в методі Add потрібно спробувати додбавить елемент в поточний batch і, якщо ми його заповнили, за-flush-ить його в _batchQueue. При цьому цілком можлива ситуація, коли інший потік нас випередив, в даний момент займається додаванням/flush-му, але при цьому ще не встиг записати в _currentBatch посилання на новий (порожній) batch. Звідси старий добрий SpinWait.

Основна магія буде в nested класі Batch, ідея якого самим нахабним чином запозичена з реалізації ConcurrentQueue (до речі, якщо хто не читав исходники, рекомендую ознайомитися: там є багато цікавого). Ця ідея полягає в наступному:

  • Елементи зберігаємо в звичайному масиві, благо розмір ми знаємо заздалегідь
  • Проблеми з concurrency вирішуються з допомогою Interlocked.Increment поля, де зберігається індекс последнеего вставлений елемент
  • Якщо потік захопив останній слот масиву, то на нього [потік, не слот] покладається відповідальність по виконанню flush-а поточного batch-а
  • Якщо потік захопив слот, що виходить за межі масиву, то нам не пощастило: цей batch вже заповнений і потоку потрібно крутитися в очікуванні нового
Виглядає це як-то так. (Обережно, код поки нежиттєздатний! Трохи пізніше розповім чому.)

private class Batch: IReadOnlyList<T>
{
private AsyncBatchQueue<T> _queue;
private T[] _items;
private int _lastReservationIndex = -1;
private int _count = -1;

public Batch( AsyncBatchQueue<T> queue )
{
_queue = queue;
_items = new T[ _queue._batchSize ];
}

public bool TryAdd( T item )
{
int index = Interlocked.Increment( ref _lastReservationIndex );

// The following is true if someone has beaten us to the last slot and we have to wait until the next batch along comes.
if ( index >= _queue._batchSize )
return false;

// The following is true if we've taken the last slot, which means we're obligated to flush the current batch and create a new one.
if ( index == _queue._batchSize - 1 )
FlushInternal( _queue._batchSize );

_items[ index ] = item;
return true;
}

private void FlushInternal( int count )
{
_count = count;
_queue._currentBatch = new Batch( _queue );
_queue._batchQueue.Add( this );
}
}

Далі було б непогано все ж реалізувати IReadOnlyList. Тут спливає один нюанс: ніхто не гарантує, що коли ми за-flush-або batch, всі елементи масиву заповнені реальними даними. Потік, схвативший останній елемент, міг просто виявитися швидше. Напрошується рішення: для кожного слота масиву зберігати прапор, який визначає, чи можна читати відповідне значення.

private class Batch: IReadOnlyList<T>
{
// ...
private bool[] _finalizationFlags;

public Batch( AsyncBatchQueue<T> queue )
{
// ...
_finalizationFlags = new bool[ _queue._batchSize ];
}

public bool TryAdd( T item )
{
// ...
_items[ index ] = item;
_finalizationFlags[ index ] = true;

return true;
}

public T this[ int index ]
{
get
{
if ( index >= _count )
throw new IndexOutOfRangeException();

return GetItemWithoutValidation( index );
}
}

private T GetItemWithoutValidation( int index )
{
SpinWait spin = new SpinWait();
while ( !_finalizationFlags[ index ] )
spin.SpinOnce();

return _items[ index ];
}

// ... інші методи реалізується через GetItemWithoutValidation
}


А тепер починається справжня магія. Проблема в тому, що в коді є купа місць, де компілятор з процесором можуть все зіпсувати, переставляючи інструкції місцями і кэшируя те, що кешувати категорично не можна.

1. В AsyncBatchCollection.Add() значення _currentBatch може бути прочитано одного разу і закэшировано, в результаті чого, якщо batch заповнився, потік буде крутитися вічно. volatile поспішає на допомогу:

public class AsyncBatchQueue<T>
{
// ...
private volatile Batch _currentBatch;
// ...
}

2. В методі FlushInternal() batch може додатися у вихідну чергу до того, як буде заповнено поле _count. Встромляємо full fence:

private void FlushInternal( int count )
{
_count = count;
_queue._currentBatch = new Batch( _queue );

// The full fence ensures that the current batch will never be added to the queue before _count is set.
Thread.MemoryBarrier();

_queue._batchQueue.Add( this );
}

3. В методі TryAdd інструкції записи в _items[ index ] і _finalizationFlags[ index ] можуть бути переставлені місцями. Знову встромляємо full fence:

public bool TryAdd( T item )
{
// ...

// The full fence prevents setting finalization flag before the actual item value is written.
_items[ index ] = item;
Thread.MemoryBarrier();
_finalizationFlags[ index ] = true;

return true;
}

4. Зворотна проблема (читання елемента перед прапором) може відбутися в GetItemWithoutValidation. Встромляємо самі-знаєте-що:

private T GetItemWithoutValidation( int index )
{
SpinWait spin = new SpinWait();
while ( !_finalizationFlags[ index ] )
spin.SpinOnce();

// The full fence prevents reading item value before finalization flag is set.
Thread.MemoryBarrier();
return _items[ index ];
}

5. Все в тому ж методі значення _finalizationFlags[ index ] може бути закэшировано, з-за чого потік буде крутитися вічно. Зазвичай подібне вирішується навішуванням на полі модифікатора volatile, але зробити це з елементом масиву не представляється можливим, тому ну ви зрозуміли:

private T GetItemWithoutValidation( int index )
{
SpinWait spin = new SpinWait();
while ( !_finalizationFlags[ index ] )
{
spin.SpinOnce();

// The full fence prevents caching any part of _finalizationFlags[ index ] expression.
Thread.MemoryBarrier();
}

// ...
}

Тут, до речі, варто зробити невеликий відступConcurrentQueue аналогічна проблема вирішується вельми незвичайним чином:
internal volatile VolatileBool[] m_state;

struct VolatileBool
{
public VolatileBool(bool value)
{
m_value = value;
}
public volatile bool m_value;
}

Якби VolatileBool був класом замість структури, все було б дуже просто: навіть якщо посилання на екземпляр VolatileBool де-небудь закэшируют, читання volatile m_value гарантовано буде повертати реальне значення поля. Чому цей фінт працює зі структурою, якій належить бути скопійованої в момент виклику m_state[ index ], я так і не зрозумів.

Начебто небезпечні місця на цьому скінчилися і базовий функціонал повинен працювати (принаймні, мені щиро хотілося б у це вірити).

А тепер вкорячиваем таймер

Все начебто добре, але є один (вже не пов'язаний з багатопоточністю) нюанс: якщо в колекцію додадуть кількість елементів, не кратне batchSize, то залишок ми ніколи не побачимо. Потрібна можливість робити flush вручну, а краще — по таймеру. Найпростіший спосіб — зробити так, щоб виклик методу Flush() намагався схопити останній слот в масиві, таким чином, позначивши batch як заповнений. При цьому потрібно в обов'язковому порядку запам'ятати останнє реальне значення _lastReservationIndex, інакше ми не зможемо дізнатися, скільки реально слотів зайнято (спойлер: тут на допомогу приходить Interlocked.CompareExchange()). Всього можливі 5 варіантів розвитку подій:

  1. _lastReservationIndex < 0. flush-ить нічого.
  2. _lastReservationIndex >= _queue._batchSize. FlushInternal() виконає потік, схвативший останній слот, робити нічого не треба.
  3. _lastReservationIndex придатний і у нас вийшло атомарно встановити його в _queue._batchSize. Ми знаємо реальний кількість елементів в масиві, можна робити FlushInternal().
  4. Між читанням минулого значення _lastReservationIndex і записом туди ж нового значення проліз інший потік і схопив останній елемент. По суті, ситуація повторює варіант №2: нічого не робимо.
  5. То ж що у №4, але batch не заповнений. Крутимося, пробуємо ще раз.
public class AsyncBatchQueue<T>: IEnumerable<IReadOnlyList<T>>
{
// ...

public void Flush()
{
SpinWait spin = new SpinWait();
while ( !_currentBatch.TryFlush() )
spin.SpinOnce();
}

// ...

private class Batch: IReadOnlyList<T>
{
// ... 

public bool TryFlush()
{
int expectedPreviousReservation = Volatile.Read( ref _lastReservationIndex );

// We don't flush if the batch doesn't have any items or if another thread is about to flush it
// However, we report success to avoid unnecessary spinning.
if ( expectedPreviousReservation < 0 || expectedPreviousReservation >= _queue._batchSize )
return true;

int previousReservation = Interlocked.CompareExchange( ref _lastReservationIndex, _queue._batchSize, expectedPreviousReservation );

// Flush reservation has succeeded.
if ( expectedPreviousReservation == previousReservation )
{
FlushInternal( previousReservation + 1 );
return true;
}

// The following is true if someone has completed the batch by the time we tried to flush it.
// Therefore the batch will be flushed anyway even if we don't do anything.
// The opposite means someone has slipped in an update and we have to spin.
return previousReservation >= _queue._batchSize;
}

// ...
}
}

Готово! Залишилося навісити зверху таймер — це настільки позбавлена магії процес, що я спробую обійтися без копипейста пов'язаного з ним коду. Бенчмарків теж не буде, тому що я не знаю з ким можна було б порівняти продуктивність.

Що далі?
По-перше, обидві розглянуті колекції страждають від одного тонкого недоліку. Якщо хто-небудь зробить Thread.Abort(), то в самий несподіваний момент може вилетіти ThreadAbortException і зруйнувати настільки ретельно підтримувану консистентним стану колекцій. У вищезгаданій ConcurrentQueue (та й у безлічі інших місць) ця проблема вирішується досить екстравагантним чином:

try
{
}
finally
{
// Insert Thread.Abort()-safe code here
}

Кейс досить рідкісний, але на всяк випадок було б непогано від нього захиститися. Можливо, коли-небудь я це все зроблю.

По-друге, для щастя не вистачає ще як мінімум однієї асинхронної колекції: приоритезированной черги. І, на відміну від BlockingCollection, тривіальної реалізації з використанням TakeFromAny() на горизонті не видно. Продовження буде?..

P.S.
Для тих, хто героїчно дочитав до кінця:

Nuget package: www.nuget.org/packages/AsyncCollections/
Source code: github.com/HellBrick/AsyncCollections

Якщо є критика, баги, побажання або просто здорові думки — пишіть, буду радий обговорити.

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.