Imported Upstream version 5.16.0.100

Former-commit-id: 38faa55fb9669e35e7d8448b15c25dc447f25767
This commit is contained in:
Xamarin Public Jenkins (auto-signing)
2018-08-07 15:19:03 +00:00
parent 0a9828183b
commit 7d7f676260
4419 changed files with 170950 additions and 90273 deletions

View File

@ -1 +0,0 @@
a083e9d9bebedacd697da65dcdcec0bf21c9666d

View File

@ -1,414 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// DataflowBlockOptions.cs
//
//
// DataflowBlockOptions types for configuring dataflow blocks
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>
/// Provides options used to configure the processing performed by dataflow blocks.
/// </summary>
/// <remarks>
/// <see cref="DataflowBlockOptions"/> is mutable and can be configured through its properties.
/// When specific configuration options are not set, the following defaults are used:
/// <list type="table">
/// <listheader>
/// <term>Options</term>
/// <description>Default</description>
/// </listheader>
/// <item>
/// <term>TaskScheduler</term>
/// <description><see cref="System.Threading.Tasks.TaskScheduler.Default"/></description>
/// </item>
/// <item>
/// <term>MaxMessagesPerTask</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>CancellationToken</term>
/// <description><see cref="System.Threading.CancellationToken.None"/></description>
/// </item>
/// <item>
/// <term>BoundedCapacity</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>NameFormat</term>
/// <description>"{0} Id={1}"</description>
/// </item>
/// </list>
/// Dataflow blocks capture the state of the options at their construction. Subsequent changes
/// to the provided <see cref="DataflowBlockOptions"/> instance should not affect the behavior
/// of a dataflow block.
/// </remarks>
[DebuggerDisplay("TaskScheduler = {TaskScheduler}, MaxMessagesPerTask = {MaxMessagesPerTask}, BoundedCapacity = {BoundedCapacity}")]
public class DataflowBlockOptions
{
/// <summary>
/// A constant used to specify an unlimited quantity for <see cref="DataflowBlockOptions"/> members
/// that provide an upper bound. This field is constant.
/// </summary>
public const Int32 Unbounded = -1;
/// <summary>The scheduler to use for scheduling tasks to process messages.</summary>
private TaskScheduler _taskScheduler = TaskScheduler.Default;
/// <summary>The cancellation token to monitor for cancellation requests.</summary>
private CancellationToken _cancellationToken = CancellationToken.None;
/// <summary>The maximum number of messages that may be processed per task.</summary>
private Int32 _maxMessagesPerTask = Unbounded;
/// <summary>The maximum number of messages that may be buffered by the block.</summary>
private Int32 _boundedCapacity = Unbounded;
/// <summary>The name format to use for creating a name for a block.</summary>
private string _nameFormat = "{0} Id={1}"; // see NameFormat property for a description of format items
/// <summary>A default instance of <see cref="DataflowBlockOptions"/>.</summary>
/// <remarks>
/// Do not change the values of this instance. It is shared by all of our blocks when no options are provided by the user.
/// </remarks>
internal static readonly DataflowBlockOptions Default = new DataflowBlockOptions();
/// <summary>Returns this <see cref="DataflowBlockOptions"/> instance if it's the default instance or else a cloned instance.</summary>
/// <returns>An instance of the options that may be cached by the block.</returns>
internal DataflowBlockOptions DefaultOrClone()
{
return (this == Default) ?
this :
new DataflowBlockOptions
{
TaskScheduler = this.TaskScheduler,
CancellationToken = this.CancellationToken,
MaxMessagesPerTask = this.MaxMessagesPerTask,
BoundedCapacity = this.BoundedCapacity,
NameFormat = this.NameFormat
};
}
/// <summary>Initializes the <see cref="DataflowBlockOptions"/>.</summary>
public DataflowBlockOptions() { }
/// <summary>Gets or sets the <see cref="System.Threading.Tasks.TaskScheduler"/> to use for scheduling tasks.</summary>
public TaskScheduler TaskScheduler
{
get { return _taskScheduler; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
if (value == null) throw new ArgumentNullException("value");
_taskScheduler = value;
}
}
/// <summary>Gets or sets the <see cref="System.Threading.CancellationToken"/> to monitor for cancellation requests.</summary>
public CancellationToken CancellationToken
{
get { return _cancellationToken; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
_cancellationToken = value;
}
}
/// <summary>Gets or sets the maximum number of messages that may be processed per task.</summary>
public Int32 MaxMessagesPerTask
{
get { return _maxMessagesPerTask; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
if (value < 1 && value != Unbounded) throw new ArgumentOutOfRangeException("value");
_maxMessagesPerTask = value;
}
}
/// <summary>Gets a MaxMessagesPerTask value that may be used for comparison purposes.</summary>
/// <returns>The maximum value, usable for comparison purposes.</returns>
/// <remarks>Unlike MaxMessagesPerTask, this property will always return a positive value.</remarks>
internal Int32 ActualMaxMessagesPerTask
{
get { return (_maxMessagesPerTask == Unbounded) ? Int32.MaxValue : _maxMessagesPerTask; }
}
/// <summary>Gets or sets the maximum number of messages that may be buffered by the block.</summary>
public Int32 BoundedCapacity
{
get { return _boundedCapacity; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
if (value < 1 && value != Unbounded) throw new ArgumentOutOfRangeException("value");
_boundedCapacity = value;
}
}
/// <summary>
/// Gets or sets the format string to use when a block is queried for its name.
/// </summary>
/// <remarks>
/// The name format may contain up to two format items. {0} will be substituted
/// with the block's name. {1} will be substituted with the block's Id, as is
/// returned from the block's Completion.Id property.
/// </remarks>
public string NameFormat
{
get { return _nameFormat; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
if (value == null) throw new ArgumentNullException("value");
_nameFormat = value;
}
}
}
/// <summary>
/// Provides options used to configure the processing performed by dataflow blocks that
/// process each message through the invocation of a user-provided delegate, blocks such
/// as <see cref="ActionBlock{T}"/> and <see cref="TransformBlock{TInput,TOutput}"/>.
/// </summary>
/// <remarks>
/// <see cref="ExecutionDataflowBlockOptions"/> is mutable and can be configured through its properties.
/// When specific configuration options are not set, the following defaults are used:
/// <list type="table">
/// <listheader>
/// <term>Options</term>
/// <description>Default</description>
/// </listheader>
/// <item>
/// <term>TaskScheduler</term>
/// <description><see cref="System.Threading.Tasks.TaskScheduler.Default"/></description>
/// </item>
/// <item>
/// <term>CancellationToken</term>
/// <description><see cref="System.Threading.CancellationToken.None"/></description>
/// </item>
/// <item>
/// <term>MaxMessagesPerTask</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>BoundedCapacity</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>NameFormat</term>
/// <description>"{0} Id={1}"</description>
/// </item>
/// <item>
/// <term>MaxDegreeOfParallelism</term>
/// <description>1</description>
/// </item>
/// <item>
/// <term>SingleProducerConstrained</term>
/// <description>false</description>
/// </item>
/// </list>
/// Dataflow block captures the state of the options at their construction. Subsequent changes
/// to the provided <see cref="ExecutionDataflowBlockOptions"/> instance should not affect the behavior
/// of a dataflow block.
/// </remarks>
[DebuggerDisplay("TaskScheduler = {TaskScheduler}, MaxMessagesPerTask = {MaxMessagesPerTask}, BoundedCapacity = {BoundedCapacity}, MaxDegreeOfParallelism = {MaxDegreeOfParallelism}")]
public class ExecutionDataflowBlockOptions : DataflowBlockOptions
{
/// <summary>A default instance of <see cref="DataflowBlockOptions"/>.</summary>
/// <remarks>
/// Do not change the values of this instance. It is shared by all of our blocks when no options are provided by the user.
/// </remarks>
internal new static readonly ExecutionDataflowBlockOptions Default = new ExecutionDataflowBlockOptions();
/// <summary>Returns this <see cref="ExecutionDataflowBlockOptions"/> instance if it's the default instance or else a cloned instance.</summary>
/// <returns>An instance of the options that may be cached by the block.</returns>
internal new ExecutionDataflowBlockOptions DefaultOrClone()
{
return (this == Default) ?
this :
new ExecutionDataflowBlockOptions
{
TaskScheduler = this.TaskScheduler,
CancellationToken = this.CancellationToken,
MaxMessagesPerTask = this.MaxMessagesPerTask,
BoundedCapacity = this.BoundedCapacity,
NameFormat = this.NameFormat,
MaxDegreeOfParallelism = this.MaxDegreeOfParallelism,
SingleProducerConstrained = this.SingleProducerConstrained
};
}
/// <summary>The maximum number of tasks that may be used concurrently to process messages.</summary>
private Int32 _maxDegreeOfParallelism = 1;
/// <summary>Whether the code using this block will only ever have a single producer accessing the block at any given time.</summary>
private Boolean _singleProducerConstrained = false;
/// <summary>Initializes the <see cref="ExecutionDataflowBlockOptions"/>.</summary>
public ExecutionDataflowBlockOptions() { }
/// <summary>Gets the maximum number of messages that may be processed by the block concurrently.</summary>
public Int32 MaxDegreeOfParallelism
{
get { return _maxDegreeOfParallelism; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
if (value < 1 && value != Unbounded) throw new ArgumentOutOfRangeException("value");
_maxDegreeOfParallelism = value;
}
}
/// <summary>
/// Gets whether code using the dataflow block is constrained to one producer at a time.
/// </summary>
/// <remarks>
/// This property defaults to false, such that the block may be used by multiple
/// producers concurrently. This property should only be set to true if the code
/// using the block can guarantee that it will only ever be used by one producer
/// (e.g. a source linked to the block) at a time, meaning that methods like Post,
/// Complete, Fault, and OfferMessage will never be called concurrently. Some blocks
/// may choose to capitalize on the knowledge that there will only be one producer at a time
/// in order to provide better performance.
/// </remarks>
public Boolean SingleProducerConstrained
{
get { return _singleProducerConstrained; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
_singleProducerConstrained = value;
}
}
/// <summary>Gets a MaxDegreeOfParallelism value that may be used for comparison purposes.</summary>
/// <returns>The maximum value, usable for comparison purposes.</returns>
/// <remarks>Unlike MaxDegreeOfParallelism, this property will always return a positive value.</remarks>
internal Int32 ActualMaxDegreeOfParallelism
{
get { return (_maxDegreeOfParallelism == Unbounded) ? Int32.MaxValue : _maxDegreeOfParallelism; }
}
/// <summary>Gets whether these dataflow block options allow for parallel execution.</summary>
internal Boolean SupportsParallelExecution { get { return _maxDegreeOfParallelism == Unbounded || _maxDegreeOfParallelism > 1; } }
}
/// <summary>
/// Provides options used to configure the processing performed by dataflow blocks that
/// group together multiple messages, blocks such as <see cref="JoinBlock{T1,T2}"/> and
/// <see cref="BatchBlock{T}"/>.
/// </summary>
/// <remarks>
/// <see cref="GroupingDataflowBlockOptions"/> is mutable and can be configured through its properties.
/// When specific configuration options are not set, the following defaults are used:
/// <list type="table">
/// <listheader>
/// <term>Options</term>
/// <description>Default</description>
/// </listheader>
/// <item>
/// <term>TaskScheduler</term>
/// <description><see cref="System.Threading.Tasks.TaskScheduler.Default"/></description>
/// </item>
/// <item>
/// <term>CancellationToken</term>
/// <description><see cref="System.Threading.CancellationToken.None"/></description>
/// </item>
/// <item>
/// <term>MaxMessagesPerTask</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>BoundedCapacity</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>NameFormat</term>
/// <description>"{0} Id={1}"</description>
/// </item>
/// <item>
/// <term>MaxNumberOfGroups</term>
/// <description>GroupingDataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>Greedy</term>
/// <description>true</description>
/// </item>
/// </list>
/// Dataflow block capture the state of the options at their construction. Subsequent changes
/// to the provided <see cref="GroupingDataflowBlockOptions"/> instance should not affect the behavior
/// of a dataflow block.
/// </remarks>
[DebuggerDisplay("TaskScheduler = {TaskScheduler}, MaxMessagesPerTask = {MaxMessagesPerTask}, BoundedCapacity = {BoundedCapacity}, Greedy = {Greedy}, MaxNumberOfGroups = {MaxNumberOfGroups}")]
public class GroupingDataflowBlockOptions : DataflowBlockOptions
{
/// <summary>A default instance of <see cref="DataflowBlockOptions"/>.</summary>
/// <remarks>
/// Do not change the values of this instance. It is shared by all of our blocks when no options are provided by the user.
/// </remarks>
internal new static readonly GroupingDataflowBlockOptions Default = new GroupingDataflowBlockOptions();
/// <summary>Returns this <see cref="GroupingDataflowBlockOptions"/> instance if it's the default instance or else a cloned instance.</summary>
/// <returns>An instance of the options that may be cached by the block.</returns>
internal new GroupingDataflowBlockOptions DefaultOrClone()
{
return (this == Default) ?
this :
new GroupingDataflowBlockOptions
{
TaskScheduler = this.TaskScheduler,
CancellationToken = this.CancellationToken,
MaxMessagesPerTask = this.MaxMessagesPerTask,
BoundedCapacity = this.BoundedCapacity,
NameFormat = this.NameFormat,
Greedy = this.Greedy,
MaxNumberOfGroups = this.MaxNumberOfGroups
};
}
/// <summary>Whether the block should greedily consume offered messages.</summary>
private Boolean _greedy = true;
/// <summary>The maximum number of groups that should be generated by the block.</summary>
private Int64 _maxNumberOfGroups = Unbounded;
/// <summary>Initializes the <see cref="GroupingDataflowBlockOptions"/>.</summary>
public GroupingDataflowBlockOptions() { }
/// <summary>Gets or sets the Boolean value to use to determine whether to greedily consume offered messages.</summary>
public Boolean Greedy
{
get { return _greedy; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
_greedy = value;
}
}
/// <summary>Gets or sets the maximum number of groups that should be generated by the block.</summary>
public Int64 MaxNumberOfGroups
{
get { return _maxNumberOfGroups; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
if (value <= 0 && value != Unbounded) throw new ArgumentOutOfRangeException("value");
_maxNumberOfGroups = value;
}
}
/// <summary>Gets a MaxNumberOfGroups value that may be used for comparison purposes.</summary>
/// <returns>The maximum value, usable for comparison purposes.</returns>
/// <remarks>Unlike MaxNumberOfGroups, this property will always return a positive value.</remarks>
internal Int64 ActualMaxNumberOfGroups
{
get { return (_maxNumberOfGroups == Unbounded) ? Int64.MaxValue : _maxNumberOfGroups; }
}
}
}

