// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ // // BroadcastBlock.cs // // // A propagator that broadcasts incoming messages to all targets, overwriting the current // message in the process. // // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Diagnostics.Contracts; using System.Linq; using System.Security; using System.Threading.Tasks.Dataflow.Internal; namespace System.Threading.Tasks.Dataflow { /// /// Provides a buffer for storing at most one element at time, overwriting each message with the next as it arrives. /// Messages are broadcast to all linked targets, all of which may consume a clone of the message. /// /// Specifies the type of the data buffered by this dataflow block. /// /// exposes at most one element at a time. However, unlike /// , that element will be overwritten as new elements are provided /// to the block. ensures that the current element is broadcast to any /// linked targets before allowing the element to be overwritten. /// [DebuggerDisplay("{DebuggerDisplayContent,nq}")] [DebuggerTypeProxy(typeof(BroadcastBlock<>.DebugView))] public sealed class BroadcastBlock : IPropagatorBlock, IReceivableSourceBlock, IDebuggerDisplay { /// The source side. private readonly BroadcastingSourceCore _source; /// Bounding state for when the block is executing in bounded mode. private readonly BoundingStateWithPostponedAndTask _boundingState; /// Whether all future messages should be declined. private bool _decliningPermanently; /// A task has reserved the right to run the completion routine. private bool _completionReserved; /// Gets the lock used to synchronize incoming requests. private object IncomingLock { get { return _source; } } /// Initializes the with the specified cloning function. /// /// The function to use to clone the data when offered to other blocks. /// This may be null to indicate that no cloning need be performed. /// public BroadcastBlock(Func cloningFunction) : this(cloningFunction, DataflowBlockOptions.Default) { } /// Initializes the with the specified cloning function and . /// /// The function to use to clone the data when offered to other blocks. /// This may be null to indicate that no cloning need be performed. /// /// The options with which to configure this . /// The is null (Nothing in Visual Basic). public BroadcastBlock(Func cloningFunction, DataflowBlockOptions dataflowBlockOptions) { // Validate arguments if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions"); Contract.EndContractBlock(); // Ensure we have options that can't be changed by the caller dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone(); // Initialize bounding state if necessary Action onItemsRemoved = null; if (dataflowBlockOptions.BoundedCapacity > 0) { Debug.Assert(dataflowBlockOptions.BoundedCapacity > 0, "Positive bounding count expected; should have been verified by options ctor"); onItemsRemoved = OnItemsRemoved; _boundingState = new BoundingStateWithPostponedAndTask(dataflowBlockOptions.BoundedCapacity); } // Initialize the source side _source = new BroadcastingSourceCore(this, cloningFunction, dataflowBlockOptions, onItemsRemoved); // It is possible that the source half may fault on its own, e.g. due to a task scheduler exception. // In those cases we need to fault the target half to drop its buffered messages and to release its // reservations. This should not create an infinite loop, because all our implementations are designed // to handle multiple completion requests and to carry over only one. _source.Completion.ContinueWith((completed, state) => { var thisBlock = ((BroadcastBlock)state) as IDataflowBlock; Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion."); thisBlock.Fault(completed.Exception); }, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default); // Handle async cancellation requests by declining on the target Common.WireCancellationToComplete( dataflowBlockOptions.CancellationToken, _source.Completion, state => ((BroadcastBlock)state).Complete(), this); #if FEATURE_TRACING DataflowEtwProvider etwLog = DataflowEtwProvider.Log; if (etwLog.IsEnabled()) { etwLog.DataflowBlockCreated(this, dataflowBlockOptions); } #endif } /// public void Complete() { CompleteCore(exception: null, storeExceptionEvenIfAlreadyCompleting: false); } /// void IDataflowBlock.Fault(Exception exception) { if (exception == null) throw new ArgumentNullException("exception"); Contract.EndContractBlock(); CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: false); } internal void CompleteCore(Exception exception, bool storeExceptionEvenIfAlreadyCompleting, bool revertProcessingState = false) { Contract.Requires(storeExceptionEvenIfAlreadyCompleting || !revertProcessingState, "Indicating dirty processing state may only come with storeExceptionEvenIfAlreadyCompleting==true."); Contract.EndContractBlock(); lock (IncomingLock) { // Faulting from outside is allowed until we start declining permanently. // Faulting from inside is allowed at any time. if (exception != null && (!_decliningPermanently || storeExceptionEvenIfAlreadyCompleting)) { _source.AddException(exception); } // Revert the dirty processing state if requested if (revertProcessingState) { Debug.Assert(_boundingState != null && _boundingState.TaskForInputProcessing != null, "The processing state must be dirty when revertProcessingState==true."); _boundingState.TaskForInputProcessing = null; } // Trigger completion if possible _decliningPermanently = true; CompleteTargetIfPossible(); } } /// public IDisposable LinkTo(ITargetBlock target, DataflowLinkOptions linkOptions) { return _source.LinkTo(target, linkOptions); } /// public Boolean TryReceive(Predicate filter, out T item) { return _source.TryReceive(filter, out item); } /// Boolean IReceivableSourceBlock.TryReceiveAll(out IList items) { return _source.TryReceiveAll(out items); } /// public Task Completion { get { return _source.Completion; } } /// DataflowMessageStatus ITargetBlock.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock source, Boolean consumeToAccept) { // Validate arguments if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader"); if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept"); Contract.EndContractBlock(); lock (IncomingLock) { // If we've already stopped accepting messages, decline permanently if (_decliningPermanently) { CompleteTargetIfPossible(); return DataflowMessageStatus.DecliningPermanently; } // We can directly accept the message if: // 1) we are not bounding, OR // 2) we are bounding AND there is room available AND there are no postponed messages AND we are not currently processing. // (If there were any postponed messages, we would need to postpone so that ordering would be maintained.) // (We should also postpone if we are currently processing, because there may be a race between consuming postponed messages and // accepting new ones directly into the queue.) if (_boundingState == null || (_boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0 && _boundingState.TaskForInputProcessing == null)) { // Consume the message from the source if necessary if (consumeToAccept) { Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true."); bool consumed; messageValue = source.ConsumeMessage(messageHeader, this, out consumed); if (!consumed) return DataflowMessageStatus.NotAvailable; } // Once consumed, pass it to the delegate _source.AddMessage(messageValue); if (_boundingState != null) _boundingState.CurrentCount += 1; // track this new item against our bound return DataflowMessageStatus.Accepted; } // Otherwise, we try to postpone if a source was provided else if (source != null) { Debug.Assert(_boundingState != null && _boundingState.PostponedMessages != null, "PostponedMessages must have been initialized during construction in bounding mode."); _boundingState.PostponedMessages.Push(source, messageHeader); return DataflowMessageStatus.Postponed; } // We can't do anything else about this message return DataflowMessageStatus.Declined; } } /// Notifies the block that one or more items was removed from the queue. /// The number of items removed. private void OnItemsRemoved(int numItemsRemoved) { Contract.Requires(numItemsRemoved > 0, "Should only be called for a positive number of items removed."); Common.ContractAssertMonitorStatus(IncomingLock, held: false); // If we're bounding, we need to know when an item is removed so that we // can update the count that's mirroring the actual count in the source's queue, // and potentially kick off processing to start consuming postponed messages. if (_boundingState != null) { lock (IncomingLock) { // Decrement the count, which mirrors the count in the source half Debug.Assert(_boundingState.CurrentCount - numItemsRemoved >= 0, "It should be impossible to have a negative number of items."); _boundingState.CurrentCount -= numItemsRemoved; ConsumeAsyncIfNecessary(); CompleteTargetIfPossible(); } } } /// Called when postponed messages may need to be consumed. /// Whether this call is the continuation of a previous message loop. internal void ConsumeAsyncIfNecessary(bool isReplacementReplica = false) { Common.ContractAssertMonitorStatus(IncomingLock, held: true); Debug.Assert(_boundingState != null, "Must be in bounded mode."); if (!_decliningPermanently && _boundingState.TaskForInputProcessing == null && _boundingState.PostponedMessages.Count > 0 && _boundingState.CountIsLessThanBound) { // Create task and store into _taskForInputProcessing prior to scheduling the task // so that _taskForInputProcessing will be visibly set in the task loop. _boundingState.TaskForInputProcessing = new Task(state => ((BroadcastBlock)state).ConsumeMessagesLoopCore(), this, Common.GetCreationOptionsForTask(isReplacementReplica)); #if FEATURE_TRACING DataflowEtwProvider etwLog = DataflowEtwProvider.Log; if (etwLog.IsEnabled()) { etwLog.TaskLaunchedForMessageHandling( this, _boundingState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages, _boundingState.PostponedMessages.Count); } #endif // Start the task handling scheduling exceptions Exception exception = Common.StartTaskSafe(_boundingState.TaskForInputProcessing, _source.DataflowBlockOptions.TaskScheduler); if (exception != null) { // Get out from under currently held locks. Complete re-acquires the locks it needs. Task.Factory.StartNew(exc => CompleteCore(exception: (Exception)exc, storeExceptionEvenIfAlreadyCompleting: true, revertProcessingState: true), exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default); } } } /// Task body used to consume postponed messages. [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] private void ConsumeMessagesLoopCore() { Contract.Requires(_boundingState != null && _boundingState.TaskForInputProcessing != null, "May only be called in bounded mode and when a task is in flight."); Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId, "This must only be called from the in-flight processing task."); Common.ContractAssertMonitorStatus(IncomingLock, held: false); try { int maxMessagesPerTask = _source.DataflowBlockOptions.ActualMaxMessagesPerTask; for (int i = 0; i < maxMessagesPerTask && ConsumeAndStoreOneMessageIfAvailable(); i++) ; } catch (Exception exception) { // Prevent the creation of new processing tasks CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: true); } finally { lock (IncomingLock) { // We're no longer processing, so null out the processing task _boundingState.TaskForInputProcessing = null; // However, we may have given up early because we hit our own configured // processing limits rather than because we ran out of work to do. If that's // the case, make sure we spin up another task to keep going. ConsumeAsyncIfNecessary(isReplacementReplica: true); // If, however, we stopped because we ran out of work to do and we // know we'll never get more, then complete. CompleteTargetIfPossible(); } } } /// /// Retrieves one postponed message if there's room and if we can consume a postponed message. /// Stores any consumed message into the source half. /// /// true if a message could be consumed and stored; otherwise, false. /// This must only be called from the asynchronous processing loop. private bool ConsumeAndStoreOneMessageIfAvailable() { Contract.Requires(_boundingState != null && _boundingState.TaskForInputProcessing != null, "May only be called in bounded mode and when a task is in flight."); Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId, "This must only be called from the in-flight processing task."); Common.ContractAssertMonitorStatus(IncomingLock, held: false); // Loop through the postponed messages until we get one. while (true) { // Get the next item to retrieve. If there are no more, bail. KeyValuePair, DataflowMessageHeader> sourceAndMessage; lock (IncomingLock) { if (!_boundingState.CountIsLessThanBound) return false; if (!_boundingState.PostponedMessages.TryPop(out sourceAndMessage)) return false; // Optimistically assume we're going to get the item. This avoids taking the lock // again if we're right. If we're wrong, we decrement it later under lock. _boundingState.CurrentCount++; } // Consume the item bool consumed = false; try { T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed); if (consumed) { _source.AddMessage(consumedValue); return true; } } finally { // We didn't get the item, so decrement the count to counteract our optimistic assumption. if (!consumed) { lock (IncomingLock) _boundingState.CurrentCount--; } } } } /// Completes the target, notifying the source, once all completion conditions are met. private void CompleteTargetIfPossible() { Common.ContractAssertMonitorStatus(IncomingLock, held: true); if (_decliningPermanently && !_completionReserved && (_boundingState == null || _boundingState.TaskForInputProcessing == null)) { _completionReserved = true; // If we're in bounding mode and we have any postponed messages, we need to clear them, // which means calling back to the source, which means we need to escape the incoming lock. if (_boundingState != null && _boundingState.PostponedMessages.Count > 0) { Task.Factory.StartNew(state => { var thisBroadcastBlock = (BroadcastBlock)state; // Release any postponed messages List exceptions = null; if (thisBroadcastBlock._boundingState != null) { // Note: No locks should be held at this point Common.ReleaseAllPostponedMessages(thisBroadcastBlock, thisBroadcastBlock._boundingState.PostponedMessages, ref exceptions); } if (exceptions != null) { // It is important to migrate these exceptions to the source part of the owning batch, // because that is the completion task that is publically exposed. thisBroadcastBlock._source.AddExceptions(exceptions); } thisBroadcastBlock._source.Complete(); }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default); } // Otherwise, we can just decline the source directly. else { _source.Complete(); } } } /// T ISourceBlock.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock target, out Boolean messageConsumed) { return _source.ConsumeMessage(messageHeader, target, out messageConsumed); } /// bool ISourceBlock.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock target) { return _source.ReserveMessage(messageHeader, target); } /// void ISourceBlock.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock target) { _source.ReleaseReservation(messageHeader, target); } /// Gets a value to be used for the DebuggerDisplayAttribute. This must not throw even if HasValue is false. private bool HasValueForDebugger { get { return _source.GetDebuggingInformation().HasValue; } } /// Gets a value to be used for the DebuggerDisplayAttribute. This must not throw even if HasValue is false. private T ValueForDebugger { get { return _source.GetDebuggingInformation().Value; } } /// public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); } /// The data to display in the debugger display attribute. [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")] private object DebuggerDisplayContent { get { return string.Format("{0}, HasValue={1}, Value={2}", Common.GetNameForDebugger(this, _source.DataflowBlockOptions), HasValueForDebugger, ValueForDebugger); } } /// Gets the data to display in the debugger display attribute for this instance. object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } } /// Provides a debugger type proxy for the BroadcastBlock. private sealed class DebugView { /// The BroadcastBlock being debugged. private readonly BroadcastBlock _broadcastBlock; /// Debug info about the source side of the broadcast. private readonly BroadcastingSourceCore.DebuggingInformation _sourceDebuggingInformation; /// Initializes the debug view. /// The BroadcastBlock being debugged. public DebugView(BroadcastBlock broadcastBlock) { Contract.Requires(broadcastBlock != null, "Need a block with which to construct the debug view."); _broadcastBlock = broadcastBlock; _sourceDebuggingInformation = broadcastBlock._source.GetDebuggingInformation(); } /// Gets the messages waiting to be processed. public IEnumerable InputQueue { get { return _sourceDebuggingInformation.InputQueue; } } /// Gets whether the broadcast has a current value. public bool HasValue { get { return _broadcastBlock.HasValueForDebugger; } } /// Gets the broadcast's current value. public T Value { get { return _broadcastBlock.ValueForDebugger; } } /// Gets the task being used for output processing. public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } } /// Gets the DataflowBlockOptions used to configure this block. public DataflowBlockOptions DataflowBlockOptions { get { return _sourceDebuggingInformation.DataflowBlockOptions; } } /// Gets whether the block is declining further messages. public bool IsDecliningPermanently { get { return _broadcastBlock._decliningPermanently; } } /// Gets whether the block is completed. public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } } /// Gets the block's Id. public int Id { get { return Common.GetBlockId(_broadcastBlock); } } /// Gets the set of all targets linked from this block. public TargetRegistry LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } } /// Gets the set of all targets linked from this block. public ITargetBlock NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } } } /// Provides a core implementation for blocks that implement . /// Specifies the type of data supplied by the . [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] [DebuggerDisplay("{DebuggerDisplayContent,nq}")] private sealed class BroadcastingSourceCore { /// A registry used to store all linked targets and information about them. private readonly TargetRegistry _targetRegistry; /// All of the output messages queued up to be received by consumers/targets. private readonly Queue _messages = new Queue(); /// A TaskCompletionSource that represents the completion of this block. private readonly TaskCompletionSource _completionTask = new TaskCompletionSource(); /// /// An action to be invoked on the owner block when an item is removed. /// This may be null if the owner block doesn't need to be notified. /// private readonly Action _itemsRemovedAction; /// Gets the object to use as the outgoing lock. private object OutgoingLock { get { return _completionTask; } } /// Gets the object to use as the value lock. private object ValueLock { get { return _targetRegistry; } } /// The source utilize this helper. private readonly BroadcastBlock _owningSource; /// The options used to configure this block's execution. private readonly DataflowBlockOptions _dataflowBlockOptions; /// The cloning function to use. private readonly Func _cloningFunction; /// An indicator whether _currentMessage has a value. private bool _currentMessageIsValid; /// The message currently being broadcast. private TOutput _currentMessage; /// The target that the next message is reserved for, or null if nothing is reserved. private ITargetBlock _nextMessageReservedFor; /// Whether this block should again attempt to offer messages to targets. private bool _enableOffering; /// Whether all future messages should be declined. private bool _decliningPermanently; /// The task used to process the output and offer it to targets. private Task _taskForOutputProcessing; /// Exceptions that may have occurred and gone unhandled during processing. private List _exceptions; /// Counter for message IDs unique within this source block. private long _nextMessageId = 1; // We are going to use this value before incrementing. /// Whether someone has reserved the right to call CompleteBlockOncePossible. private bool _completionReserved; /// Initializes the source core. /// The source utilizing this core. /// The function to use to clone the data when offered to other blocks. May be null. /// The options to use to configure the block. /// Action to invoke when an item is removed. internal BroadcastingSourceCore( BroadcastBlock owningSource, Func cloningFunction, DataflowBlockOptions dataflowBlockOptions, Action itemsRemovedAction) { Contract.Requires(owningSource != null, "Must be associated with a broadcast block."); Contract.Requires(dataflowBlockOptions != null, "Options are required to configure this block."); // Store the arguments _owningSource = owningSource; _cloningFunction = cloningFunction; _dataflowBlockOptions = dataflowBlockOptions; _itemsRemovedAction = itemsRemovedAction; // Construct members that depend on the arguments _targetRegistry = new TargetRegistry(_owningSource); } /// internal Boolean TryReceive(Predicate filter, out TOutput item) { // Take the lock only long enough to get the message, // synchronizing with other activities on the block. // We don't want to execute the user-provided cloning delegate // while holding the lock. TOutput message; bool isValid; lock (OutgoingLock) { lock (ValueLock) { message = _currentMessage; isValid = _currentMessageIsValid; } } // Clone and hand back a message if we have one and if it passes the filter. // (A null filter means all messages pass.) if (isValid && (filter == null || filter(message))) { item = CloneItem(message); return true; } else { item = default(TOutput); return false; } } /// internal Boolean TryReceiveAll(out IList items) { // Try to receive the one item this block may have. // If we can, give back an array of one item. Otherwise, give back null. TOutput item; if (TryReceive(null, out item)) { items = new TOutput[] { item }; return true; } else { items = null; return false; } } /// Adds a message to the source block for propagation. /// The item to be wrapped in a message to be added. internal void AddMessage(TOutput item) { // This method must not take the outgoing lock, as it will be called in situations // where a derived type's incoming lock is held. The lock leveling structure // we're employing is such that outgoing may be held while acquiring incoming, but // of course not the other way around. This is the reason why DataflowSourceBlock // needs ValueLock as well. Otherwise, it would be pure overhead. lock (ValueLock) { if (_decliningPermanently) return; _messages.Enqueue(item); if (_messages.Count == 1) _enableOffering = true; OfferAsyncIfNecessary(); } } /// Informs the block that it will not be receiving additional messages. internal void Complete() { lock (ValueLock) { _decliningPermanently = true; // Complete may be called in a context where an incoming lock is held. We need to // call CompleteBlockIfPossible, but we can't do so if the incoming lock is held. // However, now that _decliningPermanently has been set, the timing of // CompleteBlockIfPossible doesn't matter, so we schedule it to run asynchronously // and take the necessary locks in a situation where we're sure it won't cause a problem. Task.Factory.StartNew(state => { var thisSourceCore = (BroadcastingSourceCore)state; lock (thisSourceCore.OutgoingLock) { lock (thisSourceCore.ValueLock) { thisSourceCore.CompleteBlockIfPossible(); } } }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default); } } /// Clones the item. /// The item to clone. /// The cloned item. private TOutput CloneItem(TOutput item) { return _cloningFunction != null ? _cloningFunction(item) : item; } /// Offers the current message to a specific target. /// The target to which to offer the current message. private void OfferCurrentMessageToNewTarget(ITargetBlock target) { Contract.Requires(target != null, "Target required to offer messages to."); Common.ContractAssertMonitorStatus(OutgoingLock, held: true); Common.ContractAssertMonitorStatus(ValueLock, held: false); // Get the current message if there is one TOutput currentMessage; bool isValid; lock (ValueLock) { currentMessage = _currentMessage; isValid = _currentMessageIsValid; } // If there is no valid message yet, there is nothing to offer if (!isValid) return; // Offer it to the target. // We must not increment the message ID here. We only do that when we populate _currentMessage, i.e. when we dequeue. bool useCloning = _cloningFunction != null; DataflowMessageStatus result = target.OfferMessage(new DataflowMessageHeader(_nextMessageId), currentMessage, _owningSource, consumeToAccept: useCloning); // If accepted and the target was linked as "unlinkAfterOne", remove it if (result == DataflowMessageStatus.Accepted) { if (!useCloning) { // If accepted and the target was linked as "once", mark it for removal. // If we were forcing consumption, this removal would have already // happened in ConsumeMessage. _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true); } } // If declined permanently, remove it else if (result == DataflowMessageStatus.DecliningPermanently) { _targetRegistry.Remove(target); } else Debug.Assert(result != DataflowMessageStatus.NotAvailable, "Messages from a Broadcast should never be missed."); } /// Offers messages to targets. private bool OfferToTargets() { Common.ContractAssertMonitorStatus(OutgoingLock, held: true); Common.ContractAssertMonitorStatus(ValueLock, held: false); DataflowMessageHeader header = default(DataflowMessageHeader); TOutput message = default(TOutput); int numDequeuedMessages = 0; lock (ValueLock) { // If there's a reservation or there aren't any more messages, // there's nothing for us to do. If there's no reservation // and a message is available, dequeue the next one and store it // as the new current. If we're now at 0 message, disable further // propagation until more messages arrive. if (_nextMessageReservedFor == null && _messages.Count > 0) { // If there are no targets registered, we might as well empty out the broadcast, // keeping just the last. Otherwise, it'll happen anyway, but much more expensively. if (_targetRegistry.FirstTargetNode == null) { while (_messages.Count > 1) { _messages.Dequeue(); numDequeuedMessages++; } } // Get the next message to offer Debug.Assert(_messages.Count > 0, "There must be at least one message to dequeue."); _currentMessage = message = _messages.Dequeue(); numDequeuedMessages++; _currentMessageIsValid = true; header = new DataflowMessageHeader(++_nextMessageId); if (_messages.Count == 0) _enableOffering = false; } else { _enableOffering = false; return false; } } // must not hold ValueLock when calling out to targets // Offer the message if (header.IsValid) { // Notify the owner block that our count has decreased if (_itemsRemovedAction != null) _itemsRemovedAction(numDequeuedMessages); // Offer it to each target, unless a soleTarget was provided, which case just offer it to that one. TargetRegistry.LinkedTargetInfo cur = _targetRegistry.FirstTargetNode; while (cur != null) { // Note that during OfferMessage, a target may call ConsumeMessage, which may unlink the target // if the target is registered as "once". Doing so will remove the target from the targets list. // As such, we avoid using an enumerator over _targetRegistry and instead walk from back to front, // so that if an element is removed, it won't affect the rest of our walk. TargetRegistry.LinkedTargetInfo next = cur.Next; ITargetBlock target = cur.Target; OfferMessageToTarget(header, message, target); cur = next; } } return true; } /// Offers the specified message to the specified target. /// The header of the message to offer. /// The message to offer. /// The target to which the message should be offered. /// /// This will remove the target from the target registry if the result of the propagation demands it. /// private void OfferMessageToTarget(DataflowMessageHeader header, TOutput message, ITargetBlock target) { Common.ContractAssertMonitorStatus(OutgoingLock, held: true); Common.ContractAssertMonitorStatus(ValueLock, held: false); // Offer the message. If there's a cloning function, we force the target to // come back to us to consume the message, allowing us the opportunity to run // the cloning function once we know they want the data. If there is no cloning // function, there's no reason for them to call back here. bool useCloning = _cloningFunction != null; switch (target.OfferMessage(header, message, _owningSource, consumeToAccept: useCloning)) { case DataflowMessageStatus.Accepted: if (!useCloning) { // If accepted and the target was linked as "once", mark it for removal. // If we were forcing consumption, this removal would have already // happened in ConsumeMessage. _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true); } break; case DataflowMessageStatus.DecliningPermanently: // If declined permanently, mark the target for removal _targetRegistry.Remove(target); break; case DataflowMessageStatus.NotAvailable: Debug.Assert(false, "Messages from a Broadcast should never be missed."); break; // No action required for Postponed or Declined } } /// Called when we want to enable asynchronously offering message to targets. /// Whether this call is the continuation of a previous message loop. private void OfferAsyncIfNecessary(bool isReplacementReplica = false) { Common.ContractAssertMonitorStatus(ValueLock, held: true); // This method must not take the OutgoingLock. bool currentlyProcessing = _taskForOutputProcessing != null; bool processingToDo = _enableOffering && _messages.Count > 0; // If there's any work to be done... if (!currentlyProcessing && processingToDo && !CanceledOrFaulted) { // Create task and store into _taskForOutputProcessing prior to scheduling the task // so that _taskForOutputProcessing will be visibly set in the task loop. _taskForOutputProcessing = new Task(thisSourceCore => ((BroadcastingSourceCore)thisSourceCore).OfferMessagesLoopCore(), this, Common.GetCreationOptionsForTask(isReplacementReplica)); #if FEATURE_TRACING DataflowEtwProvider etwLog = DataflowEtwProvider.Log; if (etwLog.IsEnabled()) { etwLog.TaskLaunchedForMessageHandling( _owningSource, _taskForOutputProcessing, DataflowEtwProvider.TaskLaunchedReason.OfferingOutputMessages, _messages.Count); } #endif // Start the task handling scheduling exceptions Exception exception = Common.StartTaskSafe(_taskForOutputProcessing, _dataflowBlockOptions.TaskScheduler); if (exception != null) { // First, log the exception while the processing state is dirty which is preventing the block from completing. // Then revert the proactive processing state changes. // And last, try to complete the block. AddException(exception); _decliningPermanently = true; _taskForOutputProcessing = null; // Get out from under currently held locks - ValueLock is taken, but OutgoingLock may not be. // Re-take the locks on a separate thread. Task.Factory.StartNew(state => { var thisSourceCore = (BroadcastingSourceCore)state; lock (thisSourceCore.OutgoingLock) { lock (thisSourceCore.ValueLock) { thisSourceCore.CompleteBlockIfPossible(); } } }, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default); } } } /// Task body used to process messages. [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] private void OfferMessagesLoopCore() { try { int maxMessagesPerTask = _dataflowBlockOptions.ActualMaxMessagesPerTask; lock (OutgoingLock) { // Offer as many messages as we can for (int counter = 0; counter < maxMessagesPerTask && !CanceledOrFaulted; counter++) { if (!OfferToTargets()) break; } } } catch (Exception exception) { _owningSource.CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: true); } finally { lock (OutgoingLock) { lock (ValueLock) { // We're no longer processing, so null out the processing task _taskForOutputProcessing = null; // However, we may have given up early because we hit our own configured // processing limits rather than because we ran out of work to do. If that's // the case, make sure we spin up another task to keep going. OfferAsyncIfNecessary(isReplacementReplica: true); // If, however, we stopped because we ran out of work to do and we // know we'll never get more, then complete. CompleteBlockIfPossible(); } } } } /// Completes the block's processing if there's nothing left to do and never will be. private void CompleteBlockIfPossible() { Common.ContractAssertMonitorStatus(OutgoingLock, held: true); Common.ContractAssertMonitorStatus(ValueLock, held: true); if (!_completionReserved) { bool currentlyProcessing = _taskForOutputProcessing != null; bool noMoreMessages = _decliningPermanently && _messages.Count == 0; // Are we done forever? bool complete = !currentlyProcessing && (noMoreMessages || CanceledOrFaulted); if (complete) { CompleteBlockIfPossible_Slow(); } } } /// /// Slow path for CompleteBlockIfPossible. /// Separating out the slow path into its own method makes it more likely that the fast path method will get inlined. /// private void CompleteBlockIfPossible_Slow() { Contract.Requires(_taskForOutputProcessing == null, "There must be no processing tasks."); Contract.Requires( (_decliningPermanently && _messages.Count == 0) || CanceledOrFaulted, "There must be no more messages or the block must be canceled or faulted."); _completionReserved = true; // Run asynchronously to get out of the currently held locks Task.Factory.StartNew(thisSourceCore => ((BroadcastingSourceCore)thisSourceCore).CompleteBlockOncePossible(), this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default); } /// /// Completes the block. This must only be called once, and only once all of the completion conditions are met. /// As such, it must only be called from CompleteBlockIfPossible. /// private void CompleteBlockOncePossible() { TargetRegistry.LinkedTargetInfo linkedTargets; List exceptions; // Clear out the target registry and buffers to help avoid memory leaks. // We do not clear _currentMessage, which should remain as that message forever. lock (OutgoingLock) { // Save the linked list of targets so that it could be traversed later to propagate completion linkedTargets = _targetRegistry.ClearEntryPoints(); lock (ValueLock) { _messages.Clear(); // Save a local reference to the exceptions list and null out the field, // so that if the target side tries to add an exception this late, // it will go to a separate list (that will be ignored.) exceptions = _exceptions; _exceptions = null; } } // If it's due to an exception, finish in a faulted state if (exceptions != null) { _completionTask.TrySetException(exceptions); } // It's due to cancellation, finish in a canceled state else if (_dataflowBlockOptions.CancellationToken.IsCancellationRequested) { _completionTask.TrySetCanceled(); } // Otherwise, finish in a successful state. else { _completionTask.TrySetResult(default(VoidResult)); } // Now that the completion task is completed, we may propagate completion to the linked targets _targetRegistry.PropagateCompletion(linkedTargets); #if FEATURE_TRACING DataflowEtwProvider etwLog = DataflowEtwProvider.Log; if (etwLog.IsEnabled()) { etwLog.DataflowBlockCompleted(_owningSource); } #endif } /// [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] internal IDisposable LinkTo(ITargetBlock target, DataflowLinkOptions linkOptions) { // Validate arguments if (target == null) throw new ArgumentNullException("target"); if (linkOptions == null) throw new ArgumentNullException("linkOptions"); Contract.EndContractBlock(); lock (OutgoingLock) { // If we've completed or completion has at least started, offer the message to this target, // and propagate completion if that was requested. // Then there's nothing more to be done. if (_completionReserved) { OfferCurrentMessageToNewTarget(target); if (linkOptions.PropagateCompletion) Common.PropagateCompletionOnceCompleted(_completionTask.Task, target); return Disposables.Nop; } // Otherwise, add the target and then offer it the current // message. We do this in this order because offering may // cause the target to be removed if it's unlinkAfterOne, // and in the reverse order we would end up adding the target // after it was "removed". _targetRegistry.Add(ref target, linkOptions); OfferCurrentMessageToNewTarget(target); return Common.CreateUnlinker(OutgoingLock, _targetRegistry, target); } } /// internal TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock target, out Boolean messageConsumed) { // Validate arguments if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader"); if (target == null) throw new ArgumentNullException("target"); Contract.EndContractBlock(); TOutput valueToClone; lock (OutgoingLock) // We may currently be calling out under this lock to the target; requires it to be reentrant { lock (ValueLock) { // If this isn't the next message to be served up, bail if (messageHeader.Id != _nextMessageId) { messageConsumed = false; return default(TOutput); } // If the caller has the reservation, release the reservation. // We still allow others to take the message if there's a reservation. if (_nextMessageReservedFor == target) { _nextMessageReservedFor = null; _enableOffering = true; } _targetRegistry.Remove(target, onlyIfReachedMaxMessages: true); OfferAsyncIfNecessary(); CompleteBlockIfPossible(); // Return a clone of the consumed message. valueToClone = _currentMessage; } } messageConsumed = true; return CloneItem(valueToClone); } /// internal Boolean ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock target) { // Validate arguments if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader"); if (target == null) throw new ArgumentNullException("target"); Contract.EndContractBlock(); lock (OutgoingLock) { // If no one currently holds a reservation... if (_nextMessageReservedFor == null) { lock (ValueLock) { // ...and the requested message is next in line, allow it if (messageHeader.Id == _nextMessageId) { _nextMessageReservedFor = target; _enableOffering = false; return true; } } } } return false; } /// internal void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock target) { // Validate arguments if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader"); if (target == null) throw new ArgumentNullException("target"); Contract.EndContractBlock(); lock (OutgoingLock) { // If someone else holds the reservation, bail. if (_nextMessageReservedFor != target) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget); TOutput messageToReoffer; lock (ValueLock) { // If this is not the message at the head of the queue, bail if (messageHeader.Id != _nextMessageId) throw new InvalidOperationException(SR.InvalidOperation_MessageNotReservedByTarget); // Otherwise, release the reservation, and reoffer the message to all targets. _nextMessageReservedFor = null; _enableOffering = true; messageToReoffer = _currentMessage; OfferAsyncIfNecessary(); } // We need to explicitly reoffer this message to the releaser, // as otherwise if the target has join behavior it could end up waiting for an offer from // this broadcast forever, even though data is in fact available. We could only // do this if _messages.Count == 0, as if it's > 0 the message will get overwritten // as part of the asynchronous offering, but for consistency we should always reoffer // the current message. OfferMessageToTarget(messageHeader, messageToReoffer, target); } } /// Gets whether the source has had cancellation requested or an exception has occurred. private bool CanceledOrFaulted { get { // Cancellation is honored as soon as the CancellationToken has been signaled. // Faulting is honored after an exception has been encountered and the owning block // has invoked Complete on us. return _dataflowBlockOptions.CancellationToken.IsCancellationRequested || (Volatile.Read(ref _exceptions) != null && _decliningPermanently); } } /// Adds an individual exceptionto this source. /// The exception to add internal void AddException(Exception exception) { Contract.Requires(exception != null, "An exception to add is required."); Contract.Requires(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions."); lock (ValueLock) { Common.AddException(ref _exceptions, exception); } } /// Adds exceptions to this source. /// The exceptions to add internal void AddExceptions(List exceptions) { Contract.Requires(exceptions != null, "A list of exceptions to add is required."); Contract.Requires(!Completion.IsCompleted || Completion.IsFaulted, "The block must either not be completed or be faulted if we're still storing exceptions."); lock (ValueLock) { foreach (Exception exception in exceptions) { Common.AddException(ref _exceptions, exception); } } } /// internal Task Completion { get { return _completionTask.Task; } } /// Gets the DataflowBlockOptions used to configure this block. internal DataflowBlockOptions DataflowBlockOptions { get { return _dataflowBlockOptions; } } /// Gets the object to display in the debugger display attribute. [SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")] private object DebuggerDisplayContent { get { var displaySource = _owningSource as IDebuggerDisplay; return string.Format("Block=\"{0}\"", displaySource != null ? displaySource.Content : _owningSource); } } /// Gets information about this helper to be used for display in a debugger. /// Debugging information about this source core. internal DebuggingInformation GetDebuggingInformation() { return new DebuggingInformation(this); } /// Provides debugging information about the source core. internal sealed class DebuggingInformation { /// The source being viewed. private BroadcastingSourceCore _source; /// Initializes the type proxy. /// The source being viewed. public DebuggingInformation(BroadcastingSourceCore source) { _source = source; } /// Gets whether the source contains a current message. public bool HasValue { get { return _source._currentMessageIsValid; } } /// Gets the value of the source's current message. public TOutput Value { get { return _source._currentMessage; } } /// Gets the number of messages waiting to be made current. public int InputCount { get { return _source._messages.Count; } } /// Gets the messages available for receiving. public IEnumerable InputQueue { get { return _source._messages.ToList(); } } /// Gets the task being used for output processing. public Task TaskForOutputProcessing { get { return _source._taskForOutputProcessing; } } /// Gets the DataflowBlockOptions used to configure this block. public DataflowBlockOptions DataflowBlockOptions { get { return _source._dataflowBlockOptions; } } /// Gets whether the block is declining further messages. public bool IsDecliningPermanently { get { return _source._decliningPermanently; } } /// Gets whether the block is completed. public bool IsCompleted { get { return _source.Completion.IsCompleted; } } /// Gets the set of all targets linked from this block. public TargetRegistry LinkedTargets { get { return _source._targetRegistry; } } /// Gets the target that holds a reservation on the next message, if any. public ITargetBlock NextMessageReservedFor { get { return _source._nextMessageReservedFor; } } } } } }