// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// BatchBlock.cs
//
//
// A propagator block that groups individual messages into arrays of messages.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
using System.Linq;
using System.Security;
using System.Threading.Tasks.Dataflow.Internal;
namespace System.Threading.Tasks.Dataflow
{
    /// Provides a dataflow block that batches inputs into arrays.
    /// Specifies the type of data put into batches.
    [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
    [DebuggerTypeProxy(typeof(BatchBlock<>.DebugView))]
    [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
    public sealed class BatchBlock : IPropagatorBlock, IReceivableSourceBlock, IDebuggerDisplay
    {
        /// The target half of this batch.
        private readonly BatchBlockTargetCore _target;
        /// The source half of this batch.
        private readonly SourceCore _source;
        /// Initializes this  with the specified batch size.
        /// The number of items to group into a batch.
        /// The  must be positive.
        public BatchBlock(Int32 batchSize) :
            this(batchSize, GroupingDataflowBlockOptions.Default)
        { }
        /// Initializes this  with the  specified batch size, declining option, and block options.
        /// The number of items to group into a batch.
        /// The options with which to configure this .
        /// The  must be positive.
        /// The  must be no greater than the value of the BoundedCapacity option if a non-default value has been set.
        /// The  is null (Nothing in Visual Basic).
        public BatchBlock(Int32 batchSize, GroupingDataflowBlockOptions dataflowBlockOptions)
        {
            // Validate arguments
            if (batchSize < 1) throw new ArgumentOutOfRangeException("batchSize", SR.ArgumentOutOfRange_GenericPositive);
            if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
            if (dataflowBlockOptions.BoundedCapacity > 0 && dataflowBlockOptions.BoundedCapacity < batchSize) throw new ArgumentOutOfRangeException("batchSize", SR.ArgumentOutOfRange_BatchSizeMustBeNoGreaterThanBoundedCapacity);
            Contract.EndContractBlock();
            // Ensure we have options that can't be changed by the caller
            dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
            // Initialize bounding actions
            Action, int> onItemsRemoved = null;
            Func, T[], IList, int> itemCountingFunc = null;
            if (dataflowBlockOptions.BoundedCapacity > 0)
            {
                onItemsRemoved = (owningSource, count) => ((BatchBlock)owningSource)._target.OnItemsRemoved(count);
                itemCountingFunc = (owningSource, singleOutputItem, multipleOutputItems) => BatchBlockTargetCore.CountItems(singleOutputItem, multipleOutputItems);
            }
            // Initialize source
            _source = new SourceCore(this, dataflowBlockOptions,
                owningSource => ((BatchBlock)owningSource)._target.Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false),
                onItemsRemoved, itemCountingFunc);
            // Initialize target
            _target = new BatchBlockTargetCore(this, batchSize, batch => _source.AddMessage(batch), dataflowBlockOptions);
            // When the target is done, let the source know it won't be getting any more data
            _target.Completion.ContinueWith(delegate { _source.Complete(); },
                CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default);
            // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
            // In those cases we need to fault the target half to drop its buffered messages and to release its 
            // reservations. This should not create an infinite loop, because all our implementations are designed
            // to handle multiple completion requests and to carry over only one.
            _source.Completion.ContinueWith((completed, state) =>
            {
                var thisBlock = ((BatchBlock)state) as IDataflowBlock;
                Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
                thisBlock.Fault(completed.Exception);
            }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
            // Handle async cancellation requests by declining on the target
            Common.WireCancellationToComplete(
                dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BatchBlockTargetCore)state).Complete(exception: null, dropPendingMessages: true, releaseReservedMessages: false), _target);
#if FEATURE_TRACING
            DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
            if (etwLog.IsEnabled())
            {
                etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
            }
#endif
        }
        /// 
        public void Complete() { _target.Complete(exception: null, dropPendingMessages: false, releaseReservedMessages: false); }
        /// 
        void IDataflowBlock.Fault(Exception exception)
        {
            if (exception == null) throw new ArgumentNullException("exception");
            Contract.EndContractBlock();
            _target.Complete(exception, dropPendingMessages: true, releaseReservedMessages: false);
        }
        /// 
        /// Triggers the  to initiate a batching operation even if the number
        /// of currently queued or postponed items is less than the .
        /// 
        /// 
        /// In greedy mode, a batch will be generated from queued items even if fewer exist than the batch size.  
        /// In non-greedy mode, a batch will be generated asynchronously from postponed items even if
        /// fewer than the batch size can be consumed.
        /// 
        public void TriggerBatch() { _target.TriggerBatch(); }
        /// 
        public IDisposable LinkTo(ITargetBlock target, DataflowLinkOptions linkOptions)
        {
            return _source.LinkTo(target, linkOptions);
        }
        /// 
        public Boolean TryReceive(Predicate filter, out T[] item)
        {
            return _source.TryReceive(filter, out item);
        }
        /// 
        public bool TryReceiveAll(out IList items) { return _source.TryReceiveAll(out items); }
        /// 
        public int OutputCount { get { return _source.OutputCount; } }
        /// 
        public Task Completion { get { return _source.Completion; } }
        /// Gets the size of the batches generated by this .
        /// 
        /// If the number of items provided to the block is not evenly divisible by the batch size provided
        /// to the block's constructor, the block's final batch may contain fewer than the requested number of items.
        /// 
        public Int32 BatchSize { get { return _target.BatchSize; } }
        /// 
        DataflowMessageStatus ITargetBlock.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock source, Boolean consumeToAccept)
        {
            return _target.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
        }
        /// 
        T[] ISourceBlock.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock target, out Boolean messageConsumed)
        {
            return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
        }
        /// 
        bool ISourceBlock.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock target)
        {
            return _source.ReserveMessage(messageHeader, target);
        }
        /// 
        void ISourceBlock.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock target)
        {
            _source.ReleaseReservation(messageHeader, target);
        }
        /// Gets the number of messages waiting to be offered.  This must only be used from the debugger as it avoids taking necessary locks.
        private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
        /// 
        public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
        /// The data to display in the debugger display attribute.
        [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
        private object DebuggerDisplayContent
        {
            get
            {
                return string.Format("{0}, BatchSize={1}, OutputCount={2}",
                    Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
                    BatchSize,
                    OutputCountForDebugger);
            }
        }
        /// Gets the data to display in the debugger display attribute for this instance.
        object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
        /// Provides a debugger type proxy for the Batch.
        private sealed class DebugView
        {
            /// The batch block being viewed.
            private BatchBlock _batchBlock;
            /// The target half being viewed.
            private readonly BatchBlockTargetCore.DebuggingInformation _targetDebuggingInformation;
            /// The source half of the block being viewed.
            private readonly SourceCore.DebuggingInformation _sourceDebuggingInformation;
            /// Initializes the debug view.
            /// The batch being viewed.
            public DebugView(BatchBlock batchBlock)
            {
                Contract.Requires(batchBlock != null, "Need a block with which to construct the debug view");
                _batchBlock = batchBlock;
                _targetDebuggingInformation = batchBlock._target.GetDebuggingInformation();
                _sourceDebuggingInformation = batchBlock._source.GetDebuggingInformation();
            }
            /// Gets the messages waiting to be processed.
            public IEnumerable InputQueue { get { return _targetDebuggingInformation.InputQueue; } }
            /// Gets the messages waiting to be received.
            public IEnumerable OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
            /// Gets the number of batches that have been completed.
            public long BatchesCompleted { get { return _targetDebuggingInformation.NumberOfBatchesCompleted; } }
            /// Gets the task being used for input processing.
            public Task TaskForInputProcessing { get { return _targetDebuggingInformation.TaskForInputProcessing; } }
            /// Gets the task being used for output processing.
            public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
            /// Gets the DataflowBlockOptions used to configure this block.
            public GroupingDataflowBlockOptions DataflowBlockOptions { get { return _targetDebuggingInformation.DataflowBlockOptions; } }
            /// Gets the size of batches generated by the block.
            public int BatchSize { get { return _batchBlock.BatchSize; } }
            /// Gets whether the block is declining further messages.
            public bool IsDecliningPermanently { get { return _targetDebuggingInformation.IsDecliningPermanently; } }
            /// Gets whether the block is completed.
            public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
            /// Gets the block's Id.
            public int Id { get { return Common.GetBlockId(_batchBlock); } }
            /// Gets the messages postponed by this batch.
            public QueuedMap, DataflowMessageHeader> PostponedMessages { get { return _targetDebuggingInformation.PostponedMessages; } }
            /// Gets the set of all targets linked from this block.
            public TargetRegistry LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
            /// Gets the set of all targets linked from this block.
            public ITargetBlock NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
        }
        /// Provides the core target implementation for a Batch.
        [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
        [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
        private sealed class BatchBlockTargetCore
        {
            /// The messages in this target.
            private readonly Queue _messages = new Queue();
            /// A task representing the completion of the block.
            private readonly TaskCompletionSource _completionTask = new TaskCompletionSource();
            /// Gets the object used as the incoming lock.
            private object IncomingLock { get { return _completionTask; } }
            /// The target that owns this target core.
            private readonly BatchBlock _owningBatch;
            /// The batch size.
            private readonly int _batchSize;
            /// State used when in non-greedy mode.
            private readonly NonGreedyState _nonGreedyState;
            /// Bounding state for when the block is executing in bounded mode.
            private readonly BoundingState _boundingState;
            /// The options associated with this block.
            private readonly GroupingDataflowBlockOptions _dataflowBlockOptions;
            /// The action invoked with a completed batch.
            private readonly Action _batchCompletedAction;
            /// Whether to stop accepting new messages.
            private bool _decliningPermanently;
            /// Whether we've completed at least one batch.
            private long _batchesCompleted;
            /// Whether someone has reserved the right to call CompleteBlockOncePossible.
            private bool _completionReserved;
            /// State used only when in non-greedy mode.
            private sealed class NonGreedyState
            {
                /// Collection of postponed messages.
                internal readonly QueuedMap, DataflowMessageHeader> PostponedMessages;
                /// A temporary array used to store data retrieved from PostponedMessages.
                internal readonly KeyValuePair, DataflowMessageHeader>[] PostponedMessagesTemp;
                /// A temporary list used in non-greedy mode when consuming postponed messages to store successfully reserved messages.
                internal readonly List, KeyValuePair>> ReservedSourcesTemp;
                /// Whether the next batching operation should accept fewer than BatchSize items.
                /// This value may be read not under a lock, but it must only be written to protected by the IncomingLock.
                internal bool AcceptFewerThanBatchSize;
                /// The task used to process messages.
                internal Task TaskForInputProcessing;
                /// Initializes the NonGreedyState.
                /// The batch size used by the BatchBlock.
                internal NonGreedyState(int batchSize)
                {
                    // A non-greedy batch requires at least batchSize sources to be successful.
                    // Thus, we initialize our collections to be able to store at least that many elements
                    // in order to avoid unnecessary allocations below that point.
                    Contract.Requires(batchSize > 0, "A positive batch size is required");
                    PostponedMessages = new QueuedMap, DataflowMessageHeader>(batchSize);
                    PostponedMessagesTemp = new KeyValuePair, DataflowMessageHeader>[batchSize];
                    ReservedSourcesTemp = new List, KeyValuePair>>(batchSize);
                }
            }
            /// Initializes this target core with the specified configuration.
            /// The owning batch target.
            /// The number of items to group into a batch.
            /// The delegate to invoke when a batch is completed.
            /// The options with which to configure this .  Assumed to be immutable.
            /// The  must be positive.
            /// The  is null (Nothing in Visual Basic).
            internal BatchBlockTargetCore(BatchBlock owningBatch, Int32 batchSize, Action batchCompletedAction, GroupingDataflowBlockOptions dataflowBlockOptions)
            {
                Contract.Requires(owningBatch != null, "This batch target core must be associated with a batch block.");
                Contract.Requires(batchSize >= 1, "Batch sizes must be positive.");
                Contract.Requires(batchCompletedAction != null, "Completion action must be specified.");
                Contract.Requires(dataflowBlockOptions != null, "Options required to configure the block.");
                // Store arguments
                _owningBatch = owningBatch;
                _batchSize = batchSize;
                _batchCompletedAction = batchCompletedAction;
                _dataflowBlockOptions = dataflowBlockOptions;
                // We'll be using _nonGreedyState even if we are greedy with bounding
                bool boundingEnabled = dataflowBlockOptions.BoundedCapacity > 0;
                if (!_dataflowBlockOptions.Greedy || boundingEnabled) _nonGreedyState = new NonGreedyState(batchSize);
                if (boundingEnabled) _boundingState = new BoundingState(dataflowBlockOptions.BoundedCapacity);
            }
            /// 
            /// Triggers a batching operation even if the number of currently queued or postponed items is less than the .
            /// 
            internal void TriggerBatch()
            {
                lock (IncomingLock)
                {
                    // If we shouldn't be doing any more work, bail.  Otherwise, note that we're willing to 
                    // accept fewer items in the next batching operation, and ensure processing is kicked off.
                    if (!_decliningPermanently && !_dataflowBlockOptions.CancellationToken.IsCancellationRequested)
                    {
                        if (_nonGreedyState == null)
                        {
                            MakeBatchIfPossible(evenIfFewerThanBatchSize: true);
                        }
                        else
                        {
                            _nonGreedyState.AcceptFewerThanBatchSize = true;
                            ProcessAsyncIfNecessary();
                        }
                    }
                    CompleteBlockIfPossible();
                }
            }
            /// 
            internal DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock source, Boolean consumeToAccept)
            {
                // Validate arguments
                if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
                if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
                Contract.EndContractBlock();
                lock (IncomingLock)
                {
                    // If we shouldn't be accepting more messages, don't.
                    if (_decliningPermanently)
                    {
                        CompleteBlockIfPossible();
                        return DataflowMessageStatus.DecliningPermanently;
                    }
                    // We can directly accept the message if:
                    //      1) we are being greedy AND we are not bounding, OR 
                    //      2) we are being greedy AND we are bounding AND there is room available AND there are no postponed messages AND we are not currently processing. 
                    // (If there were any postponed messages, we would need to postpone so that ordering would be maintained.)
                    // (We should also postpone if we are currently processing, because there may be a race between consuming postponed messages and
                    // accepting new ones directly into the queue.)
                    if (_dataflowBlockOptions.Greedy &&
                            (_boundingState == null
                                ||
                             (_boundingState.CountIsLessThanBound && _nonGreedyState.PostponedMessages.Count == 0 && _nonGreedyState.TaskForInputProcessing == null)))
                    {
                        // Consume the message from the source if necessary
                        if (consumeToAccept)
                        {
                            Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
                            bool consumed;
                            messageValue = source.ConsumeMessage(messageHeader, _owningBatch, out consumed);
                            if (!consumed) return DataflowMessageStatus.NotAvailable;
                        }
                        // Once consumed, enqueue it.
                        _messages.Enqueue(messageValue);
                        if (_boundingState != null) _boundingState.CurrentCount += 1; // track this new item against our bound
                        // Now start declining if the number of batches we've already made plus 
                        // the number we can make from data already enqueued meets our quota.
                        if (!_decliningPermanently &&
                            (_batchesCompleted + (_messages.Count / _batchSize)) >= _dataflowBlockOptions.ActualMaxNumberOfGroups)
                        {
                            _decliningPermanently = true;
                        }
                        // Now that we have a message, see if we can make forward progress.
                        MakeBatchIfPossible(evenIfFewerThanBatchSize: false);
                        CompleteBlockIfPossible();
                        return DataflowMessageStatus.Accepted;
                    }
                    // Otherwise, we try to postpone if a source was provided
                    else if (source != null)
                    {
                        Debug.Assert(_nonGreedyState != null, "_nonGreedyState must have been initialized during construction in non-greedy mode.");
                        // We always postpone using _nonGreedyState even if we are being greedy with bounding
                        _nonGreedyState.PostponedMessages.Push(source, messageHeader);
                        // In non-greedy mode, we need to see if batch could be completed
                        if (!_dataflowBlockOptions.Greedy) ProcessAsyncIfNecessary();
                        return DataflowMessageStatus.Postponed;
                    }
                    // We can't do anything else about this message
                    return DataflowMessageStatus.Declined;
                }
            }
            /// Completes/faults the block.
            /// In general, it is not safe to pass releaseReservedMessages:true, because releasing of reserved messages
            /// is done without taking a lock. We pass releaseReservedMessages:true only when an exception has been 
            /// caught inside the message processing loop which is a single instance at any given moment.
            [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
            internal void Complete(Exception exception, bool dropPendingMessages, bool releaseReservedMessages, bool revertProcessingState = false)
            {
                // Ensure that no new messages may be added
                lock (IncomingLock)
                {
                    // Faulting from outside is allowed until we start declining permanently.
                    // Faulting from inside is allowed at any time.
                    if (exception != null && (!_decliningPermanently || releaseReservedMessages))
                    {
                        // Record the exception in the source.
                        // The source, which exposes its Completion to the public will take this
                        // into account and will complete in Faulted state.
                        _owningBatch._source.AddException(exception);
                    }
                    // Drop pending messages if requested
                    if (dropPendingMessages) _messages.Clear();
                }
                // Release reserved messages if requested.
                // This must be done from outside the lock.
                if (releaseReservedMessages)
                {
                    try { ReleaseReservedMessages(throwOnFirstException: false); }
                    catch (Exception e) { _owningBatch._source.AddException(e); }
                }
                // Triggering completion requires the lock
                lock (IncomingLock)
                {
                    // Revert the dirty processing state if requested
                    if (revertProcessingState)
                    {
                        Debug.Assert(_nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null,
                                        "The processing state must be dirty when revertProcessingState==true.");
                        _nonGreedyState.TaskForInputProcessing = null;
                    }
                    // Trigger completion
                    _decliningPermanently = true;
                    CompleteBlockIfPossible();
                }
            }
            /// 
            internal Task Completion { get { return _completionTask.Task; } }
            /// Gets the size of the batches generated by this .
            internal Int32 BatchSize { get { return _batchSize; } }
            /// Gets whether the target has had cancellation requested or an exception has occurred.
            private bool CanceledOrFaulted
            {
                get
                {
                    return _dataflowBlockOptions.CancellationToken.IsCancellationRequested || _owningBatch._source.HasExceptions;
                }
            }
            /// Returns the available capacity to bring in postponed items. The exact values above _batchSize don't matter.
            private int BoundedCapacityAvailable
            {
                get
                {
                    Common.ContractAssertMonitorStatus(IncomingLock, held: true);
                    return _boundingState != null ?
                                _dataflowBlockOptions.BoundedCapacity - _boundingState.CurrentCount :
                                _batchSize;
                }
            }
            /// Completes the block once all completion conditions are met.
            private void CompleteBlockIfPossible()
            {
                Common.ContractAssertMonitorStatus(IncomingLock, held: true);
                if (!_completionReserved)
                {
                    bool currentlyProcessing = _nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null;
                    bool completedAllDesiredBatches = _batchesCompleted >= _dataflowBlockOptions.ActualMaxNumberOfGroups;
                    bool noMoreMessages = _decliningPermanently && _messages.Count < _batchSize;
                    bool complete = !currentlyProcessing && (completedAllDesiredBatches || noMoreMessages || CanceledOrFaulted);
                    if (complete)
                    {
                        _completionReserved = true;
                        // Make sure the target is declining
                        _decliningPermanently = true;
                        // If we still have straggling items remaining, make them into their own batch even though there are fewer than batchSize
                        if (_messages.Count > 0) MakeBatchIfPossible(evenIfFewerThanBatchSize: true);
                        // We need to complete the block, but we may have arrived here from an external
                        // call to the block.  To avoid running arbitrary code in the form of 
                        // completion task continuations in that case, do it in a separate task.
                        Task.Factory.StartNew(thisTargetCore =>
                        {
                            var targetCore = (BatchBlockTargetCore)thisTargetCore;
                            // Release any postponed messages
                            List exceptions = null;
                            if (targetCore._nonGreedyState != null)
                            {
                                // Note: No locks should be held at this point
                                Common.ReleaseAllPostponedMessages(targetCore._owningBatch,
                                                                   targetCore._nonGreedyState.PostponedMessages,
                                                                   ref exceptions);
                            }
                            if (exceptions != null)
                            {
                                // It is important to migrate these exceptions to the source part of the owning batch,
                                // because that is the completion task that is publically exposed.
                                targetCore._owningBatch._source.AddExceptions(exceptions);
                            }
                            // Target's completion task is only available internally with the sole purpose
                            // of releasing the task that completes the parent. Hence the actual reason
                            // for completing this task doesn't matter.
                            targetCore._completionTask.TrySetResult(default(VoidResult));
                        }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
                    }
                }
            }
            /// 
            /// Gets whether we should launch further synchronous or asynchronous processing
            /// to create batches.
            /// 
            private bool BatchesNeedProcessing
            {
                get
                {
                    Common.ContractAssertMonitorStatus(IncomingLock, held: true);
                    // If we're currently processing asynchronously, let that async task
                    // handle all work; nothing more to do here.  If we're not currently processing
                    // but cancellation has been requested, don't do more work either.
                    bool completedAllDesiredBatches = _batchesCompleted >= _dataflowBlockOptions.ActualMaxNumberOfGroups;
                    bool currentlyProcessing = _nonGreedyState != null && _nonGreedyState.TaskForInputProcessing != null;
                    if (completedAllDesiredBatches || currentlyProcessing || CanceledOrFaulted) return false;
                    // Now, if it's possible to create a batch from queued items or if there are enough
                    // postponed items to attempt a batch, batches need processing.
                    int neededMessageCountToCompleteBatch = _batchSize - _messages.Count;
                    int boundedCapacityAvailable = BoundedCapacityAvailable;
                    // We have items queued up sufficient to make up a batch
                    if (neededMessageCountToCompleteBatch <= 0) return true;
                    if (_nonGreedyState != null)
                    {
                        // We can make a triggered batch using postponed messages
                        if (_nonGreedyState.AcceptFewerThanBatchSize &&
                            (_messages.Count > 0 || (_nonGreedyState.PostponedMessages.Count > 0 && boundedCapacityAvailable > 0)))
                            return true;
                        if (_dataflowBlockOptions.Greedy)
                        {
                            // We are in greedy mode and we have postponed messages. 
                            // (In greedy mode we only postpone due to lack of bounding capacity.) 
                            // And now we have capacity to consume some postponed messages. 
                            // (In greedy mode we can/should consume as many postponed messages as we can even  
                            // if those messages are insufficient to make up a batch.)
                            if (_nonGreedyState.PostponedMessages.Count > 0 && boundedCapacityAvailable > 0) return true;
                        }
                        else
                        {
                            // We are in non-greedy mode and we have enough postponed messages and bounding capacity to make a full batch
                            if (_nonGreedyState.PostponedMessages.Count >= neededMessageCountToCompleteBatch &&
                                boundedCapacityAvailable >= neededMessageCountToCompleteBatch)
                                return true;
                        }
                    }
                    // There is no other reason to kick off a processing task
                    return false;
                }
            }
            /// Called when new messages are available to be processed.
            /// Whether this call is the continuation of a previous message loop.
            private void ProcessAsyncIfNecessary(bool isReplacementReplica = false)
            {
                Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
                Common.ContractAssertMonitorStatus(IncomingLock, held: true);
                if (BatchesNeedProcessing)
                {
                    ProcessAsyncIfNecessary_Slow(isReplacementReplica);
                }
            }
            /// 
            /// Slow path for ProcessAsyncIfNecessary. 
            /// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined.
            /// 
            private void ProcessAsyncIfNecessary_Slow(bool isReplacementReplica)
            {
                Contract.Requires(BatchesNeedProcessing, "There must be a batch that needs processing.");
                // Create task and store into _taskForInputProcessing prior to scheduling the task
                // so that _taskForInputProcessing will be visibly set in the task loop.
                _nonGreedyState.TaskForInputProcessing = new Task(thisBatchTarget => ((BatchBlockTargetCore)thisBatchTarget).ProcessMessagesLoopCore(), this,
                                                    Common.GetCreationOptionsForTask(isReplacementReplica));
#if FEATURE_TRACING
                DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
                if (etwLog.IsEnabled())
                {
                    etwLog.TaskLaunchedForMessageHandling(
                        _owningBatch, _nonGreedyState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
                        _messages.Count + (_nonGreedyState != null ? _nonGreedyState.PostponedMessages.Count : 0));
                }
#endif
                // Start the task handling scheduling exceptions
                Exception exception = Common.StartTaskSafe(_nonGreedyState.TaskForInputProcessing, _dataflowBlockOptions.TaskScheduler);
                if (exception != null)
                {
                    // Get out from under currently held locks. Complete re-acquires the locks it needs.
                    Task.Factory.StartNew(exc => Complete(exception: (Exception)exc, dropPendingMessages: true, releaseReservedMessages: true, revertProcessingState: true),
                                        exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
                }
            }
            /// Task body used to process messages.
            [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
            private void ProcessMessagesLoopCore()
            {
                Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
                Common.ContractAssertMonitorStatus(IncomingLock, held: false);
                try
                {
                    int maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask;
                    int timesThroughLoop = 0;
                    bool madeProgress;
                    do
                    {
                        // Determine whether a batch has been forced/triggered.
                        // (If the value is read as false and is set to true immediately afterwards,
                        // we'll simply force the next time around.  The only code that can
                        // set the value to false is this function, after reading a true value.)
                        bool triggered = Volatile.Read(ref _nonGreedyState.AcceptFewerThanBatchSize);
                        // Retrieve postponed items:
                        //      In non-greedy mode: Reserve + Consume
                        //      In greedy bounded mode: Consume (without a prior reservation)
                        if (!_dataflowBlockOptions.Greedy) RetrievePostponedItemsNonGreedy(allowFewerThanBatchSize: triggered);
                        else RetrievePostponedItemsGreedyBounded(allowFewerThanBatchSize: triggered);
                        // Try to make a batch if there are enough buffered messages
                        lock (IncomingLock)
                        {
                            madeProgress = MakeBatchIfPossible(evenIfFewerThanBatchSize: triggered);
                            // Reset the trigger flag if:
                            // - We made a batch, regardless of whether it came due to a trigger or not.
                            // - We tried to make a batch due to a trigger, but were unable to, which
                            //   could happen if we're unable to consume any of the postponed messages.
                            if (madeProgress || triggered) _nonGreedyState.AcceptFewerThanBatchSize = false;
                        }
                        timesThroughLoop++;
                    } while (madeProgress && timesThroughLoop < maxMessagesPerTask);
                }
                catch (Exception exc)
                {
                    Complete(exc, dropPendingMessages: false, releaseReservedMessages: true);
                }
                finally
                {
                    lock (IncomingLock)
                    {
                        // We're no longer processing, so null out the processing task
                        _nonGreedyState.TaskForInputProcessing = null;
                        // However, we may have given up early because we hit our own configured
                        // processing limits rather than because we ran out of work to do.  If that's
                        // the case, make sure we spin up another task to keep going.
                        ProcessAsyncIfNecessary(isReplacementReplica: true);
                        // If, however, we stopped because we ran out of work to do and we
                        // know we'll never get more, then complete.
                        CompleteBlockIfPossible();
                    }
                }
            }
            /// Create a batch from the available items.
            /// 
            /// Whether to make a batch even if there are fewer than BatchSize items available.
            /// 
            /// true if a batch was created and published; otherwise, false.
            private bool MakeBatchIfPossible(bool evenIfFewerThanBatchSize)
            {
                Common.ContractAssertMonitorStatus(IncomingLock, held: true);
                // Is a full batch available?
                bool fullBatch = _messages.Count >= _batchSize;
                // If so, or if it's ok to make a batch with fewer than batchSize, make one.
                if (fullBatch || (evenIfFewerThanBatchSize && _messages.Count > 0))
                {
                    var newBatch = new T[fullBatch ? _batchSize : _messages.Count];
                    for (int i = 0; i < newBatch.Length; i++) newBatch[i] = _messages.Dequeue();
                    _batchCompletedAction(newBatch);
                    _batchesCompleted++;
                    if (_batchesCompleted >= _dataflowBlockOptions.ActualMaxNumberOfGroups) _decliningPermanently = true;
                    return true;
                }
                // No batch could be created
                else return false;
            }
            /// Retrieves postponed items in non-greedy mode if we have enough to make a batch.
            /// Whether we'll accept consuming fewer elements than the defined batch size.
            private void RetrievePostponedItemsNonGreedy(bool allowFewerThanBatchSize)
            {
                Contract.Requires(!_dataflowBlockOptions.Greedy, "This method may only be used in non-greedy mode.");
                Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
                Common.ContractAssertMonitorStatus(IncomingLock, held: false);
                // Shortcuts just to keep the code cleaner
                QueuedMap, DataflowMessageHeader> postponed = _nonGreedyState.PostponedMessages;
                KeyValuePair, DataflowMessageHeader>[] postponedTemp = _nonGreedyState.PostponedMessagesTemp;
                List, KeyValuePair>> reserved = _nonGreedyState.ReservedSourcesTemp;
                // Clear the temporary buffer.  This is safe to do without a lock because
                // it is only accessed by the serial message loop.
                reserved.Clear();
                int poppedInitially;
                int boundedCapacityAvailable;
                lock (IncomingLock)
                {
                    // The queue must be empty between batches in non-greedy mode
                    Debug.Assert(_messages.Count == 0, "The queue must be empty between batches in non-greedy mode");
                    // If there are not enough postponed items (or if we're not allowing consumption), there's nothing more to be done
                    boundedCapacityAvailable = BoundedCapacityAvailable;
                    if (_decliningPermanently ||
                        postponed.Count == 0 ||
                        boundedCapacityAvailable <= 0 ||
                        (!allowFewerThanBatchSize && (postponed.Count < _batchSize || boundedCapacityAvailable < _batchSize)))
                        return;
                    // Grab an initial batch of postponed messages.
                    poppedInitially = postponed.PopRange(postponedTemp, 0, _batchSize);
                    Debug.Assert(allowFewerThanBatchSize ? poppedInitially > 0 : poppedInitially == _batchSize,
                                    "We received fewer than we expected based on the previous check.");
                } // Release the lock.  We must not hold it while calling Reserve/Consume/Release.
                // Try to reserve the initial batch of messages.
                for (int i = 0; i < poppedInitially; i++)
                {
                    KeyValuePair, DataflowMessageHeader> sourceAndMessage = postponedTemp[i];
                    if (sourceAndMessage.Key.ReserveMessage(sourceAndMessage.Value, _owningBatch))
                    {
                        var reservedMessage = new KeyValuePair(sourceAndMessage.Value, default(T));
                        var reservedSourceAndMessage = new KeyValuePair, KeyValuePair>(sourceAndMessage.Key, reservedMessage);
                        reserved.Add(reservedSourceAndMessage);
                    }
                }
                Array.Clear(postponedTemp, 0, postponedTemp.Length); // clear out the temp array so as not to hold onto messages too long
                // If we didn't reserve enough to make a batch, start picking off postponed messages
                // one by one until we either have enough reserved or we run out of messages
                while (reserved.Count < _batchSize)
                {
                    KeyValuePair, DataflowMessageHeader> sourceAndMessage;
                    lock (IncomingLock)
                    {
                        if (!postponed.TryPop(out sourceAndMessage)) break;
                    } // Release the lock.  We must not hold it while calling Reserve/Consume/Release.
                    if (sourceAndMessage.Key.ReserveMessage(sourceAndMessage.Value, _owningBatch))
                    {
                        var reservedMessage = new KeyValuePair(sourceAndMessage.Value, default(T));
                        var reservedSourceAndMessage = new KeyValuePair, KeyValuePair>(sourceAndMessage.Key, reservedMessage);
                        reserved.Add(reservedSourceAndMessage);
                    }
                }
                Debug.Assert(reserved.Count <= _batchSize, "Expected the number of reserved sources to be <= the number needed for a batch.");
                // We've now reserved what we can.  Either consume them all or release them all.
                if (reserved.Count > 0)
                {
                    // TriggerBatch adds a complication here.  It's possible that while we've been reserving
                    // messages, Post has been used to queue up a bunch of messages to the batch,
                    // and that if the batch has a max group count and enough messages were posted,
                    // we could now be declining.  In that case, if we don't specially handle the situation,
                    // we could consume messages that we won't be able to turn into a batch, since MaxNumberOfGroups
                    // implies the block will only ever output a maximum number of batches.  To handle this,
                    // we start declining before consuming, now that we know we'll have enough to form a batch.
                    // (If an exception occurs after we do this, we'll be shutting down the block anyway.)
                    // This is also why we still reserve/consume rather than just consume in forced mode, 
                    // so that we only consume if we're able to turn what we consume into a batch.
                    bool shouldProceedToConsume = true;
                    if (allowFewerThanBatchSize)
                    {
                        lock (IncomingLock)
                        {
                            if (!_decliningPermanently &&
                                (_batchesCompleted + 1) >= _dataflowBlockOptions.ActualMaxNumberOfGroups)
                            // Note that this logic differs from the other location where we do a similar check.
                            // Here we want to know whether we're one shy of meeting our quota, because we'll accept
                            // any size batch.  Elsewhere, we need to know whether we have the right number of messages
                            // queued up.
                            {
                                shouldProceedToConsume = !_decliningPermanently;
                                _decliningPermanently = true;
                            }
                        }
                    }
                    if (shouldProceedToConsume && (allowFewerThanBatchSize || reserved.Count == _batchSize))
                    {
                        ConsumeReservedMessagesNonGreedy();
                    }
                    else
                    {
                        ReleaseReservedMessages(throwOnFirstException: true);
                    }
                }
                // Clear out the reserved list, so as not to hold onto values longer than necessary.
                // We don't do this in case of failure, because the higher-level exception handler
                // accesses the list to try to release reservations.
                reserved.Clear();
            }
            /// Retrieves postponed items in greedy bounded mode.
            /// Whether we'll accept consuming fewer elements than the defined batch size.
            private void RetrievePostponedItemsGreedyBounded(bool allowFewerThanBatchSize)
            {
                Contract.Requires(_dataflowBlockOptions.Greedy, "This method may only be used in greedy mode.");
                Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
                Contract.Requires(_boundingState != null, "Bounding state is required when in bounded mode.");
                Common.ContractAssertMonitorStatus(IncomingLock, held: false);
                // Shortcuts just to keep the code cleaner
                QueuedMap, DataflowMessageHeader> postponed = _nonGreedyState.PostponedMessages;
                KeyValuePair, DataflowMessageHeader>[] postponedTemp = _nonGreedyState.PostponedMessagesTemp;
                List, KeyValuePair>> reserved = _nonGreedyState.ReservedSourcesTemp;
                // Clear the temporary buffer.  This is safe to do without a lock because
                // it is only accessed by the serial message loop.
                reserved.Clear();
                int poppedInitially;
                int boundedCapacityAvailable;
                int itemCountNeededToCompleteBatch;
                lock (IncomingLock)
                {
                    // If there are not enough postponed items (or if we're not allowing consumption), there's nothing more to be done
                    boundedCapacityAvailable = BoundedCapacityAvailable;
                    itemCountNeededToCompleteBatch = _batchSize - _messages.Count;
                    if (_decliningPermanently ||
                        postponed.Count == 0 ||
                        boundedCapacityAvailable <= 0)
                        return;
                    // Grab an initial batch of postponed messages.
                    if (boundedCapacityAvailable < itemCountNeededToCompleteBatch) itemCountNeededToCompleteBatch = boundedCapacityAvailable;
                    poppedInitially = postponed.PopRange(postponedTemp, 0, itemCountNeededToCompleteBatch);
                    Debug.Assert(poppedInitially > 0, "We received fewer than we expected based on the previous check.");
                } // Release the lock.  We must not hold it while calling Reserve/Consume/Release.
                // Treat popped messages as reserved. 
                // We don't have to formally reserve because we are in greedy mode.
                for (int i = 0; i < poppedInitially; i++)
                {
                    KeyValuePair, DataflowMessageHeader> sourceAndMessage = postponedTemp[i];
                    var reservedMessage = new KeyValuePair(sourceAndMessage.Value, default(T));
                    var reservedSourceAndMessage = new KeyValuePair, KeyValuePair>(sourceAndMessage.Key, reservedMessage);
                    reserved.Add(reservedSourceAndMessage);
                }
                Array.Clear(postponedTemp, 0, postponedTemp.Length); // clear out the temp array so as not to hold onto messages too long
                // If we didn't reserve enough to make a batch, start picking off postponed messages
                // one by one until we either have enough reserved or we run out of messages
                while (reserved.Count < itemCountNeededToCompleteBatch)
                {
                    KeyValuePair, DataflowMessageHeader> sourceAndMessage;
                    lock (IncomingLock)
                    {
                        if (!postponed.TryPop(out sourceAndMessage)) break;
                    } // Release the lock.  We must not hold it while calling Reserve/Consume/Release.
                    var reservedMessage = new KeyValuePair(sourceAndMessage.Value, default(T));
                    var reservedSourceAndMessage = new KeyValuePair, KeyValuePair>(sourceAndMessage.Key, reservedMessage);
                    reserved.Add(reservedSourceAndMessage);
                }
                Debug.Assert(reserved.Count <= itemCountNeededToCompleteBatch, "Expected the number of reserved sources to be <= the number needed for a batch.");
                // We've gotten as many postponed messages as we can. Try to consume them.
                if (reserved.Count > 0)
                {
                    // TriggerBatch adds a complication here.  It's possible that while we've been reserving
                    // messages, Post has been used to queue up a bunch of messages to the batch,
                    // and that if the batch has a max group count and enough messages were posted,
                    // we could now be declining.  In that case, if we don't specially handle the situation,
                    // we could consume messages that we won't be able to turn into a batch, since MaxNumberOfGroups
                    // implies the block will only ever output a maximum number of batches.  To handle this,
                    // we start declining before consuming, now that we know we'll have enough to form a batch.
                    // (If an exception occurs after we do this, we'll be shutting down the block anyway.)
                    // This is also why we still reserve/consume rather than just consume in forced mode, 
                    // so that we only consume if we're able to turn what we consume into a batch.
                    bool shouldProceedToConsume = true;
                    if (allowFewerThanBatchSize)
                    {
                        lock (IncomingLock)
                        {
                            if (!_decliningPermanently &&
                                (_batchesCompleted + 1) >= _dataflowBlockOptions.ActualMaxNumberOfGroups)
                            // Note that this logic differs from the other location where we do a similar check.
                            // Here we want to know whether we're one shy of meeting our quota, because we'll accept
                            // any size batch.  Elsewhere, we need to know whether we have the right number of messages
                            // queued up.
                            {
                                shouldProceedToConsume = !_decliningPermanently;
                                _decliningPermanently = true;
                            }
                        }
                    }
                    if (shouldProceedToConsume)
                    {
                        ConsumeReservedMessagesGreedyBounded();
                    }
                }
                // Clear out the reserved list, so as not to hold onto values longer than necessary.
                // We don't do this in case of failure, because the higher-level exception handler
                // accesses the list to try to release reservations.
                reserved.Clear();
            }
            /// 
            /// Consumes all of the reserved messages stored in the non-greedy state's temporary reserved source list.
            /// 
            private void ConsumeReservedMessagesNonGreedy()
            {
                Contract.Requires(!_dataflowBlockOptions.Greedy, "This method may only be used in non-greedy mode.");
                Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
                Contract.Requires(_nonGreedyState.ReservedSourcesTemp != null, "ReservedSourcesTemp should have been initialized.");
                Common.ContractAssertMonitorStatus(IncomingLock, held: false);
                // Consume the reserved items and store the data.
                List, KeyValuePair>> reserved = _nonGreedyState.ReservedSourcesTemp;
                for (int i = 0; i < reserved.Count; i++)
                {
                    // We can only store the data into _messages while holding the IncomingLock, we 
                    // don't want to allocate extra objects for each batch, and we don't want to 
                    // take and release the lock for each individual item... but we do need to use
                    // the consumed message rather than the initial one.  To handle this, because KeyValuePair is immutable,
                    // we store a new KVP with the newly consumed message back into the temp list, so that we can
                    // then enumerate the temp list en mass while taking the lock once afterwards.
                    KeyValuePair, KeyValuePair> sourceAndMessage = reserved[i];
                    reserved[i] = default(KeyValuePair, KeyValuePair>); // in case of exception from ConsumeMessage
                    bool consumed;
                    T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
                    if (!consumed)
                    {
                        // The protocol broke down, so throw an exception, as this is fatal.  Before doing so, though,
                        // null out all of the messages we've already consumed, as a higher-level event handler
                        // should try to release everything in the reserved list.
                        for (int prev = 0; prev < i; prev++) reserved[prev] = default(KeyValuePair, KeyValuePair>);
                        throw new InvalidOperationException(SR.InvalidOperation_FailedToConsumeReservedMessage);
                    }
                    var consumedMessage = new KeyValuePair(sourceAndMessage.Value.Key, consumedValue);
                    var consumedSourceAndMessage = new KeyValuePair, KeyValuePair>(sourceAndMessage.Key, consumedMessage);
                    reserved[i] = consumedSourceAndMessage;
                }
                lock (IncomingLock)
                {
                    // Increment the bounding count with the number of consumed messages 
                    if (_boundingState != null) _boundingState.CurrentCount += reserved.Count;
                    // Enqueue the consumed mesasages
                    foreach (KeyValuePair, KeyValuePair> sourceAndMessage in reserved)
                    {
                        _messages.Enqueue(sourceAndMessage.Value.Value);
                    }
                }
            }
            /// 
            /// Consumes all of the reserved messages stored in the non-greedy state's temporary reserved source list.
            /// 
            private void ConsumeReservedMessagesGreedyBounded()
            {
                Contract.Requires(_dataflowBlockOptions.Greedy, "This method may only be used in greedy mode.");
                Contract.Requires(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
                Contract.Requires(_nonGreedyState.ReservedSourcesTemp != null, "ReservedSourcesTemp should have been initialized.");
                Contract.Requires(_boundingState != null, "Bounded state is required for bounded mode.");
                Common.ContractAssertMonitorStatus(IncomingLock, held: false);
                // Consume the reserved items and store the data.
                int consumedCount = 0;
                List, KeyValuePair>> reserved = _nonGreedyState.ReservedSourcesTemp;
                for (int i = 0; i < reserved.Count; i++)
                {
                    // We can only store the data into _messages while holding the IncomingLock, we 
                    // don't want to allocate extra objects for each batch, and we don't want to 
                    // take and release the lock for each individual item... but we do need to use
                    // the consumed message rather than the initial one.  To handle this, because KeyValuePair is immutable,
                    // we store a new KVP with the newly consumed message back into the temp list, so that we can
                    // then enumerate the temp list en mass while taking the lock once afterwards.
                    KeyValuePair, KeyValuePair> sourceAndMessage = reserved[i];
                    reserved[i] = default(KeyValuePair, KeyValuePair>); // in case of exception from ConsumeMessage
                    bool consumed;
                    T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value.Key, _owningBatch, out consumed);
                    if (consumed)
                    {
                        var consumedMessage = new KeyValuePair(sourceAndMessage.Value.Key, consumedValue);
                        var consumedSourceAndMessage = new KeyValuePair, KeyValuePair>(sourceAndMessage.Key, consumedMessage);
                        reserved[i] = consumedSourceAndMessage;
                        // Keep track of the actually consumed messages
                        consumedCount++;
                    }
                }
                lock (IncomingLock)
                {
                    // Increment the bounding count with the number of consumed messages 
                    if (_boundingState != null) _boundingState.CurrentCount += consumedCount;
                    // Enqueue the consumed mesasages
                    foreach (KeyValuePair, KeyValuePair> sourceAndMessage in reserved)
                    {
                        // If we didn't consume this message, the KeyValuePai will be default, i.e. the source will be null
                        if (sourceAndMessage.Key != null) _messages.Enqueue(sourceAndMessage.Value.Value);
                    }
                }
            }
            /// 
            /// Releases all of the reserved messages stored in the non-greedy state's temporary reserved source list.
            /// 
            /// 
            /// Whether to allow an exception from a release to propagate immediately,
            /// or to delay propagation until all releases have been attempted.
            /// 
            [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
            internal void ReleaseReservedMessages(bool throwOnFirstException)
            {
                Common.ContractAssertMonitorStatus(IncomingLock, held: false);
                Debug.Assert(_nonGreedyState != null, "Non-greedy state is required for non-greedy mode.");
                Debug.Assert(_nonGreedyState.ReservedSourcesTemp != null, "Should have been initialized");
                List exceptions = null;
                List, KeyValuePair>> reserved = _nonGreedyState.ReservedSourcesTemp;
                for (int i = 0; i < reserved.Count; i++)
                {
                    KeyValuePair, KeyValuePair> sourceAndMessage = reserved[i];
                    reserved[i] = default(KeyValuePair, KeyValuePair>);
                    ISourceBlock source = sourceAndMessage.Key;
                    KeyValuePair message = sourceAndMessage.Value;
                    if (source != null && message.Key.IsValid)
                    {
                        try { source.ReleaseReservation(message.Key, _owningBatch); }
                        catch (Exception e)
                        {
                            if (throwOnFirstException) throw;
                            if (exceptions == null) exceptions = new List(1);
                            exceptions.Add(e);
                        }
                    }
                }
                if (exceptions != null) throw new AggregateException(exceptions);
            }
            /// Notifies the block that one or more items was removed from the queue.
            /// The number of items removed.
            internal void OnItemsRemoved(int numItemsRemoved)
            {
                Contract.Requires(numItemsRemoved > 0, "Should only be called for a positive number of items removed.");
                Common.ContractAssertMonitorStatus(IncomingLock, held: false);
                // If we're bounding, we need to know when an item is removed so that we
                // can update the count that's mirroring the actual count in the source's queue,
                // and potentially kick off processing to start consuming postponed messages.
                if (_boundingState != null)
                {
                    lock (IncomingLock)
                    {
                        // Decrement the count, which mirrors the count in the source half
                        Debug.Assert(_boundingState.CurrentCount - numItemsRemoved >= 0,
                            "It should be impossible to have a negative number of items.");
                        _boundingState.CurrentCount -= numItemsRemoved;
                        ProcessAsyncIfNecessary();
                        CompleteBlockIfPossible();
                    }
                }
            }
            /// Counts the input items in a single output item or in a list of output items.
            /// A single output item. Only considered if multipleOutputItems == null.
            /// A list of output items. May be null.
            internal static int CountItems(T[] singleOutputItem, IList multipleOutputItems)
            {
                // If multipleOutputItems == null, then singleOutputItem is the subject of counting
                if (multipleOutputItems == null) return singleOutputItem.Length;
                // multipleOutputItems != null. Count the elements in each item.
                int count = 0;
                foreach (T[] item in multipleOutputItems) count += item.Length;
                return count;
            }
            /// Gets the number of messages waiting to be processed.  This must only be used from the debugger as it avoids taking necessary locks.
            private int InputCountForDebugger { get { return _messages.Count; } }
            /// Gets information about this helper to be used for display in a debugger.
            /// Debugging information about this target.
            internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); }
            /// Gets the object to display in the debugger display attribute.
            [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
            private object DebuggerDisplayContent
            {
                get
                {
                    var displayBatch = _owningBatch as IDebuggerDisplay;
                    return string.Format("Block=\"{0}\"",
                        displayBatch != null ? displayBatch.Content : _owningBatch);
                }
            }
            /// Provides a wrapper for commonly needed debugging information.
            internal sealed class DebuggingInformation
            {
                /// The target being viewed.
                private BatchBlockTargetCore _target;
                /// Initializes the debugging helper.
                /// The target being viewed.
                public DebuggingInformation(BatchBlockTargetCore target) { _target = target; }
                /// Gets the messages waiting to be processed.
                public IEnumerable InputQueue { get { return _target._messages.ToList(); } }
                /// Gets the task being used for input processing.
                public Task TaskForInputProcessing { get { return _target._nonGreedyState != null ? _target._nonGreedyState.TaskForInputProcessing : null; } }
                /// Gets the collection of postponed messages.
                public QueuedMap, DataflowMessageHeader> PostponedMessages { get { return _target._nonGreedyState != null ? _target._nonGreedyState.PostponedMessages : null; } }
                /// Gets whether the block is declining further messages.
                public bool IsDecliningPermanently { get { return _target._decliningPermanently; } }
                /// Gets the DataflowBlockOptions used to configure this block.
                public GroupingDataflowBlockOptions DataflowBlockOptions { get { return _target._dataflowBlockOptions; } }
                /// Gets the number of batches that have been completed.
                public long NumberOfBatchesCompleted { get { return _target._batchesCompleted; } }
            }
        }
    }
}