View File

@ -1,113 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// DataflowLinkOptions.cs
//
//
// DataflowLinkOptions type for configuring links between dataflow blocks
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>
/// Provides options used to configure a link between dataflow blocks.
/// </summary>
/// <remarks>
/// <see cref="DataflowLinkOptions"/> is mutable and can be configured through its properties.
/// When specific configuration options are not set, the following defaults are used:
/// <list type="table">
/// <listheader>
/// <term>Options</term>
/// <description>Default</description>
/// </listheader>
/// <item>
/// <term>PropagateCompletion</term>
/// <description>False</description>
/// </item>
/// <item>
/// <term>MaxMessages</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>Append</term>
/// <description>True</description>
/// </item>
/// </list>
/// Dataflow blocks capture the state of the options at linking. Subsequent changes to the provided
/// <see cref="DataflowLinkOptions"/> instance should not affect the behavior of a link.
/// </remarks>
[DebuggerDisplay("PropagateCompletion = {PropagateCompletion}, MaxMessages = {MaxMessages}, Append = {Append}")]
public class DataflowLinkOptions
{
/// <summary>
/// A constant used to specify an unlimited quantity for <see cref="DataflowLinkOptions"/> members
/// that provide an upper bound. This field is a constant tied to <see cref="DataflowLinkOptions.Unbounded"/>.
/// </summary>
internal const Int32 Unbounded = DataflowBlockOptions.Unbounded;
/// <summary>Whether the linked target will have completion and faulting notification propagated to it automatically.</summary>
private Boolean _propagateCompletion = false;
/// <summary>The maximum number of messages that may be consumed across the link.</summary>
private Int32 _maxNumberOfMessages = Unbounded;
/// <summary>Whether the link should be appended to the source’s list of links, or whether it should be prepended.</summary>
private Boolean _append = true;
/// <summary>A default instance of <see cref="DataflowLinkOptions"/>.</summary>
/// <remarks>
/// Do not change the values of this instance. It is shared by all of our blocks when no options are provided by the user.
/// </remarks>
internal static readonly DataflowLinkOptions Default = new DataflowLinkOptions();
/// <summary>A cached instance of <see cref="DataflowLinkOptions"/>.</summary>
/// <remarks>
/// Do not change the values of this instance. It is shared by all of our blocks that need to unlink after one message has been consumed.
/// </remarks>
internal static readonly DataflowLinkOptions UnlinkAfterOneAndPropagateCompletion = new DataflowLinkOptions() { MaxMessages = 1, PropagateCompletion = true };
/// <summary>Initializes the <see cref="DataflowLinkOptions"/>.</summary>
public DataflowLinkOptions()
{
}
/// <summary>Gets or sets whether the linked target will have completion and faulting notification propagated to it automatically.</summary>
public Boolean PropagateCompletion
{
get { return _propagateCompletion; }
set
{
Debug.Assert(this != Default && this != UnlinkAfterOneAndPropagateCompletion, "Default and UnlinkAfterOneAndPropagateCompletion instances are supposed to be immutable.");
_propagateCompletion = value;
}
}
/// <summary>Gets or sets the maximum number of messages that may be consumed across the link.</summary>
public Int32 MaxMessages
{
get { return _maxNumberOfMessages; }
set
{
Debug.Assert(this != Default && this != UnlinkAfterOneAndPropagateCompletion, "Default and UnlinkAfterOneAndPropagateCompletion instances are supposed to be immutable.");
if (value < 1 && value != Unbounded) throw new ArgumentOutOfRangeException("value");
_maxNumberOfMessages = value;
}
}
/// <summary>Gets or sets whether the link should be appended to the source’s list of links, or whether it should be prepended.</summary>
public Boolean Append
{
get { return _append; }
set
{
Debug.Assert(this != Default && this != UnlinkAfterOneAndPropagateCompletion, "Default and UnlinkAfterOneAndPropagateCompletion instances are supposed to be immutable.");
_append = value;
}
}
}
}

View File

