Imported Upstream version 5.10.0.69

Former-commit-id: fc39669a0b707dd3c063977486506b6793da2890
This commit is contained in:
Xamarin Public Jenkins (auto-signing)
2018-01-29 19:03:06 +00:00
parent d8f8abd549
commit e2950ec768
6283 changed files with 453847 additions and 91879 deletions

View File

@@ -0,0 +1,60 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 14
VisualStudioVersion = 14.0.25420.1
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Threading.Channels.Tests", "tests\System.Threading.Channels.Tests.csproj", "{9E984EB2-827E-4029-9647-FB5F8B67C553}"
ProjectSection(ProjectDependencies) = postProject
{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977} = {1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Threading.Channels.Performance.Tests", "tests\Performance\System.Threading.Channels.Performance.Tests.csproj", "{11ABE2F8-4FB9-48AC-91AA-D04503059550}"
ProjectSection(ProjectDependencies) = postProject
{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977} = {1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Threading.Channels", "src\System.Threading.Channels.csproj", "{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}"
ProjectSection(ProjectDependencies) = postProject
{9C524CA0-92FF-437B-B568-BCE8A794A69A} = {9C524CA0-92FF-437B-B568-BCE8A794A69A}
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "System.Threading.Channels", "ref\System.Threading.Channels.csproj", "{9C524CA0-92FF-437B-B568-BCE8A794A69A}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{1A2F9F4A-A032-433E-B914-ADD5992BB178}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{E107E9C1-E893-4E87-987E-04EF0DCEAEFD}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ref", "ref", "{2E666815-2EDB-464B-9DF6-380BF4789AD4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{9E984EB2-827E-4029-9647-FB5F8B67C553}.Debug|Any CPU.ActiveCfg = netstandard-Debug|Any CPU
{9E984EB2-827E-4029-9647-FB5F8B67C553}.Debug|Any CPU.Build.0 = netstandard-Debug|Any CPU
{9E984EB2-827E-4029-9647-FB5F8B67C553}.Release|Any CPU.ActiveCfg = netstandard-Release|Any CPU
{9E984EB2-827E-4029-9647-FB5F8B67C553}.Release|Any CPU.Build.0 = netstandard-Release|Any CPU
{11ABE2F8-4FB9-48AC-91AA-D04503059550}.Debug|Any CPU.ActiveCfg = netcoreapp-Debug|Any CPU
{11ABE2F8-4FB9-48AC-91AA-D04503059550}.Debug|Any CPU.Build.0 = netcoreapp-Debug|Any CPU
{11ABE2F8-4FB9-48AC-91AA-D04503059550}.Release|Any CPU.ActiveCfg = netcoreapp-Release|Any CPU
{11ABE2F8-4FB9-48AC-91AA-D04503059550}.Release|Any CPU.Build.0 = netcoreapp-Release|Any CPU
{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}.Debug|Any CPU.ActiveCfg = netstandard-Debug|Any CPU
{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}.Debug|Any CPU.Build.0 = netstandard-Debug|Any CPU
{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}.Release|Any CPU.ActiveCfg = netstandard-Release|Any CPU
{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}.Release|Any CPU.Build.0 = netstandard-Release|Any CPU
{9C524CA0-92FF-437B-B568-BCE8A794A69A}.Debug|Any CPU.ActiveCfg = netstandard-Debug|Any CPU
{9C524CA0-92FF-437B-B568-BCE8A794A69A}.Debug|Any CPU.Build.0 = netstandard-Debug|Any CPU
{9C524CA0-92FF-437B-B568-BCE8A794A69A}.Release|Any CPU.ActiveCfg = netstandard-Release|Any CPU
{9C524CA0-92FF-437B-B568-BCE8A794A69A}.Release|Any CPU.Build.0 = netstandard-Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{9E984EB2-827E-4029-9647-FB5F8B67C553} = {1A2F9F4A-A032-433E-B914-ADD5992BB178}
{11ABE2F8-4FB9-48AC-91AA-D04503059550} = {1A2F9F4A-A032-433E-B914-ADD5992BB178}
{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977} = {E107E9C1-E893-4E87-987E-04EF0DCEAEFD}
{9C524CA0-92FF-437B-B568-BCE8A794A69A} = {2E666815-2EDB-464B-9DF6-380BF4789AD4}
EndGlobalSection
EndGlobal

View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="..\dir.props" />
<PropertyGroup>
<AssemblyVersion>4.0.0.0</AssemblyVersion>
<AssemblyKey>Open</AssemblyKey>
</PropertyGroup>
</Project>

View File

@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.props))\dir.props" />
<ItemGroup>
<ProjectReference Include="..\src\System.Threading.Channels.csproj">
<SupportedFramework>net46;netcore50;netcoreapp1.0;$(UAPvNextTFM);$(AllXamarinFrameworks)</SupportedFramework>
</ProjectReference>
</ItemGroup>
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.targets))\dir.targets" />
</Project>

View File

@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<BuildConfigurations>
netstandard1.3;
netstandard;
</BuildConfigurations>
</PropertyGroup>
</Project>

View File

