Imported Upstream version 5.0.0.42

Former-commit-id: fd56571888259555122d8a0f58c68838229cea2b
This commit is contained in:
Xamarin Public Jenkins (auto-signing)
2017-04-10 11:41:01 +00:00
parent 1190d13a04
commit 6bdd276d05
19939 changed files with 3099680 additions and 93811 deletions

View File

@@ -0,0 +1,59 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 2013
VisualStudioVersion = 12.0.30723.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Threading.Tasks.Dataflow.Tests", "tests\System.Threading.Tasks.Dataflow.Tests.csproj", "{72E21903-0FBA-444E-9855-3B4F05DFC1F9}"
ProjectSection(ProjectDependencies) = postProject
{0C10C503-FD37-4990-BD0F-B79FE22203DD} = {0C10C503-FD37-4990-BD0F-B79FE22203DD}
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Threading.Tasks.Dataflow", "src\System.Threading.Tasks.Dataflow.csproj", "{1DD0FF15-6234-4BD6-850A-317F05479554}"
ProjectSection(ProjectDependencies) = postProject
{0C10C503-FD37-4990-BD0F-B79FE22203DD} = {0C10C503-FD37-4990-BD0F-B79FE22203DD}
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Threading.Tasks.Dataflow.WP8", "src\System.Threading.Tasks.Dataflow.WP8.csproj", "{0C10C503-FD37-4990-BD0F-B79FE22203DD}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{3A3E1C22-FF83-4D70-A94A-6C130805E6BF}"
ProjectSection(SolutionItems) = preProject
..\.nuget\packages.Windows_NT.config = ..\.nuget\packages.Windows_NT.config
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
DebugNETCoreAppnetstandard1.0netstandard1.3|AnyCPU = DebugNETCoreAppnetstandard1.0netstandard1.3|AnyCPU
ReleaseNETCoreAppnetstandard1.0netstandard1.3|AnyCPU = ReleaseNETCoreAppnetstandard1.0netstandard1.3|AnyCPU
DebugNETCoreAppnetstandard1.1netstandard1.3|AnyCPU = DebugNETCoreAppnetstandard1.1netstandard1.3|AnyCPU
ReleaseNETCoreAppnetstandard1.1netstandard1.3|AnyCPU = ReleaseNETCoreAppnetstandard1.1netstandard1.3|AnyCPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{72E21903-0FBA-444E-9855-3B4F05DFC1F9}.DebugNETCoreAppnetstandard1.0netstandard1.3|AnyCPU.ActiveCfg = netstandard1.3-Debug|Any CPU
{72E21903-0FBA-444E-9855-3B4F05DFC1F9}.DebugNETCoreAppnetstandard1.0netstandard1.3|AnyCPU.Build.0 = netstandard1.3-Debug|Any CPU
{72E21903-0FBA-444E-9855-3B4F05DFC1F9}.ReleaseNETCoreAppnetstandard1.0netstandard1.3|AnyCPU.ActiveCfg = netstandard1.3-Release|Any CPU
{72E21903-0FBA-444E-9855-3B4F05DFC1F9}.ReleaseNETCoreAppnetstandard1.0netstandard1.3|AnyCPU.Build.0 = netstandard1.3-Release|Any CPU
{72E21903-0FBA-444E-9855-3B4F05DFC1F9}.DebugNETCoreAppnetstandard1.1netstandard1.3|AnyCPU.ActiveCfg = netstandard1.3-Debug|Any CPU
{72E21903-0FBA-444E-9855-3B4F05DFC1F9}.DebugNETCoreAppnetstandard1.1netstandard1.3|AnyCPU.Build.0 = netstandard1.3-Debug|Any CPU
{72E21903-0FBA-444E-9855-3B4F05DFC1F9}.ReleaseNETCoreAppnetstandard1.1netstandard1.3|AnyCPU.ActiveCfg = netstandard1.3-Release|Any CPU
{72E21903-0FBA-444E-9855-3B4F05DFC1F9}.ReleaseNETCoreAppnetstandard1.1netstandard1.3|AnyCPU.Build.0 = netstandard1.3-Release|Any CPU
{1DD0FF15-6234-4BD6-850A-317F05479554}.DebugNETCoreAppnetstandard1.0netstandard1.3|AnyCPU.ActiveCfg = netstandard1.0-Debug|Any CPU
{1DD0FF15-6234-4BD6-850A-317F05479554}.DebugNETCoreAppnetstandard1.0netstandard1.3|AnyCPU.Build.0 = netstandard1.0-Debug|Any CPU
{1DD0FF15-6234-4BD6-850A-317F05479554}.ReleaseNETCoreAppnetstandard1.0netstandard1.3|AnyCPU.ActiveCfg = netstandard1.0-Release|Any CPU
{1DD0FF15-6234-4BD6-850A-317F05479554}.ReleaseNETCoreAppnetstandard1.0netstandard1.3|AnyCPU.Build.0 = netstandard1.0-Release|Any CPU
{1DD0FF15-6234-4BD6-850A-317F05479554}.DebugNETCoreAppnetstandard1.1netstandard1.3|AnyCPU.ActiveCfg = netstandard1.1-Debug|Any CPU
{1DD0FF15-6234-4BD6-850A-317F05479554}.DebugNETCoreAppnetstandard1.1netstandard1.3|AnyCPU.Build.0 = netstandard1.1-Debug|Any CPU
{1DD0FF15-6234-4BD6-850A-317F05479554}.ReleaseNETCoreAppnetstandard1.1netstandard1.3|AnyCPU.ActiveCfg = netstandard1.1-Release|Any CPU
{1DD0FF15-6234-4BD6-850A-317F05479554}.ReleaseNETCoreAppnetstandard1.1netstandard1.3|AnyCPU.Build.0 = netstandard1.1-Release|Any CPU
{0C10C503-FD37-4990-BD0F-B79FE22203DD}.DebugNETCoreAppnetstandard1.0netstandard1.3|AnyCPU.ActiveCfg = netstandard1.0-Debug|Any CPU
{0C10C503-FD37-4990-BD0F-B79FE22203DD}.DebugNETCoreAppnetstandard1.0netstandard1.3|AnyCPU.Build.0 = netstandard1.0-Debug|Any CPU
{0C10C503-FD37-4990-BD0F-B79FE22203DD}.ReleaseNETCoreAppnetstandard1.0netstandard1.3|AnyCPU.ActiveCfg = netstandard1.0-Release|Any CPU
{0C10C503-FD37-4990-BD0F-B79FE22203DD}.ReleaseNETCoreAppnetstandard1.0netstandard1.3|AnyCPU.Build.0 = netstandard1.0-Release|Any CPU
{0C10C503-FD37-4990-BD0F-B79FE22203DD}.DebugNETCoreAppnetstandard1.1netstandard1.3|AnyCPU.ActiveCfg = netstandard1.1-Debug|Any CPU
{0C10C503-FD37-4990-BD0F-B79FE22203DD}.DebugNETCoreAppnetstandard1.1netstandard1.3|AnyCPU.Build.0 = netstandard1.1-Debug|Any CPU
{0C10C503-FD37-4990-BD0F-B79FE22203DD}.ReleaseNETCoreAppnetstandard1.1netstandard1.3|AnyCPU.ActiveCfg = netstandard1.1-Release|Any CPU
{0C10C503-FD37-4990-BD0F-B79FE22203DD}.ReleaseNETCoreAppnetstandard1.1netstandard1.3|AnyCPU.Build.0 = netstandard1.1-Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
EndGlobal

View File

@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="..\dir.props" />
<PropertyGroup>
<PackageVersion>4.8.0</PackageVersion>
<AssemblyVersion>4.6.2.0</AssemblyVersion>
<IsNETCoreApp>true</IsNETCoreApp>
</PropertyGroup>
</Project>

View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.props))\dir.props" />
<ItemGroup>
<Project Include="System.Threading.Tasks.Dataflow.pkgproj" />
</ItemGroup>
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.traversal.targets))\dir.traversal.targets" />
</Project>

View File

@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.props))\dir.props" />
<PropertyGroup>
<!-- we need to be supported on pre-nuget-3 platforms (Dev12, Dev11, etc) -->
<MinClientVersion>2.8.6</MinClientVersion>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\src\System.Threading.Tasks.Dataflow.builds">
<SupportedFramework>net45;netcore45;wp8;wpa81;netcoreapp1.0;$(AllXamarinFrameworks)</SupportedFramework>
</ProjectReference>
</ItemGroup>
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.targets))\dir.targets" />
</Project>

View File

@@ -0,0 +1 @@
08e6b7ba7dac7dd1b7df4be119523a7db4f1e0de

