// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ActionBlock.cs
//
//
// A target block that executes an action for each message.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
using System.Runtime.CompilerServices;
using System.Threading.Tasks.Dataflow.Internal;
namespace System.Threading.Tasks.Dataflow
{
    /// Provides a dataflow block that invokes a provided  delegate for every data element received.
    /// Specifies the type of data operated on by this .
    [DebuggerDisplay("{DebuggerDisplayContent,nq}")]
    [DebuggerTypeProxy(typeof(ActionBlock<>.DebugView))]
    public sealed class ActionBlock : ITargetBlock, IDebuggerDisplay
    {
        /// The core implementation of this message block when in default mode.
        private readonly TargetCore _defaultTarget;
        /// The core implementation of this message block when in SPSC mode.
        private readonly SpscTargetCore _spscTarget;
        /// Initializes the  with the specified .
        /// The action to invoke with each data element received.
        /// The  is null (Nothing in Visual Basic).
        public ActionBlock(Action action) :
            this((Delegate)action, ExecutionDataflowBlockOptions.Default)
        { }
        /// Initializes the  with the specified  and .
        /// The action to invoke with each data element received.
        /// The options with which to configure this .
        /// The  is null (Nothing in Visual Basic).
        /// The  is null (Nothing in Visual Basic).
        public ActionBlock(Action action, ExecutionDataflowBlockOptions dataflowBlockOptions) :
            this((Delegate)action, dataflowBlockOptions)
        { }
        /// Initializes the  with the specified .
        /// The action to invoke with each data element received.
        /// The  is null (Nothing in Visual Basic).
        public ActionBlock(Func action) :
            this((Delegate)action, ExecutionDataflowBlockOptions.Default)
        { }
        /// Initializes the  with the specified  and .
        /// The action to invoke with each data element received.
        /// The options with which to configure this .
        /// The  is null (Nothing in Visual Basic).
        /// The  is null (Nothing in Visual Basic).
        public ActionBlock(Func action, ExecutionDataflowBlockOptions dataflowBlockOptions) :
            this((Delegate)action, dataflowBlockOptions)
        { }
        /// Initializes the  with the specified delegate and options.
        /// The action to invoke with each data element received.
        /// The options with which to configure this .
        /// The  is null (Nothing in Visual Basic).
        /// The  is null (Nothing in Visual Basic).
        private ActionBlock(Delegate action, ExecutionDataflowBlockOptions dataflowBlockOptions)
        {
            // Validate arguments
            if (action == null) throw new ArgumentNullException("action");
            if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
            Contract.Ensures((_spscTarget != null) ^ (_defaultTarget != null), "One and only one of the two targets must be non-null after construction");
            Contract.EndContractBlock();
            // Ensure we have options that can't be changed by the caller
            dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
            // Based on the mode, initialize the target.  If the user specifies SingleProducerConstrained,
            // we'll try to employ an optimized mode under a limited set of circumstances.
            var syncAction = action as Action;
            if (syncAction != null &&
                dataflowBlockOptions.SingleProducerConstrained &&
                dataflowBlockOptions.MaxDegreeOfParallelism == 1 &&
                !dataflowBlockOptions.CancellationToken.CanBeCanceled &&
                dataflowBlockOptions.BoundedCapacity == DataflowBlockOptions.Unbounded)
            {
                // Initialize the SPSC fast target to handle the bulk of the processing.
                // The SpscTargetCore is only supported when BoundedCapacity, CancellationToken,
                // and MaxDOP are all their default values.  It's also only supported for sync
                // delegates and not for async delegates.
                _spscTarget = new SpscTargetCore(this, syncAction, dataflowBlockOptions);
            }
            else
            {
                // Initialize the TargetCore which handles the bulk of the processing.
                // The default target core can handle all options and delegate flavors.
                if (syncAction != null) // sync
                {
                    _defaultTarget = new TargetCore(this,
                        messageWithId => ProcessMessage(syncAction, messageWithId),
                        null, dataflowBlockOptions, TargetCoreOptions.RepresentsBlockCompletion);
                }
                else // async
                {
                    var asyncAction = action as Func;
                    Debug.Assert(asyncAction != null, "action is of incorrect delegate type");
                    _defaultTarget = new TargetCore(this,
                        messageWithId => ProcessMessageWithTask(asyncAction, messageWithId),
                        null, dataflowBlockOptions, TargetCoreOptions.RepresentsBlockCompletion | TargetCoreOptions.UsesAsyncCompletion);
                }
                // Handle async cancellation requests by declining on the target
                Common.WireCancellationToComplete(
                    dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore)state).Complete(exception: null, dropPendingMessages: true), _defaultTarget);
            }
#if FEATURE_TRACING
            DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
            if (etwLog.IsEnabled())
            {
                etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
            }
#endif
        }
        /// Processes the message with a user-provided action.
        /// The action to use to process the message.
        /// The message to be processed.
        private void ProcessMessage(Action action, KeyValuePair messageWithId)
        {
            try
            {
                action(messageWithId.Key);
            }
            catch (Exception exc)
            {
                // If this exception represents cancellation, swallow it rather than shutting down the block.
                if (!Common.IsCooperativeCancellation(exc)) throw;
            }
            finally
            {
                // We're done synchronously processing an element, so reduce the bounding count
                // that was incrementing when this element was enqueued.
                if (_defaultTarget.IsBounded) _defaultTarget.ChangeBoundingCount(-1);
            }
        }
        /// Processes the message with a user-provided action that returns a task.
        /// The action to use to process the message.
        /// The message to be processed.
        [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
        private void ProcessMessageWithTask(Func action, KeyValuePair messageWithId)
        {
            Contract.Requires(action != null, "action needed for processing");
            // Run the action to get the task that represents the operation's completion
            Task task = null;
            Exception caughtException = null;
            try
            {
                task = action(messageWithId.Key);
            }
            catch (Exception exc) { caughtException = exc; }
            // If no task is available, we're done.
            if (task == null)
            {
                // If we didn't get a task because an exception occurred,
                // store it (if the exception was cancellation, just ignore it).
                if (caughtException != null && !Common.IsCooperativeCancellation(caughtException))
                {
                    Common.StoreDataflowMessageValueIntoExceptionData(caughtException, messageWithId.Key);
                    _defaultTarget.Complete(caughtException, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: false);
                }
                // Signal that we're done this async operation.
                _defaultTarget.SignalOneAsyncMessageCompleted(boundingCountChange: -1);
                return;
            }
            else if (task.IsCompleted)
            {
                AsyncCompleteProcessMessageWithTask(task);
            }
            else
            {
                // Otherwise, join with the asynchronous operation when it completes.
                task.ContinueWith((completed, state) =>
                {
                    ((ActionBlock)state).AsyncCompleteProcessMessageWithTask(completed);
                }, this, CancellationToken.None, Common.GetContinuationOptions(TaskContinuationOptions.ExecuteSynchronously), TaskScheduler.Default);
            }
        }
        /// Completes the processing of an asynchronous message.
        /// The completed task.
        private void AsyncCompleteProcessMessageWithTask(Task completed)
        {
            Contract.Requires(completed != null, "Need completed task for processing");
            Contract.Requires(completed.IsCompleted, "The task to be processed must be completed by now.");
            // If the task faulted, store its errors. We must add the exception before declining
            // and signaling completion, as the exception is part of the operation, and the completion conditions
            // depend on this.
            if (completed.IsFaulted)
            {
                _defaultTarget.Complete(completed.Exception, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: true);
            }
            // Regardless of faults, note that we're done processing.  There are
            // no outputs to keep track of for action block, so we always decrement 
            // the bounding count here (the callee will handle checking whether
            // we're actually in a bounded mode).
            _defaultTarget.SignalOneAsyncMessageCompleted(boundingCountChange: -1);
        }
        /// 
        public void Complete()
        {
            if (_defaultTarget != null)
            {
                _defaultTarget.Complete(exception: null, dropPendingMessages: false);
            }
            else
            {
                _spscTarget.Complete(exception: null);
            }
        }
        /// 
        void IDataflowBlock.Fault(Exception exception)
        {
            if (exception == null) throw new ArgumentNullException("exception");
            Contract.EndContractBlock();
            if (_defaultTarget != null)
            {
                _defaultTarget.Complete(exception, dropPendingMessages: true);
            }
            else
            {
                _spscTarget.Complete(exception);
            }
        }
        /// 
        public Task Completion
        {
            get { return _defaultTarget != null ? _defaultTarget.Completion : _spscTarget.Completion; }
        }
        /// Posts an item to the .
        /// The item being offered to the target.
        /// true if the item was accepted by the target block; otherwise, false.
        /// 
        /// This method will return once the target block has decided to accept or decline the item,
        /// but unless otherwise dictated by special semantics of the target block, it does not wait
        /// for the item to actually be processed (for example, 
        /// will return from Post as soon as it has stored the posted item into its input queue).  From the perspective
        /// of the block's processing, Post is asynchronous. For target blocks that support postponing offered messages, 
        /// or for blocks that may do more processing in their Post implementation, consider using
        ///  SendAsync, 
        /// which will return immediately and will enable the target to postpone the posted message and later consume it 
        /// after SendAsync returns.
        /// 
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool Post(TInput item)
        {
            // Even though this method is available with the exact same functionality as an extension method
            // on ITargetBlock, using that extension method goes through an interface call on ITargetBlock,
            // which for very high-throughput scenarios shows up as noticeable overhead on certain architectures.  
            // We can eliminate that call for direct ActionBlock usage by providing the same method as an instance method.
            return _defaultTarget != null ?
                _defaultTarget.OfferMessage(Common.SingleMessageHeader, item, null, false) == DataflowMessageStatus.Accepted :
                _spscTarget.Post(item);
        }
        /// 
        DataflowMessageStatus ITargetBlock.OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock source, Boolean consumeToAccept)
        {
            return _defaultTarget != null ?
                _defaultTarget.OfferMessage(messageHeader, messageValue, source, consumeToAccept) :
                _spscTarget.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
        }
        /// 
        public int InputCount
        {
            get { return _defaultTarget != null ? _defaultTarget.InputCount : _spscTarget.InputCount; }
        }
        /// Gets the number of messages waiting to be processed. This must only be used from the debugger.
        private int InputCountForDebugger
        {
            get { return _defaultTarget != null ? _defaultTarget.GetDebuggingInformation().InputCount : _spscTarget.InputCount; }
        }
        /// 
        public override string ToString()
        {
            return Common.GetNameForDebugger(this, _defaultTarget != null ? _defaultTarget.DataflowBlockOptions : _spscTarget.DataflowBlockOptions);
        }
        /// The data to display in the debugger display attribute.
        [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
        private object DebuggerDisplayContent
        {
            get
            {
                return string.Format("{0}, InputCount={1}",
                    Common.GetNameForDebugger(this, _defaultTarget != null ? _defaultTarget.DataflowBlockOptions : _spscTarget.DataflowBlockOptions),
                    InputCountForDebugger);
            }
        }
        /// Gets the data to display in the debugger display attribute for this instance.
        object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
        /// Provides a debugger type proxy for the Call.
        private sealed class DebugView
        {
            /// The action block being viewed.
            private readonly ActionBlock _actionBlock;
            /// The action block's default target being viewed.
            private readonly TargetCore.DebuggingInformation _defaultDebugInfo;
            /// The action block's SPSC target being viewed.
            private readonly SpscTargetCore.DebuggingInformation _spscDebugInfo;
            /// Initializes the debug view.
            /// The target being debugged.
            public DebugView(ActionBlock actionBlock)
            {
                Contract.Requires(actionBlock != null, "Need a block with which to construct the debug view.");
                _actionBlock = actionBlock;
                if (_actionBlock._defaultTarget != null)
                {
                    _defaultDebugInfo = actionBlock._defaultTarget.GetDebuggingInformation();
                }
                else
                {
                    _spscDebugInfo = actionBlock._spscTarget.GetDebuggingInformation();
                }
            }
            /// Gets the messages waiting to be processed.
            public IEnumerable InputQueue
            {
                get { return _defaultDebugInfo != null ? _defaultDebugInfo.InputQueue : _spscDebugInfo.InputQueue; }
            }
            /// Gets any postponed messages.
            public QueuedMap, DataflowMessageHeader> PostponedMessages
            {
                get { return _defaultDebugInfo != null ? _defaultDebugInfo.PostponedMessages : null; }
            }
            /// Gets the number of outstanding input operations.
            public Int32 CurrentDegreeOfParallelism
            {
                get { return _defaultDebugInfo != null ? _defaultDebugInfo.CurrentDegreeOfParallelism : _spscDebugInfo.CurrentDegreeOfParallelism; }
            }
            /// Gets the ExecutionDataflowBlockOptions used to configure this block.
            public ExecutionDataflowBlockOptions DataflowBlockOptions
            {
                get { return _defaultDebugInfo != null ? _defaultDebugInfo.DataflowBlockOptions : _spscDebugInfo.DataflowBlockOptions; }
            }
            /// Gets whether the block is declining further messages.
            public bool IsDecliningPermanently
            {
                get { return _defaultDebugInfo != null ? _defaultDebugInfo.IsDecliningPermanently : _spscDebugInfo.IsDecliningPermanently; }
            }
            /// Gets whether the block is completed.
            public bool IsCompleted
            {
                get { return _defaultDebugInfo != null ? _defaultDebugInfo.IsCompleted : _spscDebugInfo.IsCompleted; }
            }
            /// Gets the block's Id.
            public int Id { get { return Common.GetBlockId(_actionBlock); } }
        }
    }
}