@@ -0,0 +1,83 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
// ------------------------------------------------------------------------------
// Changes to this file must follow the http://aka.ms/api-review process.
// ------------------------------------------------------------------------------
namespace System.Threading.Channels
{
public enum BoundedChannelFullMode
{
DropNewest = 1,
DropOldest = 2,
DropWrite = 3,
Wait = 0,
}
public sealed partial class BoundedChannelOptions : System.Threading.Channels.ChannelOptions
{
public BoundedChannelOptions(int capacity) { }
public int Capacity { get { throw null; } set { } }
public System.Threading.Channels.BoundedChannelFullMode FullMode { get { throw null; } set { } }
}
public static partial class Channel
{
public static System.Threading.Channels.Channel<T> CreateBounded<T>(int capacity) { throw null; }
public static System.Threading.Channels.Channel<T> CreateBounded<T>(System.Threading.Channels.BoundedChannelOptions options) { throw null; }
public static System.Threading.Channels.Channel<T> CreateUnbounded<T>() { throw null; }
public static System.Threading.Channels.Channel<T> CreateUnbounded<T>(System.Threading.Channels.UnboundedChannelOptions options) { throw null; }
public static System.Threading.Channels.Channel<T> CreateUnbuffered<T>() { throw null; }
public static System.Threading.Channels.Channel<T> CreateUnbuffered<T>(System.Threading.Channels.UnbufferedChannelOptions options) { throw null; }
}
public partial class ChannelClosedException : System.InvalidOperationException
{
public ChannelClosedException() { }
public ChannelClosedException(System.Exception innerException) { }
public ChannelClosedException(string message) { }
public ChannelClosedException(string message, System.Exception innerException) { }
}
public abstract partial class ChannelOptions
{
protected ChannelOptions() { }
public bool AllowSynchronousContinuations { get { throw null; } set { } }
public bool SingleReader { get { throw null; } set { } }
public bool SingleWriter { get { throw null; } set { } }
}
public abstract partial class ChannelReader<T>
{
protected ChannelReader() { }
public virtual System.Threading.Tasks.Task Completion { get { throw null; } }
public virtual System.Threading.Tasks.ValueTask<T> ReadAsync(CancellationToken cancellationToken = default) { throw null; }
public abstract bool TryRead(out T item);
public abstract System.Threading.Tasks.Task<bool> WaitToReadAsync(System.Threading.CancellationToken cancellationToken=default);
}
public abstract partial class ChannelWriter<T>
{
protected ChannelWriter() { }
public void Complete(System.Exception error=null) { }
public virtual bool TryComplete(System.Exception error=null) { throw null; }
public abstract bool TryWrite(T item);
public abstract System.Threading.Tasks.Task<bool> WaitToWriteAsync(System.Threading.CancellationToken cancellationToken=default);
public virtual System.Threading.Tasks.Task WriteAsync(T item, System.Threading.CancellationToken cancellationToken=default) { throw null; }
}
public abstract partial class Channel<T> : System.Threading.Channels.Channel<T, T>
{
protected Channel() { }
}
public abstract partial class Channel<TWrite, TRead>
{
protected Channel() { }
public System.Threading.Channels.ChannelReader<TRead> Reader { get { throw null; } protected set { } }
public System.Threading.Channels.ChannelWriter<TWrite> Writer { get { throw null; } protected set { } }
public static implicit operator System.Threading.Channels.ChannelReader<TRead> (System.Threading.Channels.Channel<TWrite, TRead> channel) { throw null; }
public static implicit operator System.Threading.Channels.ChannelWriter<TWrite> (System.Threading.Channels.Channel<TWrite, TRead> channel) { throw null; }
}
public sealed partial class UnboundedChannelOptions : System.Threading.Channels.ChannelOptions
{
public UnboundedChannelOptions() { }
}
public sealed partial class UnbufferedChannelOptions : System.Threading.Channels.ChannelOptions
{
public UnbufferedChannelOptions() { }
}
}

View File

@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="15.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.props))\dir.props" />
<PropertyGroup>
<ProjectGuid>{9C524CA0-92FF-437B-B568-BCE8A794A69A}</ProjectGuid>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'netstandard-Debug|AnyCPU'" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'netstandard-Release|AnyCPU'" />
<ItemGroup>
<Compile Include="System.Threading.Channels.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetGroup)' == 'netstandard'">
<Reference Include="netstandard" />
</ItemGroup>
<ItemGroup Condition="'$(TargetGroup)' == 'netstandard1.3'">
<Reference Include="System.Runtime" />
<Reference Include="System.Threading.Tasks" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\System.Threading.Tasks.Extensions\ref\System.Threading.Tasks.Extensions.csproj" />
</ItemGroup>
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.targets))\dir.targets" />
</Project>

View File

@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<BuildConfigurations>
netstandard1.3;
netstandard;
</BuildConfigurations>
</PropertyGroup>
</Project>

View File

@@ -0,0 +1,123 @@
<?xml version="1.0" encoding="utf-8"?>
<root>
<!--
Microsoft ResX Schema
Version 2.0
The primary goals of this format is to allow a simple XML format
that is mostly human readable. The generation and parsing of the
various data types are done through the TypeConverter classes
associated with the data types.
Example:
... ado.net/XML headers & schema ...
<resheader name="resmimetype">text/microsoft-resx</resheader>
<resheader name="version">2.0</resheader>
<resheader name="reader">System.Resources.ResXResourceReader, System.Windows.Forms, ...</resheader>
<resheader name="writer">System.Resources.ResXResourceWriter, System.Windows.Forms, ...</resheader>
<data name="Name1"><value>this is my long string</value><comment>this is a comment</comment></data>
<data name="Color1" type="System.Drawing.Color, System.Drawing">Blue</data>
<data name="Bitmap1" mimetype="application/x-microsoft.net.object.binary.base64">
<value>[base64 mime encoded serialized .NET Framework object]</value>
</data>
<data name="Icon1" type="System.Drawing.Icon, System.Drawing" mimetype="application/x-microsoft.net.object.bytearray.base64">
<value>[base64 mime encoded string representing a byte array form of the .NET Framework object]</value>
<comment>This is a comment</comment>
</data>
There are any number of "resheader" rows that contain simple
name/value pairs.
Each data row contains a name, and value. The row also contains a
type or mimetype. Type corresponds to a .NET class that support
text/value conversion through the TypeConverter architecture.
Classes that don't support this are serialized and stored with the
mimetype set.
The mimetype is used for serialized objects, and tells the
ResXResourceReader how to depersist the object. This is currently not
extensible. For a given mimetype the value must be set accordingly:
Note - application/x-microsoft.net.object.binary.base64 is the format
that the ResXResourceWriter will generate, however the reader can
read any of the formats listed below.
mimetype: application/x-microsoft.net.object.binary.base64
value : The object must be serialized with
: System.Runtime.Serialization.Formatters.Binary.BinaryFormatter
: and then encoded with base64 encoding.
mimetype: application/x-microsoft.net.object.soap.base64
value : The object must be serialized with
: System.Runtime.Serialization.Formatters.Soap.SoapFormatter
: and then encoded with base64 encoding.
mimetype: application/x-microsoft.net.object.bytearray.base64
value : The object must be serialized into a byte array
: using a System.ComponentModel.TypeConverter
: and then encoded with base64 encoding.
-->
<xsd:schema id="root" xmlns="" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:msdata="urn:schemas-microsoft-com:xml-msdata">
<xsd:import namespace="http://www.w3.org/XML/1998/namespace" />
<xsd:element name="root" msdata:IsDataSet="true">
<xsd:complexType>
<xsd:choice maxOccurs="unbounded">
<xsd:element name="metadata">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="value" type="xsd:string" minOccurs="0" />
</xsd:sequence>
<xsd:attribute name="name" use="required" type="xsd:string" />
<xsd:attribute name="type" type="xsd:string" />
<xsd:attribute name="mimetype" type="xsd:string" />
<xsd:attribute ref="xml:space" />
</xsd:complexType>
</xsd:element>
<xsd:element name="assembly">
<xsd:complexType>
<xsd:attribute name="alias" type="xsd:string" />
<xsd:attribute name="name" type="xsd:string" />
</xsd:complexType>
</xsd:element>
<xsd:element name="data">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="value" type="xsd:string" minOccurs="0" msdata:Ordinal="1" />
<xsd:element name="comment" type="xsd:string" minOccurs="0" msdata:Ordinal="2" />
</xsd:sequence>
<xsd:attribute name="name" type="xsd:string" use="required" msdata:Ordinal="1" />
<xsd:attribute name="type" type="xsd:string" msdata:Ordinal="3" />
<xsd:attribute name="mimetype" type="xsd:string" msdata:Ordinal="4" />
<xsd:attribute ref="xml:space" />
</xsd:complexType>
</xsd:element>
<xsd:element name="resheader">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="value" type="xsd:string" minOccurs="0" msdata:Ordinal="1" />
</xsd:sequence>
<xsd:attribute name="name" type="xsd:string" use="required" />
</xsd:complexType>
</xsd:element>
</xsd:choice>
</xsd:complexType>
</xsd:element>
</xsd:schema>
<resheader name="resmimetype">
<value>text/microsoft-resx</value>
</resheader>
<resheader name="version">
<value>2.0</value>
</resheader>
<resheader name="reader">
<value>System.Resources.ResXResourceReader, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089</value>
</resheader>
<resheader name="writer">
<value>System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089</value>
</resheader>
<data name="ChannelClosedException_DefaultMessage" xml:space="preserve">
<value>The channel has been closed.</value>
</data>
</root>