View File

@@ -0,0 +1,449 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// DataflowBlockOptions.cs
//
//
// DataflowBlockOptions types for configuring dataflow blocks
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>
/// Provides options used to configure the processing performed by dataflow blocks.
/// </summary>
/// <remarks>
/// <see cref="DataflowBlockOptions"/> is mutable and can be configured through its properties.
/// When specific configuration options are not set, the following defaults are used:
/// <list type="table">
/// <listheader>
/// <term>Options</term>
/// <description>Default</description>
/// </listheader>
/// <item>
/// <term>TaskScheduler</term>
/// <description><see cref="System.Threading.Tasks.TaskScheduler.Default"/></description>
/// </item>
/// <item>
/// <term>MaxMessagesPerTask</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>CancellationToken</term>
/// <description><see cref="System.Threading.CancellationToken.None"/></description>
/// </item>
/// <item>
/// <term>BoundedCapacity</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>NameFormat</term>
/// <description>"{0} Id={1}"</description>
/// </item>
/// <item>
/// <term>EnsureOrdered</term>
/// <description>true</description>
/// </item>
/// </list>
/// Dataflow blocks capture the state of the options at their construction. Subsequent changes
/// to the provided <see cref="DataflowBlockOptions"/> instance should not affect the behavior
/// of a dataflow block.
/// </remarks>
[DebuggerDisplay("TaskScheduler = {TaskScheduler}, MaxMessagesPerTask = {MaxMessagesPerTask}, BoundedCapacity = {BoundedCapacity}")]
public class DataflowBlockOptions
{
/// <summary>
/// A constant used to specify an unlimited quantity for <see cref="DataflowBlockOptions"/> members
/// that provide an upper bound. This field is constant.
/// </summary>
public const Int32 Unbounded = -1;
/// <summary>The scheduler to use for scheduling tasks to process messages.</summary>
private TaskScheduler _taskScheduler = TaskScheduler.Default;
/// <summary>The cancellation token to monitor for cancellation requests.</summary>
private CancellationToken _cancellationToken = CancellationToken.None;
/// <summary>The maximum number of messages that may be processed per task.</summary>
private Int32 _maxMessagesPerTask = Unbounded;
/// <summary>The maximum number of messages that may be buffered by the block.</summary>
private Int32 _boundedCapacity = Unbounded;
/// <summary>The name format to use for creating a name for a block.</summary>
private string _nameFormat = "{0} Id={1}"; // see NameFormat property for a description of format items
/// <summary>Whether to force ordered processing of messages.</summary>
private bool _ensureOrdered = true;
/// <summary>A default instance of <see cref="DataflowBlockOptions"/>.</summary>
/// <remarks>
/// Do not change the values of this instance. It is shared by all of our blocks when no options are provided by the user.
/// </remarks>
internal static readonly DataflowBlockOptions Default = new DataflowBlockOptions();
/// <summary>Returns this <see cref="DataflowBlockOptions"/> instance if it's the default instance or else a cloned instance.</summary>
/// <returns>An instance of the options that may be cached by the block.</returns>
internal DataflowBlockOptions DefaultOrClone()
{
return (this == Default) ?
this :
new DataflowBlockOptions
{
TaskScheduler = this.TaskScheduler,
CancellationToken = this.CancellationToken,
MaxMessagesPerTask = this.MaxMessagesPerTask,
BoundedCapacity = this.BoundedCapacity,
NameFormat = this.NameFormat,
EnsureOrdered = this.EnsureOrdered
};
}
/// <summary>Initializes the <see cref="DataflowBlockOptions"/>.</summary>
public DataflowBlockOptions() { }
/// <summary>Gets or sets the <see cref="System.Threading.Tasks.TaskScheduler"/> to use for scheduling tasks.</summary>
public TaskScheduler TaskScheduler
{
get { return _taskScheduler; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
if (value == null) throw new ArgumentNullException(nameof(value));
_taskScheduler = value;
}
}
/// <summary>Gets or sets the <see cref="System.Threading.CancellationToken"/> to monitor for cancellation requests.</summary>
public CancellationToken CancellationToken
{
get { return _cancellationToken; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
_cancellationToken = value;
}
}
/// <summary>Gets or sets the maximum number of messages that may be processed per task.</summary>
public Int32 MaxMessagesPerTask
{
get { return _maxMessagesPerTask; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
if (value < 1 && value != Unbounded) throw new ArgumentOutOfRangeException(nameof(value));
_maxMessagesPerTask = value;
}
}
/// <summary>Gets a MaxMessagesPerTask value that may be used for comparison purposes.</summary>
/// <returns>The maximum value, usable for comparison purposes.</returns>
/// <remarks>Unlike MaxMessagesPerTask, this property will always return a positive value.</remarks>
internal Int32 ActualMaxMessagesPerTask
{
get { return (_maxMessagesPerTask == Unbounded) ? Int32.MaxValue : _maxMessagesPerTask; }
}
/// <summary>Gets or sets the maximum number of messages that may be buffered by the block.</summary>
public Int32 BoundedCapacity
{
get { return _boundedCapacity; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
if (value < 1 && value != Unbounded) throw new ArgumentOutOfRangeException(nameof(value));
_boundedCapacity = value;
}
}
/// <summary>
/// Gets or sets the format string to use when a block is queried for its name.
/// </summary>
/// <remarks>
/// The name format may contain up to two format items. {0} will be substituted
/// with the block's name. {1} will be substituted with the block's Id, as is
/// returned from the block's Completion.Id property.
/// </remarks>
public string NameFormat
{
get { return _nameFormat; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
if (value == null) throw new ArgumentNullException(nameof(value));
_nameFormat = value;
}
}
/// <summary>Gets or sets whether ordered processing should be enforced on a block's handling of messages.</summary>
/// <remarks>
/// By default, dataflow blocks enforce ordering on the processing of messages. This means that a
/// block like <see cref="TransformBlock{TInput, TOutput}"/> will ensure that messages are output in the same
/// order they were input, even if parallelism is employed by the block and the processing of a message N finishes
/// after the processing of a subsequent message N+1 (the block will reorder the results to maintain the input
/// ordering prior to making those results available to a consumer). Some blocks may allow this to be relaxed,
/// however. Setting <see cref="EnsureOrdered"/> to false tells a block that it may relax this ordering if
/// it's able to do so. This can be beneficial if the immediacy of a processed result being made available
/// is more important than the input-to-output ordering being maintained.
/// </remarks>
public bool EnsureOrdered
{
get { return _ensureOrdered; }
set { _ensureOrdered = value; }
}
}
/// <summary>
/// Provides options used to configure the processing performed by dataflow blocks that
/// process each message through the invocation of a user-provided delegate, blocks such
/// as <see cref="ActionBlock{T}"/> and <see cref="TransformBlock{TInput,TOutput}"/>.
/// </summary>
/// <remarks>
/// <see cref="ExecutionDataflowBlockOptions"/> is mutable and can be configured through its properties.
/// When specific configuration options are not set, the following defaults are used:
/// <list type="table">
/// <listheader>
/// <term>Options</term>
/// <description>Default</description>
/// </listheader>
/// <item>
/// <term>TaskScheduler</term>
/// <description><see cref="System.Threading.Tasks.TaskScheduler.Default"/></description>
/// </item>
/// <item>
/// <term>CancellationToken</term>
/// <description><see cref="System.Threading.CancellationToken.None"/></description>
/// </item>
/// <item>
/// <term>MaxMessagesPerTask</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>BoundedCapacity</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>NameFormat</term>
/// <description>"{0} Id={1}"</description>
/// </item>
/// <item>
/// <term>EnsureOrdered</term>
/// <description>true</description>
/// </item>
/// <item>
/// <term>MaxDegreeOfParallelism</term>
/// <description>1</description>
/// </item>
/// <item>
/// <term>SingleProducerConstrained</term>
/// <description>false</description>
/// </item>
/// </list>
/// Dataflow block captures the state of the options at their construction. Subsequent changes
/// to the provided <see cref="ExecutionDataflowBlockOptions"/> instance should not affect the behavior
/// of a dataflow block.
/// </remarks>
[DebuggerDisplay("TaskScheduler = {TaskScheduler}, MaxMessagesPerTask = {MaxMessagesPerTask}, BoundedCapacity = {BoundedCapacity}, MaxDegreeOfParallelism = {MaxDegreeOfParallelism}")]
public class ExecutionDataflowBlockOptions : DataflowBlockOptions
{
/// <summary>A default instance of <see cref="DataflowBlockOptions"/>.</summary>
/// <remarks>
/// Do not change the values of this instance. It is shared by all of our blocks when no options are provided by the user.
/// </remarks>
internal new static readonly ExecutionDataflowBlockOptions Default = new ExecutionDataflowBlockOptions();
/// <summary>Returns this <see cref="ExecutionDataflowBlockOptions"/> instance if it's the default instance or else a cloned instance.</summary>
/// <returns>An instance of the options that may be cached by the block.</returns>
internal new ExecutionDataflowBlockOptions DefaultOrClone()
{
return (this == Default) ?
this :
new ExecutionDataflowBlockOptions
{
TaskScheduler = this.TaskScheduler,
CancellationToken = this.CancellationToken,
MaxMessagesPerTask = this.MaxMessagesPerTask,
BoundedCapacity = this.BoundedCapacity,
NameFormat = this.NameFormat,
EnsureOrdered = this.EnsureOrdered,
MaxDegreeOfParallelism = this.MaxDegreeOfParallelism,
SingleProducerConstrained = this.SingleProducerConstrained
};
}
/// <summary>The maximum number of tasks that may be used concurrently to process messages.</summary>
private Int32 _maxDegreeOfParallelism = 1;
/// <summary>Whether the code using this block will only ever have a single producer accessing the block at any given time.</summary>
private Boolean _singleProducerConstrained = false;
/// <summary>Initializes the <see cref="ExecutionDataflowBlockOptions"/>.</summary>
public ExecutionDataflowBlockOptions() { }
/// <summary>Gets the maximum number of messages that may be processed by the block concurrently.</summary>
public Int32 MaxDegreeOfParallelism
{
get { return _maxDegreeOfParallelism; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
if (value < 1 && value != Unbounded) throw new ArgumentOutOfRangeException(nameof(value));
_maxDegreeOfParallelism = value;
}
}
/// <summary>
/// Gets whether code using the dataflow block is constrained to one producer at a time.
/// </summary>
/// <remarks>
/// This property defaults to false, such that the block may be used by multiple
/// producers concurrently. This property should only be set to true if the code
/// using the block can guarantee that it will only ever be used by one producer
/// (e.g. a source linked to the block) at a time, meaning that methods like Post,
/// Complete, Fault, and OfferMessage will never be called concurrently. Some blocks
/// may choose to capitalize on the knowledge that there will only be one producer at a time
/// in order to provide better performance.
/// </remarks>
public Boolean SingleProducerConstrained
{
get { return _singleProducerConstrained; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
_singleProducerConstrained = value;
}
}
/// <summary>Gets a MaxDegreeOfParallelism value that may be used for comparison purposes.</summary>
/// <returns>The maximum value, usable for comparison purposes.</returns>
/// <remarks>Unlike MaxDegreeOfParallelism, this property will always return a positive value.</remarks>
internal Int32 ActualMaxDegreeOfParallelism
{
get { return (_maxDegreeOfParallelism == Unbounded) ? Int32.MaxValue : _maxDegreeOfParallelism; }
}
/// <summary>Gets whether these dataflow block options allow for parallel execution.</summary>
internal Boolean SupportsParallelExecution { get { return _maxDegreeOfParallelism == Unbounded || _maxDegreeOfParallelism > 1; } }
}
/// <summary>
/// Provides options used to configure the processing performed by dataflow blocks that
/// group together multiple messages, blocks such as <see cref="JoinBlock{T1,T2}"/> and
/// <see cref="BatchBlock{T}"/>.
/// </summary>
/// <remarks>
/// <see cref="GroupingDataflowBlockOptions"/> is mutable and can be configured through its properties.
/// When specific configuration options are not set, the following defaults are used:
/// <list type="table">
/// <listheader>
/// <term>Options</term>
/// <description>Default</description>
/// </listheader>
/// <item>
/// <term>TaskScheduler</term>
/// <description><see cref="System.Threading.Tasks.TaskScheduler.Default"/></description>
/// </item>
/// <item>
/// <term>CancellationToken</term>
/// <description><see cref="System.Threading.CancellationToken.None"/></description>
/// </item>
/// <item>
/// <term>MaxMessagesPerTask</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>BoundedCapacity</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>NameFormat</term>
/// <description>"{0} Id={1}"</description>
/// </item>
/// <item>
/// <term>EnsureOrdered</term>
/// <description>true</description>
/// </item>
/// <item>
/// <term>MaxNumberOfGroups</term>
/// <description>GroupingDataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>Greedy</term>
/// <description>true</description>
/// </item>
/// </list>
/// Dataflow block capture the state of the options at their construction. Subsequent changes
/// to the provided <see cref="GroupingDataflowBlockOptions"/> instance should not affect the behavior
/// of a dataflow block.
/// </remarks>
[DebuggerDisplay("TaskScheduler = {TaskScheduler}, MaxMessagesPerTask = {MaxMessagesPerTask}, BoundedCapacity = {BoundedCapacity}, Greedy = {Greedy}, MaxNumberOfGroups = {MaxNumberOfGroups}")]
public class GroupingDataflowBlockOptions : DataflowBlockOptions
{
/// <summary>A default instance of <see cref="DataflowBlockOptions"/>.</summary>
/// <remarks>
/// Do not change the values of this instance. It is shared by all of our blocks when no options are provided by the user.
/// </remarks>
internal new static readonly GroupingDataflowBlockOptions Default = new GroupingDataflowBlockOptions();
/// <summary>Returns this <see cref="GroupingDataflowBlockOptions"/> instance if it's the default instance or else a cloned instance.</summary>
/// <returns>An instance of the options that may be cached by the block.</returns>
internal new GroupingDataflowBlockOptions DefaultOrClone()
{
return (this == Default) ?
this :
new GroupingDataflowBlockOptions
{
TaskScheduler = this.TaskScheduler,
CancellationToken = this.CancellationToken,
MaxMessagesPerTask = this.MaxMessagesPerTask,
BoundedCapacity = this.BoundedCapacity,
NameFormat = this.NameFormat,
EnsureOrdered = this.EnsureOrdered,
Greedy = this.Greedy,
MaxNumberOfGroups = this.MaxNumberOfGroups
};
}
/// <summary>Whether the block should greedily consume offered messages.</summary>
private Boolean _greedy = true;
/// <summary>The maximum number of groups that should be generated by the block.</summary>
private Int64 _maxNumberOfGroups = Unbounded;
/// <summary>Initializes the <see cref="GroupingDataflowBlockOptions"/>.</summary>
public GroupingDataflowBlockOptions() { }
/// <summary>Gets or sets the Boolean value to use to determine whether to greedily consume offered messages.</summary>
public Boolean Greedy
{
get { return _greedy; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
_greedy = value;
}
}
/// <summary>Gets or sets the maximum number of groups that should be generated by the block.</summary>
public Int64 MaxNumberOfGroups
{
get { return _maxNumberOfGroups; }
set
{
Debug.Assert(this != Default, "Default instance is supposed to be immutable.");
if (value <= 0 && value != Unbounded) throw new ArgumentOutOfRangeException(nameof(value));
_maxNumberOfGroups = value;
}
}
/// <summary>Gets a MaxNumberOfGroups value that may be used for comparison purposes.</summary>
/// <returns>The maximum value, usable for comparison purposes.</returns>
/// <remarks>Unlike MaxNumberOfGroups, this property will always return a positive value.</remarks>
internal Int64 ActualMaxNumberOfGroups
{
get { return (_maxNumberOfGroups == Unbounded) ? Int64.MaxValue : _maxNumberOfGroups; }
}
}
}