@ -1,92 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// DataflowMessageHeader.cs
//
//
// A container of data attributes passed between dataflow blocks.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Threading.Tasks.Dataflow.Internal;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Provides a container of data attributes for passing between dataflow blocks.</summary>
[DebuggerDisplay("Id = {Id}")]
public struct DataflowMessageHeader : IEquatable<DataflowMessageHeader>
{
/// <summary>The message ID. Needs to be unique within the source.</summary>
private readonly long _id;
/// <summary>Initializes the <see cref="DataflowMessageHeader"/> with the specified attributes.</summary>
/// <param name="id">The ID of the message. Must be unique within the originating source block. Need not be globally unique.</param>
public DataflowMessageHeader(Int64 id)
{
if (id == default(long)) throw new ArgumentException(SR.Argument_InvalidMessageId, "id");
Contract.EndContractBlock();
_id = id;
}
/// <summary>Gets the validity of the message.</summary>
/// <returns>True if the ID of the message is different from 0. False if the ID of the message is 0</returns>
public Boolean IsValid { get { return _id != default(long); } }
/// <summary>Gets the ID of the message within the source.</summary>
/// <returns>The ID contained in the <see cref="DataflowMessageHeader"/> instance.</returns>
public Int64 Id { get { return _id; } }
// These overrides are required by the FX API Guidelines.
// NOTE: When these overrides are present, the compiler doesn't complain about statements
// like 'if (struct == null) ...' which will result in incorrect behavior at runtime.
// The product code should not use them. Instead, it should compare the Id properties.
// To verify that, every once in a while, comment out this region and build the product.
#region Comparison Operators
/// <summary>Checks two <see cref="DataflowMessageHeader"/> instances for equality by ID without boxing.</summary>
/// <param name="other">Another <see cref="DataflowMessageHeader"/> instance.</param>
/// <returns>True if the instances are equal. False otherwise.</returns>
public bool Equals(DataflowMessageHeader other)
{
return this == other;
}
/// <summary>Checks boxed <see cref="DataflowMessageHeader"/> instances for equality by ID.</summary>
/// <param name="obj">A boxed <see cref="DataflowMessageHeader"/> instance.</param>
/// <returns>True if the instances are equal. False otherwise.</returns>
public override bool Equals(object obj)
{
return obj is DataflowMessageHeader && this == (DataflowMessageHeader)obj;
}
/// <summary>Generates a hash code for the <see cref="DataflowMessageHeader"/> instance.</summary>
/// <returns>Hash code.</returns>
public override int GetHashCode()
{
return (int)Id;
}
/// <summary>Checks two <see cref="DataflowMessageHeader"/> instances for equality by ID.</summary>
/// <param name="left">A <see cref="DataflowMessageHeader"/> instance.</param>
/// <param name="right">A <see cref="DataflowMessageHeader"/> instance.</param>
/// <returns>True if the instances are equal. False otherwise.</returns>
public static bool operator ==(DataflowMessageHeader left, DataflowMessageHeader right)
{
return left.Id == right.Id;
}
/// <summary>Checks two <see cref="DataflowMessageHeader"/> instances for non-equality by ID.</summary>
/// <param name="left">A <see cref="DataflowMessageHeader"/> instance.</param>
/// <param name="right">A <see cref="DataflowMessageHeader"/> instance.</param>
/// <returns>True if the instances are not equal. False otherwise.</returns>
public static bool operator !=(DataflowMessageHeader left, DataflowMessageHeader right)
{
return left.Id != right.Id;
}
#endregion
}
}

View File

@ -1,47 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// DataflowMessageStatus.cs
//
//
// Status about the propagation of a message.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Represents the status of a <see cref="DataflowMessageHeader"/> when passed between dataflow blocks.</summary>
public enum DataflowMessageStatus
{
/// <summary>
/// Indicates that the <see cref="ITargetBlock{TInput}"/> accepted the message. Once a target has accepted a message,
/// it is wholly owned by the target.
/// </summary>
Accepted = 0x0,
/// <summary>
/// Indicates that the <see cref="ITargetBlock{TInput}"/> declined the message. The <see cref="ISourceBlock{TOutput}"/> still owns the message.
/// </summary>
Declined = 0x1,
/// <summary>
/// Indicates that the <see cref="ITargetBlock{TInput}"/> postponed the message for potential consumption at a later time.
/// The <see cref="ISourceBlock{TOutput}"/> still owns the message.
/// </summary>
Postponed = 0x2,
/// <summary>
/// Indicates that the <see cref="ITargetBlock{TInput}"/> tried to accept the message from the <see cref="ISourceBlock{TOutput}"/>, but the
/// message was no longer available.
/// </summary>
NotAvailable = 0x3,
/// <summary>
/// Indicates that the <see cref="ITargetBlock{TInput}"/> declined the message. The <see cref="ISourceBlock{TOutput}"/> still owns the message.
/// Additionally, the <see cref="ITargetBlock{TInput}"/> will decline all future messages sent by the source.
/// </summary>
DecliningPermanently = 0x4
}
}

View File

@ -1,31 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// IDataflowBlock.cs
//
//
// The base interface for all dataflow blocks.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Represents a dataflow block.</summary>
public interface IDataflowBlock
{
// IMPLEMENT IMPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
Task Completion { get; }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
void Complete();
// IMPLEMENT EXPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void Fault(Exception exception);
}
}

View File

@ -1,22 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// IPropagatorBlock.cs
//
//
// The base interface for all propagator blocks.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Represents a dataflow block that is both a target for data and a source of data.</summary>
/// <typeparam name="TInput">Specifies the type of data accepted by the <see cref="IPropagatorBlock{TInput,TOutput}"/>.</typeparam>
/// <typeparam name="TOutput">Specifies the type of data supplied by the <see cref="IPropagatorBlock{TInput,TOutput}"/>.</typeparam>
public interface IPropagatorBlock<in TInput, out TOutput> : ITargetBlock<TInput>, ISourceBlock<TOutput>
{
// No additional members beyond those inherited from ITargetBlock<TInput> and ISourceBlock<TOutput>
}
}

View File

@ -1,31 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// IReceivableSourceBlock.cs
//
//
// The base interface for all source blocks.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Represents a dataflow block that supports receiving of messages without linking.</summary>
/// <typeparam name="TOutput">Specifies the type of data supplied by the <see cref="IReceivableSourceBlock{TOutput}"/>.</typeparam>
public interface IReceivableSourceBlock<TOutput> : ISourceBlock<TOutput>
{
// IMPLEMENT IMPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
bool TryReceive(Predicate<TOutput> filter, out TOutput item);
// IMPLEMENT IMPLICITLY IF BLOCK SUPPORTS RECEIVING MORE THAN ONE ITEM, OTHERWISE EXPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
bool TryReceiveAll(out IList<TOutput> items);
}
}

View File

@ -1,39 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ISourceBlock.cs
//
//
// The base interface for all source blocks.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Represents a dataflow block that is a source of data.</summary>
/// <typeparam name="TOutput">Specifies the type of data supplied by the <see cref="ISourceBlock{TOutput}"/>.</typeparam>
public interface ISourceBlock<out TOutput> : IDataflowBlock
{
// IMPLEMENT IMPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions);
// IMPLEMENT EXPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
[SuppressMessage("Microsoft.Design", "CA1021:AvoidOutParameters", MessageId = "2#")]
TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed);
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
Boolean ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target);
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target);
}
}

View File

@ -1,24 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ITargetBlock.cs
//
//
// The base interface for all target blocks.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Represents a dataflow block that is a target for data.</summary>
/// <typeparam name="TInput">Specifies the type of data accepted by the <see cref="ITargetBlock{TInput}"/>.</typeparam>
public interface ITargetBlock<in TInput> : IDataflowBlock
{
// IMPLEMENT EXPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, Boolean consumeToAccept);
}
}

View File