View File

@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="15.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.props))\dir.props" />
<PropertyGroup>
<ProjectGuid>{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}</ProjectGuid>
<RootNamespace>System.Threading.Channels</RootNamespace>
<DocumentationFile>$(OutputPath)$(MSBuildProjectName).xml</DocumentationFile>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'netstandard-Debug|AnyCPU'" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'netstandard-Release|AnyCPU'" />
<ItemGroup>
<Compile Include="System\VoidResult.cs" />
<Compile Include="System\Collections\Generic\Dequeue.cs" />
<Compile Include="System\Threading\Channels\BoundedChannel.cs" />
<Compile Include="System\Threading\Channels\BoundedChannelFullMode.cs" />
<Compile Include="System\Threading\Channels\Channel.cs" />
<Compile Include="System\Threading\Channels\ChannelClosedException.cs" />
<Compile Include="System\Threading\Channels\ChannelOptions.cs" />
<Compile Include="System\Threading\Channels\ChannelReader.cs" />
<Compile Include="System\Threading\Channels\ChannelUtilities.cs" />
<Compile Include="System\Threading\Channels\ChannelWriter.cs" />
<Compile Include="System\Threading\Channels\Channel_1.cs" />
<Compile Include="System\Threading\Channels\Channel_2.cs" />
<Compile Include="System\Threading\Channels\IDebugEnumerator.cs" />
<Compile Include="System\Threading\Channels\Interactor.cs" />
<Compile Include="System\Threading\Channels\SingleConsumerUnboundedChannel.cs" />
<Compile Include="System\Threading\Channels\UnboundedChannel.cs" />
<Compile Include="System\Threading\Channels\UnbufferedChannel.cs" />
<Compile Include="$(CommonPath)\System\Collections\Concurrent\SingleProducerConsumerQueue.cs">
<Link>Common\System\Collections\Concurrent\SingleProducerConsumerQueue.cs</Link>
</Compile>
</ItemGroup>
<ItemGroup>
<Reference Include="System.Collections" />
<Reference Include="System.Collections.Concurrent" />
<Reference Include="System.Diagnostics.Debug" />
<Reference Include="System.Resources.ResourceManager" />
<Reference Include="System.Runtime" />
<Reference Include="System.Runtime.Extensions" />
<Reference Include="System.Threading" />
<Reference Include="System.Threading.Tasks" />
<Reference Include="System.Threading.Tasks.Extensions" />
</ItemGroup>
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.targets))\dir.targets" />
</Project>

View File

@@ -0,0 +1,124 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System.Diagnostics;
namespace System.Collections.Generic
{
/// <summary>Provides a double-ended queue data structure.</summary>
/// <typeparam name="T">Type of the data stored in the dequeue.</typeparam>
[DebuggerDisplay("Count = {_size}")]
internal sealed class Dequeue<T>
{
private T[] _array = Array.Empty<T>();
private int _head; // First valid element in the queue
private int _tail; // First open slot in the dequeue, unless the dequeue is full
private int _size; // Number of elements.
public int Count => _size;
public bool IsEmpty => _size == 0;
public void EnqueueTail(T item)
{
if (_size == _array.Length)
{
Grow();
}
_array[_tail] = item;
if (++_tail == _array.Length)
{
_tail = 0;
}
_size++;
}
//// Uncomment if/when enqueueing at the head is needed
//public void EnqueueHead(T item)
//{
// if (_size == _array.Length)
// {
// Grow();
// }
//
// _head = (_head == 0 ? _array.Length : _head) - 1;
// _array[_head] = item;
// _size++;
//}
public T DequeueHead()
{
Debug.Assert(!IsEmpty); // caller's responsibility to make sure there are elements remaining
T item = _array[_head];
_array[_head] = default;
if (++_head == _array.Length)
{
_head = 0;
}
_size--;
return item;
}
public T DequeueTail()
{
Debug.Assert(!IsEmpty); // caller's responsibility to make sure there are elements remaining
if (--_tail == -1)
{
_tail = _array.Length - 1;
}
T item = _array[_tail];
_array[_tail] = default;
_size--;
return item;
}
public IEnumerator<T> GetEnumerator() // meant for debug purposes only
{
int pos = _head;
int count = _size;
while (count-- > 0)
{
yield return _array[pos];
pos = (pos + 1) % _array.Length;
}
}
private void Grow()
{
Debug.Assert(_size == _array.Length);
Debug.Assert(_head == _tail);
const int MinimumGrow = 4;
int capacity = (int)(_array.Length * 2L);
if (capacity < _array.Length + MinimumGrow)
{
capacity = _array.Length + MinimumGrow;
}
T[] newArray = new T[capacity];
if (_head == 0)
{
Array.Copy(_array, 0, newArray, 0, _size);
}
else
{
Array.Copy(_array, _head, newArray, 0, _array.Length - _head);
Array.Copy(_array, 0, newArray, _array.Length - _head, _tail);
}
_array = newArray;
_head = 0;
_tail = _size;
}
}
}