View File

@@ -0,0 +1,114 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// DataflowLinkOptions.cs
//
//
// DataflowLinkOptions type for configuring links between dataflow blocks
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>
/// Provides options used to configure a link between dataflow blocks.
/// </summary>
/// <remarks>
/// <see cref="DataflowLinkOptions"/> is mutable and can be configured through its properties.
/// When specific configuration options are not set, the following defaults are used:
/// <list type="table">
/// <listheader>
/// <term>Options</term>
/// <description>Default</description>
/// </listheader>
/// <item>
/// <term>PropagateCompletion</term>
/// <description>False</description>
/// </item>
/// <item>
/// <term>MaxMessages</term>
/// <description>DataflowBlockOptions.Unbounded (-1)</description>
/// </item>
/// <item>
/// <term>Append</term>
/// <description>True</description>
/// </item>
/// </list>
/// Dataflow blocks capture the state of the options at linking. Subsequent changes to the provided
/// <see cref="DataflowLinkOptions"/> instance should not affect the behavior of a link.
/// </remarks>
[DebuggerDisplay("PropagateCompletion = {PropagateCompletion}, MaxMessages = {MaxMessages}, Append = {Append}")]
public class DataflowLinkOptions
{
/// <summary>
/// A constant used to specify an unlimited quantity for <see cref="DataflowLinkOptions"/> members
/// that provide an upper bound. This field is a constant tied to <see cref="DataflowLinkOptions.Unbounded"/>.
/// </summary>
internal const Int32 Unbounded = DataflowBlockOptions.Unbounded;
/// <summary>Whether the linked target will have completion and faulting notification propagated to it automatically.</summary>
private Boolean _propagateCompletion = false;
/// <summary>The maximum number of messages that may be consumed across the link.</summary>
private Int32 _maxNumberOfMessages = Unbounded;
/// <summary>Whether the link should be appended to the source?s list of links, or whether it should be prepended.</summary>
private Boolean _append = true;
/// <summary>A default instance of <see cref="DataflowLinkOptions"/>.</summary>
/// <remarks>
/// Do not change the values of this instance. It is shared by all of our blocks when no options are provided by the user.
/// </remarks>
internal static readonly DataflowLinkOptions Default = new DataflowLinkOptions();
/// <summary>A cached instance of <see cref="DataflowLinkOptions"/>.</summary>
/// <remarks>
/// Do not change the values of this instance. It is shared by all of our blocks that need to unlink after one message has been consumed.
/// </remarks>
internal static readonly DataflowLinkOptions UnlinkAfterOneAndPropagateCompletion = new DataflowLinkOptions() { MaxMessages = 1, PropagateCompletion = true };
/// <summary>Initializes the <see cref="DataflowLinkOptions"/>.</summary>
public DataflowLinkOptions()
{
}
/// <summary>Gets or sets whether the linked target will have completion and faulting notification propagated to it automatically.</summary>
public Boolean PropagateCompletion
{
get { return _propagateCompletion; }
set
{
Debug.Assert(this != Default && this != UnlinkAfterOneAndPropagateCompletion, "Default and UnlinkAfterOneAndPropagateCompletion instances are supposed to be immutable.");
_propagateCompletion = value;
}
}
/// <summary>Gets or sets the maximum number of messages that may be consumed across the link.</summary>
public Int32 MaxMessages
{
get { return _maxNumberOfMessages; }
set
{
Debug.Assert(this != Default && this != UnlinkAfterOneAndPropagateCompletion, "Default and UnlinkAfterOneAndPropagateCompletion instances are supposed to be immutable.");
if (value < 1 && value != Unbounded) throw new ArgumentOutOfRangeException(nameof(value));
_maxNumberOfMessages = value;
}
}
/// <summary>Gets or sets whether the link should be appended to the source?s list of links, or whether it should be prepended.</summary>
public Boolean Append
{
get { return _append; }
set
{
Debug.Assert(this != Default && this != UnlinkAfterOneAndPropagateCompletion, "Default and UnlinkAfterOneAndPropagateCompletion instances are supposed to be immutable.");
_append = value;
}
}
}
}

