Rewrite with hard-coded offsets into the PE file format to discern if a binary is PE32 or PE32+, and then to determine if it contains a "CLR Data Directory" entry that looks valid. Tested with PE32 and PE32+ compiled Mono binaries, PE32 and PE32+ native binaries, and a random assortment of garbage files. Former-commit-id: 9e7ac86ec84f653a2f79b87183efd5b0ebda001b
1848 lines
104 KiB
C#
1848 lines
104 KiB
C#
// ==++==
|
|
//
|
|
// Copyright (c) Microsoft Corporation. All rights reserved.
|
|
//
|
|
// ==--==
|
|
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
|
|
//
|
|
// BlockingCollection.cs
|
|
//
|
|
// <OWNER>Microsoft</OWNER>
|
|
//
|
|
// A class that implements the bounding and blocking functionality while abstracting away
|
|
// the underlying storage mechanism. This file also contains BlockingCollection's
|
|
// associated debugger view type.
|
|
//
|
|
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
|
|
#pragma warning disable 0420
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Collections;
|
|
using System.Diagnostics;
|
|
using System.Globalization;
|
|
using System.Security.Permissions;
|
|
using System.Runtime.InteropServices;
|
|
using System.Threading;
|
|
|
|
namespace System.Collections.Concurrent
|
|
{
|
|
/// <summary>
|
|
/// Provides blocking and bounding capabilities for thread-safe collections that
|
|
/// implement <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> represents a collection
|
|
/// that allows for thread-safe adding and removing of data.
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is used as a wrapper
|
|
/// for an <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> instance, allowing
|
|
/// removal attempts from the collection to block until data is available to be removed. Similarly,
|
|
/// a <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> can be created to enforce
|
|
/// an upper-bound on the number of data elements allowed in the
|
|
/// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>; addition attempts to the
|
|
/// collection may then block until space is available to store the added items. In this manner,
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is similar to a traditional
|
|
/// blocking queue data structure, except that the underlying data storage mechanism is abstracted
|
|
/// away as an <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>.
|
|
/// </remarks>
|
|
/// <typeparam name="T">Specifies the type of elements in the collection.</typeparam>
|
|
[ComVisible(false)]
|
|
#if !FEATURE_NETCORE
|
|
#pragma warning disable 0618
|
|
[HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
|
|
#pragma warning restore 0618
|
|
#endif
|
|
[DebuggerTypeProxy(typeof(SystemThreadingCollections_BlockingCollectionDebugView<>))]
|
|
[DebuggerDisplay("Count = {Count}, Type = {m_collection}")]
|
|
public class BlockingCollection<T> : IEnumerable<T>, ICollection, IDisposable, IReadOnlyCollection<T>
|
|
{
|
|
private IProducerConsumerCollection<T> m_collection;
|
|
private int m_boundedCapacity;
|
|
private const int NON_BOUNDED = -1;
|
|
private SemaphoreSlim m_freeNodes;
|
|
private SemaphoreSlim m_occupiedNodes;
|
|
private bool m_isDisposed;
|
|
private CancellationTokenSource m_ConsumersCancellationTokenSource;
|
|
private CancellationTokenSource m_ProducersCancellationTokenSource;
|
|
|
|
private volatile int m_currentAdders;
|
|
private const int COMPLETE_ADDING_ON_MASK = unchecked((int)0x80000000);
|
|
|
|
#region Properties
|
|
/// <summary>Gets the bounded capacity of this <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</summary>
|
|
/// <value>The bounded capacity of this collection, or int.MaxValue if no bound was supplied.</value>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
public int BoundedCapacity
|
|
{
|
|
get
|
|
{
|
|
CheckDisposed();
|
|
return m_boundedCapacity;
|
|
}
|
|
}
|
|
|
|
/// <summary>Gets whether this <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked as complete for adding.</summary>
|
|
/// <value>Whether this collection has been marked as complete for adding.</value>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
public bool IsAddingCompleted
|
|
{
|
|
get
|
|
{
|
|
CheckDisposed();
|
|
return (m_currentAdders == COMPLETE_ADDING_ON_MASK);
|
|
}
|
|
}
|
|
|
|
/// <summary>Gets whether this <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked as complete for adding and is empty.</summary>
|
|
/// <value>Whether this collection has been marked as complete for adding and is empty.</value>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
public bool IsCompleted
|
|
{
|
|
get
|
|
{
|
|
CheckDisposed();
|
|
return (IsAddingCompleted && (m_occupiedNodes.CurrentCount == 0));
|
|
}
|
|
}
|
|
|
|
/// <summary>Gets the number of items contained in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</summary>
|
|
/// <value>The number of items contained in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</value>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
public int Count
|
|
{
|
|
get
|
|
{
|
|
CheckDisposed();
|
|
return m_occupiedNodes.CurrentCount;
|
|
}
|
|
}
|
|
|
|
/// <summary>Gets a value indicating whether access to the <see cref="T:System.Collections.ICollection"/> is synchronized.</summary>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
bool ICollection.IsSynchronized
|
|
{
|
|
get
|
|
{
|
|
CheckDisposed();
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets an object that can be used to synchronize access to the <see
|
|
/// cref="T:System.Collections.ICollection"/>. This property is not supported.
|
|
/// </summary>
|
|
/// <exception cref="T:System.NotSupportedException">The SyncRoot property is not supported.</exception>
|
|
object ICollection.SyncRoot
|
|
{
|
|
get
|
|
{
|
|
throw new NotSupportedException(SR.GetString(SR.ConcurrentCollection_SyncRoot_NotSupported));
|
|
}
|
|
}
|
|
#endregion
|
|
|
|
|
|
/// <summary>Initializes a new instance of the
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>
|
|
/// class without an upper-bound.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// The default underlying collection is a <see cref="System.Collections.Concurrent.ConcurrentQueue{T}">ConcurrentQueue<T></see>.
|
|
/// </remarks>
|
|
public BlockingCollection()
|
|
: this(new ConcurrentQueue<T>())
|
|
{
|
|
}
|
|
|
|
/// <summary>Initializes a new instance of the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>
|
|
/// class with the specified upper-bound.
|
|
/// </summary>
|
|
/// <param name="boundedCapacity">The bounded size of the collection.</param>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The <paramref name="boundedCapacity"/> is
|
|
/// not a positive value.</exception>
|
|
/// <remarks>
|
|
/// The default underlying collection is a <see cref="System.Collections.Concurrent.ConcurrentQueue{T}">ConcurrentQueue<T></see>.
|
|
/// </remarks>
|
|
public BlockingCollection(int boundedCapacity)
|
|
: this(new ConcurrentQueue<T>(), boundedCapacity)
|
|
{
|
|
}
|
|
|
|
/// <summary>Initializes a new instance of the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>
|
|
/// class with the specified upper-bound and using the provided
|
|
/// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> as its underlying data store.</summary>
|
|
/// <param name="collection">The collection to use as the underlying data store.</param>
|
|
/// <param name="boundedCapacity">The bounded size of the collection.</param>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collection"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The <paramref name="boundedCapacity"/> is not a positive value.</exception>
|
|
/// <exception cref="System.ArgumentException">The supplied <paramref name="collection"/> contains more values
|
|
/// than is permitted by <paramref name="boundedCapacity"/>.</exception>
|
|
public BlockingCollection(IProducerConsumerCollection<T> collection, int boundedCapacity)
|
|
{
|
|
if (boundedCapacity < 1)
|
|
{
|
|
throw new ArgumentOutOfRangeException(
|
|
"boundedCapacity", boundedCapacity,
|
|
SR.GetString(SR.BlockingCollection_ctor_BoundedCapacityRange));
|
|
}
|
|
if (collection == null)
|
|
{
|
|
throw new ArgumentNullException("collection");
|
|
}
|
|
int count = collection.Count;
|
|
if (count > boundedCapacity)
|
|
{
|
|
throw new ArgumentException(SR.GetString(SR.BlockingCollection_ctor_CountMoreThanCapacity));
|
|
}
|
|
Initialize(collection, boundedCapacity, count);
|
|
}
|
|
|
|
/// <summary>Initializes a new instance of the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>
|
|
/// class without an upper-bound and using the provided
|
|
/// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> as its underlying data store.</summary>
|
|
/// <param name="collection">The collection to use as the underlying data store.</param>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collection"/> argument is
|
|
/// null.</exception>
|
|
public BlockingCollection(IProducerConsumerCollection<T> collection)
|
|
{
|
|
if (collection == null)
|
|
{
|
|
throw new ArgumentNullException("collection");
|
|
}
|
|
Initialize(collection, NON_BOUNDED, collection.Count);
|
|
}
|
|
|
|
/// <summary>Initializes the BlockingCollection instance.</summary>
|
|
/// <param name="collection">The collection to use as the underlying data store.</param>
|
|
/// <param name="boundedCapacity">The bounded size of the collection.</param>
|
|
/// <param name="collectionCount">The number of items currently in the underlying collection.</param>
|
|
private void Initialize(IProducerConsumerCollection<T> collection, int boundedCapacity, int collectionCount)
|
|
{
|
|
Debug.Assert(boundedCapacity > 0 || boundedCapacity == NON_BOUNDED);
|
|
|
|
m_collection = collection;
|
|
m_boundedCapacity = boundedCapacity; ;
|
|
m_isDisposed = false;
|
|
m_ConsumersCancellationTokenSource = new CancellationTokenSource();
|
|
m_ProducersCancellationTokenSource = new CancellationTokenSource();
|
|
|
|
if (boundedCapacity == NON_BOUNDED)
|
|
{
|
|
m_freeNodes = null;
|
|
}
|
|
else
|
|
{
|
|
Debug.Assert(boundedCapacity > 0);
|
|
m_freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount);
|
|
}
|
|
|
|
|
|
m_occupiedNodes = new SemaphoreSlim(collectionCount);
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Adds the item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
|
|
/// </summary>
|
|
/// <param name="item">The item to be added to the collection. The value can be a null reference.</param>
|
|
/// <exception cref="T:System.InvalidOperationException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
|
|
/// as complete with regards to additions.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
|
|
/// <remarks>
|
|
/// If a bounded capacity was specified when this instance of
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> was initialized,
|
|
/// a call to Add may block until space is available to store the provided item.
|
|
/// </remarks>
|
|
public void Add(T item)
|
|
{
|
|
#if DEBUG
|
|
bool tryAddReturnValue =
|
|
#endif
|
|
TryAddWithNoTimeValidation(item, Timeout.Infinite, new CancellationToken());
|
|
#if DEBUG
|
|
Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
|
|
#endif
|
|
}
|
|
|
|
/// <summary>
|
|
/// Adds the item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
|
|
/// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
|
|
/// canceled.
|
|
/// </summary>
|
|
/// <param name="item">The item to be added to the collection. The value can be a null reference.</param>
|
|
/// <param name="cancellationToken">A cancellation token to observe.</param>
|
|
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
|
|
/// as complete with regards to additions.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
|
|
/// <remarks>
|
|
/// If a bounded capacity was specified when this instance of
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> was initialized,
|
|
/// a call to <see cref="Add(T,System.Threading.CancellationToken)"/> may block until space is available to store the provided item.
|
|
/// </remarks>
|
|
public void Add(T item, CancellationToken cancellationToken)
|
|
{
|
|
#if DEBUG
|
|
bool tryAddReturnValue =
|
|
#endif
|
|
TryAddWithNoTimeValidation(item, Timeout.Infinite, cancellationToken);
|
|
#if DEBUG
|
|
Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true.");
|
|
#endif
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
|
|
/// </summary>
|
|
/// <param name="item">The item to be added to the collection.</param>
|
|
/// <returns>true if the <paramref name="item"/> could be added; otherwise, false.</returns>
|
|
/// <exception cref="T:System.InvalidOperationException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
|
|
/// as complete with regards to additions.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
|
|
public bool TryAdd(T item)
|
|
{
|
|
return TryAddWithNoTimeValidation(item, 0, new CancellationToken());
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
|
|
/// </summary>
|
|
/// <param name="item">The item to be added to the collection.</param>
|
|
/// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
|
|
/// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
|
|
/// </param>
|
|
/// <returns>true if the <paramref name="item"/> could be added to the collection within
|
|
/// the alloted time; otherwise, false.</returns>
|
|
/// <exception cref="T:System.InvalidOperationException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
|
|
/// as complete with regards to additions.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
|
|
/// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
|
|
/// <see cref="System.Int32.MaxValue"/>.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
|
|
public bool TryAdd(T item, TimeSpan timeout)
|
|
{
|
|
ValidateTimeout(timeout);
|
|
return TryAddWithNoTimeValidation(item, (int)timeout.TotalMilliseconds, new CancellationToken());
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
|
|
/// </summary>
|
|
/// <param name="item">The item to be added to the collection.</param>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
|
|
/// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
|
|
/// <returns>true if the <paramref name="item"/> could be added to the collection within
|
|
/// the alloted time; otherwise, false.</returns>
|
|
/// <exception cref="T:System.InvalidOperationException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
|
|
/// as complete with regards to additions.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
|
|
/// negative number other than -1, which represents an infinite time-out.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
|
|
public bool TryAdd(T item, int millisecondsTimeout)
|
|
{
|
|
ValidateMillisecondsTimeout(millisecondsTimeout);
|
|
return TryAddWithNoTimeValidation(item, millisecondsTimeout, new CancellationToken());
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
|
|
/// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
|
|
/// canceled.
|
|
/// </summary>
|
|
/// <param name="item">The item to be added to the collection.</param>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
|
|
/// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
|
|
/// <param name="cancellationToken">A cancellation token to observe.</param>
|
|
/// <returns>true if the <paramref name="item"/> could be added to the collection within
|
|
/// the alloted time; otherwise, false.</returns>
|
|
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked
|
|
/// as complete with regards to additions.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
|
|
/// negative number other than -1, which represents an infinite time-out.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
|
|
public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken)
|
|
{
|
|
ValidateMillisecondsTimeout(millisecondsTimeout);
|
|
return TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken);
|
|
}
|
|
|
|
/// <summary>Adds an item into the underlying data store using its IProducerConsumerCollection<T>.Add
|
|
/// method. If a bounded capacity was specified and the collection was full,
|
|
/// this method will wait for, at most, the timeout period trying to add the item.
|
|
/// If the timeout period was exhaused before successfully adding the item this method will
|
|
/// return false.</summary>
|
|
/// <param name="item">The item to be added to the collection.</param>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait for the collection to accept the item,
|
|
/// or Timeout.Infinite to wait indefinitely.</param>
|
|
/// <param name="cancellationToken">A cancellation token to observe.</param>
|
|
/// <returns>False if the collection remained full till the timeout period was exhausted.True otherwise.</returns>
|
|
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
|
|
/// <exception cref="System.InvalidOperationException">the collection has already been marked
|
|
/// as complete with regards to additions.</exception>
|
|
/// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception>
|
|
private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken)
|
|
{
|
|
CheckDisposed();
|
|
|
|
if (cancellationToken.IsCancellationRequested)
|
|
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
|
|
|
|
if (IsAddingCompleted)
|
|
{
|
|
throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
|
|
}
|
|
|
|
bool waitForSemaphoreWasSuccessful = true;
|
|
|
|
if (m_freeNodes != null)
|
|
{
|
|
//If the m_freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding()
|
|
//was called concurrently with Adding which is not supported by BlockingCollection.
|
|
CancellationTokenSource linkedTokenSource = null;
|
|
try
|
|
{
|
|
waitForSemaphoreWasSuccessful = m_freeNodes.Wait(0);
|
|
if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
|
|
{
|
|
linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
|
|
cancellationToken, m_ProducersCancellationTokenSource.Token);
|
|
waitForSemaphoreWasSuccessful = m_freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
//if cancellation was via external token, throw an OCE
|
|
if (cancellationToken.IsCancellationRequested)
|
|
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
|
|
|
|
//if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx.
|
|
//Contract.Assert(m_ProducersCancellationTokenSource.Token.IsCancellationRequested);
|
|
|
|
throw new InvalidOperationException
|
|
(SR.GetString(SR.BlockingCollection_Add_ConcurrentCompleteAdd));
|
|
}
|
|
finally
|
|
{
|
|
if (linkedTokenSource != null)
|
|
{
|
|
linkedTokenSource.Dispose();
|
|
}
|
|
}
|
|
}
|
|
if (waitForSemaphoreWasSuccessful)
|
|
{
|
|
// Update the adders count if the complete adding was not requested, otherwise
|
|
// spins until all adders finish then throw IOE
|
|
// The idea behind to spin untill all adders finish, is to avoid to return to the caller with IOE while there are still some adders have
|
|
// not been finished yet
|
|
SpinWait spinner = new SpinWait();
|
|
while (true)
|
|
{
|
|
int observedAdders = m_currentAdders;
|
|
if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
|
|
{
|
|
spinner.Reset();
|
|
// CompleteAdding is requested, spin then throw
|
|
while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
|
|
throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed));
|
|
}
|
|
if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders + 1, observedAdders) == observedAdders)
|
|
{
|
|
Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread excceeded the maximum limit.");
|
|
break;
|
|
}
|
|
spinner.SpinOnce();
|
|
}
|
|
|
|
// This outer try/finally to workaround of repeating the decrement adders code 3 times, because we should decrement the adders if:
|
|
// 1- m_collection.TryAdd threw an exception
|
|
// 2- m_collection.TryAdd succeeded
|
|
// 3- m_collection.TryAdd returned false
|
|
// so we put the decrement code in the finally block
|
|
try
|
|
{
|
|
|
|
//TryAdd is guaranteed to find a place to add the element. Its return value depends
|
|
//on the semantics of the underlying store. Some underlying stores will not add an already
|
|
//existing item and thus TryAdd returns false indicating that the size of the underlying
|
|
//store did not increase.
|
|
|
|
|
|
bool addingSucceeded = false;
|
|
try
|
|
{
|
|
//The token may have been canceled before the collection had space available, so we need a check after the wait has completed.
|
|
//This fixes bug #702328, case 2 of 2.
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
addingSucceeded = m_collection.TryAdd(item);
|
|
}
|
|
catch
|
|
{
|
|
//TryAdd did not result in increasing the size of the underlying store and hence we need
|
|
//to increment back the count of the m_freeNodes semaphore.
|
|
if (m_freeNodes != null)
|
|
{
|
|
m_freeNodes.Release();
|
|
}
|
|
throw;
|
|
}
|
|
if (addingSucceeded)
|
|
{
|
|
//After adding an element to the underlying storage, signal to the consumers
|
|
//waiting on m_occupiedNodes that there is a new item added ready to be consumed.
|
|
m_occupiedNodes.Release();
|
|
}
|
|
else
|
|
{
|
|
throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Add_Failed));
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
// decrement the adders count
|
|
Debug.Assert((m_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0);
|
|
Interlocked.Decrement(ref m_currentAdders);
|
|
}
|
|
|
|
|
|
}
|
|
return waitForSemaphoreWasSuccessful;
|
|
}
|
|
|
|
/// <summary>Takes an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</summary>
|
|
/// <returns>The item removed from the collection.</returns>
|
|
/// <exception cref="T:System.OperationCanceledException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is empty and has been marked
|
|
/// as complete with regards to additions.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
|
|
/// outside of this <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
|
|
/// <remarks>A call to <see cref="Take()"/> may block until an item is available to be removed.</remarks>
|
|
public T Take()
|
|
{
|
|
T item;
|
|
|
|
if (!TryTake(out item, Timeout.Infinite, CancellationToken.None))
|
|
{
|
|
throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone));
|
|
}
|
|
|
|
return item;
|
|
}
|
|
|
|
/// <summary>Takes an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</summary>
|
|
/// <returns>The item removed from the collection.</returns>
|
|
/// <exception cref="T:System.OperationCanceledException">If the <see cref="CancellationToken"/> is
|
|
/// canceled or the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is empty and has been marked
|
|
/// as complete with regards to additions.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
|
|
/// outside of this <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
|
|
/// <remarks>A call to <see cref="Take(CancellationToken)"/> may block until an item is available to be removed.</remarks>
|
|
public T Take(CancellationToken cancellationToken)
|
|
{
|
|
T item;
|
|
|
|
if (!TryTake(out item, Timeout.Infinite, cancellationToken))
|
|
{
|
|
throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone));
|
|
}
|
|
|
|
return item;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
|
|
/// </summary>
|
|
/// <param name="item">The item removed from the collection.</param>
|
|
/// <returns>true if an item could be removed; otherwise, false.</returns>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
|
|
/// outside of this <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
|
|
public bool TryTake(out T item)
|
|
{
|
|
return TryTake(out item, 0, CancellationToken.None);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
|
|
/// </summary>
|
|
/// <param name="item">The item removed from the collection.</param>
|
|
/// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
|
|
/// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
|
|
/// </param>
|
|
/// <returns>true if an item could be removed from the collection within
|
|
/// the alloted time; otherwise, false.</returns>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
|
|
/// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
|
|
/// <see cref="System.Int32.MaxValue"/>.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
|
|
/// outside of this <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
|
|
public bool TryTake(out T item, TimeSpan timeout)
|
|
{
|
|
ValidateTimeout(timeout);
|
|
return TryTakeWithNoTimeValidation(out item, (int)timeout.TotalMilliseconds, CancellationToken.None, null);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
|
|
/// </summary>
|
|
/// <param name="item">The item removed from the collection.</param>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
|
|
/// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
|
|
/// <returns>true if an item could be removed from the collection within
|
|
/// the alloted time; otherwise, false.</returns>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
|
|
/// negative number other than -1, which represents an infinite time-out.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
|
|
/// outside of this <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
|
|
public bool TryTake(out T item, int millisecondsTimeout)
|
|
{
|
|
ValidateMillisecondsTimeout(millisecondsTimeout);
|
|
return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, CancellationToken.None, null);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.
|
|
/// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
|
|
/// canceled.
|
|
/// </summary>
|
|
/// <param name="item">The item removed from the collection.</param>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
|
|
/// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
|
|
/// <param name="cancellationToken">A cancellation token to observe.</param>
|
|
/// <returns>true if an item could be removed from the collection within
|
|
/// the alloted time; otherwise, false.</returns>
|
|
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
|
|
/// negative number other than -1, which represents an infinite time-out.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">The underlying collection was modified
|
|
/// outside of this <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
|
|
public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
|
|
{
|
|
ValidateMillisecondsTimeout(millisecondsTimeout);
|
|
return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, cancellationToken, null);
|
|
}
|
|
|
|
/// <summary>Takes an item from the underlying data store using its IProducerConsumerCollection<T>.Take
|
|
/// method. If the collection was empty, this method will wait for, at most, the timeout period (if AddingIsCompleted is false)
|
|
/// trying to remove an item. If the timeout period was exhaused before successfully removing an item
|
|
/// this method will return false.
|
|
/// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
|
|
/// canceled.
|
|
/// </summary>
|
|
/// <param name="item">The item removed from the collection.</param>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait for the collection to have an item available
|
|
/// for removal, or Timeout.Infinite to wait indefinitely.</param>
|
|
/// <param name="cancellationToken">A cancellation token to observe.</param>
|
|
/// <param name="combinedTokenSource">A combined cancellation token if created, it is only created by GetConsumingEnumerable to avoid creating the linked token
|
|
/// multiple times.</param>
|
|
/// <returns>False if the collection remained empty till the timeout period was exhausted. True otherwise.</returns>
|
|
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
|
|
/// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception>
|
|
private bool TryTakeWithNoTimeValidation(out T item, int millisecondsTimeout, CancellationToken cancellationToken, CancellationTokenSource combinedTokenSource)
|
|
{
|
|
CheckDisposed();
|
|
item = default(T);
|
|
|
|
if (cancellationToken.IsCancellationRequested)
|
|
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
|
|
|
|
//If the collection is completed then there is no need to wait.
|
|
if (IsCompleted)
|
|
{
|
|
return false;
|
|
}
|
|
bool waitForSemaphoreWasSuccessful = false;
|
|
|
|
// set the combined token source to the combinedToken paramater if it is not null (came from GetConsumingEnumerable)
|
|
CancellationTokenSource linkedTokenSource = combinedTokenSource;
|
|
try
|
|
{
|
|
waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(0);
|
|
if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0)
|
|
{
|
|
// create the linked token if it is not created yet
|
|
if (combinedTokenSource == null)
|
|
linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken,
|
|
m_ConsumersCancellationTokenSource.Token);
|
|
waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(millisecondsTimeout, linkedTokenSource.Token);
|
|
}
|
|
}
|
|
//The collection became completed while waiting on the semaphore.
|
|
catch (OperationCanceledException)
|
|
{
|
|
if (cancellationToken.IsCancellationRequested)
|
|
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
|
|
|
|
return false;
|
|
}
|
|
finally
|
|
{
|
|
// only dispose the combined token source if we created it here, otherwise the caller (GetConsumingEnumerable) is responsible for disposing it
|
|
if (linkedTokenSource != null && combinedTokenSource == null)
|
|
{
|
|
linkedTokenSource.Dispose();
|
|
}
|
|
}
|
|
|
|
if (waitForSemaphoreWasSuccessful)
|
|
{
|
|
bool removeSucceeded = false;
|
|
bool removeFaulted = true;
|
|
try
|
|
{
|
|
//The token may have been canceled before an item arrived, so we need a check after the wait has completed.
|
|
//This fixes bug #702328, case 1 of 2.
|
|
cancellationToken.ThrowIfCancellationRequested();
|
|
|
|
//If an item was successfully removed from the underlying collection.
|
|
removeSucceeded = m_collection.TryTake(out item);
|
|
removeFaulted = false;
|
|
if (!removeSucceeded)
|
|
{
|
|
// Check if the collection is empty which means that the collection was modified outside BlockingCollection
|
|
throw new InvalidOperationException
|
|
(SR.GetString(SR.BlockingCollection_Take_CollectionModified));
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
// removeFaulted implies !removeSucceeded, but the reverse is not true.
|
|
if (removeSucceeded)
|
|
{
|
|
if (m_freeNodes != null)
|
|
{
|
|
Debug.Assert(m_boundedCapacity != NON_BOUNDED);
|
|
m_freeNodes.Release();
|
|
}
|
|
}
|
|
else if (removeFaulted)
|
|
{
|
|
m_occupiedNodes.Release();
|
|
}
|
|
//Last remover will detect that it has actually removed the last item from the
|
|
//collection and that CompleteAdding() was called previously. Thus, it will cancel the semaphores
|
|
//so that any thread waiting on them wakes up. Note several threads may call CancelWaitingConsumers
|
|
//but this is not a problem.
|
|
if (IsCompleted)
|
|
{
|
|
CancelWaitingConsumers();
|
|
}
|
|
}
|
|
}
|
|
return waitForSemaphoreWasSuccessful;
|
|
}
|
|
|
|
|
|
|
|
/// <summary>
|
|
/// Adds the specified item to any one of the specified
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
|
|
/// </summary>
|
|
/// <param name="collections">The array of collections.</param>
|
|
/// <param name="item">The item to be added to one of the collections.</param>
|
|
/// <returns>The index of the collection in the <paramref name="collections"/> array to which the item was added.</returns>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
|
|
/// a 0-length array or contains a null element, or at least one of collections has been
|
|
/// marked as complete for adding.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">At least one of the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
|
|
/// 62 for STA and 63 for MTA.</exception>
|
|
/// <remarks>
|
|
/// If a bounded capacity was specified when all of the
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances were initialized,
|
|
/// a call to AddToAny may block until space is available in one of the collections
|
|
/// to store the provided item.
|
|
/// </remarks>
|
|
public static int AddToAny(BlockingCollection<T>[] collections, T item)
|
|
{
|
|
#if DEBUG
|
|
int tryAddAnyReturnValue =
|
|
#else
|
|
return
|
|
#endif
|
|
TryAddToAny(collections, item, Timeout.Infinite, CancellationToken.None);
|
|
#if DEBUG
|
|
Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length)
|
|
, "TryAddToAny() was expected to return an index within the bounds of the collections array.");
|
|
return tryAddAnyReturnValue;
|
|
#endif
|
|
}
|
|
|
|
/// <summary>
|
|
/// Adds the specified item to any one of the specified
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
|
|
/// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
|
|
/// canceled.
|
|
/// </summary>
|
|
/// <param name="collections">The array of collections.</param>
|
|
/// <param name="item">The item to be added to one of the collections.</param>
|
|
/// <param name="cancellationToken">A cancellation token to observe.</param>
|
|
/// <returns>The index of the collection in the <paramref name="collections"/> array to which the item was added.</returns>
|
|
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
|
|
/// a 0-length array or contains a null element, or at least one of collections has been
|
|
/// marked as complete for adding.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">At least one of the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
|
|
/// 62 for STA and 63 for MTA.</exception>
|
|
/// <remarks>
|
|
/// If a bounded capacity was specified when all of the
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances were initialized,
|
|
/// a call to AddToAny may block until space is available in one of the collections
|
|
/// to store the provided item.
|
|
/// </remarks>
|
|
public static int AddToAny(BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)
|
|
{
|
|
#if DEBUG
|
|
int tryAddAnyReturnValue =
|
|
#else
|
|
return
|
|
#endif
|
|
TryAddToAny(collections, item, Timeout.Infinite, cancellationToken);
|
|
#if DEBUG
|
|
Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length)
|
|
, "TryAddToAny() was expected to return an index within the bounds of the collections array.");
|
|
return tryAddAnyReturnValue;
|
|
#endif
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to add the specified item to any one of the specified
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
|
|
/// </summary>
|
|
/// <param name="collections">The array of collections.</param>
|
|
/// <param name="item">The item to be added to one of the collections.</param>
|
|
/// <returns>The index of the collection in the <paramref name="collections"/>
|
|
/// array to which the item was added, or -1 if the item could not be added.</returns>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
|
|
/// a 0-length array or contains a null element, or at least one of collections has been
|
|
/// marked as complete for adding.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">At least one of the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
|
|
/// 62 for STA and 63 for MTA.</exception>
|
|
public static int TryAddToAny(BlockingCollection<T>[] collections, T item)
|
|
{
|
|
return TryAddToAny(collections, item, 0, CancellationToken.None);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to add the specified item to any one of the specified
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
|
|
/// </summary>
|
|
/// <param name="collections">The array of collections.</param>
|
|
/// <param name="item">The item to be added to one of the collections.</param>
|
|
/// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
|
|
/// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
|
|
/// </param>
|
|
/// <returns>The index of the collection in the <paramref name="collections"/>
|
|
/// array to which the item was added, or -1 if the item could not be added.</returns>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
|
|
/// a 0-length array or contains a null element, or at least one of collections has been
|
|
/// marked as complete for adding.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">At least one of the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
|
|
/// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
|
|
/// <see cref="System.Int32.MaxValue"/>.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
|
|
/// 62 for STA and 63 for MTA.</exception>
|
|
public static int TryAddToAny(BlockingCollection<T>[] collections, T item, TimeSpan timeout)
|
|
{
|
|
ValidateTimeout(timeout);
|
|
return TryAddToAnyCore(collections, item, (int)timeout.TotalMilliseconds, CancellationToken.None);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to add the specified item to any one of the specified
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
|
|
/// </summary>
|
|
/// <param name="collections">The array of collections.</param>
|
|
/// <param name="item">The item to be added to one of the collections.</param>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
|
|
/// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param> /// <returns>The index of the collection in the <paramref name="collections"/>
|
|
/// array to which the item was added, or -1 if the item could not be added.</returns>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
|
|
/// a 0-length array or contains a null element, or at least one of collections has been
|
|
/// marked as complete for adding.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">At least one of the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
|
|
/// negative number other than -1, which represents an infinite time-out.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
|
|
/// 62 for STA and 63 for MTA.</exception>
|
|
public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
|
|
{
|
|
ValidateMillisecondsTimeout(millisecondsTimeout);
|
|
return TryAddToAnyCore(collections, item, millisecondsTimeout, CancellationToken.None);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to add the specified item to any one of the specified
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
|
|
/// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
|
|
/// canceled.
|
|
/// </summary>
|
|
/// <param name="collections">The array of collections.</param>
|
|
/// <param name="item">The item to be added to one of the collections.</param>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
|
|
/// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
|
|
/// <returns>The index of the collection in the <paramref name="collections"/>
|
|
/// array to which the item was added, or -1 if the item could not be added.</returns>
|
|
/// <param name="cancellationToken">A cancellation token to observe.</param>
|
|
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
|
|
/// a 0-length array or contains a null element, or at least one of collections has been
|
|
/// marked as complete for adding.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">At least one of the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
|
|
/// negative number other than -1, which represents an infinite time-out.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
|
|
/// 62 for STA and 63 for MTA.</exception>
|
|
public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken)
|
|
{
|
|
ValidateMillisecondsTimeout(millisecondsTimeout);
|
|
return TryAddToAnyCore(collections, item, millisecondsTimeout, cancellationToken);
|
|
}
|
|
|
|
/// <summary>Adds an item to anyone of the specified collections.
|
|
/// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
|
|
/// canceled.
|
|
/// </summary>
|
|
/// <param name="collections">The collections into which the item can be added.</param>
|
|
/// <param name="item">The item to be added .</param>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the
|
|
/// operation, or -1 to wait indefinitely.</param>
|
|
/// <param name="externalCancellationToken">A cancellation token to observe.</param>
|
|
/// <returns>The index into collections for the collection which accepted the
|
|
/// adding of the item; -1 if the item could not be added.</returns>
|
|
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
|
|
/// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
|
|
/// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a
|
|
/// null element. Also, if atleast one of the collections has been marked complete for adds.</exception>
|
|
/// <exception cref="System.ObjectDisposedException">If atleast one of the collections has been disposed.</exception>
|
|
private static int TryAddToAnyCore(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken externalCancellationToken)
|
|
{
|
|
ValidateCollectionsArray(collections, true);
|
|
const int OPERATION_FAILED = -1;
|
|
|
|
// Copy the wait time to another local variable to update it
|
|
int timeout = millisecondsTimeout;
|
|
|
|
uint startTime = 0;
|
|
if (millisecondsTimeout != Timeout.Infinite)
|
|
{
|
|
startTime = (uint)Environment.TickCount;
|
|
}
|
|
|
|
// Fast path for adding if there is at least one unbounded collection
|
|
int index = TryAddToAnyFast(collections, item);
|
|
if (index > -1)
|
|
return index;
|
|
|
|
|
|
// Get wait handles and the tokens for all collections,
|
|
// and construct a single combined token from all the tokens,
|
|
// add the combined token handle to the handles list
|
|
// call WaitAny for all handles
|
|
// After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not
|
|
// If the combined token is cancelled, this mean either the external token is cancelled then throw OCE
|
|
// or one if the collection is AddingCompleted then throw AE
|
|
CancellationToken[] collatedCancellationTokens;
|
|
List<WaitHandle> handles = GetHandles(collections, externalCancellationToken, true, out collatedCancellationTokens);
|
|
|
|
//Loop until one of these conditions is met:
|
|
// 1- The operation is succeeded
|
|
// 2- The timeout expired for try* versions
|
|
// 3- The external token is cancelled, throw
|
|
// 4- There is at least one collection marked as adding completed then throw
|
|
while (millisecondsTimeout == Timeout.Infinite || timeout >= 0)
|
|
{
|
|
index = -1;
|
|
|
|
using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens))
|
|
{
|
|
handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list
|
|
|
|
//Wait for any collection to become available.
|
|
index = WaitHandle.WaitAny(handles.ToArray(), timeout, false);
|
|
|
|
handles.RemoveAt(handles.Count - 1); //remove the linked token
|
|
|
|
if (linkedTokenSource.IsCancellationRequested)
|
|
{
|
|
if (externalCancellationToken.IsCancellationRequested) //case#3
|
|
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalCancellationToken);
|
|
else //case#4
|
|
throw new ArgumentException(SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
|
|
}
|
|
}
|
|
|
|
Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count));
|
|
|
|
if (index == WaitHandle.WaitTimeout) //case#2
|
|
return OPERATION_FAILED;
|
|
|
|
//If the timeout period was not exhausted and the appropriate operation succeeded.
|
|
if (collections[index].TryAdd(item)) //case#1
|
|
return index;
|
|
|
|
// Update the timeout
|
|
if (millisecondsTimeout != Timeout.Infinite)
|
|
timeout = UpdateTimeOut(startTime, millisecondsTimeout);
|
|
}
|
|
|
|
// case #2
|
|
return OPERATION_FAILED;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Fast path for TryAddToAny to find a non bounded collection and add the items in it
|
|
/// </summary>
|
|
/// <param name="collections">The collections list</param>
|
|
/// <param name="item">The item to be added</param>
|
|
/// <returns>The index which the item has been added, -1 if failed</returns>
|
|
private static int TryAddToAnyFast(BlockingCollection<T>[] collections, T item)
|
|
{
|
|
for (int i = 0; i < collections.Length; i++)
|
|
{
|
|
if (collections[i].m_freeNodes == null)
|
|
{
|
|
#if DEBUG
|
|
bool result =
|
|
#endif
|
|
collections[i].TryAdd(item);
|
|
#if DEBUG
|
|
Debug.Assert(result);
|
|
#endif
|
|
return i;
|
|
}
|
|
}
|
|
return -1;
|
|
}
|
|
/// <summary>
|
|
/// Local static method, used by TryAddTakeAny to get the wait handles for the collection, with exclude option to exclude the Compeleted collections
|
|
/// </summary>
|
|
/// <param name="collections">The blocking collections</param>
|
|
/// <param name="externalCancellationToken">The original CancellationToken</param>
|
|
/// <param name="isAddOperation">True if Add or TryAdd, false if Take or TryTake</param>
|
|
/// <param name="cancellationTokens">Complete list of cancellationTokens to observe</param>
|
|
/// <returns>The collections wait handles</returns>
|
|
private static List<WaitHandle> GetHandles(BlockingCollection<T>[] collections, CancellationToken externalCancellationToken, bool isAddOperation, out CancellationToken[] cancellationTokens)
|
|
{
|
|
|
|
Debug.Assert(collections != null);
|
|
List<WaitHandle> handlesList = new List<WaitHandle>(collections.Length + 1); // + 1 for the external token handle to be added
|
|
List<CancellationToken> tokensList = new List<CancellationToken>(collections.Length + 1); // + 1 for the external token
|
|
tokensList.Add(externalCancellationToken);
|
|
|
|
//Read the appropriate WaitHandle based on the operation mode.
|
|
if (isAddOperation)
|
|
{
|
|
|
|
for (int i = 0; i < collections.Length; i++)
|
|
{
|
|
if (collections[i].m_freeNodes != null)
|
|
{
|
|
handlesList.Add(collections[i].m_freeNodes.AvailableWaitHandle);
|
|
tokensList.Add(collections[i].m_ProducersCancellationTokenSource.Token);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
for (int i = 0; i < collections.Length; i++)
|
|
{
|
|
if (collections[i].IsCompleted) //exclude Completed collections if it is take operation
|
|
continue;
|
|
|
|
handlesList.Add(collections[i].m_occupiedNodes.AvailableWaitHandle);
|
|
tokensList.Add(collections[i].m_ConsumersCancellationTokenSource.Token);
|
|
}
|
|
}
|
|
|
|
cancellationTokens = tokensList.ToArray();
|
|
return handlesList;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Helper function to measure and update the wait time
|
|
/// </summary>
|
|
/// <param name="startTime"> The first time (in milliseconds) observed when the wait started</param>
|
|
/// <param name="originalWaitMillisecondsTimeout">The orginal wait timeoutout in milliseconds</param>
|
|
/// <returns>The new wait time in milliseconds, -1 if the time expired</returns>
|
|
private static int UpdateTimeOut(uint startTime, int originalWaitMillisecondsTimeout)
|
|
{
|
|
if (originalWaitMillisecondsTimeout == 0)
|
|
{
|
|
return 0;
|
|
}
|
|
// The function must be called in case the time out is not infinite
|
|
Debug.Assert(originalWaitMillisecondsTimeout != Timeout.Infinite);
|
|
|
|
uint elapsedMilliseconds = (uint)Environment.TickCount - startTime;
|
|
|
|
// Check the elapsed milliseconds is greater than max int because this property is uint
|
|
if (elapsedMilliseconds > int.MaxValue)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
// Subtract the elapsed time from the current wait time
|
|
int currentWaitTimeout = originalWaitMillisecondsTimeout - (int)elapsedMilliseconds; ;
|
|
if (currentWaitTimeout <= 0)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
return currentWaitTimeout;
|
|
}
|
|
/// <summary>
|
|
/// Takes an item from any one of the specified
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
|
|
/// </summary>
|
|
/// <param name="collections">The array of collections.</param>
|
|
/// <param name="item">The item removed from one of the collections.</param>
|
|
/// <returns>The index of the collection in the <paramref name="collections"/> array from which
|
|
/// the item was removed, or -1 if an item could not be removed.</returns>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
|
|
/// a 0-length array or contains a null element.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">At least one of the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
|
|
/// outside of its <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
|
|
/// 62 for STA and 63 for MTA.</exception>
|
|
/// <remarks>A call to TakeFromAny may block until an item is available to be removed.</remarks>
|
|
public static int TakeFromAny(BlockingCollection<T>[] collections, out T item)
|
|
{
|
|
return TakeFromAny(collections, out item, CancellationToken.None);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Takes an item from any one of the specified
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
|
|
/// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
|
|
/// canceled.
|
|
/// </summary>
|
|
/// <param name="collections">The array of collections.</param>
|
|
/// <param name="item">The item removed from one of the collections.</param>
|
|
/// <param name="cancellationToken">A cancellation token to observe.</param>
|
|
/// <returns>The index of the collection in the <paramref name="collections"/> array from which
|
|
/// the item was removed, or -1 if an item could not be removed.</returns>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
|
|
/// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
|
|
/// a 0-length array or contains a null element.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">At least one of the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
|
|
/// outside of its <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
|
|
/// 62 for STA and 63 for MTA.</exception>
|
|
/// <remarks>A call to TakeFromAny may block until an item is available to be removed.</remarks>
|
|
public static int TakeFromAny(BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken)
|
|
{
|
|
int returnValue = TryTakeFromAnyCore(collections, out item, Timeout.Infinite, true, cancellationToken);
|
|
Debug.Assert((returnValue >= 0 && returnValue < collections.Length)
|
|
, "TryTakeFromAny() was expected to return an index within the bounds of the collections array.");
|
|
return returnValue;
|
|
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to remove an item from any one of the specified
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
|
|
/// </summary>
|
|
/// <param name="collections">The array of collections.</param>
|
|
/// <param name="item">The item removed from one of the collections.</param>
|
|
/// <returns>The index of the collection in the <paramref name="collections"/> array from which
|
|
/// the item was removed, or -1 if an item could not be removed.</returns>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
|
|
/// a 0-length array or contains a null element.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">At least one of the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
|
|
/// outside of its <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
|
|
/// 62 for STA and 63 for MTA.</exception>
|
|
/// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
|
|
public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item)
|
|
{
|
|
return TryTakeFromAny(collections, out item, 0);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to remove an item from any one of the specified
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
|
|
/// </summary>
|
|
/// <param name="collections">The array of collections.</param>
|
|
/// <param name="item">The item removed from one of the collections.</param>
|
|
/// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds
|
|
/// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely.
|
|
/// </param>
|
|
/// <returns>The index of the collection in the <paramref name="collections"/> array from which
|
|
/// the item was removed, or -1 if an item could not be removed.</returns>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
|
|
/// a 0-length array or contains a null element.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">At least one of the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number
|
|
/// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than
|
|
/// <see cref="System.Int32.MaxValue"/>.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
|
|
/// outside of its <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
|
|
/// 62 for STA and 63 for MTA.</exception>
|
|
/// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
|
|
public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, TimeSpan timeout)
|
|
{
|
|
ValidateTimeout(timeout);
|
|
return TryTakeFromAnyCore(collections, out item, (int)timeout.TotalMilliseconds, false, CancellationToken.None);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to remove an item from any one of the specified
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
|
|
/// </summary>
|
|
/// <param name="collections">The array of collections.</param>
|
|
/// <param name="item">The item removed from one of the collections.</param>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
|
|
/// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
|
|
/// <returns>The index of the collection in the <paramref name="collections"/> array from which
|
|
/// the item was removed, or -1 if an item could not be removed.</returns>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
|
|
/// a 0-length array or contains a null element.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">At least one of the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
|
|
/// negative number other than -1, which represents an infinite time-out.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
|
|
/// outside of its <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
|
|
/// 62 for STA and 63 for MTA.</exception>
|
|
/// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
|
|
public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
|
|
{
|
|
ValidateMillisecondsTimeout(millisecondsTimeout);
|
|
return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, CancellationToken.None);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attempts to remove an item from any one of the specified
|
|
/// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances.
|
|
/// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
|
|
/// canceled.
|
|
/// </summary>
|
|
/// <param name="collections">The array of collections.</param>
|
|
/// <param name="item">The item removed from one of the collections.</param>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
|
|
/// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param>
|
|
/// <param name="cancellationToken">A cancellation token to observe.</param>
|
|
/// <returns>The index of the collection in the <paramref name="collections"/> array from which
|
|
/// the item was removed, or -1 if an item could not be removed.</returns>
|
|
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is
|
|
/// a 0-length array or contains a null element.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">At least one of the <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
|
|
/// negative number other than -1, which represents an infinite time-out.</exception>
|
|
/// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified
|
|
/// outside of its <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception>
|
|
/// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of
|
|
/// 62 for STA and 63 for MTA.</exception>
|
|
/// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks>
|
|
public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, CancellationToken cancellationToken)
|
|
{
|
|
ValidateMillisecondsTimeout(millisecondsTimeout);
|
|
return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, cancellationToken);
|
|
}
|
|
|
|
/// <summary>Takes an item from anyone of the specified collections.
|
|
/// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
|
|
/// canceled.
|
|
/// </summary>
|
|
/// <param name="collections">The collections from which the item can be removed.</param>
|
|
/// <param name="item">The item removed and returned to the caller.</param>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the
|
|
/// operation, or -1 to wait indefinitely.</param>
|
|
/// <param name="isTakeOperation">True if Take, false if TryTake.</param>
|
|
/// <param name="externalCancellationToken">A cancellation token to observe.</param>
|
|
/// <returns>The index into collections for the collection which accepted the
|
|
/// removal of the item; -1 if the item could not be removed.</returns>
|
|
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
|
|
/// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
|
|
/// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a
|
|
/// null element. Also, if atleast one of the collections has been marked complete for adds.</exception>
|
|
/// <exception cref="System.ObjectDisposedException">If atleast one of the collections has been disposed.</exception>
|
|
private static int TryTakeFromAnyCore(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)
|
|
{
|
|
|
|
ValidateCollectionsArray(collections, false);
|
|
|
|
//try the fast path first
|
|
for (int i = 0; i < collections.Length; i++)
|
|
{
|
|
// Check if the collection is not completed, and potentially has at least one element by checking the semaphore count
|
|
if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item))
|
|
return i;
|
|
}
|
|
|
|
//Fast path failed, try the slow path
|
|
return TryTakeFromAnyCoreSlow(collections, out item, millisecondsTimeout, isTakeOperation, externalCancellationToken);
|
|
}
|
|
|
|
|
|
/// <summary>Takes an item from anyone of the specified collections.
|
|
/// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is
|
|
/// canceled.
|
|
/// </summary>
|
|
/// <param name="collections">The collections copy from which the item can be removed.</param>
|
|
/// <param name="item">The item removed and returned to the caller.</param>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the
|
|
/// operation, or -1 to wait indefinitely.</param>
|
|
/// <param name="isTakeOperation">True if Take, false if TryTake.</param>
|
|
/// <param name="externalCancellationToken">A cancellation token to observe.</param>
|
|
/// <returns>The index into collections for the collection which accepted the
|
|
/// removal of the item; -1 if the item could not be removed.</returns>
|
|
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
|
|
/// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
|
|
/// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a
|
|
/// null element. Also, if atleast one of the collections has been marked complete for adds.</exception>
|
|
/// <exception cref="System.ObjectDisposedException">If atleast one of the collections has been disposed.</exception>
|
|
private static int TryTakeFromAnyCoreSlow(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)
|
|
{
|
|
|
|
const int OPERATION_FAILED = -1;
|
|
|
|
// Copy the wait time to another local variable to update it
|
|
int timeout = millisecondsTimeout;
|
|
|
|
uint startTime = 0;
|
|
if (millisecondsTimeout != Timeout.Infinite)
|
|
{
|
|
startTime = (uint)Environment.TickCount;
|
|
}
|
|
|
|
|
|
//Loop until one of these conditions is met:
|
|
// 1- The operation is succeeded
|
|
// 2- The timeout expired for try* versions
|
|
// 3- The external token is cancelled, throw
|
|
// 4- The operation is TryTake and all collections are marked as completed, return false
|
|
// 5- The operation is Take and all collection are marked as completed, throw
|
|
while (millisecondsTimeout == Timeout.Infinite || timeout >= 0)
|
|
{
|
|
// Get wait handles and the tokens for all collections,
|
|
// and construct a single combined token from all the tokens,
|
|
// add the combined token handle to the handles list
|
|
// call WaitAny for all handles
|
|
// After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not
|
|
// If the combined token is cancelled, this mean either the external token is cancelled then throw OCE
|
|
// or one if the collection is Completed then exclude it and retry
|
|
CancellationToken[] collatedCancellationTokens;
|
|
List<WaitHandle> handles = GetHandles(collections, externalCancellationToken, false, out collatedCancellationTokens);
|
|
|
|
if (handles.Count == 0 && isTakeOperation) //case#5
|
|
throw new ArgumentException(SR.GetString(SR.BlockingCollection_CantTakeAnyWhenAllDone), "collections");
|
|
|
|
else if (handles.Count == 0) //case#4
|
|
break;
|
|
|
|
|
|
//Wait for any collection to become available.
|
|
using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens))
|
|
{
|
|
handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list
|
|
int index = WaitHandle.WaitAny(handles.ToArray(), timeout, false);
|
|
|
|
if (linkedTokenSource.IsCancellationRequested && externalCancellationToken.IsCancellationRequested)//case#3
|
|
throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalCancellationToken);
|
|
|
|
|
|
else if (!linkedTokenSource.IsCancellationRequested)// if no eiter internal or external cancellation trquested
|
|
{
|
|
Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count));
|
|
if (index == WaitHandle.WaitTimeout) //case#2
|
|
break;
|
|
|
|
// adjust the index in case one or more handles removed because they are completed
|
|
if (collections.Length != handles.Count - 1) // -1 because of the combined token handle
|
|
{
|
|
for (int i = 0; i < collections.Length; i++)
|
|
{
|
|
if (collections[i].m_occupiedNodes.AvailableWaitHandle == handles[index])
|
|
{
|
|
index = i;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (collections[index].TryTake(out item)) //case#1
|
|
return index;
|
|
}
|
|
}
|
|
|
|
// Update the timeout
|
|
if (millisecondsTimeout != Timeout.Infinite)
|
|
timeout = UpdateTimeOut(startTime, millisecondsTimeout);
|
|
}
|
|
|
|
item = default(T); //case#2
|
|
return OPERATION_FAILED;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Marks the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances
|
|
/// as not accepting any more additions.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// After a collection has been marked as complete for adding, adding to the collection is not permitted
|
|
/// and attempts to remove from the collection will not wait when the collection is empty.
|
|
/// </remarks>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
public void CompleteAdding()
|
|
{
|
|
CheckDisposed();
|
|
|
|
if (IsAddingCompleted)
|
|
return;
|
|
|
|
SpinWait spinner = new SpinWait();
|
|
while (true)
|
|
{
|
|
int observedAdders = m_currentAdders;
|
|
if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0)
|
|
{
|
|
spinner.Reset();
|
|
// If there is another COmpleteAdding in progress waiting the current adders, then spin until it finishes
|
|
while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
|
|
return;
|
|
}
|
|
|
|
if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders | COMPLETE_ADDING_ON_MASK, observedAdders) == observedAdders)
|
|
{
|
|
spinner.Reset();
|
|
while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce();
|
|
|
|
if (Count == 0)
|
|
{
|
|
CancelWaitingConsumers();
|
|
}
|
|
|
|
// We should always wake waiting producers, and have them throw exceptions as
|
|
// Add&CompleteAdding should not be used concurrently.
|
|
CancelWaitingProducers();
|
|
return;
|
|
|
|
}
|
|
spinner.SpinOnce();
|
|
}
|
|
}
|
|
|
|
/// <summary>Cancels the semaphores.</summary>
|
|
private void CancelWaitingConsumers()
|
|
{
|
|
m_ConsumersCancellationTokenSource.Cancel();
|
|
}
|
|
|
|
private void CancelWaitingProducers()
|
|
{
|
|
m_ProducersCancellationTokenSource.Cancel();
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
/// Releases resources used by the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.
|
|
/// </summary>
|
|
public void Dispose()
|
|
{
|
|
Dispose(true);
|
|
GC.SuppressFinalize(this);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Releases resources used by the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.
|
|
/// </summary>
|
|
/// <param name="disposing">Whether being disposed explicitly (true) or due to a finalizer (false).</param>
|
|
protected virtual void Dispose(bool disposing)
|
|
{
|
|
if (!m_isDisposed)
|
|
{
|
|
if (m_freeNodes != null)
|
|
{
|
|
m_freeNodes.Dispose();
|
|
}
|
|
|
|
m_occupiedNodes.Dispose();
|
|
|
|
m_isDisposed = true;
|
|
}
|
|
}
|
|
|
|
/// <summary>Copies the items from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance into a new array.</summary>
|
|
/// <returns>An array containing copies of the elements of the collection.</returns>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <remarks>
|
|
/// The copied elements are not removed from the collection.
|
|
/// </remarks>
|
|
public T[] ToArray()
|
|
{
|
|
CheckDisposed();
|
|
return m_collection.ToArray();
|
|
}
|
|
|
|
/// <summary>Copies all of the items in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance
|
|
/// to a compatible one-dimensional array, starting at the specified index of the target array.
|
|
/// </summary>
|
|
/// <param name="array">The one-dimensional array that is the destination of the elements copied from
|
|
/// the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance. The array must have zero-based indexing.</param>
|
|
/// <param name="index">The zero-based index in <paramref name="array"/> at which copying begins.</param>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="array"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="index"/> argument is less than zero.</exception>
|
|
/// <exception cref="System.ArgumentException">The <paramref name="index"/> argument is equal to or greater
|
|
/// than the length of the <paramref name="array"/>.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
public void CopyTo(T[] array, int index)
|
|
{
|
|
((ICollection)this).CopyTo(array, index);
|
|
}
|
|
|
|
/// <summary>Copies all of the items in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance
|
|
/// to a compatible one-dimensional array, starting at the specified index of the target array.
|
|
/// </summary>
|
|
/// <param name="array">The one-dimensional array that is the destination of the elements copied from
|
|
/// the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance. The array must have zero-based indexing.</param>
|
|
/// <param name="index">The zero-based index in <paramref name="array"/> at which copying begins.</param>
|
|
/// <exception cref="T:System.ArgumentNullException">The <paramref name="array"/> argument is
|
|
/// null.</exception>
|
|
/// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="index"/> argument is less than zero.</exception>
|
|
/// <exception cref="System.ArgumentException">The <paramref name="index"/> argument is equal to or greater
|
|
/// than the length of the <paramref name="array"/>, the array is multidimensional, or the type parameter for the collection
|
|
/// cannot be cast automatically to the type of the destination array.</exception>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
void ICollection.CopyTo(Array array, int index)
|
|
{
|
|
CheckDisposed();
|
|
|
|
//We don't call m_collection.CopyTo() directly because we rely on Array.Copy method to customize
|
|
//all array exceptions.
|
|
T[] collectionSnapShot = m_collection.ToArray();
|
|
|
|
try
|
|
{
|
|
Array.Copy(collectionSnapShot, 0, array, index, collectionSnapShot.Length);
|
|
}
|
|
catch (ArgumentNullException)
|
|
{
|
|
throw new ArgumentNullException("array");
|
|
}
|
|
catch (ArgumentOutOfRangeException)
|
|
{
|
|
throw new ArgumentOutOfRangeException("index", index, SR.GetString(SR.BlockingCollection_CopyTo_NonNegative));
|
|
}
|
|
catch (ArgumentException)
|
|
{
|
|
throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_TooManyElems), "index");
|
|
}
|
|
catch (RankException)
|
|
{
|
|
throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_MultiDim), "array");
|
|
}
|
|
catch (InvalidCastException)
|
|
{
|
|
throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array");
|
|
}
|
|
catch (ArrayTypeMismatchException)
|
|
{
|
|
throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array");
|
|
}
|
|
}
|
|
|
|
/// <summary>Provides a consuming <see cref="T:System.Collections.Generics.IEnumerable{T}"/> for items in the collection.</summary>
|
|
/// <returns>An <see cref="T:System.Collections.Generics.IEnumerable{T}"/> that removes and returns items from the collection.</returns>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
public IEnumerable<T> GetConsumingEnumerable()
|
|
{
|
|
return GetConsumingEnumerable(CancellationToken.None);
|
|
}
|
|
|
|
/// <summary>Provides a consuming <see cref="T:System.Collections.Generics.IEnumerable{T}"/> for items in the collection.
|
|
/// Calling MoveNext on the returned enumerable will block if there is no data available, or will
|
|
/// throw an <see cref="System.OperationCanceledException"/> if the <see cref="CancellationToken"/> is canceled.
|
|
/// </summary>
|
|
/// <param name="cancellationToken">A cancellation token to observe.</param>
|
|
/// <returns>An <see cref="T:System.Collections.Generics.IEnumerable{T}"/> that removes and returns items from the collection.</returns>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception>
|
|
public IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken)
|
|
{
|
|
CancellationTokenSource linkedTokenSource = null;
|
|
try
|
|
{
|
|
linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token);
|
|
while (!IsCompleted)
|
|
{
|
|
T item;
|
|
if (TryTakeWithNoTimeValidation(out item, Timeout.Infinite, cancellationToken, linkedTokenSource))
|
|
{
|
|
yield return item;
|
|
}
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
if (linkedTokenSource != null)
|
|
{
|
|
linkedTokenSource.Dispose();
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>Provides an <see cref="T:System.Collections.Generics.IEnumerator{T}"/> for items in the collection.</summary>
|
|
/// <returns>An <see cref="T:System.Collections.Generics.IEnumerator{T}"/> for the items in the collection.</returns>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
IEnumerator<T> IEnumerable<T>.GetEnumerator()
|
|
{
|
|
CheckDisposed();
|
|
return m_collection.GetEnumerator();
|
|
|
|
}
|
|
|
|
/// <summary>Provides an <see cref="T:System.Collections.IEnumerator"/> for items in the collection.</summary>
|
|
/// <returns>An <see cref="T:System.Collections.IEnumerator"/> for the items in the collection.</returns>
|
|
/// <exception cref="T:System.ObjectDisposedException">The <see
|
|
/// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception>
|
|
IEnumerator IEnumerable.GetEnumerator()
|
|
{
|
|
return ((IEnumerable<T>)this).GetEnumerator();
|
|
}
|
|
|
|
/// <summary>Centralizes the logic for validating the BlockingCollections array passed to TryAddToAny()
|
|
/// and TryTakeFromAny().</summary>
|
|
/// <param name="collections">The collections to/from which an item should be added/removed.</param>
|
|
/// <param name="operationMode">Indicates whether this method is called to Add or Take.</param>
|
|
/// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception>
|
|
/// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a
|
|
/// null element. Also, if at least one of the collections has been marked complete for adds.</exception>
|
|
/// <exception cref="System.ObjectDisposedException">If at least one of the collections has been disposed.</exception>
|
|
private static void ValidateCollectionsArray(BlockingCollection<T>[] collections, bool isAddOperation)
|
|
{
|
|
if (collections == null)
|
|
{
|
|
throw new ArgumentNullException("collections");
|
|
}
|
|
else if (collections.Length < 1)
|
|
{
|
|
throw new ArgumentException(
|
|
SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_ZeroSize), "collections");
|
|
}
|
|
else if ((!IsSTAThread && collections.Length > 63) || (IsSTAThread && collections.Length > 62))
|
|
//The number of WaitHandles must be <= 64 for MTA, and <=63 for STA, and we reserve one for CancellationToken
|
|
{
|
|
throw new ArgumentOutOfRangeException(
|
|
"collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_LargeSize));
|
|
}
|
|
|
|
for (int i = 0; i < collections.Length; ++i)
|
|
{
|
|
if (collections[i] == null)
|
|
{
|
|
throw new ArgumentException(
|
|
SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_NullElems), "collections");
|
|
}
|
|
|
|
if (collections[i].m_isDisposed)
|
|
throw new ObjectDisposedException(
|
|
"collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_DispElems));
|
|
|
|
if (isAddOperation && collections[i].IsAddingCompleted)
|
|
{
|
|
throw new ArgumentException(
|
|
SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
|
|
}
|
|
}
|
|
}
|
|
|
|
private static bool IsSTAThread
|
|
{
|
|
get
|
|
{
|
|
#if !SILVERLIGHT
|
|
return Thread.CurrentThread.GetApartmentState() == ApartmentState.STA;
|
|
#else
|
|
return false;
|
|
#endif
|
|
}
|
|
}
|
|
|
|
// ---------
|
|
// Private Helpers.
|
|
/// <summary>Centeralizes the logic of validating the timeout input argument.</summary>
|
|
/// <param name="timeout">The TimeSpan to wait for to successfully complete an operation on the collection.</param>
|
|
/// <exception cref="System.ArgumentOutOfRangeException">If the number of millseconds represented by the timeout
|
|
/// TimeSpan is less than 0 or is larger than Int32.MaxValue and not Timeout.Infinite</exception>
|
|
private static void ValidateTimeout(TimeSpan timeout)
|
|
{
|
|
long totalMilliseconds = (long)timeout.TotalMilliseconds;
|
|
if ((totalMilliseconds < 0 || totalMilliseconds > Int32.MaxValue) && (totalMilliseconds != Timeout.Infinite))
|
|
{
|
|
throw new ArgumentOutOfRangeException("timeout", timeout,
|
|
String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue));
|
|
}
|
|
}
|
|
|
|
/// <summary>Centralizes the logic of validating the millisecondsTimeout input argument.</summary>
|
|
/// <param name="millisecondsTimeout">The number of milliseconds to wait for to successfully complete an
|
|
/// operation on the collection.</param>
|
|
/// <exception cref="System.ArgumentOutOfRangeException">If the number of millseconds is less than 0 and not
|
|
/// equal to Timeout.Infinite.</exception>
|
|
private static void ValidateMillisecondsTimeout(int millisecondsTimeout)
|
|
{
|
|
if ((millisecondsTimeout < 0) && (millisecondsTimeout != Timeout.Infinite))
|
|
{
|
|
throw new ArgumentOutOfRangeException("millisecondsTimeout", millisecondsTimeout,
|
|
String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue));
|
|
}
|
|
}
|
|
|
|
/// <summary>Throws a System.ObjectDisposedException if the collection was disposed</summary>
|
|
/// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception>
|
|
private void CheckDisposed()
|
|
{
|
|
if (m_isDisposed)
|
|
{
|
|
throw new ObjectDisposedException("BlockingCollection", SR.GetString(SR.BlockingCollection_Disposed));
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// <summary>A debugger view of the blocking collection that makes it simple to browse the
|
|
/// collection's contents at a point in time.</summary>
|
|
/// <typeparam name="T">The type of element that the BlockingCollection will hold.</typeparam>
|
|
internal sealed class SystemThreadingCollections_BlockingCollectionDebugView<T>
|
|
{
|
|
private BlockingCollection<T> m_blockingCollection; // The collection being viewed.
|
|
|
|
/// <summary>Constructs a new debugger view object for the provided blocking collection object.</summary>
|
|
/// <param name="collection">A blocking collection to browse in the debugger.</param>
|
|
public SystemThreadingCollections_BlockingCollectionDebugView(BlockingCollection<T> collection)
|
|
{
|
|
if (collection == null)
|
|
{
|
|
throw new ArgumentNullException("collection");
|
|
}
|
|
|
|
m_blockingCollection = collection;
|
|
}
|
|
|
|
/// <summary>Returns a snapshot of the underlying collection's elements.</summary>
|
|
[DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
|
|
public T[] Items
|
|
{
|
|
get
|
|
{
|
|
return m_blockingCollection.ToArray();
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
#pragma warning restore 0420
|