View File

@@ -0,0 +1,411 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
namespace System.Threading.Channels
{
/// <summary>Provides a channel with a bounded capacity.</summary>
[DebuggerDisplay("Items={ItemsCountForDebugger}, Capacity={_bufferedCapacity}")]
[DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
internal sealed class BoundedChannel<T> : Channel<T>, IDebugEnumerable<T>
{
/// <summary>The mode used when the channel hits its bound.</summary>
private readonly BoundedChannelFullMode _mode;
/// <summary>Task signaled when the channel has completed.</summary>
private readonly TaskCompletionSource<VoidResult> _completion;
/// <summary>The maximum capacity of the channel.</summary>
private readonly int _bufferedCapacity;
/// <summary>Items currently stored in the channel waiting to be read.</summary>
private readonly Dequeue<T> _items = new Dequeue<T>();
/// <summary>Writers waiting to write to the channel.</summary>
private readonly Dequeue<WriterInteractor<T>> _blockedWriters = new Dequeue<WriterInteractor<T>>();
/// <summary>Task signaled when any WaitToReadAsync waiters should be woken up.</summary>
private ReaderInteractor<bool> _waitingReaders;
/// <summary>Task signaled when any WaitToWriteAsync waiters should be woken up.</summary>
private ReaderInteractor<bool> _waitingWriters;
/// <summary>Whether to force continuations to be executed asynchronously from producer writes.</summary>
private readonly bool _runContinuationsAsynchronously;
/// <summary>Set to non-null once Complete has been called.</summary>
private Exception _doneWriting;
/// <summary>Gets an object used to synchronize all state on the instance.</summary>
private object SyncObj => _items;
/// <summary>Initializes the <see cref="BoundedChannel{T}"/>.</summary>
/// <param name="bufferedCapacity">The positive bounded capacity for the channel.</param>
/// <param name="mode">The mode used when writing to a full channel.</param>
/// <param name="runContinuationsAsynchronously">Whether to force continuations to be executed asynchronously.</param>
internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously)
{
Debug.Assert(bufferedCapacity > 0);
_bufferedCapacity = bufferedCapacity;
_mode = mode;
_runContinuationsAsynchronously = runContinuationsAsynchronously;
_completion = new TaskCompletionSource<VoidResult>(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
Reader = new BoundedChannelReader(this);
Writer = new BoundedChannelWriter(this);
}
private sealed class BoundedChannelReader : ChannelReader<T>
{
internal readonly BoundedChannel<T> _parent;
internal BoundedChannelReader(BoundedChannel<T> parent) => _parent = parent;
public override Task Completion => _parent._completion.Task;
public override bool TryRead(out T item)
{
BoundedChannel<T> parent = _parent;
lock (parent.SyncObj)
{
parent.AssertInvariants();
// Get an item if there is one.
if (!parent._items.IsEmpty)
{
item = DequeueItemAndPostProcess();
return true;
}
}
item = default;
return false;
}
public override Task<bool> WaitToReadAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<bool>(cancellationToken);
}
BoundedChannel<T> parent = _parent;
lock (parent.SyncObj)
{
parent.AssertInvariants();
// If there are any items available, a read is possible.
if (!parent._items.IsEmpty)
{
return ChannelUtilities.s_trueTask;
}
// There were no items available, so if we're done writing, a read will never be possible.
if (parent._doneWriting != null)
{
return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ?
Task.FromException<bool>(parent._doneWriting) :
ChannelUtilities.s_falseTask;
}
// There were no items available, but there could be in the future, so ensure
// there's a blocked reader task and return it.
return ChannelUtilities.GetOrCreateWaiter(ref parent._waitingReaders, parent._runContinuationsAsynchronously, cancellationToken);
}
}
/// <summary>Dequeues an item, and then fixes up our state around writers and completion.</summary>
/// <returns>The dequeued item.</returns>
private T DequeueItemAndPostProcess()
{
BoundedChannel<T> parent = _parent;
Debug.Assert(Monitor.IsEntered(parent.SyncObj));
// Dequeue an item.
T item = parent._items.DequeueHead();
// If we're now empty and we're done writing, complete the channel.
if (parent._doneWriting != null && parent._items.IsEmpty)
{
ChannelUtilities.Complete(parent._completion, parent._doneWriting);
}
// If there are any writers blocked, there's now room for at least one
// to be promoted to have its item moved into the items queue. We need
// to loop while trying to complete the writer in order to find one that
// hasn't yet been canceled (canceled writers transition to canceled but
// remain in the physical queue).
while (!parent._blockedWriters.IsEmpty)
{
WriterInteractor<T> w = parent._blockedWriters.DequeueHead();
if (w.Success(default))
{
parent._items.EnqueueTail(w.Item);
return item;
}
}
// There was no blocked writer, so see if there's a WaitToWriteAsync
// we should wake up.
ChannelUtilities.WakeUpWaiters(ref parent._waitingWriters, result: true);
// Return the item
return item;
}
}
private sealed class BoundedChannelWriter : ChannelWriter<T>
{
internal readonly BoundedChannel<T> _parent;
internal BoundedChannelWriter(BoundedChannel<T> parent) => _parent = parent;
public override bool TryComplete(Exception error)
{
BoundedChannel<T> parent = _parent;
bool completeTask;
lock (parent.SyncObj)
{
parent.AssertInvariants();
// If we've already marked the channel as completed, bail.
if (parent._doneWriting != null)
{
return false;
}
// Mark that we're done writing.
parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel;
completeTask = parent._items.IsEmpty;
}
// If there are no items in the queue, complete the channel's task,
// as no more data can possibly arrive at this point. We do this outside
// of the lock in case we'll be running synchronous completions, and we
// do it before completing blocked/waiting readers, so that when they
// wake up they'll see the task as being completed.
if (completeTask)
{
ChannelUtilities.Complete(parent._completion, error);
}
// At this point, _blockedWriters and _waitingReaders/Writers will not be mutated:
// they're only mutated by readers/writers while holding the lock, and only if _doneWriting is null.
// We also know that only one thread (this one) will ever get here, as only that thread
// will be the one to transition from _doneWriting false to true. As such, we can
// freely manipulate them without any concurrency concerns.
ChannelUtilities.FailInteractors<WriterInteractor<T>, VoidResult>(parent._blockedWriters, ChannelUtilities.CreateInvalidCompletionException(error));
ChannelUtilities.WakeUpWaiters(ref parent._waitingReaders, result: false, error: error);
ChannelUtilities.WakeUpWaiters(ref parent._waitingWriters, result: false, error: error);
// Successfully transitioned to completed.
return true;
}
public override bool TryWrite(T item)
{
ReaderInteractor<bool> waitingReaders = null;
BoundedChannel<T> parent = _parent;
lock (parent.SyncObj)
{
parent.AssertInvariants();
// If we're done writing, nothing more to do.
if (parent._doneWriting != null)
{
return false;
}
// Get the number of items in the channel currently.
int count = parent._items.Count;
if (count == 0)
{
// There are no items in the channel, which means we may have waiting readers.
// Store the item.
parent._items.EnqueueTail(item);
waitingReaders = parent._waitingReaders;
if (waitingReaders == null)
{
// If no one's waiting to be notified about a 0-to-1 transition, we're done.
return true;
}
parent._waitingReaders = null;
}
else if (count < parent._bufferedCapacity)
{
// There's room in the channel. Since we're not transitioning from 0-to-1 and
// since there's room, we can simply store the item and exit without having to
// worry about blocked/waiting readers.
parent._items.EnqueueTail(item);
return true;
}
else if (parent._mode == BoundedChannelFullMode.Wait)
{
// The channel is full and we're in a wait mode.
// Simply exit and let the caller know we didn't write the data.
return false;
}
else if (parent._mode == BoundedChannelFullMode.DropWrite)
{
// The channel is full. Just ignore the item being added
// but say we added it.
return true;
}
else
{
// The channel is full, and we're in a dropping mode.
// Drop either the oldest or the newest and write the new item.
T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ?
parent._items.DequeueTail() :
parent._items.DequeueHead();
parent._items.EnqueueTail(item);
return true;
}
}
// We stored an item bringing the count up from 0 to 1. Alert
// any waiting readers that there may be something for them to consume.
// Since we're no longer holding the lock, it's possible we'll end up
// waking readers that have since come in.
waitingReaders.Success(item: true);
return true;
}
public override Task<bool> WaitToWriteAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<bool>(cancellationToken);
}
BoundedChannel<T> parent = _parent;
lock (parent.SyncObj)
{
parent.AssertInvariants();
// If we're done writing, no writes will ever succeed.
if (parent._doneWriting != null)
{
return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ?
Task.FromException<bool>(parent._doneWriting) :
ChannelUtilities.s_falseTask;
}
// If there's space to write, a write is possible.
// And if the mode involves dropping/ignoring, we can always write, as even if it's
// full we'll just drop an element to make room.
if (parent._items.Count < parent._bufferedCapacity || parent._mode != BoundedChannelFullMode.Wait)
{
return ChannelUtilities.s_trueTask;
}
// We're still allowed to write, but there's no space, so ensure a waiter is queued and return it.
return ChannelUtilities.GetOrCreateWaiter(ref parent._waitingWriters, runContinuationsAsynchronously: true, cancellationToken);
}
}
public override Task WriteAsync(T item, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
}
ReaderInteractor<bool> waitingReaders = null;
BoundedChannel<T> parent = _parent;
lock (parent.SyncObj)
{
parent.AssertInvariants();
// If we're done writing, trying to write is an error.
if (parent._doneWriting != null)
{
return Task.FromException(ChannelUtilities.CreateInvalidCompletionException(parent._doneWriting));
}
// Get the number of items in the channel currently.
int count = parent._items.Count;
if (count == 0)
{
// There are no items in the channel, which means we may have waiting readers.
// Store the item.
parent._items.EnqueueTail(item);
waitingReaders = parent._waitingReaders;
if (waitingReaders == null)
{
// If no one's waiting to be notified about a 0-to-1 transition, we're done.
return ChannelUtilities.s_trueTask;
}
parent._waitingReaders = null;
}
else if (count < parent._bufferedCapacity)
{
// There's room in the channel. Since we're not transitioning from 0-to-1 and
// since there's room, we can simply store the item and exit without having to
// worry about blocked/waiting readers.
parent._items.EnqueueTail(item);
return ChannelUtilities.s_trueTask;
}
else if (parent._mode == BoundedChannelFullMode.Wait)
{
// The channel is full and we're in a wait mode.
// Queue the writer.
var writer = WriterInteractor<T>.Create(runContinuationsAsynchronously: true, item, cancellationToken);
parent._blockedWriters.EnqueueTail(writer);
return writer.Task;
}
else if (parent._mode == BoundedChannelFullMode.DropWrite)
{
// The channel is full and we're in ignore mode.
// Ignore the item but say we accepted it.
return ChannelUtilities.s_trueTask;
}
else
{
// The channel is full, and we're in a dropping mode.
// Drop either the oldest or the newest and write the new item.
T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ?
parent._items.DequeueTail() :
parent._items.DequeueHead();
parent._items.EnqueueTail(item);
return ChannelUtilities.s_trueTask;
}
}
// We stored an item bringing the count up from 0 to 1. Alert
// any waiting readers that there may be something for them to consume.
// Since we're no longer holding the lock, it's possible we'll end up
// waking readers that have since come in.
waitingReaders.Success(item: true);
return ChannelUtilities.s_trueTask;
}
}
[Conditional("DEBUG")]
private void AssertInvariants()
{
Debug.Assert(SyncObj != null, "The sync obj must not be null.");
Debug.Assert(Monitor.IsEntered(SyncObj), "Invariants can only be validated while holding the lock.");
if (!_items.IsEmpty)
{
Debug.Assert(_waitingReaders == null, "There are items available, so there shouldn't be any waiting readers.");
}
if (_items.Count < _bufferedCapacity)
{
Debug.Assert(_blockedWriters.IsEmpty, "There's space available, so there shouldn't be any blocked writers.");
Debug.Assert(_waitingWriters == null, "There's space available, so there shouldn't be any waiting writers.");
}
if (!_blockedWriters.IsEmpty)
{
Debug.Assert(_items.Count == _bufferedCapacity, "We should have a full buffer if there's a blocked writer.");
}
if (_completion.Task.IsCompleted)
{
Debug.Assert(_doneWriting != null, "We can only complete if we're done writing.");
}
}
/// <summary>Gets the number of items in the channel. This should only be used by the debugger.</summary>
private int ItemsCountForDebugger => _items.Count;
/// <summary>Gets an enumerator the debugger can use to show the contents of the channel.</summary>
IEnumerator<T> IDebugEnumerable<T>.GetEnumerator() => _items.GetEnumerator();
}
}