View File

@@ -0,0 +1,93 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// DataflowMessageHeader.cs
//
//
// A container of data attributes passed between dataflow blocks.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Threading.Tasks.Dataflow.Internal;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Provides a container of data attributes for passing between dataflow blocks.</summary>
[DebuggerDisplay("Id = {Id}")]
public struct DataflowMessageHeader : IEquatable<DataflowMessageHeader>
{
/// <summary>The message ID. Needs to be unique within the source.</summary>
private readonly long _id;
/// <summary>Initializes the <see cref="DataflowMessageHeader"/> with the specified attributes.</summary>
/// <param name="id">The ID of the message. Must be unique within the originating source block. Need not be globally unique.</param>
public DataflowMessageHeader(Int64 id)
{
if (id == default(long)) throw new ArgumentException(SR.Argument_InvalidMessageId, nameof(id));
Contract.EndContractBlock();
_id = id;
}
/// <summary>Gets the validity of the message.</summary>
/// <returns>True if the ID of the message is different from 0. False if the ID of the message is 0</returns>
public Boolean IsValid { get { return _id != default(long); } }
/// <summary>Gets the ID of the message within the source.</summary>
/// <returns>The ID contained in the <see cref="DataflowMessageHeader"/> instance.</returns>
public Int64 Id { get { return _id; } }
// These overrides are required by the FX API Guidelines.
// NOTE: When these overrides are present, the compiler doesn't complain about statements
// like 'if (struct == null) ...' which will result in incorrect behavior at runtime.
// The product code should not use them. Instead, it should compare the Id properties.
// To verify that, every once in a while, comment out this region and build the product.
#region Comparison Operators
/// <summary>Checks two <see cref="DataflowMessageHeader"/> instances for equality by ID without boxing.</summary>
/// <param name="other">Another <see cref="DataflowMessageHeader"/> instance.</param>
/// <returns>True if the instances are equal. False otherwise.</returns>
public bool Equals(DataflowMessageHeader other)
{
return this == other;
}
/// <summary>Checks boxed <see cref="DataflowMessageHeader"/> instances for equality by ID.</summary>
/// <param name="obj">A boxed <see cref="DataflowMessageHeader"/> instance.</param>
/// <returns>True if the instances are equal. False otherwise.</returns>
public override bool Equals(object obj)
{
return obj is DataflowMessageHeader && this == (DataflowMessageHeader)obj;
}
/// <summary>Generates a hash code for the <see cref="DataflowMessageHeader"/> instance.</summary>
/// <returns>Hash code.</returns>
public override int GetHashCode()
{
return (int)Id;
}
/// <summary>Checks two <see cref="DataflowMessageHeader"/> instances for equality by ID.</summary>
/// <param name="left">A <see cref="DataflowMessageHeader"/> instance.</param>
/// <param name="right">A <see cref="DataflowMessageHeader"/> instance.</param>
/// <returns>True if the instances are equal. False otherwise.</returns>
public static bool operator ==(DataflowMessageHeader left, DataflowMessageHeader right)
{
return left.Id == right.Id;
}
/// <summary>Checks two <see cref="DataflowMessageHeader"/> instances for non-equality by ID.</summary>
/// <param name="left">A <see cref="DataflowMessageHeader"/> instance.</param>
/// <param name="right">A <see cref="DataflowMessageHeader"/> instance.</param>
/// <returns>True if the instances are not equal. False otherwise.</returns>
public static bool operator !=(DataflowMessageHeader left, DataflowMessageHeader right)
{
return left.Id != right.Id;
}
#endregion
}
}

View File

@@ -0,0 +1,48 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// DataflowMessageStatus.cs
//
//
// Status about the propagation of a message.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Represents the status of a <see cref="DataflowMessageHeader"/> when passed between dataflow blocks.</summary>
public enum DataflowMessageStatus
{
/// <summary>
/// Indicates that the <see cref="ITargetBlock{TInput}"/> accepted the message. Once a target has accepted a message,
/// it is wholly owned by the target.
/// </summary>
Accepted = 0x0,
/// <summary>
/// Indicates that the <see cref="ITargetBlock{TInput}"/> declined the message. The <see cref="ISourceBlock{TOutput}"/> still owns the message.
/// </summary>
Declined = 0x1,
/// <summary>
/// Indicates that the <see cref="ITargetBlock{TInput}"/> postponed the message for potential consumption at a later time.
/// The <see cref="ISourceBlock{TOutput}"/> still owns the message.
/// </summary>
Postponed = 0x2,
/// <summary>
/// Indicates that the <see cref="ITargetBlock{TInput}"/> tried to accept the message from the <see cref="ISourceBlock{TOutput}"/>, but the
/// message was no longer available.
/// </summary>
NotAvailable = 0x3,
/// <summary>
/// Indicates that the <see cref="ITargetBlock{TInput}"/> declined the message. The <see cref="ISourceBlock{TOutput}"/> still owns the message.
/// Additionally, the <see cref="ITargetBlock{TInput}"/> will decline all future messages sent by the source.
/// </summary>
DecliningPermanently = 0x4
}
}

View File

@@ -0,0 +1,32 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// IDataflowBlock.cs
//
//
// The base interface for all dataflow blocks.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Represents a dataflow block.</summary>
public interface IDataflowBlock
{
// IMPLEMENT IMPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
Task Completion { get; }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
void Complete();
// IMPLEMENT EXPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void Fault(Exception exception);
}
}

View File

@@ -0,0 +1,23 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// IPropagatorBlock.cs
//
//
// The base interface for all propagator blocks.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Represents a dataflow block that is both a target for data and a source of data.</summary>
/// <typeparam name="TInput">Specifies the type of data accepted by the <see cref="IPropagatorBlock{TInput,TOutput}"/>.</typeparam>
/// <typeparam name="TOutput">Specifies the type of data supplied by the <see cref="IPropagatorBlock{TInput,TOutput}"/>.</typeparam>
public interface IPropagatorBlock<in TInput, out TOutput> : ITargetBlock<TInput>, ISourceBlock<TOutput>
{
// No additional members beyond those inherited from ITargetBlock<TInput> and ISourceBlock<TOutput>
}
}