@ -1,383 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ActionBlock.cs
//
//
// A target block that executes an action for each message.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
using System.Runtime.CompilerServices;
using System.Threading.Tasks.Dataflow.Internal;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Provides a dataflow block that invokes a provided <see cref="System.Action{T}"/> delegate for every data element received.</summary>
/// <typeparam name="TInput">Specifies the type of data operated on by this <see cref="ActionBlock{T}"/>.</typeparam>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
[DebuggerTypeProxy(typeof(ActionBlock<>.DebugView))]
public sealed class ActionBlock<TInput> : ITargetBlock<TInput>, IDebuggerDisplay
{
/// <summary>The core implementation of this message block when in default mode.</summary>
private readonly TargetCore<TInput> _defaultTarget;
/// <summary>The core implementation of this message block when in SPSC mode.</summary>
private readonly SpscTargetCore<TInput> _spscTarget;
/// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Action{T}"/>.</summary>
/// <param name="action">The action to invoke with each data element received.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
public ActionBlock(Action<TInput> action) :
this((Delegate)action, ExecutionDataflowBlockOptions.Default)
{ }
/// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Action{T}"/> and <see cref="ExecutionDataflowBlockOptions"/>.</summary>
/// <param name="action">The action to invoke with each data element received.</param>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="ActionBlock{T}"/>.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
public ActionBlock(Action<TInput> action, ExecutionDataflowBlockOptions dataflowBlockOptions) :
this((Delegate)action, dataflowBlockOptions)
{ }
/// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Func{T,Task}"/>.</summary>
/// <param name="action">The action to invoke with each data element received.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
public ActionBlock(Func<TInput, Task> action) :
this((Delegate)action, ExecutionDataflowBlockOptions.Default)
{ }
/// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Func{T,Task}"/> and <see cref="ExecutionDataflowBlockOptions"/>.</summary>
/// <param name="action">The action to invoke with each data element received.</param>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="ActionBlock{T}"/>.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
public ActionBlock(Func<TInput, Task> action, ExecutionDataflowBlockOptions dataflowBlockOptions) :
this((Delegate)action, dataflowBlockOptions)
{ }
/// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified delegate and options.</summary>
/// <param name="action">The action to invoke with each data element received.</param>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="ActionBlock{T}"/>.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
private ActionBlock(Delegate action, ExecutionDataflowBlockOptions dataflowBlockOptions)
{
// Validate arguments
if (action == null) throw new ArgumentNullException("action");
if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
Contract.Ensures((_spscTarget != null) ^ (_defaultTarget != null), "One and only one of the two targets must be non-null after construction");
Contract.EndContractBlock();
// Ensure we have options that can't be changed by the caller
dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
// Based on the mode, initialize the target. If the user specifies SingleProducerConstrained,
// we'll try to employ an optimized mode under a limited set of circumstances.
var syncAction = action as Action<TInput>;
if (syncAction != null &&
dataflowBlockOptions.SingleProducerConstrained &&
dataflowBlockOptions.MaxDegreeOfParallelism == 1 &&
!dataflowBlockOptions.CancellationToken.CanBeCanceled &&
dataflowBlockOptions.BoundedCapacity == DataflowBlockOptions.Unbounded)
{
// Initialize the SPSC fast target to handle the bulk of the processing.
// The SpscTargetCore is only supported when BoundedCapacity, CancellationToken,
// and MaxDOP are all their default values. It's also only supported for sync
// delegates and not for async delegates.
_spscTarget = new SpscTargetCore<TInput>(this, syncAction, dataflowBlockOptions);
}
else
{
// Initialize the TargetCore which handles the bulk of the processing.
// The default target core can handle all options and delegate flavors.
if (syncAction != null) // sync
{
_defaultTarget = new TargetCore<TInput>(this,
messageWithId => ProcessMessage(syncAction, messageWithId),
null, dataflowBlockOptions, TargetCoreOptions.RepresentsBlockCompletion);
}
else // async
{
var asyncAction = action as Func<TInput, Task>;
Debug.Assert(asyncAction != null, "action is of incorrect delegate type");
_defaultTarget = new TargetCore<TInput>(this,
messageWithId => ProcessMessageWithTask(asyncAction, messageWithId),
null, dataflowBlockOptions, TargetCoreOptions.RepresentsBlockCompletion | TargetCoreOptions.UsesAsyncCompletion);
}
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore<TInput>)state).Complete(exception: null, dropPendingMessages: true), _defaultTarget);
}
#if FEATURE_TRACING
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
}
#endif
}
/// <summary>Processes the message with a user-provided action.</summary>
/// <param name="action">The action to use to process the message.</param>
/// <param name="messageWithId">The message to be processed.</param>
private void ProcessMessage(Action<TInput> action, KeyValuePair<TInput, long> messageWithId)
{
try
{
action(messageWithId.Key);
}
catch (Exception exc)
{
// If this exception represents cancellation, swallow it rather than shutting down the block.
if (!Common.IsCooperativeCancellation(exc)) throw;
}
finally
{
// We're done synchronously processing an element, so reduce the bounding count
// that was incrementing when this element was enqueued.
if (_defaultTarget.IsBounded) _defaultTarget.ChangeBoundingCount(-1);
}
}
/// <summary>Processes the message with a user-provided action that returns a task.</summary>
/// <param name="action">The action to use to process the message.</param>
/// <param name="messageWithId">The message to be processed.</param>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
private void ProcessMessageWithTask(Func<TInput, Task> action, KeyValuePair<TInput, long> messageWithId)
{
Contract.Requires(action != null, "action needed for processing");
// Run the action to get the task that represents the operation's completion
Task task = null;
Exception caughtException = null;
try
{
task = action(messageWithId.Key);
}
catch (Exception exc) { caughtException = exc; }
// If no task is available, we're done.
if (task == null)
{
// If we didn't get a task because an exception occurred,
// store it (if the exception was cancellation, just ignore it).
if (caughtException != null && !Common.IsCooperativeCancellation(caughtException))
{
Common.StoreDataflowMessageValueIntoExceptionData(caughtException, messageWithId.Key);
_defaultTarget.Complete(caughtException, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: false);
}
// Signal that we're done this async operation.
_defaultTarget.SignalOneAsyncMessageCompleted(boundingCountChange: -1);
return;
}
else if (task.IsCompleted)
{
AsyncCompleteProcessMessageWithTask(task);
}
else
{
// Otherwise, join with the asynchronous operation when it completes.
task.ContinueWith((completed, state) =>
{
((ActionBlock<TInput>)state).AsyncCompleteProcessMessageWithTask(completed);
}, this, CancellationToken.None, Common.GetContinuationOptions(TaskContinuationOptions.ExecuteSynchronously), TaskScheduler.Default);
}
}
/// <summary>Completes the processing of an asynchronous message.</summary>
/// <param name="completed">The completed task.</param>
private void AsyncCompleteProcessMessageWithTask(Task completed)
{
Contract.Requires(completed != null, "Need completed task for processing");
Contract.Requires(completed.IsCompleted, "The task to be processed must be completed by now.");
// If the task faulted, store its errors. We must add the exception before declining
// and signaling completion, as the exception is part of the operation, and the completion conditions
// depend on this.
if (completed.IsFaulted)
{
_defaultTarget.Complete(completed.Exception, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: true);
}
// Regardless of faults, note that we're done processing. There are
// no outputs to keep track of for action block, so we always decrement
// the bounding count here (the callee will handle checking whether
// we're actually in a bounded mode).
_defaultTarget.SignalOneAsyncMessageCompleted(boundingCountChange: -1);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
public void Complete()
{
if (_defaultTarget != null)
{
_defaultTarget.Complete(exception: null, dropPendingMessages: false);
}
else
{
_spscTarget.Complete(exception: null);
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void IDataflowBlock.Fault(Exception exception)
{
if (exception == null) throw new ArgumentNullException("exception");
Contract.EndContractBlock();
if (_defaultTarget != null)
{
_defaultTarget.Complete(exception, dropPendingMessages: true);
}
else
{
_spscTarget.Complete(exception);
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
public Task Completion
{
get { return _defaultTarget != null ? _defaultTarget.Completion : _spscTarget.Completion; }
}
/// <summary>Posts an item to the <see cref="T:System.Threading.Tasks.Dataflow.ITargetBlock`1"/>.</summary>
/// <param name="item">The item being offered to the target.</param>
/// <returns>true if the item was accepted by the target block; otherwise, false.</returns>
/// <remarks>
/// This method will return once the target block has decided to accept or decline the item,
/// but unless otherwise dictated by special semantics of the target block, it does not wait
/// for the item to actually be processed (for example, <see cref="T:System.Threading.Tasks.Dataflow.ActionBlock`1"/>
/// will return from Post as soon as it has stored the posted item into its input queue). From the perspective
/// of the block's processing, Post is asynchronous. For target blocks that support postponing offered messages,
/// or for blocks that may do more processing in their Post implementation, consider using
/// <see cref="T:System.Threading.Tasks.Dataflow.DataflowBlock.SendAsync">SendAsync</see>,
/// which will return immediately and will enable the target to postpone the posted message and later consume it
/// after SendAsync returns.
/// </remarks>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool Post(TInput item)
{
// Even though this method is available with the exact same functionality as an extension method
// on ITargetBlock, using that extension method goes through an interface call on ITargetBlock,
// which for very high-throughput scenarios shows up as noticeable overhead on certain architectures.
// We can eliminate that call for direct ActionBlock usage by providing the same method as an instance method.
return _defaultTarget != null ?
_defaultTarget.OfferMessage(Common.SingleMessageHeader, item, null, false) == DataflowMessageStatus.Accepted :
_spscTarget.Post(item);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, Boolean consumeToAccept)
{
return _defaultTarget != null ?
_defaultTarget.OfferMessage(messageHeader, messageValue, source, consumeToAccept) :
_spscTarget.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="InputCount"]/*' />
public int InputCount
{
get { return _defaultTarget != null ? _defaultTarget.InputCount : _spscTarget.InputCount; }
}
/// <summary>Gets the number of messages waiting to be processed. This must only be used from the debugger.</summary>
private int InputCountForDebugger
{
get { return _defaultTarget != null ? _defaultTarget.GetDebuggingInformation().InputCount : _spscTarget.InputCount; }
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
public override string ToString()
{
return Common.GetNameForDebugger(this, _defaultTarget != null ? _defaultTarget.DataflowBlockOptions : _spscTarget.DataflowBlockOptions);
}
/// <summary>The data to display in the debugger display attribute.</summary>
[SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
private object DebuggerDisplayContent
{
get
{
return string.Format("{0}, InputCount={1}",
Common.GetNameForDebugger(this, _defaultTarget != null ? _defaultTarget.DataflowBlockOptions : _spscTarget.DataflowBlockOptions),
InputCountForDebugger);
}
}
/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
/// <summary>Provides a debugger type proxy for the Call.</summary>
private sealed class DebugView
{
/// <summary>The action block being viewed.</summary>
private readonly ActionBlock<TInput> _actionBlock;
/// <summary>The action block's default target being viewed.</summary>
private readonly TargetCore<TInput>.DebuggingInformation _defaultDebugInfo;
/// <summary>The action block's SPSC target being viewed.</summary>
private readonly SpscTargetCore<TInput>.DebuggingInformation _spscDebugInfo;
/// <summary>Initializes the debug view.</summary>
/// <param name="actionBlock">The target being debugged.</param>
public DebugView(ActionBlock<TInput> actionBlock)
{
Contract.Requires(actionBlock != null, "Need a block with which to construct the debug view.");
_actionBlock = actionBlock;
if (_actionBlock._defaultTarget != null)
{
_defaultDebugInfo = actionBlock._defaultTarget.GetDebuggingInformation();
}
else
{
_spscDebugInfo = actionBlock._spscTarget.GetDebuggingInformation();
}
}
/// <summary>Gets the messages waiting to be processed.</summary>
public IEnumerable<TInput> InputQueue
{
get { return _defaultDebugInfo != null ? _defaultDebugInfo.InputQueue : _spscDebugInfo.InputQueue; }
}
/// <summary>Gets any postponed messages.</summary>
public QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader> PostponedMessages
{
get { return _defaultDebugInfo != null ? _defaultDebugInfo.PostponedMessages : null; }
}
/// <summary>Gets the number of outstanding input operations.</summary>
public Int32 CurrentDegreeOfParallelism
{
get { return _defaultDebugInfo != null ? _defaultDebugInfo.CurrentDegreeOfParallelism : _spscDebugInfo.CurrentDegreeOfParallelism; }
}
/// <summary>Gets the ExecutionDataflowBlockOptions used to configure this block.</summary>
public ExecutionDataflowBlockOptions DataflowBlockOptions
{
get { return _defaultDebugInfo != null ? _defaultDebugInfo.DataflowBlockOptions : _spscDebugInfo.DataflowBlockOptions; }
}
/// <summary>Gets whether the block is declining further messages.</summary>
public bool IsDecliningPermanently
{
get { return _defaultDebugInfo != null ? _defaultDebugInfo.IsDecliningPermanently : _spscDebugInfo.IsDecliningPermanently; }
}
/// <summary>Gets whether the block is completed.</summary>
public bool IsCompleted
{
get { return _defaultDebugInfo != null ? _defaultDebugInfo.IsCompleted : _spscDebugInfo.IsCompleted; }
}
/// <summary>Gets the block's Id.</summary>
public int Id { get { return Common.GetBlockId(_actionBlock); } }
}
}
}

View File

@ -1,492 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// BufferBlock.cs
//
//
// A propagator block that provides support for unbounded and bounded FIFO buffers.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Security;
using System.Threading.Tasks.Dataflow.Internal;
using System.Diagnostics.CodeAnalysis;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Provides a buffer for storing data.</summary>
/// <typeparam name="T">Specifies the type of the data buffered by this dataflow block.</typeparam>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
[DebuggerTypeProxy(typeof(BufferBlock<>.DebugView))]
public sealed class BufferBlock<T> : IPropagatorBlock<T, T>, IReceivableSourceBlock<T>, IDebuggerDisplay
{
/// <summary>The core logic for the buffer block.</summary>
private readonly SourceCore<T> _source;
/// <summary>The bounding state for when in bounding mode; null if not bounding.</summary>
private readonly BoundingStateWithPostponedAndTask<T> _boundingState;
/// <summary>Whether all future messages should be declined on the target.</summary>
private bool _targetDecliningPermanently;
/// <summary>A task has reserved the right to run the target's completion routine.</summary>
private bool _targetCompletionReserved;
/// <summary>Gets the lock object used to synchronize incoming requests.</summary>
private object IncomingLock { get { return _source; } }
/// <summary>Initializes the <see cref="BufferBlock{T}"/>.</summary>
public BufferBlock() :
this(DataflowBlockOptions.Default)
{ }
/// <summary>Initializes the <see cref="BufferBlock{T}"/> with the specified <see cref="DataflowBlockOptions"/>.</summary>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BufferBlock{T}"/>.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
public BufferBlock(DataflowBlockOptions dataflowBlockOptions)
{
if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
Contract.EndContractBlock();
// Ensure we have options that can't be changed by the caller
dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
// Initialize bounding state if necessary
Action<ISourceBlock<T>, int> onItemsRemoved = null;
if (dataflowBlockOptions.BoundedCapacity > 0)
{
onItemsRemoved = (owningSource, count) => ((BufferBlock<T>)owningSource).OnItemsRemoved(count);
_boundingState = new BoundingStateWithPostponedAndTask<T>(dataflowBlockOptions.BoundedCapacity);
}
// Initialize the source state
_source = new SourceCore<T>(this, dataflowBlockOptions,
owningSource => ((BufferBlock<T>)owningSource).Complete(),
onItemsRemoved);
// It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
_source.Completion.ContinueWith((completed, state) =>
{
var thisBlock = ((BufferBlock<T>)state) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
thisBlock.Fault(completed.Exception);
}, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
dataflowBlockOptions.CancellationToken, _source.Completion, owningSource => ((BufferBlock<T>)owningSource).Complete(), this);
#if FEATURE_TRACING
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
}
#endif
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
{
// Validate arguments
if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, "messageHeader");
if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, "consumeToAccept");
Contract.EndContractBlock();
lock (IncomingLock)
{
// If we've already stopped accepting messages, decline permanently
if (_targetDecliningPermanently)
{
CompleteTargetIfPossible();
return DataflowMessageStatus.DecliningPermanently;
}
// We can directly accept the message if:
// 1) we are not bounding, OR
// 2) we are bounding AND there is room available AND there are no postponed messages AND we are not currently processing.
// (If there were any postponed messages, we would need to postpone so that ordering would be maintained.)
// (We should also postpone if we are currently processing, because there may be a race between consuming postponed messages and
// accepting new ones directly into the queue.)
if (_boundingState == null
||
(_boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0 && _boundingState.TaskForInputProcessing == null))
{
// Consume the message from the source if necessary
if (consumeToAccept)
{
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
bool consumed;
messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
if (!consumed) return DataflowMessageStatus.NotAvailable;
}
// Once consumed, pass it to the source
_source.AddMessage(messageValue);
if (_boundingState != null) _boundingState.CurrentCount++;
return DataflowMessageStatus.Accepted;
}
// Otherwise, we try to postpone if a source was provided
else if (source != null)
{
Debug.Assert(_boundingState != null && _boundingState.PostponedMessages != null,
"PostponedMessages must have been initialized during construction in bounding mode.");
_boundingState.PostponedMessages.Push(source, messageHeader);
return DataflowMessageStatus.Postponed;
}
// We can't do anything else about this message
return DataflowMessageStatus.Declined;
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
public void Complete() { CompleteCore(exception: null, storeExceptionEvenIfAlreadyCompleting: false); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void IDataflowBlock.Fault(Exception exception)
{
if (exception == null) throw new ArgumentNullException("exception");
Contract.EndContractBlock();
CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: false);
}
private void CompleteCore(Exception exception, bool storeExceptionEvenIfAlreadyCompleting, bool revertProcessingState = false)
{
Contract.Requires(storeExceptionEvenIfAlreadyCompleting || !revertProcessingState,
"Indicating dirty processing state may only come with storeExceptionEvenIfAlreadyCompleting==true.");
Contract.EndContractBlock();
lock (IncomingLock)
{
// Faulting from outside is allowed until we start declining permanently.
// Faulting from inside is allowed at any time.
if (exception != null && (!_targetDecliningPermanently || storeExceptionEvenIfAlreadyCompleting))
{
_source.AddException(exception);
}
// Revert the dirty processing state if requested
if (revertProcessingState)
{
Debug.Assert(_boundingState != null && _boundingState.TaskForInputProcessing != null,
"The processing state must be dirty when revertProcessingState==true.");
_boundingState.TaskForInputProcessing = null;
}
// Trigger completion
_targetDecliningPermanently = true;
CompleteTargetIfPossible();
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) { return _source.LinkTo(target, linkOptions); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
public Boolean TryReceive(Predicate<T> filter, out T item) { return _source.TryReceive(filter, out item); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
public Boolean TryReceiveAll(out IList<T> items) { return _source.TryReceiveAll(out items); }
/// <summary>Gets the number of items currently stored in the buffer.</summary>
public Int32 Count { get { return _source.OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
public Task Completion { get { return _source.Completion; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out Boolean messageConsumed)
{
return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
return _source.ReserveMessage(messageHeader, target);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
_source.ReleaseReservation(messageHeader, target);
}
/// <summary>Notifies the block that one or more items was removed from the queue.</summary>
/// <param name="numItemsRemoved">The number of items removed.</param>
private void OnItemsRemoved(int numItemsRemoved)
{
Contract.Requires(numItemsRemoved > 0, "A positive number of items to remove is required.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
// If we're bounding, we need to know when an item is removed so that we
// can update the count that's mirroring the actual count in the source's queue,
// and potentially kick off processing to start consuming postponed messages.
if (_boundingState != null)
{
lock (IncomingLock)
{
// Decrement the count, which mirrors the count in the source half
Debug.Assert(_boundingState.CurrentCount - numItemsRemoved >= 0,
"It should be impossible to have a negative number of items.");
_boundingState.CurrentCount -= numItemsRemoved;
ConsumeAsyncIfNecessary();
CompleteTargetIfPossible();
}
}
}
/// <summary>Called when postponed messages may need to be consumed.</summary>
/// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
internal void ConsumeAsyncIfNecessary(bool isReplacementReplica = false)
{
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
Debug.Assert(_boundingState != null, "Must be in bounded mode.");
if (!_targetDecliningPermanently &&
_boundingState.TaskForInputProcessing == null &&
_boundingState.PostponedMessages.Count > 0 &&
_boundingState.CountIsLessThanBound)
{
// Create task and store into _taskForInputProcessing prior to scheduling the task
// so that _taskForInputProcessing will be visibly set in the task loop.
_boundingState.TaskForInputProcessing =
new Task(state => ((BufferBlock<T>)state).ConsumeMessagesLoopCore(), this,
Common.GetCreationOptionsForTask(isReplacementReplica));
#if FEATURE_TRACING
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.TaskLaunchedForMessageHandling(
this, _boundingState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
_boundingState.PostponedMessages.Count);
}
#endif
// Start the task handling scheduling exceptions
Exception exception = Common.StartTaskSafe(_boundingState.TaskForInputProcessing, _source.DataflowBlockOptions.TaskScheduler);
if (exception != null)
{
// Get out from under currently held locks. CompleteCore re-acquires the locks it needs.
Task.Factory.StartNew(exc => CompleteCore(exception: (Exception)exc, storeExceptionEvenIfAlreadyCompleting: true, revertProcessingState: true),
exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
}
}
/// <summary>Task body used to consume postponed messages.</summary>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
private void ConsumeMessagesLoopCore()
{
Contract.Requires(_boundingState != null && _boundingState.TaskForInputProcessing != null,
"May only be called in bounded mode and when a task is in flight.");
Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
"This must only be called from the in-flight processing task.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
try
{
int maxMessagesPerTask = _source.DataflowBlockOptions.ActualMaxMessagesPerTask;
for (int i = 0;
i < maxMessagesPerTask && ConsumeAndStoreOneMessageIfAvailable();
i++)
;
}
catch (Exception exc)
{
// Prevent the creation of new processing tasks
CompleteCore(exc, storeExceptionEvenIfAlreadyCompleting: true);
}
finally
{
lock (IncomingLock)
{
// We're no longer processing, so null out the processing task
_boundingState.TaskForInputProcessing = null;
// However, we may have given up early because we hit our own configured
// processing limits rather than because we ran out of work to do. If that's
// the case, make sure we spin up another task to keep going.
ConsumeAsyncIfNecessary(isReplacementReplica: true);
// If, however, we stopped because we ran out of work to do and we
// know we'll never get more, then complete.
CompleteTargetIfPossible();
}
}
}
/// <summary>
/// Retrieves one postponed message if there's room and if we can consume a postponed message.
/// Stores any consumed message into the source half.
/// </summary>
/// <returns>true if a message could be consumed and stored; otherwise, false.</returns>
/// <remarks>This must only be called from the asynchronous processing loop.</remarks>
private bool ConsumeAndStoreOneMessageIfAvailable()
{
Contract.Requires(_boundingState != null && _boundingState.TaskForInputProcessing != null,
"May only be called in bounded mode and when a task is in flight.");
Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
"This must only be called from the in-flight processing task.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
// Loop through the postponed messages until we get one.
while (true)
{
// Get the next item to retrieve. If there are no more, bail.
KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
lock (IncomingLock)
{
if (!_boundingState.CountIsLessThanBound) return false;
if (!_boundingState.PostponedMessages.TryPop(out sourceAndMessage)) return false;
// Optimistically assume we're going to get the item. This avoids taking the lock
// again if we're right. If we're wrong, we decrement it later under lock.
_boundingState.CurrentCount++;
}
// Consume the item
bool consumed = false;
try
{
T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
if (consumed)
{
_source.AddMessage(consumedValue);
return true;
}
}
finally
{
// We didn't get the item, so decrement the count to counteract our optimistic assumption.
if (!consumed)
{
lock (IncomingLock) _boundingState.CurrentCount--;
}
}
}
}
/// <summary>Completes the target, notifying the source, once all completion conditions are met.</summary>
private void CompleteTargetIfPossible()
{
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
if (_targetDecliningPermanently &&
!_targetCompletionReserved &&
(_boundingState == null || _boundingState.TaskForInputProcessing == null))
{
_targetCompletionReserved = true;
// If we're in bounding mode and we have any postponed messages, we need to clear them,
// which means calling back to the source, which means we need to escape the incoming lock.
if (_boundingState != null && _boundingState.PostponedMessages.Count > 0)
{
Task.Factory.StartNew(state =>
{
var thisBufferBlock = (BufferBlock<T>)state;
// Release any postponed messages
List<Exception> exceptions = null;
if (thisBufferBlock._boundingState != null)
{
// Note: No locks should be held at this point
Common.ReleaseAllPostponedMessages(thisBufferBlock,
thisBufferBlock._boundingState.PostponedMessages,
ref exceptions);
}
if (exceptions != null)
{
// It is important to migrate these exceptions to the source part of the owning batch,
// because that is the completion task that is publically exposed.
thisBufferBlock._source.AddExceptions(exceptions);
}
thisBufferBlock._source.Complete();
}, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
// Otherwise, we can just decline the source directly.
else
{
_source.Complete();
}
}
}
/// <summary>Gets the number of messages in the buffer. This must only be used from the debugger as it avoids taking necessary locks.</summary>
private int CountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
/// <summary>The data to display in the debugger display attribute.</summary>
[SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
private object DebuggerDisplayContent
{
get
{
return string.Format("{0}, Count={1}",
Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
CountForDebugger);
}
}
/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
/// <summary>Provides a debugger type proxy for the BufferBlock.</summary>
private sealed class DebugView
{
/// <summary>The buffer block.</summary>
private readonly BufferBlock<T> _bufferBlock;
/// <summary>The buffer's source half.</summary>
private readonly SourceCore<T>.DebuggingInformation _sourceDebuggingInformation;
/// <summary>Initializes the debug view.</summary>
/// <param name="bufferBlock">The BufferBlock being viewed.</param>
public DebugView(BufferBlock<T> bufferBlock)
{
Contract.Requires(bufferBlock != null, "Need a block with which to construct the debug view.");
_bufferBlock = bufferBlock;
_sourceDebuggingInformation = bufferBlock._source.GetDebuggingInformation();
}
/// <summary>Gets the collection of postponed message headers.</summary>
public QueuedMap<ISourceBlock<T>, DataflowMessageHeader> PostponedMessages
{
get { return _bufferBlock._boundingState != null ? _bufferBlock._boundingState.PostponedMessages : null; }
}
/// <summary>Gets the messages in the buffer.</summary>
public IEnumerable<T> Queue { get { return _sourceDebuggingInformation.OutputQueue; } }
/// <summary>The task used to process messages.</summary>
public Task TaskForInputProcessing { get { return _bufferBlock._boundingState != null ? _bufferBlock._boundingState.TaskForInputProcessing : null; } }
/// <summary>Gets the task being used for output processing.</summary>
public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
public DataflowBlockOptions DataflowBlockOptions { get { return _sourceDebuggingInformation.DataflowBlockOptions; } }
/// <summary>Gets whether the block is declining further messages.</summary>
public bool IsDecliningPermanently { get { return _bufferBlock._targetDecliningPermanently; } }
/// <summary>Gets whether the block is completed.</summary>
public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
/// <summary>Gets the block's Id.</summary>
public int Id { get { return Common.GetBlockId(_bufferBlock); } }
/// <summary>Gets the set of all targets linked from this block.</summary>
public TargetRegistry<T> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
/// <summary>Gets the set of all targets linked from this block.</summary>
public ITargetBlock<T> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
}
}
}

View File

@ -1,427 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// TransformBlock.cs
//
//
// A propagator block that runs a function on each input to produce a single output.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Threading.Tasks.Dataflow.Internal;
using System.Diagnostics.CodeAnalysis;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Provides a dataflow block that invokes a provided <see cref="System.Func{TInput,TOutput}"/> delegate for every data element received.</summary>
/// <typeparam name="TInput">Specifies the type of data received and operated on by this <see cref="TransformBlock{TInput,TOutput}"/>.</typeparam>
/// <typeparam name="TOutput">Specifies the type of data output by this <see cref="TransformBlock{TInput,TOutput}"/>.</typeparam>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
[DebuggerTypeProxy(typeof(TransformBlock<,>.DebugView))]
public sealed class TransformBlock<TInput, TOutput> : IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>, IDebuggerDisplay
{
/// <summary>The target side.</summary>
private readonly TargetCore<TInput> _target;
/// <summary>Buffer used to reorder outputs that may have completed out-of-order between the target half and the source half.</summary>
private readonly ReorderingBuffer<TOutput> _reorderingBuffer;
/// <summary>The source side.</summary>
private readonly SourceCore<TOutput> _source;
/// <summary>Initializes the <see cref="TransformBlock{TInput,TOutput}"/> with the specified <see cref="System.Func{TInput,TOutput}"/>.</summary>
/// <param name="transform">The function to invoke with each data element received.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="transform"/> is null (Nothing in Visual Basic).</exception>
public TransformBlock(Func<TInput, TOutput> transform) :
this(transform, null, ExecutionDataflowBlockOptions.Default)
{ }
/// <summary>
/// Initializes the <see cref="TransformBlock{TInput,TOutput}"/> with the specified <see cref="System.Func{TInput,TOutput}"/> and
/// <see cref="ExecutionDataflowBlockOptions"/>.
/// </summary>
/// <param name="transform">The function to invoke with each data element received.</param>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="TransformBlock{TInput,TOutput}"/>.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="transform"/> is null (Nothing in Visual Basic).</exception>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
public TransformBlock(Func<TInput, TOutput> transform, ExecutionDataflowBlockOptions dataflowBlockOptions) :
this(transform, null, dataflowBlockOptions)
{ }
/// <summary>Initializes the <see cref="TransformBlock{TInput,TOutput}"/> with the specified <see cref="System.Func{TInput,TOutput}"/>.</summary>
/// <param name="transform">The function to invoke with each data element received.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="transform"/> is null (Nothing in Visual Basic).</exception>
[SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
public TransformBlock(Func<TInput, Task<TOutput>> transform) :
this(null, transform, ExecutionDataflowBlockOptions.Default)
{ }
/// <summary>
/// Initializes the <see cref="TransformBlock{TInput,TOutput}"/> with the specified <see cref="System.Func{TInput,TOutput}"/>
/// and <see cref="ExecutionDataflowBlockOptions"/>.
/// </summary>
/// <param name="transform">The function to invoke with each data element received.</param>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="TransformBlock{TInput,TOutput}"/>.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="transform"/> is null (Nothing in Visual Basic).</exception>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
[SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]
public TransformBlock(Func<TInput, Task<TOutput>> transform, ExecutionDataflowBlockOptions dataflowBlockOptions) :
this(null, transform, dataflowBlockOptions)
{ }
/// <summary>
/// Initializes the <see cref="TransformBlock{TInput,TOutput}"/> with the specified <see cref="System.Func{TInput,TOutput}"/>
/// and <see cref="DataflowBlockOptions"/>.
/// </summary>
/// <param name="transformSync">The synchronous function to invoke with each data element received.</param>
/// <param name="transformAsync">The asynchronous function to invoke with each data element received.</param>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="TransformBlock{TInput,TOutput}"/>.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="transformSync"/> and <paramref name="transformAsync"/> are both null (Nothing in Visual Basic).</exception>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
private TransformBlock(Func<TInput, TOutput> transformSync, Func<TInput, Task<TOutput>> transformAsync, ExecutionDataflowBlockOptions dataflowBlockOptions)
{
if (transformSync == null && transformAsync == null) throw new ArgumentNullException("transform");
if (dataflowBlockOptions == null) throw new ArgumentNullException("dataflowBlockOptions");
Contract.Requires(transformSync == null ^ transformAsync == null, "Exactly one of transformSync and transformAsync must be null.");
Contract.EndContractBlock();
// Ensure we have options that can't be changed by the caller
dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
// Initialize onItemsRemoved delegate if necessary
Action<ISourceBlock<TOutput>, int> onItemsRemoved = null;
if (dataflowBlockOptions.BoundedCapacity > 0)
onItemsRemoved = (owningSource, count) => ((TransformBlock<TInput, TOutput>)owningSource)._target.ChangeBoundingCount(-count);
// Initialize source component.
_source = new SourceCore<TOutput>(this, dataflowBlockOptions,
owningSource => ((TransformBlock<TInput, TOutput>)owningSource)._target.Complete(exception: null, dropPendingMessages: true),
onItemsRemoved);
// If parallelism is employed, we will need to support reordering messages that complete out-of-order
if (dataflowBlockOptions.SupportsParallelExecution)
{
_reorderingBuffer = new ReorderingBuffer<TOutput>(this, (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message));
}
// Create the underlying target
if (transformSync != null) // sync
{
_target = new TargetCore<TInput>(this,
messageWithId => ProcessMessage(transformSync, messageWithId),
_reorderingBuffer, dataflowBlockOptions, TargetCoreOptions.None);
}
else // async
{
Debug.Assert(transformAsync != null, "Incorrect delegate type.");
_target = new TargetCore<TInput>(this,
messageWithId => ProcessMessageWithTask(transformAsync, messageWithId),
_reorderingBuffer, dataflowBlockOptions, TargetCoreOptions.UsesAsyncCompletion);
}
// Link up the target half with the source half. In doing so,
// ensure exceptions are propagated, and let the source know no more messages will arrive.
// As the target has completed, and as the target synchronously pushes work
// through the reordering buffer when async processing completes,
// we know for certain that no more messages will need to be sent to the source.
_target.Completion.ContinueWith((completed, state) =>
{
var sourceCore = (SourceCore<TOutput>)state;
if (completed.IsFaulted) sourceCore.AddAndUnwrapAggregateException(completed.Exception);
sourceCore.Complete();
}, _source, CancellationToken.None, Common.GetContinuationOptions(), TaskScheduler.Default);
// It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
_source.Completion.ContinueWith((completed, state) =>
{
var thisBlock = ((TransformBlock<TInput, TOutput>)state) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
thisBlock.Fault(completed.Exception);
}, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore<TInput>)state).Complete(exception: null, dropPendingMessages: true), _target);
#if FEATURE_TRACING
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
}
#endif
}
/// <summary>Processes the message with a user-provided transform function that returns a TOutput.</summary>
/// <param name="transform">The transform function to use to process the message.</param>
/// <param name="messageWithId">The message to be processed.</param>
private void ProcessMessage(Func<TInput, TOutput> transform, KeyValuePair<TInput, long> messageWithId)
{
// Process the input message to get the output message
TOutput outputItem = default(TOutput);
bool itemIsValid = false;
try
{
outputItem = transform(messageWithId.Key);
itemIsValid = true;
}
catch (Exception exc)
{
// If this exception represents cancellation, swallow it rather than shutting down the block.
if (!Common.IsCooperativeCancellation(exc)) throw;
}
finally
{
// If we were not successful in producing an item, update the bounding
// count to reflect that we're done with this input item.
if (!itemIsValid) _target.ChangeBoundingCount(-1);
// If there's no reordering buffer (because we're running sequentially),
// simply pass the output message through. Otherwise, there's a reordering buffer,
// so add to it instead (if a reordering buffer is used, we always need
// to output the message to it, even if the operation failed and outputMessage
// is null... this is because the reordering buffer cares about a strict sequence
// of IDs, and it needs to know when a particular ID has completed. It will eliminate
// null messages accordingly.)
if (_reorderingBuffer == null)
{
if (itemIsValid) _source.AddMessage(outputItem);
}
else _reorderingBuffer.AddItem(messageWithId.Value, outputItem, itemIsValid);
}
}
/// <summary>Processes the message with a user-provided transform function that returns a task of TOutput.</summary>
/// <param name="transform">The transform function to use to process the message.</param>
/// <param name="messageWithId">The message to be processed.</param>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
private void ProcessMessageWithTask(Func<TInput, Task<TOutput>> transform, KeyValuePair<TInput, long> messageWithId)
{
Contract.Requires(transform != null, "Function to invoke is required.");
// Run the transform function to get the task that represents the operation's completion
Task<TOutput> task = null;
Exception caughtException = null;
try
{
task = transform(messageWithId.Key);
}
catch (Exception exc) { caughtException = exc; }
// If no task is available, we're done.
if (task == null)
{
// If we didn't get a task because an exception occurred,
// store it (if the exception was cancellation, just ignore it).
if (caughtException != null && !Common.IsCooperativeCancellation(caughtException))
{
Common.StoreDataflowMessageValueIntoExceptionData(caughtException, messageWithId.Key);
_target.Complete(caughtException, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: false);
}
// If there's a reordering buffer, notify it that this message is done.
if (_reorderingBuffer != null) _reorderingBuffer.IgnoreItem(messageWithId.Value);
// Signal that we're done this async operation, and remove the bounding
// count for the input item that didn't yield any output.
_target.SignalOneAsyncMessageCompleted(boundingCountChange: -1);
return;
}
// Otherwise, join with the asynchronous operation when it completes.
task.ContinueWith((completed, state) =>
{
var tuple = (Tuple<TransformBlock<TInput, TOutput>, KeyValuePair<TInput, long>>)state;
tuple.Item1.AsyncCompleteProcessMessageWithTask(completed, tuple.Item2);
}, Tuple.Create(this, messageWithId), CancellationToken.None,
Common.GetContinuationOptions(TaskContinuationOptions.ExecuteSynchronously), TaskScheduler.Default);
}
/// <summary>Completes the processing of an asynchronous message.</summary>
/// <param name="completed">The completed task storing the output data generated for an input message.</param>
/// <param name="messageWithId">The originating message</param>
private void AsyncCompleteProcessMessageWithTask(Task<TOutput> completed, KeyValuePair<TInput, long> messageWithId)
{
Contract.Requires(completed != null, "Completed task is required.");
Contract.Requires(completed.IsCompleted, "Task must be completed to be here.");
bool isBounded = _target.IsBounded;
bool gotOutputItem = false;
TOutput outputItem = default(TOutput);
switch (completed.Status)
{
case TaskStatus.RanToCompletion:
outputItem = completed.Result;
gotOutputItem = true;
break;
case TaskStatus.Faulted:
// We must add the exception before declining and signaling completion, as the exception
// is part of the operation, and the completion conditions depend on this.
AggregateException aggregate = completed.Exception;
Common.StoreDataflowMessageValueIntoExceptionData(aggregate, messageWithId.Key, targetInnerExceptions: true);
_target.Complete(aggregate, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: true);
break;
// Nothing special to do for cancellation
}
// Adjust the bounding count if necessary (we only need to decrement it for faulting
// and cancellation, since in the case of success we still have an item that's now in the output buffer).
// Even though this is more costly (again, only in the non-success case, we do this before we store the
// message, so that if there's a race to remove the element from the source buffer, the count is
// appropriately incremented before it's decremented.
if (!gotOutputItem && isBounded) _target.ChangeBoundingCount(-1);
// If there's no reordering buffer (because we're running sequentially),
// and we got a message, simply pass the output message through.
if (_reorderingBuffer == null)
{
if (gotOutputItem) _source.AddMessage(outputItem);
}
// Otherwise, there's a reordering buffer, so add to it instead.
// Even if something goes wrong, we need to update the
// reordering buffer, so it knows that an item isn't missing.
else _reorderingBuffer.AddItem(messageWithId.Value, outputItem, itemIsValid: gotOutputItem);
// Let the target know that one of the asynchronous operations it launched has completed.
_target.SignalOneAsyncMessageCompleted();
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
public void Complete() { _target.Complete(exception: null, dropPendingMessages: false); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void IDataflowBlock.Fault(Exception exception)
{
if (exception == null) throw new ArgumentNullException("exception");
Contract.EndContractBlock();
_target.Complete(exception, dropPendingMessages: true);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
public IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
{
return _source.LinkTo(target, linkOptions);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
public Boolean TryReceive(Predicate<TOutput> filter, out TOutput item)
{
return _source.TryReceive(filter, out item);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
public bool TryReceiveAll(out IList<TOutput> items) { return _source.TryReceiveAll(out items); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
public Task Completion { get { return _source.Completion; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="InputCount"]/*' />
public int InputCount { get { return _target.InputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="OutputCount"]/*' />
public int OutputCount { get { return _source.OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, Boolean consumeToAccept)
{
return _target.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
TOutput ISourceBlock<TOutput>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed)
{
return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
bool ISourceBlock<TOutput>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
{
return _source.ReserveMessage(messageHeader, target);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
void ISourceBlock<TOutput>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
{
_source.ReleaseReservation(messageHeader, target);
}
/// <summary>Gets the number of messages waiting to be processed. This must only be used from the debugger as it avoids taking necessary locks.</summary>
private int InputCountForDebugger { get { return _target.GetDebuggingInformation().InputCount; } }
/// <summary>Gets the number of messages waiting to be processed. This must only be used from the debugger as it avoids taking necessary locks.</summary>
private int OutputCountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
/// <summary>The data to display in the debugger display attribute.</summary>
[SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
private object DebuggerDisplayContent
{
get
{
return string.Format("{0}, InputCount={1}, OutputCount={2}",
Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
InputCountForDebugger,
OutputCountForDebugger);
}
}
/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
/// <summary>Provides a debugger type proxy for the TransformBlock.</summary>
private sealed class DebugView
{
/// <summary>The transform being viewed.</summary>
private readonly TransformBlock<TInput, TOutput> _transformBlock;
/// <summary>The target half of the block being viewed.</summary>
private readonly TargetCore<TInput>.DebuggingInformation _targetDebuggingInformation;
/// <summary>The source half of the block being viewed.</summary>
private readonly SourceCore<TOutput>.DebuggingInformation _sourceDebuggingInformation;
/// <summary>Initializes the debug view.</summary>
/// <param name="transformBlock">The transform being viewed.</param>
public DebugView(TransformBlock<TInput, TOutput> transformBlock)
{
Contract.Requires(transformBlock != null, "Need a block with which to construct the debug view.");
_transformBlock = transformBlock;
_targetDebuggingInformation = transformBlock._target.GetDebuggingInformation();
_sourceDebuggingInformation = transformBlock._source.GetDebuggingInformation();
}
/// <summary>Gets the messages waiting to be processed.</summary>
public IEnumerable<TInput> InputQueue { get { return _targetDebuggingInformation.InputQueue; } }
/// <summary>Gets any postponed messages.</summary>
public QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader> PostponedMessages { get { return _targetDebuggingInformation.PostponedMessages; } }
/// <summary>Gets the messages waiting to be received.</summary>
public IEnumerable<TOutput> OutputQueue { get { return _sourceDebuggingInformation.OutputQueue; } }
/// <summary>Gets the number of outstanding input operations.</summary>
public Int32 CurrentDegreeOfParallelism { get { return _targetDebuggingInformation.CurrentDegreeOfParallelism; } }
/// <summary>Gets the task being used for output processing.</summary>
public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
public ExecutionDataflowBlockOptions DataflowBlockOptions { get { return _targetDebuggingInformation.DataflowBlockOptions; } }
/// <summary>Gets whether the block is declining further messages.</summary>
public bool IsDecliningPermanently { get { return _targetDebuggingInformation.IsDecliningPermanently; } }
/// <summary>Gets whether the block is completed.</summary>
public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
/// <summary>Gets the block's Id.</summary>
public int Id { get { return Common.GetBlockId(_transformBlock); } }
/// <summary>Gets the set of all targets linked from this block.</summary>
public TargetRegistry<TOutput> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
/// <summary>Gets the target that holds a reservation on the next message, if any.</summary>
public ITargetBlock<TOutput> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
}
}
}

View File

@ -1,142 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ActionOnDispose.cs
//
//
// Implemention of IDisposable that runs a delegate on Dispose.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
namespace System.Threading.Tasks.Dataflow.Internal
{
/// <summary>Provider of disposables that run actions.</summary>
internal sealed class Disposables
{
/// <summary>An IDisposable that does nothing.</summary>
internal readonly static IDisposable Nop = new NopDisposable();
/// <summary>Creates an IDisposable that runs an action when disposed.</summary>
/// <typeparam name="T1">Specifies the type of the first argument.</typeparam>
/// <typeparam name="T2">Specifies the type of the second argument.</typeparam>
/// <param name="action">The action to invoke.</param>
/// <param name="arg1">The first argument.</param>
/// <param name="arg2">The second argument.</param>
/// <returns>The created disposable.</returns>
internal static IDisposable Create<T1, T2>(Action<T1, T2> action, T1 arg1, T2 arg2)
{
Contract.Requires(action != null, "Non-null disposer action required.");
return new Disposable<T1, T2>(action, arg1, arg2);
}
/// <summary>Creates an IDisposable that runs an action when disposed.</summary>
/// <typeparam name="T1">Specifies the type of the first argument.</typeparam>
/// <typeparam name="T2">Specifies the type of the second argument.</typeparam>
/// <typeparam name="T3">Specifies the type of the third argument.</typeparam>
/// <param name="action">The action to invoke.</param>
/// <param name="arg1">The first argument.</param>
/// <param name="arg2">The second argument.</param>
/// <param name="arg3">The third argument.</param>
/// <returns>The created disposable.</returns>
internal static IDisposable Create<T1, T2, T3>(Action<T1, T2, T3> action, T1 arg1, T2 arg2, T3 arg3)
{
Contract.Requires(action != null, "Non-null disposer action required.");
return new Disposable<T1, T2, T3>(action, arg1, arg2, arg3);
}
/// <summary>A disposable that's a nop.</summary>
[DebuggerDisplay("Disposed = true")]
private sealed class NopDisposable : IDisposable
{
void IDisposable.Dispose() { }
}
/// <summary>An IDisposable that will run a delegate when disposed.</summary>
[DebuggerDisplay("Disposed = {Disposed}")]
private sealed class Disposable<T1, T2> : IDisposable
{
/// <summary>First state argument.</summary>
private readonly T1 _arg1;
/// <summary>Second state argument.</summary>
private readonly T2 _arg2;
/// <summary>The action to run when disposed. Null if disposed.</summary>
private Action<T1, T2> _action;
/// <summary>Initializes the ActionOnDispose.</summary>
/// <param name="action">The action to run when disposed.</param>
/// <param name="arg1">The first argument.</param>
/// <param name="arg2">The second argument.</param>
internal Disposable(Action<T1, T2> action, T1 arg1, T2 arg2)
{
Contract.Requires(action != null, "Non-null action needed for disposable");
_action = action;
_arg1 = arg1;
_arg2 = arg2;
}
/// <summary>Gets whether the IDisposable has been disposed.</summary>
[SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")]
private bool Disposed { get { return _action == null; } }
/// <summary>Invoke the action.</summary>
void IDisposable.Dispose()
{
Action<T1, T2> toRun = _action;
if (toRun != null &&
Interlocked.CompareExchange(ref _action, null, toRun) == toRun)
{
toRun(_arg1, _arg2);
}
}
}
/// <summary>An IDisposable that will run a delegate when disposed.</summary>
[DebuggerDisplay("Disposed = {Disposed}")]
private sealed class Disposable<T1, T2, T3> : IDisposable
{
/// <summary>First state argument.</summary>
private readonly T1 _arg1;
/// <summary>Second state argument.</summary>
private readonly T2 _arg2;
/// <summary>Third state argument.</summary>
private readonly T3 _arg3;
/// <summary>The action to run when disposed. Null if disposed.</summary>
private Action<T1, T2, T3> _action;
/// <summary>Initializes the ActionOnDispose.</summary>
/// <param name="action">The action to run when disposed.</param>
/// <param name="arg1">The first argument.</param>
/// <param name="arg2">The second argument.</param>
/// <param name="arg3">The third argument.</param>
internal Disposable(Action<T1, T2, T3> action, T1 arg1, T2 arg2, T3 arg3)
{
Contract.Requires(action != null, "Non-null action needed for disposable");
_action = action;
_arg1 = arg1;
_arg2 = arg2;
_arg3 = arg3;
}
/// <summary>Gets whether the IDisposable has been disposed.</summary>
[SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")]
private bool Disposed { get { return _action == null; } }
/// <summary>Invoke the action.</summary>
void IDisposable.Dispose()
{
Action<T1, T2, T3> toRun = _action;
if (toRun != null &&
Interlocked.CompareExchange(ref _action, null, toRun) == toRun)
{
toRun(_arg1, _arg2, _arg3);
}
}
}
}
}

Some files were not shown because too many files have changed in this diff Show More