View File

@@ -0,0 +1,19 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
namespace System.Threading.Channels
{
/// <summary>Specifies the behavior to use when writing to a bounded channel that is already full.</summary>
public enum BoundedChannelFullMode
{
/// <summary>Wait for space to be available in order to complete the write operation.</summary>
Wait,
/// <summary>Remove and ignore the newest item in the channel in order to make room for the item being written.</summary>
DropNewest,
/// <summary>Remove and ignore the oldest item in the channel in order to make room for the item being written.</summary>
DropOldest,
/// <summary>Drop the item being written.</summary>
DropWrite
}
}

View File

@@ -0,0 +1,76 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
namespace System.Threading.Channels
{
/// <summary>Provides static methods for creating channels.</summary>
public static class Channel
{
/// <summary>Creates an unbounded channel usable by any number of readers and writers concurrently.</summary>
/// <returns>The created channel.</returns>
public static Channel<T> CreateUnbounded<T>() =>
new UnboundedChannel<T>(runContinuationsAsynchronously: true);
/// <summary>Creates an unbounded channel subject to the provided options.</summary>
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
/// <param name="options">Options that guide the behavior of the channel.</param>
/// <returns>The created channel.</returns>
public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options) =>
options == null ? throw new ArgumentNullException(nameof(options)) :
options.SingleReader ? new SingleConsumerUnboundedChannel<T>(!options.AllowSynchronousContinuations) :
(Channel<T>)new UnboundedChannel<T>(!options.AllowSynchronousContinuations);
/// <summary>Creates a channel with the specified maximum capacity.</summary>
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
/// <param name="capacity">The maximum number of items the channel may store.</param>
/// <returns>The created channel.</returns>
/// <remarks>
/// Channels created with this method apply the <see cref="BoundedChannelFullMode.Wait"/>
/// behavior and prohibit continuations from running synchronously.
/// </remarks>
public static Channel<T> CreateBounded<T>(int capacity)
{
if (capacity < 1)
{
throw new ArgumentOutOfRangeException(nameof(capacity));
}
return new BoundedChannel<T>(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true);
}
/// <summary>Creates a channel with the specified maximum capacity.</summary>
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
/// <param name="options">Options that guide the behavior of the channel.</param>
/// <returns>The created channel.</returns>
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options)
{
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
return new BoundedChannel<T>(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations);
}
/// <summary>Creates a channel that doesn't buffer any items.</summary>
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
/// <returns>The created channel.</returns>
public static Channel<T> CreateUnbuffered<T>() =>
new UnbufferedChannel<T>();
/// <summary>Creates a channel that doesn't buffer any items.</summary>
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
/// <param name="options">Options that guide the behavior of the channel.</param>
/// <returns>The created channel.</returns>
public static Channel<T> CreateUnbuffered<T>(UnbufferedChannelOptions options)
{
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
return new UnbufferedChannel<T>();
}
}
}