View File

@@ -0,0 +1,32 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// IReceivableSourceBlock.cs
//
//
// The base interface for all source blocks.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Represents a dataflow block that supports receiving of messages without linking.</summary>
/// <typeparam name="TOutput">Specifies the type of data supplied by the <see cref="IReceivableSourceBlock{TOutput}"/>.</typeparam>
public interface IReceivableSourceBlock<TOutput> : ISourceBlock<TOutput>
{
// IMPLEMENT IMPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
bool TryReceive(Predicate<TOutput> filter, out TOutput item);
// IMPLEMENT IMPLICITLY IF BLOCK SUPPORTS RECEIVING MORE THAN ONE ITEM, OTHERWISE EXPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
bool TryReceiveAll(out IList<TOutput> items);
}
}

View File

@@ -0,0 +1,40 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ISourceBlock.cs
//
//
// The base interface for all source blocks.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Represents a dataflow block that is a source of data.</summary>
/// <typeparam name="TOutput">Specifies the type of data supplied by the <see cref="ISourceBlock{TOutput}"/>.</typeparam>
public interface ISourceBlock<out TOutput> : IDataflowBlock
{
// IMPLEMENT IMPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions);
// IMPLEMENT EXPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
[SuppressMessage("Microsoft.Design", "CA1021:AvoidOutParameters", MessageId = "2#")]
TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out Boolean messageConsumed);
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
Boolean ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target);
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target);
}
}

View File

@@ -0,0 +1,25 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ITargetBlock.cs
//
//
// The base interface for all target blocks.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Represents a dataflow block that is a target for data.</summary>
/// <typeparam name="TInput">Specifies the type of data accepted by the <see cref="ITargetBlock{TInput}"/>.</typeparam>
public interface ITargetBlock<in TInput> : IDataflowBlock
{
// IMPLEMENT EXPLICITLY
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, Boolean consumeToAccept);
}
}

View File

