// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// BlockingCollection.cs
//
// Microsoft
//
// A class that implements the bounding and blocking functionality while abstracting away
// the underlying storage mechanism. This file also contains BlockingCollection's
// associated debugger view type.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
#pragma warning disable 0420
using System;
using System.Collections.Generic;
using System.Collections;
using System.Diagnostics;
using System.Globalization;
using System.Security.Permissions;
using System.Runtime.InteropServices;
using System.Threading;
namespace System.Collections.Concurrent
{
///
/// Provides blocking and bounding capabilities for thread-safe collections that
/// implement .
///
///
/// represents a collection
/// that allows for thread-safe adding and removing of data.
/// is used as a wrapper
/// for an instance, allowing
/// removal attempts from the collection to block until data is available to be removed. Similarly,
/// a can be created to enforce
/// an upper-bound on the number of data elements allowed in the
/// ; addition attempts to the
/// collection may then block until space is available to store the added items. In this manner,
/// is similar to a traditional
/// blocking queue data structure, except that the underlying data storage mechanism is abstracted
/// away as an .
///
/// Specifies the type of elements in the collection.
[ComVisible(false)]
#if !FEATURE_NETCORE
#pragma warning disable 0618
[HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
#pragma warning restore 0618
#endif
[DebuggerTypeProxy(typeof(SystemThreadingCollections_BlockingCollectionDebugView<>))]
[DebuggerDisplay("Count = {Count}, Type = {m_collection}")]
public class BlockingCollection : IEnumerable, ICollection, IDisposable, IReadOnlyCollection
{
private IProducerConsumerCollection m_collection;
private int m_boundedCapacity;
private const int NON_BOUNDED = -1;
private SemaphoreSlim m_freeNodes;
private SemaphoreSlim m_occupiedNodes;
private bool m_isDisposed;
private CancellationTokenSource m_ConsumersCancellationTokenSource;
private CancellationTokenSource m_ProducersCancellationTokenSource;
private volatile int m_currentAdders;
private const int COMPLETE_ADDING_ON_MASK = unchecked((int)0x80000000);
#region Properties
/// Gets the bounded capacity of this instance.
/// The bounded capacity of this collection, or int.MaxValue if no bound was supplied.
/// The has been disposed.
public int BoundedCapacity
{
get
{
CheckDisposed();
return m_boundedCapacity;
}
}
/// Gets whether this has been marked as complete for adding.
/// Whether this collection has been marked as complete for adding.
/// The has been disposed.
public bool IsAddingCompleted
{
get
{
CheckDisposed();
return (m_currentAdders == COMPLETE_ADDING_ON_MASK);
}
}
/// Gets whether this has been marked as complete for adding and is empty.
/// Whether this collection has been marked as complete for adding and is empty.
/// The has been disposed.
public bool IsCompleted
{
get
{
CheckDisposed();
return (IsAddingCompleted && (m_occupiedNodes.CurrentCount == 0));
}
}
/// Gets the number of items contained in the .
/// The number of items contained in the .
/// The has been disposed.
public int Count
{
get
{
CheckDisposed();
return m_occupiedNodes.CurrentCount;
}
}
/// Gets a value indicating whether access to the is synchronized.
/// The has been disposed.
bool ICollection.IsSynchronized
{
get
{
CheckDisposed();
return false;
}
}
///
/// Gets an object that can be used to synchronize access to the . This property is not supported.
///
/// The SyncRoot property is not supported.
object ICollection.SyncRoot
{
get
{
throw new NotSupportedException(SR.GetString(SR.ConcurrentCollection_SyncRoot_NotSupported));
}
}
#endregion
/// Initializes a new instance of the
///
/// class without an upper-bound.
///
///
/// The default underlying collection is a ConcurrentQueue<T>.
///
public BlockingCollection()
: this(new ConcurrentQueue())
{
}
/// Initializes a new instance of the
/// class with the specified upper-bound.
///
/// The bounded size of the collection.
/// The is
/// not a positive value.
///
/// The default underlying collection is a ConcurrentQueue<T>.
///
public BlockingCollection(int boundedCapacity)
: this(new ConcurrentQueue(), boundedCapacity)
{
}
/// Initializes a new instance of the
/// class with the specified upper-bound and using the provided
/// as its underlying data store.
/// The collection to use as the underlying data store.
/// The bounded size of the collection.
/// The argument is
/// null.
/// The is not a positive value.
/// The supplied contains more values
/// than is permitted by .
public BlockingCollection(IProducerConsumerCollection collection, int boundedCapacity)
{
if (boundedCapacity < 1)
{
throw new ArgumentOutOfRangeException(
"boundedCapacity", boundedCapacity,
SR.GetString(SR.BlockingCollection_ctor_BoundedCapacityRange));
}
if (collection == null)
{
throw new ArgumentNullException("collection");
}
int count = collection.Count;
if (count > boundedCapacity)
{
throw new ArgumentException(SR.GetString(SR.BlockingCollection_ctor_CountMoreThanCapacity));
}
Initialize(collection, boundedCapacity, count);
}
/// Initializes a new instance of the
/// class without an upper-bound and using the provided
/// as its underlying data store.
/// The collection to use as the underlying data store.
/// The argument is
/// null.
public BlockingCollection(IProducerConsumerCollection collection)
{
if (collection == null)
{
throw new ArgumentNullException("collection");
}
Initialize(collection, NON_BOUNDED, collection.Count);
}
/// Initializes the BlockingCollection instance.
/// The collection to use as the underlying data store.
/// The bounded size of the collection.
/// The number of items currently in the underlying collection.
private void Initialize(IProducerConsumerCollection collection, int boundedCapacity, int collectionCount)
{
Debug.Assert(boundedCapacity > 0 || boundedCapacity == NON_BOUNDED);
m_collection = collection;
m_boundedCapacity = boundedCapacity; ;
m_isDisposed = false;
m_ConsumersCancellationTokenSource = new CancellationTokenSource();
m_ProducersCancellationTokenSource = new CancellationTokenSource();
if (boundedCapacity == NON_BOUNDED)
{
m_freeNodes = null;
}
else
{
Debug.Assert(boundedCapacity > 0);
m_freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount);
}
m_occupiedNodes = new SemaphoreSlim(collectionCount);
}
///
/// Adds the item to the .
///
/// The item to be added to the collection. The value can be a null reference.
/// The has been marked
/// as complete with regards to additions.
/// The has been disposed.
/// The underlying collection didn't accept the item.
///
/// If a bounded capacity was specified when this instance of
/// was initialized,
/// a call to Add may block until space is available to store the provided item.
///
public void Add(T item)
{
#if DEBUG
bool tryAddReturnValue =
#endif
TryAddWithNoTimeValidation(item, Timeout.Infinite, new CancellationToken());
#if DEBUG
Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
#endif
}
///
/// Adds the item to the .
/// A is thrown if the is
/// canceled.
///
/// The item to be added to the collection. The value can be a null reference.
/// A cancellation token to observe.
/// If the is canceled.
/// The has been marked
/// as complete with regards to additions.
/// The has been disposed.
/// The underlying collection didn't accept the item.
///
/// If a bounded capacity was specified when this instance of
/// was initialized,
/// a call to may block until space is available to store the provided item.
///
public void Add(T item, CancellationToken cancellationToken)
{
#if DEBUG
bool tryAddReturnValue =
#endif
TryAddWithNoTimeValidation(item, Timeout.Infinite, cancellationToken);
#if DEBUG
Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
#endif
}
///
/// Attempts to add the specified item to the .
///
/// The item to be added to the collection.
/// true if the could be added; otherwise, false.
/// The has been marked
/// as complete with regards to additions.
/// The has been disposed.
/// The underlying collection didn't accept the item.
public bool TryAdd(T item)
{
return TryAddWithNoTimeValidation(item, 0, new CancellationToken());
}
///
/// Attempts to add the specified item to the .
///
/// The item to be added to the collection.
/// A that represents the number of milliseconds
/// to wait, or a that represents -1 milliseconds to wait indefinitely.
///
/// true if the could be added to the collection within
/// the alloted time; otherwise, false.
/// The has been marked
/// as complete with regards to additions.
/// The has been disposed.
/// is a negative number
/// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
/// .
/// The underlying collection didn't accept the item.
public bool TryAdd(T item, TimeSpan timeout)
{
ValidateTimeout(timeout);
return TryAddWithNoTimeValidation(item, (int)timeout.TotalMilliseconds, new CancellationToken());
}
///
/// Attempts to add the specified item to the .
///
/// The item to be added to the collection.
/// The number of milliseconds to wait, or (-1) to wait indefinitely.
/// true if the could be added to the collection within
/// the alloted time; otherwise, false.
/// The has been marked
/// as complete with regards to additions.
/// The has been disposed.
/// is a
/// negative number other than -1, which represents an infinite time-out.
/// The underlying collection didn't accept the item.
public bool TryAdd(T item, int millisecondsTimeout)
{
ValidateMillisecondsTimeout(millisecondsTimeout);
return TryAddWithNoTimeValidation(item, millisecondsTimeout, new CancellationToken());
}
///
/// Attempts to add the specified item to the .
/// A is thrown if the is
/// canceled.
///
/// The item to be added to the collection.
/// The number of milliseconds to wait, or (-1) to wait indefinitely.
/// A cancellation token to observe.
/// true if the could be added to the collection within
/// the alloted time; otherwise, false.
/// If the is canceled.
/// The has been marked
/// as complete with regards to additions.
/// The has been disposed.
/// is a
/// negative number other than -1, which represents an infinite time-out.
/// The underlying collection didn't accept the item.
public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
ValidateMillisecondsTimeout(millisecondsTimeout);
return TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken);
}
/// Adds an item into the underlying data store using its IProducerConsumerCollection<T>.Add
/// method. If a bounded capacity was specified and the collection was full,
/// this method will wait for, at most, the timeout period trying to add the item.
/// If the timeout period was exhaused before successfully adding the item this method will
/// return false.
/// The item to be added to the collection.
/// The number of milliseconds to wait for the collection to accept the item,
/// or Timeout.Infinite to wait indefinitely.
/// A cancellation token to observe.
/// False if the collection remained full till the timeout period was exhausted.True otherwise.
/// If the is canceled.
/// the collection has already been marked
/// as complete with regards to additions.
/// If the collection has been disposed.
/// The underlying collection didn't accept the item.
private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
CheckDisposed();
if (cancellationToken.IsCancellationRequested)
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
if (IsAddingCompleted)
{
throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
}
bool waitForSemaphoreWasSuccessful = true;
if (m_freeNodes != null)
{
//If the m_freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding()
//was called concurrently with Adding which is not supported by BlockingCollection.
CancellationTokenSource linkedTokenSource = null;
try
{
waitForSemaphoreWasSuccessful = m_freeNodes.Wait(0);
if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
{
linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken, m_ProducersCancellationTokenSource.Token);
waitForSemaphoreWasSuccessful = m_freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
}
}
catch (OperationCanceledException)
{
//if cancellation was via external token, throw an OCE
if (cancellationToken.IsCancellationRequested)
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
//if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx.
//Contract.Assert(m_ProducersCancellationTokenSource.Token.IsCancellationRequested);
throw new InvalidOperationException
(SR.GetString(SR.BlockingCollection_Add_ConcurrentCompleteAdd));
}
finally
{
if (linkedTokenSource != null)
{
linkedTokenSource.Dispose();
}
}
}
if (waitForSemaphoreWasSuccessful)
{
// Update the adders count if the complete adding was not requested, otherwise
// spins until all adders finish then throw IOE
// The idea behind to spin untill all adders finish, is to avoid to return to the caller with IOE while there are still some adders have
// not been finished yet
SpinWait spinner = new SpinWait();
while (true)
{
int observedAdders = m_currentAdders;
if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
{
spinner.Reset();
// CompleteAdding is requested, spin then throw
while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
}
if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders + 1, observedAdders) == observedAdders)
{
Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread excceeded the maximum limit.");
break;
}
spinner.SpinOnce();
}
// This outer try/finally to workaround of repeating the decrement adders code 3 times, because we should decrement the adders if:
// 1- m_collection.TryAdd threw an exception
// 2- m_collection.TryAdd succeeded
// 3- m_collection.TryAdd returned false
// so we put the decrement code in the finally block
try
{
//TryAdd is guaranteed to find a place to add the element. Its return value depends
//on the semantics of the underlying store. Some underlying stores will not add an already
//existing item and thus TryAdd returns false indicating that the size of the underlying
//store did not increase.
bool addingSucceeded = false;
try
{
//The token may have been canceled before the collection had space available, so we need a check after the wait has completed.
//This fixes bug #702328, case 2 of 2.
cancellationToken.ThrowIfCancellationRequested();
addingSucceeded = m_collection.TryAdd(item);
}
catch
{
//TryAdd did not result in increasing the size of the underlying store and hence we need
//to increment back the count of the m_freeNodes semaphore.
if (m_freeNodes != null)
{
m_freeNodes.Release();
}
throw;
}
if (addingSucceeded)
{
//After adding an element to the underlying storage, signal to the consumers
//waiting on m_occupiedNodes that there is a new item added ready to be consumed.
m_occupiedNodes.Release();
}
else
{
throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Add_Failed));
}
}
finally
{
// decrement the adders count
Debug.Assert((m_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0);
Interlocked.Decrement(ref m_currentAdders);
}
}
return waitForSemaphoreWasSuccessful;
}
/// Takes an item from the .
/// The item removed from the collection.
/// The is empty and has been marked
/// as complete with regards to additions.
/// The has been disposed.
/// The underlying collection was modified
/// outside of this instance.
/// A call to may block until an item is available to be removed.
public T Take()
{
T item;
if (!TryTake(out item, Timeout.Infinite, CancellationToken.None))
{
throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone));
}
return item;
}
/// Takes an item from the .
/// The item removed from the collection.
/// If the is
/// canceled or the is empty and has been marked
/// as complete with regards to additions.
/// The has been disposed.
/// The underlying collection was modified
/// outside of this instance.
/// A call to may block until an item is available to be removed.
public T Take(CancellationToken cancellationToken)
{
T item;
if (!TryTake(out item, Timeout.Infinite, cancellationToken))
{
throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone));
}
return item;
}
///
/// Attempts to remove an item from the .
///
/// The item removed from the collection.
/// true if an item could be removed; otherwise, false.
/// The has been disposed.
/// The underlying collection was modified
/// outside of this instance.
public bool TryTake(out T item)
{
return TryTake(out item, 0, CancellationToken.None);
}
///
/// Attempts to remove an item from the .
///
/// The item removed from the collection.
/// A that represents the number of milliseconds
/// to wait, or a that represents -1 milliseconds to wait indefinitely.
///
/// true if an item could be removed from the collection within
/// the alloted time; otherwise, false.
/// The has been disposed.
/// is a negative number
/// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
/// .
/// The underlying collection was modified
/// outside of this instance.
public bool TryTake(out T item, TimeSpan timeout)
{
ValidateTimeout(timeout);
return TryTakeWithNoTimeValidation(out item, (int)timeout.TotalMilliseconds, CancellationToken.None, null);
}
///
/// Attempts to remove an item from the .
///
/// The item removed from the collection.
/// The number of milliseconds to wait, or (-1) to wait indefinitely.
/// true if an item could be removed from the collection within
/// the alloted time; otherwise, false.
/// The has been disposed.
/// is a
/// negative number other than -1, which represents an infinite time-out.
/// The underlying collection was modified
/// outside of this instance.
public bool TryTake(out T item, int millisecondsTimeout)
{
ValidateMillisecondsTimeout(millisecondsTimeout);
return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, CancellationToken.None, null);
}
///
/// Attempts to remove an item from the .
/// A is thrown if the is
/// canceled.
///
/// The item removed from the collection.
/// The number of milliseconds to wait, or (-1) to wait indefinitely.
/// A cancellation token to observe.
/// true if an item could be removed from the collection within
/// the alloted time; otherwise, false.
/// If the is canceled.
/// The has been disposed.
/// is a
/// negative number other than -1, which represents an infinite time-out.
/// The underlying collection was modified
/// outside of this instance.
public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
ValidateMillisecondsTimeout(millisecondsTimeout);
return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, cancellationToken, null);
}
/// Takes an item from the underlying data store using its IProducerConsumerCollection<T>.Take
/// method. If the collection was empty, this method will wait for, at most, the timeout period (if AddingIsCompleted is false)
/// trying to remove an item. If the timeout period was exhaused before successfully removing an item
/// this method will return false.
/// A is thrown if the is
/// canceled.
///
/// The item removed from the collection.
/// The number of milliseconds to wait for the collection to have an item available
/// for removal, or Timeout.Infinite to wait indefinitely.
/// A cancellation token to observe.
/// A combined cancellation token if created, it is only created by GetConsumingEnumerable to avoid creating the linked token
/// multiple times.
/// False if the collection remained empty till the timeout period was exhausted. True otherwise.
/// If the is canceled.
/// If the collection has been disposed.
private bool TryTakeWithNoTimeValidation(out T item, int millisecondsTimeout, CancellationToken cancellationToken, CancellationTokenSource combinedTokenSource)
{
CheckDisposed();
item = default(T);
if (cancellationToken.IsCancellationRequested)
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
//If the collection is completed then there is no need to wait.
if (IsCompleted)
{
return false;
}
bool waitForSemaphoreWasSuccessful = false;
// set the combined token source to the combinedToken paramater if it is not null (came from GetConsumingEnumerable)
CancellationTokenSource linkedTokenSource = combinedTokenSource;
try
{
waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(0);
if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
{
// create the linked token if it is not created yet
if (combinedTokenSource == null)
linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken,
m_ConsumersCancellationTokenSource.Token);
waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
}
}
//The collection became completed while waiting on the semaphore.
catch (OperationCanceledException)
{
if (cancellationToken.IsCancellationRequested)
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
return false;
}
finally
{
// only dispose the combined token source if we created it here, otherwise the caller (GetConsumingEnumerable) is responsible for disposing it
if (linkedTokenSource != null && combinedTokenSource == null)
{
linkedTokenSource.Dispose();
}
}
if (waitForSemaphoreWasSuccessful)
{
bool removeSucceeded = false;
bool removeFaulted = true;
try
{
//The token may have been canceled before an item arrived, so we need a check after the wait has completed.
//This fixes bug #702328, case 1 of 2.
cancellationToken.ThrowIfCancellationRequested();
//If an item was successfully removed from the underlying collection.
removeSucceeded = m_collection.TryTake(out item);
removeFaulted = false;
if (!removeSucceeded)
{
// Check if the collection is empty which means that the collection was modified outside BlockingCollection
throw new InvalidOperationException
(SR.GetString(SR.BlockingCollection_Take_CollectionModified));
}
}
finally
{
// removeFaulted implies !removeSucceeded, but the reverse is not true.
if (removeSucceeded)
{
if (m_freeNodes != null)
{
Debug.Assert(m_boundedCapacity != NON_BOUNDED);
m_freeNodes.Release();
}
}
else if (removeFaulted)
{
m_occupiedNodes.Release();
}
//Last remover will detect that it has actually removed the last item from the
//collection and that CompleteAdding() was called previously. Thus, it will cancel the semaphores
//so that any thread waiting on them wakes up. Note several threads may call CancelWaitingConsumers
//but this is not a problem.
if (IsCompleted)
{
CancelWaitingConsumers();
}
}
}
return waitForSemaphoreWasSuccessful;
}
///
/// Adds the specified item to any one of the specified
/// instances.
///
/// The array of collections.
/// The item to be added to one of the collections.
/// The index of the collection in the array to which the item was added.
/// The argument is
/// null.
/// The argument is
/// a 0-length array or contains a null element, or at least one of collections has been
/// marked as complete for adding.
/// At least one of the instances has been disposed.
/// At least one underlying collection didn't accept the item.
/// The count of is greater than the maximum size of
/// 62 for STA and 63 for MTA.
///
/// If a bounded capacity was specified when all of the
/// instances were initialized,
/// a call to AddToAny may block until space is available in one of the collections
/// to store the provided item.
///
public static int AddToAny(BlockingCollection[] collections, T item)
{
#if DEBUG
int tryAddAnyReturnValue =
#else
return
#endif
TryAddToAny(collections, item, Timeout.Infinite, CancellationToken.None);
#if DEBUG
Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length)
, "TryAddToAny() was expected to return an index within the bounds of the collections array.");
return tryAddAnyReturnValue;
#endif
}
///
/// Adds the specified item to any one of the specified
/// instances.
/// A is thrown if the is
/// canceled.
///
/// The array of collections.
/// The item to be added to one of the collections.
/// A cancellation token to observe.
/// The index of the collection in the array to which the item was added.
/// If the is canceled.
/// The argument is
/// null.
/// The argument is
/// a 0-length array or contains a null element, or at least one of collections has been
/// marked as complete for adding.
/// At least one of the instances has been disposed.
/// At least one underlying collection didn't accept the item.
/// The count of is greater than the maximum size of
/// 62 for STA and 63 for MTA.
///
/// If a bounded capacity was specified when all of the
/// instances were initialized,
/// a call to AddToAny may block until space is available in one of the collections
/// to store the provided item.
///
public static int AddToAny(BlockingCollection[] collections, T item, CancellationToken cancellationToken)
{
#if DEBUG
int tryAddAnyReturnValue =
#else
return
#endif
TryAddToAny(collections, item, Timeout.Infinite, cancellationToken);
#if DEBUG
Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length)
, "TryAddToAny() was expected to return an index within the bounds of the collections array.");
return tryAddAnyReturnValue;
#endif
}
///
/// Attempts to add the specified item to any one of the specified
/// instances.
///
/// The array of collections.
/// The item to be added to one of the collections.
/// The index of the collection in the
/// array to which the item was added, or -1 if the item could not be added.
/// The argument is
/// null.
/// The argument is
/// a 0-length array or contains a null element, or at least one of collections has been
/// marked as complete for adding.
/// At least one of the instances has been disposed.
/// At least one underlying collection didn't accept the item.
/// The count of is greater than the maximum size of
/// 62 for STA and 63 for MTA.
public static int TryAddToAny(BlockingCollection[] collections, T item)
{
return TryAddToAny(collections, item, 0, CancellationToken.None);
}
///
/// Attempts to add the specified item to any one of the specified
/// instances.
///
/// The array of collections.
/// The item to be added to one of the collections.
/// A that represents the number of milliseconds
/// to wait, or a that represents -1 milliseconds to wait indefinitely.
///
/// The index of the collection in the
/// array to which the item was added, or -1 if the item could not be added.
/// The argument is
/// null.
/// The argument is
/// a 0-length array or contains a null element, or at least one of collections has been
/// marked as complete for adding.
/// At least one of the instances has been disposed.
/// is a negative number
/// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
/// .
/// At least one underlying collection didn't accept the item.
/// The count of is greater than the maximum size of
/// 62 for STA and 63 for MTA.
public static int TryAddToAny(BlockingCollection[] collections, T item, TimeSpan timeout)
{
ValidateTimeout(timeout);
return TryAddToAnyCore(collections, item, (int)timeout.TotalMilliseconds, CancellationToken.None);
}
///
/// Attempts to add the specified item to any one of the specified
/// instances.
///
/// The array of collections.
/// The item to be added to one of the collections.
/// The number of milliseconds to wait, or (-1) to wait indefinitely. /// The index of the collection in the
/// array to which the item was added, or -1 if the item could not be added.
/// The argument is
/// null.
/// The argument is
/// a 0-length array or contains a null element, or at least one of collections has been
/// marked as complete for adding.
/// At least one of the instances has been disposed.
/// is a
/// negative number other than -1, which represents an infinite time-out.
/// At least one underlying collection didn't accept the item.
/// The count of is greater than the maximum size of
/// 62 for STA and 63 for MTA.
public static int TryAddToAny(BlockingCollection[] collections, T item, int millisecondsTimeout)
{
ValidateMillisecondsTimeout(millisecondsTimeout);
return TryAddToAnyCore(collections, item, millisecondsTimeout, CancellationToken.None);
}
///
/// Attempts to add the specified item to any one of the specified
/// instances.
/// A is thrown if the is
/// canceled.
///
/// The array of collections.
/// The item to be added to one of the collections.
/// The number of milliseconds to wait, or (-1) to wait indefinitely.
/// The index of the collection in the
/// array to which the item was added, or -1 if the item could not be added.
/// A cancellation token to observe.
/// If the is canceled.
/// The argument is
/// null.
/// The argument is
/// a 0-length array or contains a null element, or at least one of collections has been
/// marked as complete for adding.
/// At least one of the instances has been disposed.
/// is a
/// negative number other than -1, which represents an infinite time-out.
/// At least one underlying collection didn't accept the item.
/// The count of is greater than the maximum size of
/// 62 for STA and 63 for MTA.
public static int TryAddToAny(BlockingCollection[] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
ValidateMillisecondsTimeout(millisecondsTimeout);
return TryAddToAnyCore(collections, item, millisecondsTimeout, cancellationToken);
}
/// Adds an item to anyone of the specified collections.
/// A is thrown if the is
/// canceled.
///
/// The collections into which the item can be added.
/// The item to be added .
/// The number of milliseconds to wait for a collection to accept the
/// operation, or -1 to wait indefinitely.
/// A cancellation token to observe.
/// The index into collections for the collection which accepted the
/// adding of the item; -1 if the item could not be added.
/// If the is canceled.
/// If the collections argument is null.
/// If the collections argument is a 0-length array or contains a
/// null element. Also, if atleast one of the collections has been marked complete for adds.
/// If atleast one of the collections has been disposed.
private static int TryAddToAnyCore(BlockingCollection[] collections, T item, int millisecondsTimeout, CancellationToken externalCancellationToken)
{
ValidateCollectionsArray(collections, true);
const int OPERATION_FAILED = -1;
// Copy the wait time to another local variable to update it
int timeout = millisecondsTimeout;
uint startTime = 0;
if (millisecondsTimeout != Timeout.Infinite)
{
startTime = (uint)Environment.TickCount;
}
// Fast path for adding if there is at least one unbounded collection
int index = TryAddToAnyFast(collections, item);
if (index > -1)
return index;
// Get wait handles and the tokens for all collections,
// and construct a single combined token from all the tokens,
// add the combined token handle to the handles list
// call WaitAny for all handles
// After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not
// If the combined token is cancelled, this mean either the external token is cancelled then throw OCE
// or one if the collection is AddingCompleted then throw AE
CancellationToken[] collatedCancellationTokens;
List handles = GetHandles(collections, externalCancellationToken, true, out collatedCancellationTokens);
//Loop until one of these conditions is met:
// 1- The operation is succeeded
// 2- The timeout expired for try* versions
// 3- The external token is cancelled, throw
// 4- There is at least one collection marked as adding completed then throw
while (millisecondsTimeout == Timeout.Infinite || timeout >= 0)
{
index = -1;
using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens))
{
handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list
//Wait for any collection to become available.
index = WaitHandle.WaitAny(handles.ToArray(), timeout, false);
handles.RemoveAt(handles.Count - 1); //remove the linked token
if (linkedTokenSource.IsCancellationRequested)
{
if (externalCancellationToken.IsCancellationRequested) //case#3
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalCancellationToken);
else //case#4
throw new ArgumentException(SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
}
}
Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count));
if (index == WaitHandle.WaitTimeout) //case#2
return OPERATION_FAILED;
//If the timeout period was not exhausted and the appropriate operation succeeded.
if (collections[index].TryAdd(item)) //case#1
return index;
// Update the timeout
if (millisecondsTimeout != Timeout.Infinite)
timeout = UpdateTimeOut(startTime, millisecondsTimeout);
}
// case #2
return OPERATION_FAILED;
}
///
/// Fast path for TryAddToAny to find a non bounded collection and add the items in it
///
/// The collections list
/// The item to be added
/// The index which the item has been added, -1 if failed
private static int TryAddToAnyFast(BlockingCollection[] collections, T item)
{
for (int i = 0; i < collections.Length; i++)
{
if (collections[i].m_freeNodes == null)
{
#if DEBUG
bool result =
#endif
collections[i].TryAdd(item);
#if DEBUG
Debug.Assert(result);
#endif
return i;
}
}
return -1;
}
///
/// Local static method, used by TryAddTakeAny to get the wait handles for the collection, with exclude option to exclude the Compeleted collections
///
/// The blocking collections
/// The original CancellationToken
/// True if Add or TryAdd, false if Take or TryTake
/// Complete list of cancellationTokens to observe
/// The collections wait handles
private static List GetHandles(BlockingCollection[] collections, CancellationToken externalCancellationToken, bool isAddOperation, out CancellationToken[] cancellationTokens)
{
Debug.Assert(collections != null);
List handlesList = new List(collections.Length + 1); // + 1 for the external token handle to be added
List tokensList = new List(collections.Length + 1); // + 1 for the external token
tokensList.Add(externalCancellationToken);
//Read the appropriate WaitHandle based on the operation mode.
if (isAddOperation)
{
for (int i = 0; i < collections.Length; i++)
{
if (collections[i].m_freeNodes != null)
{
handlesList.Add(collections[i].m_freeNodes.AvailableWaitHandle);
tokensList.Add(collections[i].m_ProducersCancellationTokenSource.Token);
}
}
}
else
{
for (int i = 0; i < collections.Length; i++)
{
if (collections[i].IsCompleted) //exclude Completed collections if it is take operation
continue;
handlesList.Add(collections[i].m_occupiedNodes.AvailableWaitHandle);
tokensList.Add(collections[i].m_ConsumersCancellationTokenSource.Token);
}
}
cancellationTokens = tokensList.ToArray();
return handlesList;
}
///
/// Helper function to measure and update the wait time
///
/// The first time (in milliseconds) observed when the wait started
/// The orginal wait timeoutout in milliseconds
/// The new wait time in milliseconds, -1 if the time expired
private static int UpdateTimeOut(uint startTime, int originalWaitMillisecondsTimeout)
{
if (originalWaitMillisecondsTimeout == 0)
{
return 0;
}
// The function must be called in case the time out is not infinite
Debug.Assert(originalWaitMillisecondsTimeout != Timeout.Infinite);
uint elapsedMilliseconds = (uint)Environment.TickCount - startTime;
// Check the elapsed milliseconds is greater than max int because this property is uint
if (elapsedMilliseconds > int.MaxValue)
{
return 0;
}
// Subtract the elapsed time from the current wait time
int currentWaitTimeout = originalWaitMillisecondsTimeout - (int)elapsedMilliseconds; ;
if (currentWaitTimeout <= 0)
{
return 0;
}
return currentWaitTimeout;
}
///
/// Takes an item from any one of the specified
/// instances.
///
/// The array of collections.
/// The item removed from one of the collections.
/// The index of the collection in the array from which
/// the item was removed, or -1 if an item could not be removed.
/// The argument is
/// null.
/// The argument is
/// a 0-length array or contains a null element.
/// At least one of the instances has been disposed.
/// At least one of the underlying collections was modified
/// outside of its instance.
/// The count of is greater than the maximum size of
/// 62 for STA and 63 for MTA.
/// A call to TakeFromAny may block until an item is available to be removed.
public static int TakeFromAny(BlockingCollection[] collections, out T item)
{
return TakeFromAny(collections, out item, CancellationToken.None);
}
///
/// Takes an item from any one of the specified
/// instances.
/// A is thrown if the is
/// canceled.
///
/// The array of collections.
/// The item removed from one of the collections.
/// A cancellation token to observe.
/// The index of the collection in the array from which
/// the item was removed, or -1 if an item could not be removed.
/// The argument is
/// null.
/// If the is canceled.
/// The argument is
/// a 0-length array or contains a null element.
/// At least one of the instances has been disposed.
/// At least one of the underlying collections was modified
/// outside of its instance.
/// The count of is greater than the maximum size of
/// 62 for STA and 63 for MTA.
/// A call to TakeFromAny may block until an item is available to be removed.
public static int TakeFromAny(BlockingCollection[] collections, out T item, CancellationToken cancellationToken)
{
int returnValue = TryTakeFromAnyCore(collections, out item, Timeout.Infinite, true, cancellationToken);
Debug.Assert((returnValue >= 0 && returnValue < collections.Length)
, "TryTakeFromAny() was expected to return an index within the bounds of the collections array.");
return returnValue;
}
///
/// Attempts to remove an item from any one of the specified
/// instances.
///
/// The array of collections.
/// The item removed from one of the collections.
/// The index of the collection in the array from which
/// the item was removed, or -1 if an item could not be removed.
/// The argument is
/// null.
/// The argument is
/// a 0-length array or contains a null element.
/// At least one of the instances has been disposed.
/// At least one of the underlying collections was modified
/// outside of its instance.
/// The count of is greater than the maximum size of
/// 62 for STA and 63 for MTA.
/// A call to TryTakeFromAny may block until an item is available to be removed.
public static int TryTakeFromAny(BlockingCollection[] collections, out T item)
{
return TryTakeFromAny(collections, out item, 0);
}
///
/// Attempts to remove an item from any one of the specified
/// instances.
///
/// The array of collections.
/// The item removed from one of the collections.
/// A that represents the number of milliseconds
/// to wait, or a that represents -1 milliseconds to wait indefinitely.
///
/// The index of the collection in the array from which
/// the item was removed, or -1 if an item could not be removed.
/// The argument is
/// null.
/// The argument is
/// a 0-length array or contains a null element.
/// At least one of the instances has been disposed.
/// is a negative number
/// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
/// .
/// At least one of the underlying collections was modified
/// outside of its instance.
/// The count of is greater than the maximum size of
/// 62 for STA and 63 for MTA.
/// A call to TryTakeFromAny may block until an item is available to be removed.
public static int TryTakeFromAny(BlockingCollection[] collections, out T item, TimeSpan timeout)
{
ValidateTimeout(timeout);
return TryTakeFromAnyCore(collections, out item, (int)timeout.TotalMilliseconds, false, CancellationToken.None);
}
///
/// Attempts to remove an item from any one of the specified
/// instances.
///
/// The array of collections.
/// The item removed from one of the collections.
/// The number of milliseconds to wait, or (-1) to wait indefinitely.
/// The index of the collection in the array from which
/// the item was removed, or -1 if an item could not be removed.
/// The argument is
/// null.
/// The argument is
/// a 0-length array or contains a null element.
/// At least one of the instances has been disposed.
/// is a
/// negative number other than -1, which represents an infinite time-out.
/// At least one of the underlying collections was modified
/// outside of its instance.
/// The count of is greater than the maximum size of
/// 62 for STA and 63 for MTA.
/// A call to TryTakeFromAny may block until an item is available to be removed.
public static int TryTakeFromAny(BlockingCollection[] collections, out T item, int millisecondsTimeout)
{
ValidateMillisecondsTimeout(millisecondsTimeout);
return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, CancellationToken.None);
}
///
/// Attempts to remove an item from any one of the specified
/// instances.
/// A is thrown if the is
/// canceled.
///
/// The array of collections.
/// The item removed from one of the collections.
/// The number of milliseconds to wait, or (-1) to wait indefinitely.
/// A cancellation token to observe.
/// The index of the collection in the array from which
/// the item was removed, or -1 if an item could not be removed.
/// If the is canceled.
/// The argument is
/// null.
/// The argument is
/// a 0-length array or contains a null element.
/// At least one of the instances has been disposed.
/// is a
/// negative number other than -1, which represents an infinite time-out.
/// At least one of the underlying collections was modified
/// outside of its instance.
/// The count of is greater than the maximum size of
/// 62 for STA and 63 for MTA.
/// A call to TryTakeFromAny may block until an item is available to be removed.
public static int TryTakeFromAny(BlockingCollection[] collections, out T item, int millisecondsTimeout, CancellationToken cancellationToken)
{
ValidateMillisecondsTimeout(millisecondsTimeout);
return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, cancellationToken);
}
/// Takes an item from anyone of the specified collections.
/// A is thrown if the is
/// canceled.
///
/// The collections from which the item can be removed.
/// The item removed and returned to the caller.
/// The number of milliseconds to wait for a collection to accept the
/// operation, or -1 to wait indefinitely.
/// True if Take, false if TryTake.
/// A cancellation token to observe.
/// The index into collections for the collection which accepted the
/// removal of the item; -1 if the item could not be removed.
/// If the is canceled.
/// If the collections argument is null.
/// If the collections argument is a 0-length array or contains a
/// null element. Also, if atleast one of the collections has been marked complete for adds.
/// If atleast one of the collections has been disposed.
private static int TryTakeFromAnyCore(BlockingCollection[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)
{
ValidateCollectionsArray(collections, false);
//try the fast path first
for (int i = 0; i < collections.Length; i++)
{
// Check if the collection is not completed, and potentially has at least one element by checking the semaphore count
if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item))
return i;
}
//Fast path failed, try the slow path
return TryTakeFromAnyCoreSlow(collections, out item, millisecondsTimeout, isTakeOperation, externalCancellationToken);
}
/// Takes an item from anyone of the specified collections.
/// A is thrown if the is
/// canceled.
///
/// The collections copy from which the item can be removed.
/// The item removed and returned to the caller.
/// The number of milliseconds to wait for a collection to accept the
/// operation, or -1 to wait indefinitely.
/// True if Take, false if TryTake.
/// A cancellation token to observe.
/// The index into collections for the collection which accepted the
/// removal of the item; -1 if the item could not be removed.
/// If the is canceled.
/// If the collections argument is null.
/// If the collections argument is a 0-length array or contains a
/// null element. Also, if atleast one of the collections has been marked complete for adds.
/// If atleast one of the collections has been disposed.
private static int TryTakeFromAnyCoreSlow(BlockingCollection[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)
{
const int OPERATION_FAILED = -1;
// Copy the wait time to another local variable to update it
int timeout = millisecondsTimeout;
uint startTime = 0;
if (millisecondsTimeout != Timeout.Infinite)
{
startTime = (uint)Environment.TickCount;
}
//Loop until one of these conditions is met:
// 1- The operation is succeeded
// 2- The timeout expired for try* versions
// 3- The external token is cancelled, throw
// 4- The operation is TryTake and all collections are marked as completed, return false
// 5- The operation is Take and all collection are marked as completed, throw
while (millisecondsTimeout == Timeout.Infinite || timeout >= 0)
{
// Get wait handles and the tokens for all collections,
// and construct a single combined token from all the tokens,
// add the combined token handle to the handles list
// call WaitAny for all handles
// After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not
// If the combined token is cancelled, this mean either the external token is cancelled then throw OCE
// or one if the collection is Completed then exclude it and retry
CancellationToken[] collatedCancellationTokens;
List handles = GetHandles(collections, externalCancellationToken, false, out collatedCancellationTokens);
if (handles.Count == 0 && isTakeOperation) //case#5
throw new ArgumentException(SR.GetString(SR.BlockingCollection_CantTakeAnyWhenAllDone), "collections");
else if (handles.Count == 0) //case#4
break;
//Wait for any collection to become available.
using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens))
{
handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list
int index = WaitHandle.WaitAny(handles.ToArray(), timeout, false);
if (linkedTokenSource.IsCancellationRequested && externalCancellationToken.IsCancellationRequested)//case#3
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalCancellationToken);
else if (!linkedTokenSource.IsCancellationRequested)// if no eiter internal or external cancellation trquested
{
Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count));
if (index == WaitHandle.WaitTimeout) //case#2
break;
// adjust the index in case one or more handles removed because they are completed
if (collections.Length != handles.Count - 1) // -1 because of the combined token handle
{
for (int i = 0; i < collections.Length; i++)
{
if (collections[i].m_occupiedNodes.AvailableWaitHandle == handles[index])
{
index = i;
break;
}
}
}
if (collections[index].TryTake(out item)) //case#1
return index;
}
}
// Update the timeout
if (millisecondsTimeout != Timeout.Infinite)
timeout = UpdateTimeOut(startTime, millisecondsTimeout);
}
item = default(T); //case#2
return OPERATION_FAILED;
}
///
/// Marks the instances
/// as not accepting any more additions.
///
///
/// After a collection has been marked as complete for adding, adding to the collection is not permitted
/// and attempts to remove from the collection will not wait when the collection is empty.
///
/// The has been disposed.
public void CompleteAdding()
{
CheckDisposed();
if (IsAddingCompleted)
return;
SpinWait spinner = new SpinWait();
while (true)
{
int observedAdders = m_currentAdders;
if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
{
spinner.Reset();
// If there is another COmpleteAdding in progress waiting the current adders, then spin until it finishes
while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
return;
}
if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders | COMPLETE_ADDING_ON_MASK, observedAdders) == observedAdders)
{
spinner.Reset();
while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
if (Count == 0)
{
CancelWaitingConsumers();
}
// We should always wake waiting producers, and have them throw exceptions as
// Add&CompleteAdding should not be used concurrently.
CancelWaitingProducers();
return;
}
spinner.SpinOnce();
}
}
/// Cancels the semaphores.
private void CancelWaitingConsumers()
{
m_ConsumersCancellationTokenSource.Cancel();
}
private void CancelWaitingProducers()
{
m_ProducersCancellationTokenSource.Cancel();
}
///
/// Releases resources used by the instance.
///
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
///
/// Releases resources used by the instance.
///
/// Whether being disposed explicitly (true) or due to a finalizer (false).
protected virtual void Dispose(bool disposing)
{
if (!m_isDisposed)
{
if (m_freeNodes != null)
{
m_freeNodes.Dispose();
}
m_occupiedNodes.Dispose();
m_isDisposed = true;
}
}
/// Copies the items from the instance into a new array.
/// An array containing copies of the elements of the collection.
/// The has been disposed.
///
/// The copied elements are not removed from the collection.
///
public T[] ToArray()
{
CheckDisposed();
return m_collection.ToArray();
}
/// Copies all of the items in the instance
/// to a compatible one-dimensional array, starting at the specified index of the target array.
///
/// The one-dimensional array that is the destination of the elements copied from
/// the instance. The array must have zero-based indexing.
/// The zero-based index in at which copying begins.
/// The argument is
/// null.
/// The argument is less than zero.
/// The argument is equal to or greater
/// than the length of the .
/// The has been disposed.
public void CopyTo(T[] array, int index)
{
((ICollection)this).CopyTo(array, index);
}
/// Copies all of the items in the instance
/// to a compatible one-dimensional array, starting at the specified index of the target array.
///
/// The one-dimensional array that is the destination of the elements copied from
/// the instance. The array must have zero-based indexing.
/// The zero-based index in at which copying begins.
/// The argument is
/// null.
/// The argument is less than zero.
/// The argument is equal to or greater
/// than the length of the , the array is multidimensional, or the type parameter for the collection
/// cannot be cast automatically to the type of the destination array.
/// The has been disposed.
void ICollection.CopyTo(Array array, int index)
{
CheckDisposed();
//We don't call m_collection.CopyTo() directly because we rely on Array.Copy method to customize
//all array exceptions.
T[] collectionSnapShot = m_collection.ToArray();
try
{
Array.Copy(collectionSnapShot, 0, array, index, collectionSnapShot.Length);
}
catch (ArgumentNullException)
{
throw new ArgumentNullException("array");
}
catch (ArgumentOutOfRangeException)
{
throw new ArgumentOutOfRangeException("index", index, SR.GetString(SR.BlockingCollection_CopyTo_NonNegative));
}
catch (ArgumentException)
{
throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_TooManyElems), "index");
}
catch (RankException)
{
throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_MultiDim), "array");
}
catch (InvalidCastException)
{
throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array");
}
catch (ArrayTypeMismatchException)
{
throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array");
}
}
/// Provides a consuming for items in the collection.
/// An that removes and returns items from the collection.
/// The has been disposed.
public IEnumerable GetConsumingEnumerable()
{
return GetConsumingEnumerable(CancellationToken.None);
}
/// Provides a consuming for items in the collection.
/// Calling MoveNext on the returned enumerable will block if there is no data available, or will
/// throw an if the is canceled.
///
/// A cancellation token to observe.
/// An that removes and returns items from the collection.
/// The has been disposed.
/// If the is canceled.
public IEnumerable GetConsumingEnumerable(CancellationToken cancellationToken)
{
CancellationTokenSource linkedTokenSource = null;
try
{
linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token);
while (!IsCompleted)
{
T item;
if (TryTakeWithNoTimeValidation(out item, Timeout.Infinite, cancellationToken, linkedTokenSource))
{
yield return item;
}
}
}
finally
{
if (linkedTokenSource != null)
{
linkedTokenSource.Dispose();
}
}
}
/// Provides an for items in the collection.
/// An for the items in the collection.
/// The has been disposed.
IEnumerator IEnumerable.GetEnumerator()
{
CheckDisposed();
return m_collection.GetEnumerator();
}
/// Provides an for items in the collection.
/// An for the items in the collection.
/// The has been disposed.
IEnumerator IEnumerable.GetEnumerator()
{
return ((IEnumerable)this).GetEnumerator();
}
/// Centralizes the logic for validating the BlockingCollections array passed to TryAddToAny()
/// and TryTakeFromAny().
/// The collections to/from which an item should be added/removed.
/// Indicates whether this method is called to Add or Take.
/// If the collections argument is null.
/// If the collections argument is a 0-length array or contains a
/// null element. Also, if at least one of the collections has been marked complete for adds.
/// If at least one of the collections has been disposed.
private static void ValidateCollectionsArray(BlockingCollection[] collections, bool isAddOperation)
{
if (collections == null)
{
throw new ArgumentNullException("collections");
}
else if (collections.Length < 1)
{
throw new ArgumentException(
SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_ZeroSize), "collections");
}
else if ((!IsSTAThread && collections.Length > 63) || (IsSTAThread && collections.Length > 62))
//The number of WaitHandles must be <= 64 for MTA, and <=63 for STA, and we reserve one for CancellationToken
{
throw new ArgumentOutOfRangeException(
"collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_LargeSize));
}
for (int i = 0; i < collections.Length; ++i)
{
if (collections[i] == null)
{
throw new ArgumentException(
SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_NullElems), "collections");
}
if (collections[i].m_isDisposed)
throw new ObjectDisposedException(
"collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_DispElems));
if (isAddOperation && collections[i].IsAddingCompleted)
{
throw new ArgumentException(
SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
}
}
}
private static bool IsSTAThread
{
get
{
#if !SILVERLIGHT
return Thread.CurrentThread.GetApartmentState() == ApartmentState.STA;
#else
return false;
#endif
}
}
// ---------
// Private Helpers.
/// Centeralizes the logic of validating the timeout input argument.
/// The TimeSpan to wait for to successfully complete an operation on the collection.
/// If the number of millseconds represented by the timeout
/// TimeSpan is less than 0 or is larger than Int32.MaxValue and not Timeout.Infinite
private static void ValidateTimeout(TimeSpan timeout)
{
long totalMilliseconds = (long)timeout.TotalMilliseconds;
if ((totalMilliseconds < 0 || totalMilliseconds > Int32.MaxValue) && (totalMilliseconds != Timeout.Infinite))
{
throw new ArgumentOutOfRangeException("timeout", timeout,
String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue));
}
}
/// Centralizes the logic of validating the millisecondsTimeout input argument.
/// The number of milliseconds to wait for to successfully complete an
/// operation on the collection.
/// If the number of millseconds is less than 0 and not
/// equal to Timeout.Infinite.
private static void ValidateMillisecondsTimeout(int millisecondsTimeout)
{
if ((millisecondsTimeout < 0) && (millisecondsTimeout != Timeout.Infinite))
{
throw new ArgumentOutOfRangeException("millisecondsTimeout", millisecondsTimeout,
String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue));
}
}
/// Throws a System.ObjectDisposedException if the collection was disposed
/// If the collection has been disposed.
private void CheckDisposed()
{
if (m_isDisposed)
{
throw new ObjectDisposedException("BlockingCollection", SR.GetString(SR.BlockingCollection_Disposed));
}
}
}
/// A debugger view of the blocking collection that makes it simple to browse the
/// collection's contents at a point in time.
/// The type of element that the BlockingCollection will hold.
internal sealed class SystemThreadingCollections_BlockingCollectionDebugView
{
private BlockingCollection m_blockingCollection; // The collection being viewed.
/// Constructs a new debugger view object for the provided blocking collection object.
/// A blocking collection to browse in the debugger.
public SystemThreadingCollections_BlockingCollectionDebugView(BlockingCollection collection)
{
if (collection == null)
{
throw new ArgumentNullException("collection");
}
m_blockingCollection = collection;
}
/// Returns a snapshot of the underlying collection's elements.
[DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
public T[] Items
{
get
{
return m_blockingCollection.ToArray();
}
}
}
}
#pragma warning restore 0420