View File

@@ -0,0 +1,28 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
namespace System.Threading.Channels
{
/// <summary>Exception thrown when a channel is used after it's been closed.</summary>
public class ChannelClosedException : InvalidOperationException
{
/// <summary>Initializes a new instance of the <see cref="ChannelClosedException"/> class.</summary>
public ChannelClosedException() :
base(SR.ChannelClosedException_DefaultMessage) { }
/// <summary>Initializes a new instance of the <see cref="ChannelClosedException"/> class.</summary>
/// <param name="message">The message that describes the error.</param>
public ChannelClosedException(string message) : base(message) { }
/// <summary>Initializes a new instance of the <see cref="ChannelClosedException"/> class.</summary>
/// <param name="innerException">The exception that is the cause of this exception.</param>
public ChannelClosedException(Exception innerException) :
base(SR.ChannelClosedException_DefaultMessage, innerException) { }
/// <summary>Initializes a new instance of the <see cref="ChannelClosedException"/> class.</summary>
/// <param name="message">The message that describes the error.</param>
/// <param name="innerException">The exception that is the cause of this exception.</param>
public ChannelClosedException(string message, Exception innerException) : base(message, innerException) { }
}
}

View File

@@ -0,0 +1,107 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
namespace System.Threading.Channels
{
/// <summary>Provides options that control the behavior of channel instances.</summary>
public abstract class ChannelOptions
{
/// <summary>
/// <code>true</code> if writers to the channel guarantee that there will only ever be at most one write operation
/// at a time; <code>false</code> if no such constraint is guaranteed.
/// </summary>
/// <remarks>
/// If true, the channel may be able to optimize certain operations based on knowing about the single-writer guarantee.
/// The default is false.
/// </remarks>
public bool SingleWriter { get; set; }
/// <summary>
/// <code>true</code> readers from the channel guarantee that there will only ever be at most one read operation at a time;
/// <code>false</code> if no such constraint is guaranteed.
/// </summary>
/// <remarks>
/// If true, the channel may be able to optimize certain operations based on knowing about the single-reader guarantee.
/// The default is false.
/// </remarks>
public bool SingleReader { get; set; }
/// <summary>
/// <code>true</code> if operations performed on a channel may synchronously invoke continuations subscribed to
/// notifications of pending async operations; <code>false</code> if all continuations should be invoked asynchronously.
/// </summary>
/// <remarks>
/// Setting this option to <code>true</code> can provide measurable throughput improvements by avoiding
/// scheduling additional work items. However, it may come at the cost of reduced parallelism, as for example a producer
/// may then be the one to execute work associated with a consumer, and if not done thoughtfully, this can lead
/// to unexpected interactions. The default is false.
/// </remarks>
public bool AllowSynchronousContinuations { get; set; }
}
/// <summary>Provides options that control the behavior of <see cref="BoundedChannel{T}"/> instances.</summary>
public sealed class BoundedChannelOptions : ChannelOptions
{
/// <summary>The maximum number of items the bounded channel may store.</summary>
private int _capacity;
/// <summary>The behavior incurred by write operations when the channel is full.</summary>
private BoundedChannelFullMode _mode = BoundedChannelFullMode.Wait;
/// <summary>Initializes the options.</summary>
/// <param name="capacity">The maximum number of items the bounded channel may store.</param>
public BoundedChannelOptions(int capacity)
{
if (capacity < 1)
{
throw new ArgumentOutOfRangeException(nameof(capacity));
}
Capacity = capacity;
}
/// <summary>Gets or sets the maximum number of items the bounded channel may store.</summary>
public int Capacity
{
get => _capacity;
set
{
if (value < 1)
{
throw new ArgumentOutOfRangeException(nameof(value));
}
_capacity = value;
}
}
/// <summary>Gets or sets the behavior incurred by write operations when the channel is full.</summary>
public BoundedChannelFullMode FullMode
{
get => _mode;
set
{
switch (value)
{
case BoundedChannelFullMode.Wait:
case BoundedChannelFullMode.DropNewest:
case BoundedChannelFullMode.DropOldest:
case BoundedChannelFullMode.DropWrite:
_mode = value;
break;
default:
throw new ArgumentOutOfRangeException(nameof(value));
}
}
}
}
/// <summary>Provides options that control the behavior of <see cref="UnboundedChannel{T}"/> instances.</summary>
public sealed class UnboundedChannelOptions : ChannelOptions
{
}
/// <summary>Provides options that control the behavior of <see cref="UnbufferedChannel{T}"/> instances.</summary>
public sealed class UnbufferedChannelOptions : ChannelOptions
{
}
}