@@ -0,0 +1,384 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ActionBlock.cs
//
//
// A target block that executes an action for each message.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
using System.Runtime.CompilerServices;
using System.Threading.Tasks.Dataflow.Internal;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Provides a dataflow block that invokes a provided <see cref="System.Action{T}"/> delegate for every data element received.</summary>
/// <typeparam name="TInput">Specifies the type of data operated on by this <see cref="ActionBlock{T}"/>.</typeparam>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
[DebuggerTypeProxy(typeof(ActionBlock<>.DebugView))]
public sealed class ActionBlock<TInput> : ITargetBlock<TInput>, IDebuggerDisplay
{
/// <summary>The core implementation of this message block when in default mode.</summary>
private readonly TargetCore<TInput> _defaultTarget;
/// <summary>The core implementation of this message block when in SPSC mode.</summary>
private readonly SpscTargetCore<TInput> _spscTarget;
/// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Action{T}"/>.</summary>
/// <param name="action">The action to invoke with each data element received.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
public ActionBlock(Action<TInput> action) :
this((Delegate)action, ExecutionDataflowBlockOptions.Default)
{ }
/// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Action{T}"/> and <see cref="ExecutionDataflowBlockOptions"/>.</summary>
/// <param name="action">The action to invoke with each data element received.</param>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="ActionBlock{T}"/>.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
public ActionBlock(Action<TInput> action, ExecutionDataflowBlockOptions dataflowBlockOptions) :
this((Delegate)action, dataflowBlockOptions)
{ }
/// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Func{T,Task}"/>.</summary>
/// <param name="action">The action to invoke with each data element received.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
public ActionBlock(Func<TInput, Task> action) :
this((Delegate)action, ExecutionDataflowBlockOptions.Default)
{ }
/// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified <see cref="System.Func{T,Task}"/> and <see cref="ExecutionDataflowBlockOptions"/>.</summary>
/// <param name="action">The action to invoke with each data element received.</param>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="ActionBlock{T}"/>.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
public ActionBlock(Func<TInput, Task> action, ExecutionDataflowBlockOptions dataflowBlockOptions) :
this((Delegate)action, dataflowBlockOptions)
{ }
/// <summary>Initializes the <see cref="ActionBlock{T}"/> with the specified delegate and options.</summary>
/// <param name="action">The action to invoke with each data element received.</param>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="ActionBlock{T}"/>.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="action"/> is null (Nothing in Visual Basic).</exception>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
private ActionBlock(Delegate action, ExecutionDataflowBlockOptions dataflowBlockOptions)
{
// Validate arguments
if (action == null) throw new ArgumentNullException(nameof(action));
if (dataflowBlockOptions == null) throw new ArgumentNullException(nameof(dataflowBlockOptions));
Contract.Ensures((_spscTarget != null) ^ (_defaultTarget != null), "One and only one of the two targets must be non-null after construction");
Contract.EndContractBlock();
// Ensure we have options that can't be changed by the caller
dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
// Based on the mode, initialize the target. If the user specifies SingleProducerConstrained,
// we'll try to employ an optimized mode under a limited set of circumstances.
var syncAction = action as Action<TInput>;
if (syncAction != null &&
dataflowBlockOptions.SingleProducerConstrained &&
dataflowBlockOptions.MaxDegreeOfParallelism == 1 &&
!dataflowBlockOptions.CancellationToken.CanBeCanceled &&
dataflowBlockOptions.BoundedCapacity == DataflowBlockOptions.Unbounded)
{
// Initialize the SPSC fast target to handle the bulk of the processing.
// The SpscTargetCore is only supported when BoundedCapacity, CancellationToken,
// and MaxDOP are all their default values. It's also only supported for sync
// delegates and not for async delegates.
_spscTarget = new SpscTargetCore<TInput>(this, syncAction, dataflowBlockOptions);
}
else
{
// Initialize the TargetCore which handles the bulk of the processing.
// The default target core can handle all options and delegate flavors.
if (syncAction != null) // sync
{
_defaultTarget = new TargetCore<TInput>(this,
messageWithId => ProcessMessage(syncAction, messageWithId),
null, dataflowBlockOptions, TargetCoreOptions.RepresentsBlockCompletion);
}
else // async
{
var asyncAction = action as Func<TInput, Task>;
Debug.Assert(asyncAction != null, "action is of incorrect delegate type");
_defaultTarget = new TargetCore<TInput>(this,
messageWithId => ProcessMessageWithTask(asyncAction, messageWithId),
null, dataflowBlockOptions, TargetCoreOptions.RepresentsBlockCompletion | TargetCoreOptions.UsesAsyncCompletion);
}
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
dataflowBlockOptions.CancellationToken, Completion, state => ((TargetCore<TInput>)state).Complete(exception: null, dropPendingMessages: true), _defaultTarget);
}
#if FEATURE_TRACING
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
}
#endif
}
/// <summary>Processes the message with a user-provided action.</summary>
/// <param name="action">The action to use to process the message.</param>
/// <param name="messageWithId">The message to be processed.</param>
private void ProcessMessage(Action<TInput> action, KeyValuePair<TInput, long> messageWithId)
{
try
{
action(messageWithId.Key);
}
catch (Exception exc)
{
// If this exception represents cancellation, swallow it rather than shutting down the block.
if (!Common.IsCooperativeCancellation(exc)) throw;
}
finally
{
// We're done synchronously processing an element, so reduce the bounding count
// that was incrementing when this element was enqueued.
if (_defaultTarget.IsBounded) _defaultTarget.ChangeBoundingCount(-1);
}
}
/// <summary>Processes the message with a user-provided action that returns a task.</summary>
/// <param name="action">The action to use to process the message.</param>
/// <param name="messageWithId">The message to be processed.</param>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
private void ProcessMessageWithTask(Func<TInput, Task> action, KeyValuePair<TInput, long> messageWithId)
{
Debug.Assert(action != null, "action needed for processing");
// Run the action to get the task that represents the operation's completion
Task task = null;
Exception caughtException = null;
try
{
task = action(messageWithId.Key);
}
catch (Exception exc) { caughtException = exc; }
// If no task is available, we're done.
if (task == null)
{
// If we didn't get a task because an exception occurred,
// store it (if the exception was cancellation, just ignore it).
if (caughtException != null && !Common.IsCooperativeCancellation(caughtException))
{
Common.StoreDataflowMessageValueIntoExceptionData(caughtException, messageWithId.Key);
_defaultTarget.Complete(caughtException, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: false);
}
// Signal that we're done this async operation.
_defaultTarget.SignalOneAsyncMessageCompleted(boundingCountChange: -1);
return;
}
else if (task.IsCompleted)
{
AsyncCompleteProcessMessageWithTask(task);
}
else
{
// Otherwise, join with the asynchronous operation when it completes.
task.ContinueWith((completed, state) =>
{
((ActionBlock<TInput>)state).AsyncCompleteProcessMessageWithTask(completed);
}, this, CancellationToken.None, Common.GetContinuationOptions(TaskContinuationOptions.ExecuteSynchronously), TaskScheduler.Default);
}
}
/// <summary>Completes the processing of an asynchronous message.</summary>
/// <param name="completed">The completed task.</param>
private void AsyncCompleteProcessMessageWithTask(Task completed)
{
Debug.Assert(completed != null, "Need completed task for processing");
Debug.Assert(completed.IsCompleted, "The task to be processed must be completed by now.");
// If the task faulted, store its errors. We must add the exception before declining
// and signaling completion, as the exception is part of the operation, and the completion conditions
// depend on this.
if (completed.IsFaulted)
{
_defaultTarget.Complete(completed.Exception, dropPendingMessages: true, storeExceptionEvenIfAlreadyCompleting: true, unwrapInnerExceptions: true);
}
// Regardless of faults, note that we're done processing. There are
// no outputs to keep track of for action block, so we always decrement
// the bounding count here (the callee will handle checking whether
// we're actually in a bounded mode).
_defaultTarget.SignalOneAsyncMessageCompleted(boundingCountChange: -1);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
public void Complete()
{
if (_defaultTarget != null)
{
_defaultTarget.Complete(exception: null, dropPendingMessages: false);
}
else
{
_spscTarget.Complete(exception: null);
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void IDataflowBlock.Fault(Exception exception)
{
if (exception == null) throw new ArgumentNullException(nameof(exception));
Contract.EndContractBlock();
if (_defaultTarget != null)
{
_defaultTarget.Complete(exception, dropPendingMessages: true);
}
else
{
_spscTarget.Complete(exception);
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
public Task Completion
{
get { return _defaultTarget != null ? _defaultTarget.Completion : _spscTarget.Completion; }
}
/// <summary>Posts an item to the <see cref="T:System.Threading.Tasks.Dataflow.ITargetBlock`1"/>.</summary>
/// <param name="item">The item being offered to the target.</param>
/// <returns>true if the item was accepted by the target block; otherwise, false.</returns>
/// <remarks>
/// This method will return once the target block has decided to accept or decline the item,
/// but unless otherwise dictated by special semantics of the target block, it does not wait
/// for the item to actually be processed (for example, <see cref="T:System.Threading.Tasks.Dataflow.ActionBlock`1"/>
/// will return from Post as soon as it has stored the posted item into its input queue). From the perspective
/// of the block's processing, Post is asynchronous. For target blocks that support postponing offered messages,
/// or for blocks that may do more processing in their Post implementation, consider using
/// <see cref="T:System.Threading.Tasks.Dataflow.DataflowBlock.SendAsync">SendAsync</see>,
/// which will return immediately and will enable the target to postpone the posted message and later consume it
/// after SendAsync returns.
/// </remarks>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool Post(TInput item)
{
// Even though this method is available with the exact same functionality as an extension method
// on ITargetBlock, using that extension method goes through an interface call on ITargetBlock,
// which for very high-throughput scenarios shows up as noticeable overhead on certain architectures.
// We can eliminate that call for direct ActionBlock usage by providing the same method as an instance method.
return _defaultTarget != null ?
_defaultTarget.OfferMessage(Common.SingleMessageHeader, item, null, false) == DataflowMessageStatus.Accepted :
_spscTarget.Post(item);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
DataflowMessageStatus ITargetBlock<TInput>.OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, Boolean consumeToAccept)
{
return _defaultTarget != null ?
_defaultTarget.OfferMessage(messageHeader, messageValue, source, consumeToAccept) :
_spscTarget.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="InputCount"]/*' />
public int InputCount
{
get { return _defaultTarget != null ? _defaultTarget.InputCount : _spscTarget.InputCount; }
}
/// <summary>Gets the number of messages waiting to be processed. This must only be used from the debugger.</summary>
private int InputCountForDebugger
{
get { return _defaultTarget != null ? _defaultTarget.GetDebuggingInformation().InputCount : _spscTarget.InputCount; }
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
public override string ToString()
{
return Common.GetNameForDebugger(this, _defaultTarget != null ? _defaultTarget.DataflowBlockOptions : _spscTarget.DataflowBlockOptions);
}
/// <summary>The data to display in the debugger display attribute.</summary>
[SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
private object DebuggerDisplayContent
{
get
{
return string.Format("{0}, InputCount={1}",
Common.GetNameForDebugger(this, _defaultTarget != null ? _defaultTarget.DataflowBlockOptions : _spscTarget.DataflowBlockOptions),
InputCountForDebugger);
}
}
/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
/// <summary>Provides a debugger type proxy for the Call.</summary>
private sealed class DebugView
{
/// <summary>The action block being viewed.</summary>
private readonly ActionBlock<TInput> _actionBlock;
/// <summary>The action block's default target being viewed.</summary>
private readonly TargetCore<TInput>.DebuggingInformation _defaultDebugInfo;
/// <summary>The action block's SPSC target being viewed.</summary>
private readonly SpscTargetCore<TInput>.DebuggingInformation _spscDebugInfo;
/// <summary>Initializes the debug view.</summary>
/// <param name="actionBlock">The target being debugged.</param>
public DebugView(ActionBlock<TInput> actionBlock)
{
Debug.Assert(actionBlock != null, "Need a block with which to construct the debug view.");
_actionBlock = actionBlock;
if (_actionBlock._defaultTarget != null)
{
_defaultDebugInfo = actionBlock._defaultTarget.GetDebuggingInformation();
}
else
{
_spscDebugInfo = actionBlock._spscTarget.GetDebuggingInformation();
}
}
/// <summary>Gets the messages waiting to be processed.</summary>
public IEnumerable<TInput> InputQueue
{
get { return _defaultDebugInfo != null ? _defaultDebugInfo.InputQueue : _spscDebugInfo.InputQueue; }
}
/// <summary>Gets any postponed messages.</summary>
public QueuedMap<ISourceBlock<TInput>, DataflowMessageHeader> PostponedMessages
{
get { return _defaultDebugInfo != null ? _defaultDebugInfo.PostponedMessages : null; }
}
/// <summary>Gets the number of outstanding input operations.</summary>
public Int32 CurrentDegreeOfParallelism
{
get { return _defaultDebugInfo != null ? _defaultDebugInfo.CurrentDegreeOfParallelism : _spscDebugInfo.CurrentDegreeOfParallelism; }
}
/// <summary>Gets the ExecutionDataflowBlockOptions used to configure this block.</summary>
public ExecutionDataflowBlockOptions DataflowBlockOptions
{
get { return _defaultDebugInfo != null ? _defaultDebugInfo.DataflowBlockOptions : _spscDebugInfo.DataflowBlockOptions; }
}
/// <summary>Gets whether the block is declining further messages.</summary>
public bool IsDecliningPermanently
{
get { return _defaultDebugInfo != null ? _defaultDebugInfo.IsDecliningPermanently : _spscDebugInfo.IsDecliningPermanently; }
}
/// <summary>Gets whether the block is completed.</summary>
public bool IsCompleted
{
get { return _defaultDebugInfo != null ? _defaultDebugInfo.IsCompleted : _spscDebugInfo.IsCompleted; }
}
/// <summary>Gets the block's Id.</summary>
public int Id { get { return Common.GetBlockId(_actionBlock); } }
}
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,494 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// BufferBlock.cs
//
//
// A propagator block that provides support for unbounded and bounded FIFO buffers.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Security;
using System.Threading.Tasks.Dataflow.Internal;
using System.Diagnostics.CodeAnalysis;
namespace System.Threading.Tasks.Dataflow
{
/// <summary>Provides a buffer for storing data.</summary>
/// <typeparam name="T">Specifies the type of the data buffered by this dataflow block.</typeparam>
[DebuggerDisplay("{DebuggerDisplayContent,nq}")]
[DebuggerTypeProxy(typeof(BufferBlock<>.DebugView))]
public sealed class BufferBlock<T> : IPropagatorBlock<T, T>, IReceivableSourceBlock<T>, IDebuggerDisplay
{
/// <summary>The core logic for the buffer block.</summary>
private readonly SourceCore<T> _source;
/// <summary>The bounding state for when in bounding mode; null if not bounding.</summary>
private readonly BoundingStateWithPostponedAndTask<T> _boundingState;
/// <summary>Whether all future messages should be declined on the target.</summary>
private bool _targetDecliningPermanently;
/// <summary>A task has reserved the right to run the target's completion routine.</summary>
private bool _targetCompletionReserved;
/// <summary>Gets the lock object used to synchronize incoming requests.</summary>
private object IncomingLock { get { return _source; } }
/// <summary>Initializes the <see cref="BufferBlock{T}"/>.</summary>
public BufferBlock() :
this(DataflowBlockOptions.Default)
{ }
/// <summary>Initializes the <see cref="BufferBlock{T}"/> with the specified <see cref="DataflowBlockOptions"/>.</summary>
/// <param name="dataflowBlockOptions">The options with which to configure this <see cref="BufferBlock{T}"/>.</param>
/// <exception cref="System.ArgumentNullException">The <paramref name="dataflowBlockOptions"/> is null (Nothing in Visual Basic).</exception>
public BufferBlock(DataflowBlockOptions dataflowBlockOptions)
{
if (dataflowBlockOptions == null) throw new ArgumentNullException(nameof(dataflowBlockOptions));
Contract.EndContractBlock();
// Ensure we have options that can't be changed by the caller
dataflowBlockOptions = dataflowBlockOptions.DefaultOrClone();
// Initialize bounding state if necessary
Action<ISourceBlock<T>, int> onItemsRemoved = null;
if (dataflowBlockOptions.BoundedCapacity > 0)
{
onItemsRemoved = (owningSource, count) => ((BufferBlock<T>)owningSource).OnItemsRemoved(count);
_boundingState = new BoundingStateWithPostponedAndTask<T>(dataflowBlockOptions.BoundedCapacity);
}
// Initialize the source state
_source = new SourceCore<T>(this, dataflowBlockOptions,
owningSource => ((BufferBlock<T>)owningSource).Complete(),
onItemsRemoved);
// It is possible that the source half may fault on its own, e.g. due to a task scheduler exception.
// In those cases we need to fault the target half to drop its buffered messages and to release its
// reservations. This should not create an infinite loop, because all our implementations are designed
// to handle multiple completion requests and to carry over only one.
_source.Completion.ContinueWith((completed, state) =>
{
var thisBlock = ((BufferBlock<T>)state) as IDataflowBlock;
Debug.Assert(completed.IsFaulted, "The source must be faulted in order to trigger a target completion.");
thisBlock.Fault(completed.Exception);
}, this, CancellationToken.None, Common.GetContinuationOptions() | TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
// Handle async cancellation requests by declining on the target
Common.WireCancellationToComplete(
dataflowBlockOptions.CancellationToken, _source.Completion, owningSource => ((BufferBlock<T>)owningSource).Complete(), this);
#if FEATURE_TRACING
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.DataflowBlockCreated(this, dataflowBlockOptions);
}
#endif
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Targets/Member[@name="OfferMessage"]/*' />
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, Boolean consumeToAccept)
{
// Validate arguments
if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
if (source == null && consumeToAccept) throw new ArgumentException(SR.Argument_CantConsumeFromANullSource, nameof(consumeToAccept));
Contract.EndContractBlock();
lock (IncomingLock)
{
// If we've already stopped accepting messages, decline permanently
if (_targetDecliningPermanently)
{
CompleteTargetIfPossible();
return DataflowMessageStatus.DecliningPermanently;
}
// We can directly accept the message if:
// 1) we are not bounding, OR
// 2) we are bounding AND there is room available AND there are no postponed messages AND we are not currently processing.
// (If there were any postponed messages, we would need to postpone so that ordering would be maintained.)
// (We should also postpone if we are currently processing, because there may be a race between consuming postponed messages and
// accepting new ones directly into the queue.)
if (_boundingState == null
||
(_boundingState.CountIsLessThanBound && _boundingState.PostponedMessages.Count == 0 && _boundingState.TaskForInputProcessing == null))
{
// Consume the message from the source if necessary
if (consumeToAccept)
{
Debug.Assert(source != null, "We must have thrown if source == null && consumeToAccept == true.");
bool consumed;
messageValue = source.ConsumeMessage(messageHeader, this, out consumed);
if (!consumed) return DataflowMessageStatus.NotAvailable;
}
// Once consumed, pass it to the source
_source.AddMessage(messageValue);
if (_boundingState != null) _boundingState.CurrentCount++;
return DataflowMessageStatus.Accepted;
}
// Otherwise, we try to postpone if a source was provided
else if (source != null)
{
Debug.Assert(_boundingState != null && _boundingState.PostponedMessages != null,
"PostponedMessages must have been initialized during construction in bounding mode.");
_boundingState.PostponedMessages.Push(source, messageHeader);
return DataflowMessageStatus.Postponed;
}
// We can't do anything else about this message
return DataflowMessageStatus.Declined;
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
public void Complete() { CompleteCore(exception: null, storeExceptionEvenIfAlreadyCompleting: false); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void IDataflowBlock.Fault(Exception exception)
{
if (exception == null) throw new ArgumentNullException(nameof(exception));
Contract.EndContractBlock();
CompleteCore(exception, storeExceptionEvenIfAlreadyCompleting: false);
}
private void CompleteCore(Exception exception, bool storeExceptionEvenIfAlreadyCompleting, bool revertProcessingState = false)
{
Debug.Assert(storeExceptionEvenIfAlreadyCompleting || !revertProcessingState,
"Indicating dirty processing state may only come with storeExceptionEvenIfAlreadyCompleting==true.");
Contract.EndContractBlock();
lock (IncomingLock)
{
// Faulting from outside is allowed until we start declining permanently.
// Faulting from inside is allowed at any time.
if (exception != null && (!_targetDecliningPermanently || storeExceptionEvenIfAlreadyCompleting))
{
_source.AddException(exception);
}
// Revert the dirty processing state if requested
if (revertProcessingState)
{
Debug.Assert(_boundingState != null && _boundingState.TaskForInputProcessing != null,
"The processing state must be dirty when revertProcessingState==true.");
_boundingState.TaskForInputProcessing = null;
}
// Trigger completion
_targetDecliningPermanently = true;
CompleteTargetIfPossible();
}
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="LinkTo"]/*' />
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) { return _source.LinkTo(target, linkOptions); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceive"]/*' />
public Boolean TryReceive(Predicate<T> filter, out T item) { return _source.TryReceive(filter, out item); }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="TryReceiveAll"]/*' />
public Boolean TryReceiveAll(out IList<T> items) { return _source.TryReceiveAll(out items); }
/// <summary>Gets the number of items currently stored in the buffer.</summary>
public Int32 Count { get { return _source.OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
public Task Completion { get { return _source.Completion; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ConsumeMessage"]/*' />
T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out Boolean messageConsumed)
{
return _source.ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReserveMessage"]/*' />
bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
return _source.ReserveMessage(messageHeader, target);
}
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Sources/Member[@name="ReleaseReservation"]/*' />
void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
_source.ReleaseReservation(messageHeader, target);
}
/// <summary>Notifies the block that one or more items was removed from the queue.</summary>
/// <param name="numItemsRemoved">The number of items removed.</param>
private void OnItemsRemoved(int numItemsRemoved)
{
Debug.Assert(numItemsRemoved > 0, "A positive number of items to remove is required.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
// If we're bounding, we need to know when an item is removed so that we
// can update the count that's mirroring the actual count in the source's queue,
// and potentially kick off processing to start consuming postponed messages.
if (_boundingState != null)
{
lock (IncomingLock)
{
// Decrement the count, which mirrors the count in the source half
Debug.Assert(_boundingState.CurrentCount - numItemsRemoved >= 0,
"It should be impossible to have a negative number of items.");
_boundingState.CurrentCount -= numItemsRemoved;
ConsumeAsyncIfNecessary();
CompleteTargetIfPossible();
}
}
}
/// <summary>Called when postponed messages may need to be consumed.</summary>
/// <param name="isReplacementReplica">Whether this call is the continuation of a previous message loop.</param>
internal void ConsumeAsyncIfNecessary(bool isReplacementReplica = false)
{
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
Debug.Assert(_boundingState != null, "Must be in bounded mode.");
if (!_targetDecliningPermanently &&
_boundingState.TaskForInputProcessing == null &&
_boundingState.PostponedMessages.Count > 0 &&
_boundingState.CountIsLessThanBound)
{
// Create task and store into _taskForInputProcessing prior to scheduling the task
// so that _taskForInputProcessing will be visibly set in the task loop.
_boundingState.TaskForInputProcessing =
new Task(state => ((BufferBlock<T>)state).ConsumeMessagesLoopCore(), this,
Common.GetCreationOptionsForTask(isReplacementReplica));
#if FEATURE_TRACING
DataflowEtwProvider etwLog = DataflowEtwProvider.Log;
if (etwLog.IsEnabled())
{
etwLog.TaskLaunchedForMessageHandling(
this, _boundingState.TaskForInputProcessing, DataflowEtwProvider.TaskLaunchedReason.ProcessingInputMessages,
_boundingState.PostponedMessages.Count);
}
#endif
// Start the task handling scheduling exceptions
Exception exception = Common.StartTaskSafe(_boundingState.TaskForInputProcessing, _source.DataflowBlockOptions.TaskScheduler);
if (exception != null)
{
// Get out from under currently held locks. CompleteCore re-acquires the locks it needs.
Task.Factory.StartNew(exc => CompleteCore(exception: (Exception)exc, storeExceptionEvenIfAlreadyCompleting: true, revertProcessingState: true),
exception, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
}
}
/// <summary>Task body used to consume postponed messages.</summary>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
private void ConsumeMessagesLoopCore()
{
Debug.Assert(_boundingState != null && _boundingState.TaskForInputProcessing != null,
"May only be called in bounded mode and when a task is in flight.");
Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
"This must only be called from the in-flight processing task.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
try
{
int maxMessagesPerTask = _source.DataflowBlockOptions.ActualMaxMessagesPerTask;
for (int i = 0;
i < maxMessagesPerTask && ConsumeAndStoreOneMessageIfAvailable();
i++)
;
}
catch (Exception exc)
{
// Prevent the creation of new processing tasks
CompleteCore(exc, storeExceptionEvenIfAlreadyCompleting: true);
}
finally
{
lock (IncomingLock)
{
// We're no longer processing, so null out the processing task
_boundingState.TaskForInputProcessing = null;
// However, we may have given up early because we hit our own configured
// processing limits rather than because we ran out of work to do. If that's
// the case, make sure we spin up another task to keep going.
ConsumeAsyncIfNecessary(isReplacementReplica: true);
// If, however, we stopped because we ran out of work to do and we
// know we'll never get more, then complete.
CompleteTargetIfPossible();
}
}
}
/// <summary>
/// Retrieves one postponed message if there's room and if we can consume a postponed message.
/// Stores any consumed message into the source half.
/// </summary>
/// <returns>true if a message could be consumed and stored; otherwise, false.</returns>
/// <remarks>This must only be called from the asynchronous processing loop.</remarks>
private bool ConsumeAndStoreOneMessageIfAvailable()
{
Debug.Assert(_boundingState != null && _boundingState.TaskForInputProcessing != null,
"May only be called in bounded mode and when a task is in flight.");
Debug.Assert(_boundingState.TaskForInputProcessing.Id == Task.CurrentId,
"This must only be called from the in-flight processing task.");
Common.ContractAssertMonitorStatus(IncomingLock, held: false);
// Loop through the postponed messages until we get one.
while (true)
{
// Get the next item to retrieve. If there are no more, bail.
KeyValuePair<ISourceBlock<T>, DataflowMessageHeader> sourceAndMessage;
lock (IncomingLock)
{
if (_targetDecliningPermanently) return false;
if (!_boundingState.CountIsLessThanBound) return false;
if (!_boundingState.PostponedMessages.TryPop(out sourceAndMessage)) return false;
// Optimistically assume we're going to get the item. This avoids taking the lock
// again if we're right. If we're wrong, we decrement it later under lock.
_boundingState.CurrentCount++;
}
// Consume the item
bool consumed = false;
try
{
T consumedValue = sourceAndMessage.Key.ConsumeMessage(sourceAndMessage.Value, this, out consumed);
if (consumed)
{
_source.AddMessage(consumedValue);
return true;
}
}
finally
{
// We didn't get the item, so decrement the count to counteract our optimistic assumption.
if (!consumed)
{
lock (IncomingLock) _boundingState.CurrentCount--;
}
}
}
}
/// <summary>Completes the target, notifying the source, once all completion conditions are met.</summary>
private void CompleteTargetIfPossible()
{
Common.ContractAssertMonitorStatus(IncomingLock, held: true);
if (_targetDecliningPermanently &&
!_targetCompletionReserved &&
(_boundingState == null || _boundingState.TaskForInputProcessing == null))
{
_targetCompletionReserved = true;
// If we're in bounding mode and we have any postponed messages, we need to clear them,
// which means calling back to the source, which means we need to escape the incoming lock.
if (_boundingState != null && _boundingState.PostponedMessages.Count > 0)
{
Task.Factory.StartNew(state =>
{
var thisBufferBlock = (BufferBlock<T>)state;
// Release any postponed messages
List<Exception> exceptions = null;
if (thisBufferBlock._boundingState != null)
{
// Note: No locks should be held at this point
Common.ReleaseAllPostponedMessages(thisBufferBlock,
thisBufferBlock._boundingState.PostponedMessages,
ref exceptions);
}
if (exceptions != null)
{
// It is important to migrate these exceptions to the source part of the owning batch,
// because that is the completion task that is publically exposed.
thisBufferBlock._source.AddExceptions(exceptions);
}
thisBufferBlock._source.Complete();
}, this, CancellationToken.None, Common.GetCreationOptionsForTask(), TaskScheduler.Default);
}
// Otherwise, we can just decline the source directly.
else
{
_source.Complete();
}
}
}
/// <summary>Gets the number of messages in the buffer. This must only be used from the debugger as it avoids taking necessary locks.</summary>
private int CountForDebugger { get { return _source.GetDebuggingInformation().OutputCount; } }
/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="ToString"]/*' />
public override string ToString() { return Common.GetNameForDebugger(this, _source.DataflowBlockOptions); }
/// <summary>The data to display in the debugger display attribute.</summary>
[SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider")]
private object DebuggerDisplayContent
{
get
{
return string.Format("{0}, Count={1}",
Common.GetNameForDebugger(this, _source.DataflowBlockOptions),
CountForDebugger);
}
}
/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
/// <summary>Provides a debugger type proxy for the BufferBlock.</summary>
private sealed class DebugView
{
/// <summary>The buffer block.</summary>
private readonly BufferBlock<T> _bufferBlock;
/// <summary>The buffer's source half.</summary>
private readonly SourceCore<T>.DebuggingInformation _sourceDebuggingInformation;
/// <summary>Initializes the debug view.</summary>
/// <param name="bufferBlock">The BufferBlock being viewed.</param>
public DebugView(BufferBlock<T> bufferBlock)
{
Debug.Assert(bufferBlock != null, "Need a block with which to construct the debug view.");
_bufferBlock = bufferBlock;
_sourceDebuggingInformation = bufferBlock._source.GetDebuggingInformation();
}
/// <summary>Gets the collection of postponed message headers.</summary>
public QueuedMap<ISourceBlock<T>, DataflowMessageHeader> PostponedMessages
{
get { return _bufferBlock._boundingState != null ? _bufferBlock._boundingState.PostponedMessages : null; }
}
/// <summary>Gets the messages in the buffer.</summary>
public IEnumerable<T> Queue { get { return _sourceDebuggingInformation.OutputQueue; } }
/// <summary>The task used to process messages.</summary>
public Task TaskForInputProcessing { get { return _bufferBlock._boundingState != null ? _bufferBlock._boundingState.TaskForInputProcessing : null; } }
/// <summary>Gets the task being used for output processing.</summary>
public Task TaskForOutputProcessing { get { return _sourceDebuggingInformation.TaskForOutputProcessing; } }
/// <summary>Gets the DataflowBlockOptions used to configure this block.</summary>
public DataflowBlockOptions DataflowBlockOptions { get { return _sourceDebuggingInformation.DataflowBlockOptions; } }
/// <summary>Gets whether the block is declining further messages.</summary>
public bool IsDecliningPermanently { get { return _bufferBlock._targetDecliningPermanently; } }
/// <summary>Gets whether the block is completed.</summary>
public bool IsCompleted { get { return _sourceDebuggingInformation.IsCompleted; } }
/// <summary>Gets the block's Id.</summary>
public int Id { get { return Common.GetBlockId(_bufferBlock); } }
/// <summary>Gets the set of all targets linked from this block.</summary>
public TargetRegistry<T> LinkedTargets { get { return _sourceDebuggingInformation.LinkedTargets; } }
/// <summary>Gets the set of all targets linked from this block.</summary>
public ITargetBlock<T> NextMessageReservedFor { get { return _sourceDebuggingInformation.NextMessageReservedFor; } }
}
}
}

File diff suppressed because it is too large Load Diff

Some files were not shown because too many files have changed in this diff Show More