// ==++== // // Copyright (c) Microsoft Corporation. All rights reserved. // // ==--== // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // BlockingCollection.cs // // Microsoft // // A class that implements the bounding and blocking functionality while abstracting away // the underlying storage mechanism. This file also contains BlockingCollection's // associated debugger view type. // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- #pragma warning disable 0420 using System; using System.Collections.Generic; using System.Collections; using System.Diagnostics; using System.Globalization; using System.Security.Permissions; using System.Runtime.InteropServices; using System.Threading; namespace System.Collections.Concurrent { /// /// Provides blocking and bounding capabilities for thread-safe collections that /// implement . /// /// /// represents a collection /// that allows for thread-safe adding and removing of data. /// is used as a wrapper /// for an instance, allowing /// removal attempts from the collection to block until data is available to be removed. Similarly, /// a can be created to enforce /// an upper-bound on the number of data elements allowed in the /// ; addition attempts to the /// collection may then block until space is available to store the added items. In this manner, /// is similar to a traditional /// blocking queue data structure, except that the underlying data storage mechanism is abstracted /// away as an . /// /// Specifies the type of elements in the collection. [ComVisible(false)] #if !FEATURE_NETCORE #pragma warning disable 0618 [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)] #pragma warning restore 0618 #endif [DebuggerTypeProxy(typeof(SystemThreadingCollections_BlockingCollectionDebugView<>))] [DebuggerDisplay("Count = {Count}, Type = {m_collection}")] public class BlockingCollection : IEnumerable, ICollection, IDisposable, IReadOnlyCollection { private IProducerConsumerCollection m_collection; private int m_boundedCapacity; private const int NON_BOUNDED = -1; private SemaphoreSlim m_freeNodes; private SemaphoreSlim m_occupiedNodes; private bool m_isDisposed; private CancellationTokenSource m_ConsumersCancellationTokenSource; private CancellationTokenSource m_ProducersCancellationTokenSource; private volatile int m_currentAdders; private const int COMPLETE_ADDING_ON_MASK = unchecked((int)0x80000000); #region Properties /// Gets the bounded capacity of this instance. /// The bounded capacity of this collection, or int.MaxValue if no bound was supplied. /// The has been disposed. public int BoundedCapacity { get { CheckDisposed(); return m_boundedCapacity; } } /// Gets whether this has been marked as complete for adding. /// Whether this collection has been marked as complete for adding. /// The has been disposed. public bool IsAddingCompleted { get { CheckDisposed(); return (m_currentAdders == COMPLETE_ADDING_ON_MASK); } } /// Gets whether this has been marked as complete for adding and is empty. /// Whether this collection has been marked as complete for adding and is empty. /// The has been disposed. public bool IsCompleted { get { CheckDisposed(); return (IsAddingCompleted && (m_occupiedNodes.CurrentCount == 0)); } } /// Gets the number of items contained in the . /// The number of items contained in the . /// The has been disposed. public int Count { get { CheckDisposed(); return m_occupiedNodes.CurrentCount; } } /// Gets a value indicating whether access to the is synchronized. /// The has been disposed. bool ICollection.IsSynchronized { get { CheckDisposed(); return false; } } /// /// Gets an object that can be used to synchronize access to the . This property is not supported. /// /// The SyncRoot property is not supported. object ICollection.SyncRoot { get { throw new NotSupportedException(SR.GetString(SR.ConcurrentCollection_SyncRoot_NotSupported)); } } #endregion /// Initializes a new instance of the /// /// class without an upper-bound. /// /// /// The default underlying collection is a ConcurrentQueue<T>. /// public BlockingCollection() : this(new ConcurrentQueue()) { } /// Initializes a new instance of the /// class with the specified upper-bound. /// /// The bounded size of the collection. /// The is /// not a positive value. /// /// The default underlying collection is a ConcurrentQueue<T>. /// public BlockingCollection(int boundedCapacity) : this(new ConcurrentQueue(), boundedCapacity) { } /// Initializes a new instance of the /// class with the specified upper-bound and using the provided /// as its underlying data store. /// The collection to use as the underlying data store. /// The bounded size of the collection. /// The argument is /// null. /// The is not a positive value. /// The supplied contains more values /// than is permitted by . public BlockingCollection(IProducerConsumerCollection collection, int boundedCapacity) { if (boundedCapacity < 1) { throw new ArgumentOutOfRangeException( "boundedCapacity", boundedCapacity, SR.GetString(SR.BlockingCollection_ctor_BoundedCapacityRange)); } if (collection == null) { throw new ArgumentNullException("collection"); } int count = collection.Count; if (count > boundedCapacity) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_ctor_CountMoreThanCapacity)); } Initialize(collection, boundedCapacity, count); } /// Initializes a new instance of the /// class without an upper-bound and using the provided /// as its underlying data store. /// The collection to use as the underlying data store. /// The argument is /// null. public BlockingCollection(IProducerConsumerCollection collection) { if (collection == null) { throw new ArgumentNullException("collection"); } Initialize(collection, NON_BOUNDED, collection.Count); } /// Initializes the BlockingCollection instance. /// The collection to use as the underlying data store. /// The bounded size of the collection. /// The number of items currently in the underlying collection. private void Initialize(IProducerConsumerCollection collection, int boundedCapacity, int collectionCount) { Debug.Assert(boundedCapacity > 0 || boundedCapacity == NON_BOUNDED); m_collection = collection; m_boundedCapacity = boundedCapacity; ; m_isDisposed = false; m_ConsumersCancellationTokenSource = new CancellationTokenSource(); m_ProducersCancellationTokenSource = new CancellationTokenSource(); if (boundedCapacity == NON_BOUNDED) { m_freeNodes = null; } else { Debug.Assert(boundedCapacity > 0); m_freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount); } m_occupiedNodes = new SemaphoreSlim(collectionCount); } /// /// Adds the item to the . /// /// The item to be added to the collection. The value can be a null reference. /// The has been marked /// as complete with regards to additions. /// The has been disposed. /// The underlying collection didn't accept the item. /// /// If a bounded capacity was specified when this instance of /// was initialized, /// a call to Add may block until space is available to store the provided item. /// public void Add(T item) { #if DEBUG bool tryAddReturnValue = #endif TryAddWithNoTimeValidation(item, Timeout.Infinite, new CancellationToken()); #if DEBUG Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true."); #endif } /// /// Adds the item to the . /// A is thrown if the is /// canceled. /// /// The item to be added to the collection. The value can be a null reference. /// A cancellation token to observe. /// If the is canceled. /// The has been marked /// as complete with regards to additions. /// The has been disposed. /// The underlying collection didn't accept the item. /// /// If a bounded capacity was specified when this instance of /// was initialized, /// a call to may block until space is available to store the provided item. /// public void Add(T item, CancellationToken cancellationToken) { #if DEBUG bool tryAddReturnValue = #endif TryAddWithNoTimeValidation(item, Timeout.Infinite, cancellationToken); #if DEBUG Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true."); #endif } /// /// Attempts to add the specified item to the . /// /// The item to be added to the collection. /// true if the could be added; otherwise, false. /// The has been marked /// as complete with regards to additions. /// The has been disposed. /// The underlying collection didn't accept the item. public bool TryAdd(T item) { return TryAddWithNoTimeValidation(item, 0, new CancellationToken()); } /// /// Attempts to add the specified item to the . /// /// The item to be added to the collection. /// A that represents the number of milliseconds /// to wait, or a that represents -1 milliseconds to wait indefinitely. /// /// true if the could be added to the collection within /// the alloted time; otherwise, false. /// The has been marked /// as complete with regards to additions. /// The has been disposed. /// is a negative number /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than /// . /// The underlying collection didn't accept the item. public bool TryAdd(T item, TimeSpan timeout) { ValidateTimeout(timeout); return TryAddWithNoTimeValidation(item, (int)timeout.TotalMilliseconds, new CancellationToken()); } /// /// Attempts to add the specified item to the . /// /// The item to be added to the collection. /// The number of milliseconds to wait, or (-1) to wait indefinitely. /// true if the could be added to the collection within /// the alloted time; otherwise, false. /// The has been marked /// as complete with regards to additions. /// The has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. /// The underlying collection didn't accept the item. public bool TryAdd(T item, int millisecondsTimeout) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryAddWithNoTimeValidation(item, millisecondsTimeout, new CancellationToken()); } /// /// Attempts to add the specified item to the . /// A is thrown if the is /// canceled. /// /// The item to be added to the collection. /// The number of milliseconds to wait, or (-1) to wait indefinitely. /// A cancellation token to observe. /// true if the could be added to the collection within /// the alloted time; otherwise, false. /// If the is canceled. /// The has been marked /// as complete with regards to additions. /// The has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. /// The underlying collection didn't accept the item. public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken); } /// Adds an item into the underlying data store using its IProducerConsumerCollection<T>.Add /// method. If a bounded capacity was specified and the collection was full, /// this method will wait for, at most, the timeout period trying to add the item. /// If the timeout period was exhaused before successfully adding the item this method will /// return false. /// The item to be added to the collection. /// The number of milliseconds to wait for the collection to accept the item, /// or Timeout.Infinite to wait indefinitely. /// A cancellation token to observe. /// False if the collection remained full till the timeout period was exhausted.True otherwise. /// If the is canceled. /// the collection has already been marked /// as complete with regards to additions. /// If the collection has been disposed. /// The underlying collection didn't accept the item. private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken) { CheckDisposed(); if (cancellationToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); if (IsAddingCompleted) { throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed)); } bool waitForSemaphoreWasSuccessful = true; if (m_freeNodes != null) { //If the m_freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding() //was called concurrently with Adding which is not supported by BlockingCollection. CancellationTokenSource linkedTokenSource = null; try { waitForSemaphoreWasSuccessful = m_freeNodes.Wait(0); if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0) { linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource( cancellationToken, m_ProducersCancellationTokenSource.Token); waitForSemaphoreWasSuccessful = m_freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token); } } catch (OperationCanceledException) { //if cancellation was via external token, throw an OCE if (cancellationToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); //if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx. //Contract.Assert(m_ProducersCancellationTokenSource.Token.IsCancellationRequested); throw new InvalidOperationException (SR.GetString(SR.BlockingCollection_Add_ConcurrentCompleteAdd)); } finally { if (linkedTokenSource != null) { linkedTokenSource.Dispose(); } } } if (waitForSemaphoreWasSuccessful) { // Update the adders count if the complete adding was not requested, otherwise // spins until all adders finish then throw IOE // The idea behind to spin untill all adders finish, is to avoid to return to the caller with IOE while there are still some adders have // not been finished yet SpinWait spinner = new SpinWait(); while (true) { int observedAdders = m_currentAdders; if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0) { spinner.Reset(); // CompleteAdding is requested, spin then throw while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed)); } if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders + 1, observedAdders) == observedAdders) { Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread excceeded the maximum limit."); break; } spinner.SpinOnce(); } // This outer try/finally to workaround of repeating the decrement adders code 3 times, because we should decrement the adders if: // 1- m_collection.TryAdd threw an exception // 2- m_collection.TryAdd succeeded // 3- m_collection.TryAdd returned false // so we put the decrement code in the finally block try { //TryAdd is guaranteed to find a place to add the element. Its return value depends //on the semantics of the underlying store. Some underlying stores will not add an already //existing item and thus TryAdd returns false indicating that the size of the underlying //store did not increase. bool addingSucceeded = false; try { //The token may have been canceled before the collection had space available, so we need a check after the wait has completed. //This fixes bug #702328, case 2 of 2. cancellationToken.ThrowIfCancellationRequested(); addingSucceeded = m_collection.TryAdd(item); } catch { //TryAdd did not result in increasing the size of the underlying store and hence we need //to increment back the count of the m_freeNodes semaphore. if (m_freeNodes != null) { m_freeNodes.Release(); } throw; } if (addingSucceeded) { //After adding an element to the underlying storage, signal to the consumers //waiting on m_occupiedNodes that there is a new item added ready to be consumed. m_occupiedNodes.Release(); } else { throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Add_Failed)); } } finally { // decrement the adders count Debug.Assert((m_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0); Interlocked.Decrement(ref m_currentAdders); } } return waitForSemaphoreWasSuccessful; } /// Takes an item from the . /// The item removed from the collection. /// The is empty and has been marked /// as complete with regards to additions. /// The has been disposed. /// The underlying collection was modified /// outside of this instance. /// A call to may block until an item is available to be removed. public T Take() { T item; if (!TryTake(out item, Timeout.Infinite, CancellationToken.None)) { throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone)); } return item; } /// Takes an item from the . /// The item removed from the collection. /// If the is /// canceled or the is empty and has been marked /// as complete with regards to additions. /// The has been disposed. /// The underlying collection was modified /// outside of this instance. /// A call to may block until an item is available to be removed. public T Take(CancellationToken cancellationToken) { T item; if (!TryTake(out item, Timeout.Infinite, cancellationToken)) { throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone)); } return item; } /// /// Attempts to remove an item from the . /// /// The item removed from the collection. /// true if an item could be removed; otherwise, false. /// The has been disposed. /// The underlying collection was modified /// outside of this instance. public bool TryTake(out T item) { return TryTake(out item, 0, CancellationToken.None); } /// /// Attempts to remove an item from the . /// /// The item removed from the collection. /// A that represents the number of milliseconds /// to wait, or a that represents -1 milliseconds to wait indefinitely. /// /// true if an item could be removed from the collection within /// the alloted time; otherwise, false. /// The has been disposed. /// is a negative number /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than /// . /// The underlying collection was modified /// outside of this instance. public bool TryTake(out T item, TimeSpan timeout) { ValidateTimeout(timeout); return TryTakeWithNoTimeValidation(out item, (int)timeout.TotalMilliseconds, CancellationToken.None, null); } /// /// Attempts to remove an item from the . /// /// The item removed from the collection. /// The number of milliseconds to wait, or (-1) to wait indefinitely. /// true if an item could be removed from the collection within /// the alloted time; otherwise, false. /// The has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. /// The underlying collection was modified /// outside of this instance. public bool TryTake(out T item, int millisecondsTimeout) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, CancellationToken.None, null); } /// /// Attempts to remove an item from the . /// A is thrown if the is /// canceled. /// /// The item removed from the collection. /// The number of milliseconds to wait, or (-1) to wait indefinitely. /// A cancellation token to observe. /// true if an item could be removed from the collection within /// the alloted time; otherwise, false. /// If the is canceled. /// The has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. /// The underlying collection was modified /// outside of this instance. public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, cancellationToken, null); } /// Takes an item from the underlying data store using its IProducerConsumerCollection<T>.Take /// method. If the collection was empty, this method will wait for, at most, the timeout period (if AddingIsCompleted is false) /// trying to remove an item. If the timeout period was exhaused before successfully removing an item /// this method will return false. /// A is thrown if the is /// canceled. /// /// The item removed from the collection. /// The number of milliseconds to wait for the collection to have an item available /// for removal, or Timeout.Infinite to wait indefinitely. /// A cancellation token to observe. /// A combined cancellation token if created, it is only created by GetConsumingEnumerable to avoid creating the linked token /// multiple times. /// False if the collection remained empty till the timeout period was exhausted. True otherwise. /// If the is canceled. /// If the collection has been disposed. private bool TryTakeWithNoTimeValidation(out T item, int millisecondsTimeout, CancellationToken cancellationToken, CancellationTokenSource combinedTokenSource) { CheckDisposed(); item = default(T); if (cancellationToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); //If the collection is completed then there is no need to wait. if (IsCompleted) { return false; } bool waitForSemaphoreWasSuccessful = false; // set the combined token source to the combinedToken paramater if it is not null (came from GetConsumingEnumerable) CancellationTokenSource linkedTokenSource = combinedTokenSource; try { waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(0); if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0) { // create the linked token if it is not created yet if (combinedTokenSource == null) linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token); waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(millisecondsTimeout, linkedTokenSource.Token); } } //The collection became completed while waiting on the semaphore. catch (OperationCanceledException) { if (cancellationToken.IsCancellationRequested) throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); return false; } finally { // only dispose the combined token source if we created it here, otherwise the caller (GetConsumingEnumerable) is responsible for disposing it if (linkedTokenSource != null && combinedTokenSource == null) { linkedTokenSource.Dispose(); } } if (waitForSemaphoreWasSuccessful) { bool removeSucceeded = false; bool removeFaulted = true; try { //The token may have been canceled before an item arrived, so we need a check after the wait has completed. //This fixes bug #702328, case 1 of 2. cancellationToken.ThrowIfCancellationRequested(); //If an item was successfully removed from the underlying collection. removeSucceeded = m_collection.TryTake(out item); removeFaulted = false; if (!removeSucceeded) { // Check if the collection is empty which means that the collection was modified outside BlockingCollection throw new InvalidOperationException (SR.GetString(SR.BlockingCollection_Take_CollectionModified)); } } finally { // removeFaulted implies !removeSucceeded, but the reverse is not true. if (removeSucceeded) { if (m_freeNodes != null) { Debug.Assert(m_boundedCapacity != NON_BOUNDED); m_freeNodes.Release(); } } else if (removeFaulted) { m_occupiedNodes.Release(); } //Last remover will detect that it has actually removed the last item from the //collection and that CompleteAdding() was called previously. Thus, it will cancel the semaphores //so that any thread waiting on them wakes up. Note several threads may call CancelWaitingConsumers //but this is not a problem. if (IsCompleted) { CancelWaitingConsumers(); } } } return waitForSemaphoreWasSuccessful; } /// /// Adds the specified item to any one of the specified /// instances. /// /// The array of collections. /// The item to be added to one of the collections. /// The index of the collection in the array to which the item was added. /// The argument is /// null. /// The argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. /// At least one of the instances has been disposed. /// At least one underlying collection didn't accept the item. /// The count of is greater than the maximum size of /// 62 for STA and 63 for MTA. /// /// If a bounded capacity was specified when all of the /// instances were initialized, /// a call to AddToAny may block until space is available in one of the collections /// to store the provided item. /// public static int AddToAny(BlockingCollection[] collections, T item) { #if DEBUG int tryAddAnyReturnValue = #else return #endif TryAddToAny(collections, item, Timeout.Infinite, CancellationToken.None); #if DEBUG Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length) , "TryAddToAny() was expected to return an index within the bounds of the collections array."); return tryAddAnyReturnValue; #endif } /// /// Adds the specified item to any one of the specified /// instances. /// A is thrown if the is /// canceled. /// /// The array of collections. /// The item to be added to one of the collections. /// A cancellation token to observe. /// The index of the collection in the array to which the item was added. /// If the is canceled. /// The argument is /// null. /// The argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. /// At least one of the instances has been disposed. /// At least one underlying collection didn't accept the item. /// The count of is greater than the maximum size of /// 62 for STA and 63 for MTA. /// /// If a bounded capacity was specified when all of the /// instances were initialized, /// a call to AddToAny may block until space is available in one of the collections /// to store the provided item. /// public static int AddToAny(BlockingCollection[] collections, T item, CancellationToken cancellationToken) { #if DEBUG int tryAddAnyReturnValue = #else return #endif TryAddToAny(collections, item, Timeout.Infinite, cancellationToken); #if DEBUG Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length) , "TryAddToAny() was expected to return an index within the bounds of the collections array."); return tryAddAnyReturnValue; #endif } /// /// Attempts to add the specified item to any one of the specified /// instances. /// /// The array of collections. /// The item to be added to one of the collections. /// The index of the collection in the /// array to which the item was added, or -1 if the item could not be added. /// The argument is /// null. /// The argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. /// At least one of the instances has been disposed. /// At least one underlying collection didn't accept the item. /// The count of is greater than the maximum size of /// 62 for STA and 63 for MTA. public static int TryAddToAny(BlockingCollection[] collections, T item) { return TryAddToAny(collections, item, 0, CancellationToken.None); } /// /// Attempts to add the specified item to any one of the specified /// instances. /// /// The array of collections. /// The item to be added to one of the collections. /// A that represents the number of milliseconds /// to wait, or a that represents -1 milliseconds to wait indefinitely. /// /// The index of the collection in the /// array to which the item was added, or -1 if the item could not be added. /// The argument is /// null. /// The argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. /// At least one of the instances has been disposed. /// is a negative number /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than /// . /// At least one underlying collection didn't accept the item. /// The count of is greater than the maximum size of /// 62 for STA and 63 for MTA. public static int TryAddToAny(BlockingCollection[] collections, T item, TimeSpan timeout) { ValidateTimeout(timeout); return TryAddToAnyCore(collections, item, (int)timeout.TotalMilliseconds, CancellationToken.None); } /// /// Attempts to add the specified item to any one of the specified /// instances. /// /// The array of collections. /// The item to be added to one of the collections. /// The number of milliseconds to wait, or (-1) to wait indefinitely. /// The index of the collection in the /// array to which the item was added, or -1 if the item could not be added. /// The argument is /// null. /// The argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. /// At least one of the instances has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. /// At least one underlying collection didn't accept the item. /// The count of is greater than the maximum size of /// 62 for STA and 63 for MTA. public static int TryAddToAny(BlockingCollection[] collections, T item, int millisecondsTimeout) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryAddToAnyCore(collections, item, millisecondsTimeout, CancellationToken.None); } /// /// Attempts to add the specified item to any one of the specified /// instances. /// A is thrown if the is /// canceled. /// /// The array of collections. /// The item to be added to one of the collections. /// The number of milliseconds to wait, or (-1) to wait indefinitely. /// The index of the collection in the /// array to which the item was added, or -1 if the item could not be added. /// A cancellation token to observe. /// If the is canceled. /// The argument is /// null. /// The argument is /// a 0-length array or contains a null element, or at least one of collections has been /// marked as complete for adding. /// At least one of the instances has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. /// At least one underlying collection didn't accept the item. /// The count of is greater than the maximum size of /// 62 for STA and 63 for MTA. public static int TryAddToAny(BlockingCollection[] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryAddToAnyCore(collections, item, millisecondsTimeout, cancellationToken); } /// Adds an item to anyone of the specified collections. /// A is thrown if the is /// canceled. /// /// The collections into which the item can be added. /// The item to be added . /// The number of milliseconds to wait for a collection to accept the /// operation, or -1 to wait indefinitely. /// A cancellation token to observe. /// The index into collections for the collection which accepted the /// adding of the item; -1 if the item could not be added. /// If the is canceled. /// If the collections argument is null. /// If the collections argument is a 0-length array or contains a /// null element. Also, if atleast one of the collections has been marked complete for adds. /// If atleast one of the collections has been disposed. private static int TryAddToAnyCore(BlockingCollection[] collections, T item, int millisecondsTimeout, CancellationToken externalCancellationToken) { ValidateCollectionsArray(collections, true); const int OPERATION_FAILED = -1; // Copy the wait time to another local variable to update it int timeout = millisecondsTimeout; uint startTime = 0; if (millisecondsTimeout != Timeout.Infinite) { startTime = (uint)Environment.TickCount; } // Fast path for adding if there is at least one unbounded collection int index = TryAddToAnyFast(collections, item); if (index > -1) return index; // Get wait handles and the tokens for all collections, // and construct a single combined token from all the tokens, // add the combined token handle to the handles list // call WaitAny for all handles // After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not // If the combined token is cancelled, this mean either the external token is cancelled then throw OCE // or one if the collection is AddingCompleted then throw AE CancellationToken[] collatedCancellationTokens; List handles = GetHandles(collections, externalCancellationToken, true, out collatedCancellationTokens); //Loop until one of these conditions is met: // 1- The operation is succeeded // 2- The timeout expired for try* versions // 3- The external token is cancelled, throw // 4- There is at least one collection marked as adding completed then throw while (millisecondsTimeout == Timeout.Infinite || timeout >= 0) { index = -1; using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens)) { handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list //Wait for any collection to become available. index = WaitHandle.WaitAny(handles.ToArray(), timeout, false); handles.RemoveAt(handles.Count - 1); //remove the linked token if (linkedTokenSource.IsCancellationRequested) { if (externalCancellationToken.IsCancellationRequested) //case#3 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalCancellationToken); else //case#4 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections"); } } Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count)); if (index == WaitHandle.WaitTimeout) //case#2 return OPERATION_FAILED; //If the timeout period was not exhausted and the appropriate operation succeeded. if (collections[index].TryAdd(item)) //case#1 return index; // Update the timeout if (millisecondsTimeout != Timeout.Infinite) timeout = UpdateTimeOut(startTime, millisecondsTimeout); } // case #2 return OPERATION_FAILED; } /// /// Fast path for TryAddToAny to find a non bounded collection and add the items in it /// /// The collections list /// The item to be added /// The index which the item has been added, -1 if failed private static int TryAddToAnyFast(BlockingCollection[] collections, T item) { for (int i = 0; i < collections.Length; i++) { if (collections[i].m_freeNodes == null) { #if DEBUG bool result = #endif collections[i].TryAdd(item); #if DEBUG Debug.Assert(result); #endif return i; } } return -1; } /// /// Local static method, used by TryAddTakeAny to get the wait handles for the collection, with exclude option to exclude the Compeleted collections /// /// The blocking collections /// The original CancellationToken /// True if Add or TryAdd, false if Take or TryTake /// Complete list of cancellationTokens to observe /// The collections wait handles private static List GetHandles(BlockingCollection[] collections, CancellationToken externalCancellationToken, bool isAddOperation, out CancellationToken[] cancellationTokens) { Debug.Assert(collections != null); List handlesList = new List(collections.Length + 1); // + 1 for the external token handle to be added List tokensList = new List(collections.Length + 1); // + 1 for the external token tokensList.Add(externalCancellationToken); //Read the appropriate WaitHandle based on the operation mode. if (isAddOperation) { for (int i = 0; i < collections.Length; i++) { if (collections[i].m_freeNodes != null) { handlesList.Add(collections[i].m_freeNodes.AvailableWaitHandle); tokensList.Add(collections[i].m_ProducersCancellationTokenSource.Token); } } } else { for (int i = 0; i < collections.Length; i++) { if (collections[i].IsCompleted) //exclude Completed collections if it is take operation continue; handlesList.Add(collections[i].m_occupiedNodes.AvailableWaitHandle); tokensList.Add(collections[i].m_ConsumersCancellationTokenSource.Token); } } cancellationTokens = tokensList.ToArray(); return handlesList; } /// /// Helper function to measure and update the wait time /// /// The first time (in milliseconds) observed when the wait started /// The orginal wait timeoutout in milliseconds /// The new wait time in milliseconds, -1 if the time expired private static int UpdateTimeOut(uint startTime, int originalWaitMillisecondsTimeout) { if (originalWaitMillisecondsTimeout == 0) { return 0; } // The function must be called in case the time out is not infinite Debug.Assert(originalWaitMillisecondsTimeout != Timeout.Infinite); uint elapsedMilliseconds = (uint)Environment.TickCount - startTime; // Check the elapsed milliseconds is greater than max int because this property is uint if (elapsedMilliseconds > int.MaxValue) { return 0; } // Subtract the elapsed time from the current wait time int currentWaitTimeout = originalWaitMillisecondsTimeout - (int)elapsedMilliseconds; ; if (currentWaitTimeout <= 0) { return 0; } return currentWaitTimeout; } /// /// Takes an item from any one of the specified /// instances. /// /// The array of collections. /// The item removed from one of the collections. /// The index of the collection in the array from which /// the item was removed, or -1 if an item could not be removed. /// The argument is /// null. /// The argument is /// a 0-length array or contains a null element. /// At least one of the instances has been disposed. /// At least one of the underlying collections was modified /// outside of its instance. /// The count of is greater than the maximum size of /// 62 for STA and 63 for MTA. /// A call to TakeFromAny may block until an item is available to be removed. public static int TakeFromAny(BlockingCollection[] collections, out T item) { return TakeFromAny(collections, out item, CancellationToken.None); } /// /// Takes an item from any one of the specified /// instances. /// A is thrown if the is /// canceled. /// /// The array of collections. /// The item removed from one of the collections. /// A cancellation token to observe. /// The index of the collection in the array from which /// the item was removed, or -1 if an item could not be removed. /// The argument is /// null. /// If the is canceled. /// The argument is /// a 0-length array or contains a null element. /// At least one of the instances has been disposed. /// At least one of the underlying collections was modified /// outside of its instance. /// The count of is greater than the maximum size of /// 62 for STA and 63 for MTA. /// A call to TakeFromAny may block until an item is available to be removed. public static int TakeFromAny(BlockingCollection[] collections, out T item, CancellationToken cancellationToken) { int returnValue = TryTakeFromAnyCore(collections, out item, Timeout.Infinite, true, cancellationToken); Debug.Assert((returnValue >= 0 && returnValue < collections.Length) , "TryTakeFromAny() was expected to return an index within the bounds of the collections array."); return returnValue; } /// /// Attempts to remove an item from any one of the specified /// instances. /// /// The array of collections. /// The item removed from one of the collections. /// The index of the collection in the array from which /// the item was removed, or -1 if an item could not be removed. /// The argument is /// null. /// The argument is /// a 0-length array or contains a null element. /// At least one of the instances has been disposed. /// At least one of the underlying collections was modified /// outside of its instance. /// The count of is greater than the maximum size of /// 62 for STA and 63 for MTA. /// A call to TryTakeFromAny may block until an item is available to be removed. public static int TryTakeFromAny(BlockingCollection[] collections, out T item) { return TryTakeFromAny(collections, out item, 0); } /// /// Attempts to remove an item from any one of the specified /// instances. /// /// The array of collections. /// The item removed from one of the collections. /// A that represents the number of milliseconds /// to wait, or a that represents -1 milliseconds to wait indefinitely. /// /// The index of the collection in the array from which /// the item was removed, or -1 if an item could not be removed. /// The argument is /// null. /// The argument is /// a 0-length array or contains a null element. /// At least one of the instances has been disposed. /// is a negative number /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than /// . /// At least one of the underlying collections was modified /// outside of its instance. /// The count of is greater than the maximum size of /// 62 for STA and 63 for MTA. /// A call to TryTakeFromAny may block until an item is available to be removed. public static int TryTakeFromAny(BlockingCollection[] collections, out T item, TimeSpan timeout) { ValidateTimeout(timeout); return TryTakeFromAnyCore(collections, out item, (int)timeout.TotalMilliseconds, false, CancellationToken.None); } /// /// Attempts to remove an item from any one of the specified /// instances. /// /// The array of collections. /// The item removed from one of the collections. /// The number of milliseconds to wait, or (-1) to wait indefinitely. /// The index of the collection in the array from which /// the item was removed, or -1 if an item could not be removed. /// The argument is /// null. /// The argument is /// a 0-length array or contains a null element. /// At least one of the instances has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. /// At least one of the underlying collections was modified /// outside of its instance. /// The count of is greater than the maximum size of /// 62 for STA and 63 for MTA. /// A call to TryTakeFromAny may block until an item is available to be removed. public static int TryTakeFromAny(BlockingCollection[] collections, out T item, int millisecondsTimeout) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, CancellationToken.None); } /// /// Attempts to remove an item from any one of the specified /// instances. /// A is thrown if the is /// canceled. /// /// The array of collections. /// The item removed from one of the collections. /// The number of milliseconds to wait, or (-1) to wait indefinitely. /// A cancellation token to observe. /// The index of the collection in the array from which /// the item was removed, or -1 if an item could not be removed. /// If the is canceled. /// The argument is /// null. /// The argument is /// a 0-length array or contains a null element. /// At least one of the instances has been disposed. /// is a /// negative number other than -1, which represents an infinite time-out. /// At least one of the underlying collections was modified /// outside of its instance. /// The count of is greater than the maximum size of /// 62 for STA and 63 for MTA. /// A call to TryTakeFromAny may block until an item is available to be removed. public static int TryTakeFromAny(BlockingCollection[] collections, out T item, int millisecondsTimeout, CancellationToken cancellationToken) { ValidateMillisecondsTimeout(millisecondsTimeout); return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, cancellationToken); } /// Takes an item from anyone of the specified collections. /// A is thrown if the is /// canceled. /// /// The collections from which the item can be removed. /// The item removed and returned to the caller. /// The number of milliseconds to wait for a collection to accept the /// operation, or -1 to wait indefinitely. /// True if Take, false if TryTake. /// A cancellation token to observe. /// The index into collections for the collection which accepted the /// removal of the item; -1 if the item could not be removed. /// If the is canceled. /// If the collections argument is null. /// If the collections argument is a 0-length array or contains a /// null element. Also, if atleast one of the collections has been marked complete for adds. /// If atleast one of the collections has been disposed. private static int TryTakeFromAnyCore(BlockingCollection[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken) { ValidateCollectionsArray(collections, false); //try the fast path first for (int i = 0; i < collections.Length; i++) { // Check if the collection is not completed, and potentially has at least one element by checking the semaphore count if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item)) return i; } //Fast path failed, try the slow path return TryTakeFromAnyCoreSlow(collections, out item, millisecondsTimeout, isTakeOperation, externalCancellationToken); } /// Takes an item from anyone of the specified collections. /// A is thrown if the is /// canceled. /// /// The collections copy from which the item can be removed. /// The item removed and returned to the caller. /// The number of milliseconds to wait for a collection to accept the /// operation, or -1 to wait indefinitely. /// True if Take, false if TryTake. /// A cancellation token to observe. /// The index into collections for the collection which accepted the /// removal of the item; -1 if the item could not be removed. /// If the is canceled. /// If the collections argument is null. /// If the collections argument is a 0-length array or contains a /// null element. Also, if atleast one of the collections has been marked complete for adds. /// If atleast one of the collections has been disposed. private static int TryTakeFromAnyCoreSlow(BlockingCollection[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken) { const int OPERATION_FAILED = -1; // Copy the wait time to another local variable to update it int timeout = millisecondsTimeout; uint startTime = 0; if (millisecondsTimeout != Timeout.Infinite) { startTime = (uint)Environment.TickCount; } //Loop until one of these conditions is met: // 1- The operation is succeeded // 2- The timeout expired for try* versions // 3- The external token is cancelled, throw // 4- The operation is TryTake and all collections are marked as completed, return false // 5- The operation is Take and all collection are marked as completed, throw while (millisecondsTimeout == Timeout.Infinite || timeout >= 0) { // Get wait handles and the tokens for all collections, // and construct a single combined token from all the tokens, // add the combined token handle to the handles list // call WaitAny for all handles // After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not // If the combined token is cancelled, this mean either the external token is cancelled then throw OCE // or one if the collection is Completed then exclude it and retry CancellationToken[] collatedCancellationTokens; List handles = GetHandles(collections, externalCancellationToken, false, out collatedCancellationTokens); if (handles.Count == 0 && isTakeOperation) //case#5 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CantTakeAnyWhenAllDone), "collections"); else if (handles.Count == 0) //case#4 break; //Wait for any collection to become available. using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens)) { handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list int index = WaitHandle.WaitAny(handles.ToArray(), timeout, false); if (linkedTokenSource.IsCancellationRequested && externalCancellationToken.IsCancellationRequested)//case#3 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalCancellationToken); else if (!linkedTokenSource.IsCancellationRequested)// if no eiter internal or external cancellation trquested { Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count)); if (index == WaitHandle.WaitTimeout) //case#2 break; // adjust the index in case one or more handles removed because they are completed if (collections.Length != handles.Count - 1) // -1 because of the combined token handle { for (int i = 0; i < collections.Length; i++) { if (collections[i].m_occupiedNodes.AvailableWaitHandle == handles[index]) { index = i; break; } } } if (collections[index].TryTake(out item)) //case#1 return index; } } // Update the timeout if (millisecondsTimeout != Timeout.Infinite) timeout = UpdateTimeOut(startTime, millisecondsTimeout); } item = default(T); //case#2 return OPERATION_FAILED; } /// /// Marks the instances /// as not accepting any more additions. /// /// /// After a collection has been marked as complete for adding, adding to the collection is not permitted /// and attempts to remove from the collection will not wait when the collection is empty. /// /// The has been disposed. public void CompleteAdding() { CheckDisposed(); if (IsAddingCompleted) return; SpinWait spinner = new SpinWait(); while (true) { int observedAdders = m_currentAdders; if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0) { spinner.Reset(); // If there is another COmpleteAdding in progress waiting the current adders, then spin until it finishes while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); return; } if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders | COMPLETE_ADDING_ON_MASK, observedAdders) == observedAdders) { spinner.Reset(); while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); if (Count == 0) { CancelWaitingConsumers(); } // We should always wake waiting producers, and have them throw exceptions as // Add&CompleteAdding should not be used concurrently. CancelWaitingProducers(); return; } spinner.SpinOnce(); } } /// Cancels the semaphores. private void CancelWaitingConsumers() { m_ConsumersCancellationTokenSource.Cancel(); } private void CancelWaitingProducers() { m_ProducersCancellationTokenSource.Cancel(); } /// /// Releases resources used by the instance. /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Releases resources used by the instance. /// /// Whether being disposed explicitly (true) or due to a finalizer (false). protected virtual void Dispose(bool disposing) { if (!m_isDisposed) { if (m_freeNodes != null) { m_freeNodes.Dispose(); } m_occupiedNodes.Dispose(); m_isDisposed = true; } } /// Copies the items from the instance into a new array. /// An array containing copies of the elements of the collection. /// The has been disposed. /// /// The copied elements are not removed from the collection. /// public T[] ToArray() { CheckDisposed(); return m_collection.ToArray(); } /// Copies all of the items in the instance /// to a compatible one-dimensional array, starting at the specified index of the target array. /// /// The one-dimensional array that is the destination of the elements copied from /// the instance. The array must have zero-based indexing. /// The zero-based index in at which copying begins. /// The argument is /// null. /// The argument is less than zero. /// The argument is equal to or greater /// than the length of the . /// The has been disposed. public void CopyTo(T[] array, int index) { ((ICollection)this).CopyTo(array, index); } /// Copies all of the items in the instance /// to a compatible one-dimensional array, starting at the specified index of the target array. /// /// The one-dimensional array that is the destination of the elements copied from /// the instance. The array must have zero-based indexing. /// The zero-based index in at which copying begins. /// The argument is /// null. /// The argument is less than zero. /// The argument is equal to or greater /// than the length of the , the array is multidimensional, or the type parameter for the collection /// cannot be cast automatically to the type of the destination array. /// The has been disposed. void ICollection.CopyTo(Array array, int index) { CheckDisposed(); //We don't call m_collection.CopyTo() directly because we rely on Array.Copy method to customize //all array exceptions. T[] collectionSnapShot = m_collection.ToArray(); try { Array.Copy(collectionSnapShot, 0, array, index, collectionSnapShot.Length); } catch (ArgumentNullException) { throw new ArgumentNullException("array"); } catch (ArgumentOutOfRangeException) { throw new ArgumentOutOfRangeException("index", index, SR.GetString(SR.BlockingCollection_CopyTo_NonNegative)); } catch (ArgumentException) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_TooManyElems), "index"); } catch (RankException) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_MultiDim), "array"); } catch (InvalidCastException) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array"); } catch (ArrayTypeMismatchException) { throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array"); } } /// Provides a consuming for items in the collection. /// An that removes and returns items from the collection. /// The has been disposed. public IEnumerable GetConsumingEnumerable() { return GetConsumingEnumerable(CancellationToken.None); } /// Provides a consuming for items in the collection. /// Calling MoveNext on the returned enumerable will block if there is no data available, or will /// throw an if the is canceled. /// /// A cancellation token to observe. /// An that removes and returns items from the collection. /// The has been disposed. /// If the is canceled. public IEnumerable GetConsumingEnumerable(CancellationToken cancellationToken) { CancellationTokenSource linkedTokenSource = null; try { linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token); while (!IsCompleted) { T item; if (TryTakeWithNoTimeValidation(out item, Timeout.Infinite, cancellationToken, linkedTokenSource)) { yield return item; } } } finally { if (linkedTokenSource != null) { linkedTokenSource.Dispose(); } } } /// Provides an for items in the collection. /// An for the items in the collection. /// The has been disposed. IEnumerator IEnumerable.GetEnumerator() { CheckDisposed(); return m_collection.GetEnumerator(); } /// Provides an for items in the collection. /// An for the items in the collection. /// The has been disposed. IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable)this).GetEnumerator(); } /// Centralizes the logic for validating the BlockingCollections array passed to TryAddToAny() /// and TryTakeFromAny(). /// The collections to/from which an item should be added/removed. /// Indicates whether this method is called to Add or Take. /// If the collections argument is null. /// If the collections argument is a 0-length array or contains a /// null element. Also, if at least one of the collections has been marked complete for adds. /// If at least one of the collections has been disposed. private static void ValidateCollectionsArray(BlockingCollection[] collections, bool isAddOperation) { if (collections == null) { throw new ArgumentNullException("collections"); } else if (collections.Length < 1) { throw new ArgumentException( SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_ZeroSize), "collections"); } else if ((!IsSTAThread && collections.Length > 63) || (IsSTAThread && collections.Length > 62)) //The number of WaitHandles must be <= 64 for MTA, and <=63 for STA, and we reserve one for CancellationToken { throw new ArgumentOutOfRangeException( "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_LargeSize)); } for (int i = 0; i < collections.Length; ++i) { if (collections[i] == null) { throw new ArgumentException( SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_NullElems), "collections"); } if (collections[i].m_isDisposed) throw new ObjectDisposedException( "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_DispElems)); if (isAddOperation && collections[i].IsAddingCompleted) { throw new ArgumentException( SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections"); } } } private static bool IsSTAThread { get { #if !SILVERLIGHT return Thread.CurrentThread.GetApartmentState() == ApartmentState.STA; #else return false; #endif } } // --------- // Private Helpers. /// Centeralizes the logic of validating the timeout input argument. /// The TimeSpan to wait for to successfully complete an operation on the collection. /// If the number of millseconds represented by the timeout /// TimeSpan is less than 0 or is larger than Int32.MaxValue and not Timeout.Infinite private static void ValidateTimeout(TimeSpan timeout) { long totalMilliseconds = (long)timeout.TotalMilliseconds; if ((totalMilliseconds < 0 || totalMilliseconds > Int32.MaxValue) && (totalMilliseconds != Timeout.Infinite)) { throw new ArgumentOutOfRangeException("timeout", timeout, String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue)); } } /// Centralizes the logic of validating the millisecondsTimeout input argument. /// The number of milliseconds to wait for to successfully complete an /// operation on the collection. /// If the number of millseconds is less than 0 and not /// equal to Timeout.Infinite. private static void ValidateMillisecondsTimeout(int millisecondsTimeout) { if ((millisecondsTimeout < 0) && (millisecondsTimeout != Timeout.Infinite)) { throw new ArgumentOutOfRangeException("millisecondsTimeout", millisecondsTimeout, String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue)); } } /// Throws a System.ObjectDisposedException if the collection was disposed /// If the collection has been disposed. private void CheckDisposed() { if (m_isDisposed) { throw new ObjectDisposedException("BlockingCollection", SR.GetString(SR.BlockingCollection_Disposed)); } } } /// A debugger view of the blocking collection that makes it simple to browse the /// collection's contents at a point in time. /// The type of element that the BlockingCollection will hold. internal sealed class SystemThreadingCollections_BlockingCollectionDebugView { private BlockingCollection m_blockingCollection; // The collection being viewed. /// Constructs a new debugger view object for the provided blocking collection object. /// A blocking collection to browse in the debugger. public SystemThreadingCollections_BlockingCollectionDebugView(BlockingCollection collection) { if (collection == null) { throw new ArgumentNullException("collection"); } m_blockingCollection = collection; } /// Returns a snapshot of the underlying collection's elements. [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] public T[] Items { get { return m_blockingCollection.ToArray(); } } } } #pragma warning restore 0420