View File

@@ -0,0 +1,82 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System.Threading.Tasks;
namespace System.Threading.Channels
{
/// <summary>
/// Provides a base class for reading from a channel.
/// </summary>
/// <typeparam name="T">Specifies the type of data that may be read from the channel.</typeparam>
public abstract class ChannelReader<T>
{
/// <summary>
/// Gets a <see cref="Task"/> that completes when no more data will ever
/// be available to be read from this channel.
/// </summary>
public virtual Task Completion => ChannelUtilities.s_neverCompletingTask;
/// <summary>Attempts to read an item to the channel.</summary>
/// <param name="item">The read item, or a default value if no item could be read.</param>
/// <returns>true if an item was read; otherwise, false if no item was read.</returns>
public abstract bool TryRead(out T item);
/// <summary>Returns a <see cref="Task{Boolean}"/> that will complete when data is available to read.</summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the wait operation.</param>
/// <returns>
/// A <see cref="Task{Boolean}"/> that will complete with a <c>true</c> result when data is available to read
/// or with a <c>false</c> result when no further data will ever be available to be read.
/// </returns>
public abstract Task<bool> WaitToReadAsync(CancellationToken cancellationToken = default);
/// <summary>Asynchronously reads an item from the channel.</summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the read operation.</param>
/// <returns>A <see cref="ValueTask{TResult}"/> that represents the asynchronous read operation.</returns>
public virtual ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
{
return new ValueTask<T>(Task.FromCanceled<T>(cancellationToken));
}
try
{
if (TryRead(out T fastItem))
{
return new ValueTask<T>(fastItem);
}
}
catch (Exception exc) when (!(exc is ChannelClosedException || exc is OperationCanceledException))
{
return new ValueTask<T>(Task.FromException<T>(exc));
}
return ReadAsyncCore(cancellationToken);
async ValueTask<T> ReadAsyncCore(CancellationToken ct)
{
try
{
while (true)
{
if (!await WaitToReadAsync(ct))
{
throw new ChannelClosedException();
}
if (TryRead(out T item))
{
return item;
}
}
}
catch (Exception exc) when (!(exc is ChannelClosedException || exc is OperationCanceledException))
{
throw new ChannelClosedException(exc);
}
}
}
}
}

View File

@@ -0,0 +1,123 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
namespace System.Threading.Channels
{
/// <summary>Provides internal helper methods for implementing channels.</summary>
internal static class ChannelUtilities
{
/// <summary>Sentinel object used to indicate being done writing.</summary>
internal static readonly Exception s_doneWritingSentinel = new Exception(nameof(s_doneWritingSentinel));
/// <summary>A cached task with a Boolean true result.</summary>
internal static readonly Task<bool> s_trueTask = Task.FromResult(result: true);
/// <summary>A cached task with a Boolean false result.</summary>
internal static readonly Task<bool> s_falseTask = Task.FromResult(result: false);
/// <summary>A cached task that never completes.</summary>
internal static readonly Task s_neverCompletingTask = new TaskCompletionSource<bool>().Task;
/// <summary>Completes the specified TaskCompletionSource.</summary>
/// <param name="tcs">The source to complete.</param>
/// <param name="error">
/// The optional exception with which to complete.
/// If this is null or the DoneWritingSentinel, the source will be completed successfully.
/// If this is an OperationCanceledException, it'll be completed with the exception's token.
/// Otherwise, it'll be completed as faulted with the exception.
/// </param>
internal static void Complete(TaskCompletionSource<VoidResult> tcs, Exception error = null)
{
if (error is OperationCanceledException oce)
{
tcs.TrySetCanceled(oce.CancellationToken);
}
else if (error != null && error != s_doneWritingSentinel)
{
tcs.TrySetException(error);
}
else
{
tcs.TrySetResult(default);
}
}
/// <summary>Wake up all of the waiters and null out the field.</summary>
/// <param name="waiters">The waiters.</param>
/// <param name="result">The value with which to complete each waiter.</param>
internal static void WakeUpWaiters(ref ReaderInteractor<bool> waiters, bool result)
{
ReaderInteractor<bool> w = waiters;
if (w != null)
{
w.Success(result);
waiters = null;
}
}
/// <summary>Wake up all of the waiters and null out the field.</summary>
/// <param name="waiters">The waiters.</param>
/// <param name="result">The success value with which to complete each waiter if <paramref name="error">error</paramref> is null.</param>
/// <param name="error">The failure with which to cmplete each waiter, if non-null.</param>
internal static void WakeUpWaiters(ref ReaderInteractor<bool> waiters, bool result, Exception error = null)
{
ReaderInteractor<bool> w = waiters;
if (w != null)
{
if (error != null)
{
w.Fail(error);
}
else
{
w.Success(result);
}
waiters = null;
}
}
/// <summary>Removes all interactors from the queue, failing each.</summary>
/// <param name="interactors">The queue of interactors to complete.</param>
/// <param name="error">The error with which to complete each interactor.</param>
internal static void FailInteractors<T, TInner>(Dequeue<T> interactors, Exception error) where T : Interactor<TInner>
{
Debug.Assert(error != null);
while (!interactors.IsEmpty)
{
interactors.DequeueHead().Fail(error);
}
}
/// <summary>Gets or creates a "waiter" (e.g. WaitForRead/WriteAsync) interactor.</summary>
/// <param name="waiter">The field storing the waiter interactor.</param>
/// <param name="runContinuationsAsynchronously">true to force continuations to run asynchronously; otherwise, false.</param>
/// <param name="cancellationToken">The token to use to cancel the wait.</param>
internal static Task<bool> GetOrCreateWaiter(ref ReaderInteractor<bool> waiter, bool runContinuationsAsynchronously, CancellationToken cancellationToken)
{
// Get the existing waiters interactor.
ReaderInteractor<bool> w = waiter;
// If there isn't one, create one. This explicitly does not include the cancellation token,
// as we reuse it for any number of waiters that overlap.
if (w == null)
{
waiter = w = ReaderInteractor<bool>.Create(runContinuationsAsynchronously);
}
// If the cancellation token can't be canceled, then just return the waiter task.
// If it can, we need to return a task that will complete when the waiter task does but that can also be canceled.
// Easiest way to do that is with a cancelable continuation.
return cancellationToken.CanBeCanceled ?
w.Task.ContinueWith(t => t.Result, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default) :
w.Task;
}
/// <summary>Creates and returns an exception object to indicate that a channel has been closed.</summary>
internal static Exception CreateInvalidCompletionException(Exception inner = null) =>
inner is OperationCanceledException ? inner :
inner != null && inner != s_doneWritingSentinel ? new ChannelClosedException(inner) :
new ChannelClosedException();
}
}

View File

@@ -0,0 +1,79 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System.Threading.Tasks;
namespace System.Threading.Channels
{
/// <summary>
/// Provides a base class for writing to a channel.
/// </summary>
/// <typeparam name="T">Specifies the type of data that may be written to the channel.</typeparam>
public abstract class ChannelWriter<T>
{
/// <summary>Attempts to mark the channel as being completed, meaning no more data will be written to it.</summary>
/// <param name="error">An <see cref="Exception"/> indicating the failure causing no more data to be written, or null for success.</param>
/// <returns>
/// true if this operation successfully completes the channel; otherwise, false if the channel could not be marked for completion,
/// for example due to having already been marked as such, or due to not supporting completion.
/// </returns>
public virtual bool TryComplete(Exception error = null) => false;
/// <summary>Attempts to write the specified item to the channel.</summary>
/// <param name="item">The item to write.</param>
/// <returns>true if the item was written; otherwise, false if it wasn't written.</returns>
public abstract bool TryWrite(T item);
/// <summary>Returns a <see cref="Task{Boolean}"/> that will complete when space is available to write an item.</summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the wait operation.</param>
/// <returns>
/// A <see cref="Task{Boolean}"/> that will complete with a <c>true</c> result when space is available to write an item
/// or with a <c>false</c> result when no further writing will be permitted.
/// </returns>
public abstract Task<bool> WaitToWriteAsync(CancellationToken cancellationToken = default);
/// <summary>Asynchronously writes an item to the channel.</summary>
/// <param name="item">The value to write to the channel.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the write operation.</param>
/// <returns>A <see cref="Task"/> that represents the asynchronous write operation.</returns>
public virtual Task WriteAsync(T item, CancellationToken cancellationToken = default)
{
try
{
return
cancellationToken.IsCancellationRequested ? Task.FromCanceled<T>(cancellationToken) :
TryWrite(item) ? Task.CompletedTask :
WriteAsyncCore(item, cancellationToken);
}
catch (Exception e)
{
return Task.FromException(e);
}
async Task WriteAsyncCore(T innerItem, CancellationToken ct)
{
while (await WaitToWriteAsync(ct).ConfigureAwait(false))
{
if (TryWrite(innerItem))
{
return;
}
}
throw ChannelUtilities.CreateInvalidCompletionException();
}
}
/// <summary>Mark the channel as being complete, meaning no more items will be written to it.</summary>
/// <param name="error">Optional Exception indicating a failure that's causing the channel to complete.</param>
/// <exception cref="InvalidOperationException">The channel has already been marked as complete.</exception>
public void Complete(Exception error = null)
{
if (!TryComplete(error))
{
throw ChannelUtilities.CreateInvalidCompletionException();
}
}
}
}

View File

@@ -0,0 +1,10 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
namespace System.Threading.Channels
{
/// <summary>Provides a base class for channels that support reading and writing elements of type <typeparamref name="T"/>.</summary>
/// <typeparam name="T">Specifies the type of data readable and writable in the channel.</typeparam>
public abstract class Channel<T> : Channel<T, T> { }
}

View File

@@ -0,0 +1,29 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
namespace System.Threading.Channels
{
/// <summary>
/// Provides a base class for channels that support reading elements of type <typeparamref name="TRead"/>
/// and writing elements of type <typeparamref name="TWrite"/>.
/// </summary>
/// <typeparam name="TWrite">Specifies the type of data that may be written to the channel.</typeparam>
/// <typeparam name="TRead">Specifies the type of data that may be read from the channel.</typeparam>
public abstract class Channel<TWrite, TRead>
{
/// <summary>Gets the readable half of this channel.</summary>
public ChannelReader<TRead> Reader { get; protected set; }
/// <summary>Gets the writable half of this channel.</summary>
public ChannelWriter<TWrite> Writer { get; protected set; }
/// <summary>Implicit cast from a channel to its readable half.</summary>
/// <param name="channel">The channel being cast.</param>
public static implicit operator ChannelReader<TRead>(Channel<TWrite, TRead> channel) => channel.Reader;
/// <summary>Implicit cast from a channel to its writable half.</summary>
/// <param name="channel">The channel being cast.</param>
public static implicit operator ChannelWriter<TWrite>(Channel<TWrite, TRead> channel) => channel.Writer;
}
}

Some files were not shown because too many files have changed in this diff Show More