// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
/*============================================================
**
** Class: Stream
**
** gpaperin
**
**
** Purpose: Abstract base class for all Streams. Provides
** default implementations of asynchronous reads & writes, in
** terms of the synchronous reads & writes (and vice versa).
**
**
===========================================================*/
using System;
using System.Threading;
#if FEATURE_ASYNC_IO
using System.Threading.Tasks;
#endif
using System.Runtime;
using System.Runtime.InteropServices;
#if NEW_EXPERIMENTAL_ASYNC_IO
using System.Runtime.CompilerServices;
#endif
using System.Runtime.ExceptionServices;
using System.Security;
using System.Security.Permissions;
using System.Diagnostics.Contracts;
using System.Reflection;
namespace System.IO {
[Serializable]
[ComVisible(true)]
#if CONTRACTS_FULL
[ContractClass(typeof(StreamContract))]
#endif
#if FEATURE_REMOTING
public abstract class Stream : MarshalByRefObject, IDisposable {
#else // FEATURE_REMOTING
public abstract class Stream : IDisposable {
#endif // FEATURE_REMOTING
public static readonly Stream Null = new NullStream();
//We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
// The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
// improvement in Copy performance.
private const int _DefaultCopyBufferSize = 81920;
#if NEW_EXPERIMENTAL_ASYNC_IO
// To implement Async IO operations on streams that don't support async IO
[NonSerialized]
private ReadWriteTask _activeReadWriteTask;
[NonSerialized]
private SemaphoreSlim _asyncActiveSemaphore;
internal SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized()
{
// Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
// WaitHandle, we don't need to worry about Disposing it.
return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1));
}
#endif
public abstract bool CanRead {
[Pure]
get;
}
// If CanSeek is false, Position, Seek, Length, and SetLength should throw.
public abstract bool CanSeek {
[Pure]
get;
}
[ComVisible(false)]
public virtual bool CanTimeout {
[Pure]
get {
return false;
}
}
public abstract bool CanWrite {
[Pure]
get;
}
public abstract long Length {
get;
}
public abstract long Position {
get;
set;
}
[ComVisible(false)]
public virtual int ReadTimeout {
get {
Contract.Ensures(Contract.Result() >= 0);
throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
}
set {
throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
}
}
[ComVisible(false)]
public virtual int WriteTimeout {
get {
Contract.Ensures(Contract.Result() >= 0);
throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
}
set {
throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_TimeoutsNotSupported"));
}
}
#if FEATURE_ASYNC_IO
[HostProtection(ExternalThreading = true)]
[ComVisible(false)]
public Task CopyToAsync(Stream destination)
{
return CopyToAsync(destination, _DefaultCopyBufferSize);
}
[HostProtection(ExternalThreading = true)]
[ComVisible(false)]
public Task CopyToAsync(Stream destination, Int32 bufferSize)
{
return CopyToAsync(destination, bufferSize, CancellationToken.None);
}
[HostProtection(ExternalThreading = true)]
[ComVisible(false)]
public virtual Task CopyToAsync(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
{
if (destination == null)
throw new ArgumentNullException("destination");
if (bufferSize <= 0)
throw new ArgumentOutOfRangeException("bufferSize", Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
if (!CanRead && !CanWrite)
throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
if (!destination.CanRead && !destination.CanWrite)
throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
if (!CanRead)
throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
if (!destination.CanWrite)
throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
Contract.EndContractBlock();
return CopyToAsyncInternal(destination, bufferSize, cancellationToken);
}
private async Task CopyToAsyncInternal(Stream destination, Int32 bufferSize, CancellationToken cancellationToken)
{
Contract.Requires(destination != null);
Contract.Requires(bufferSize > 0);
Contract.Requires(CanRead);
Contract.Requires(destination.CanWrite);
byte[] buffer = new byte[bufferSize];
int bytesRead;
while ((bytesRead = await ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
{
await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
}
}
#endif // FEATURE_ASYNC_IO
// Reads the bytes from the current stream and writes the bytes to
// the destination stream until all bytes are read, starting at
// the current position.
public void CopyTo(Stream destination)
{
if (destination == null)
throw new ArgumentNullException("destination");
if (!CanRead && !CanWrite)
throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
if (!destination.CanRead && !destination.CanWrite)
throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
if (!CanRead)
throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
if (!destination.CanWrite)
throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
Contract.EndContractBlock();
InternalCopyTo(destination, _DefaultCopyBufferSize);
}
public void CopyTo(Stream destination, int bufferSize)
{
if (destination == null)
throw new ArgumentNullException("destination");
if (bufferSize <= 0)
throw new ArgumentOutOfRangeException("bufferSize",
Environment.GetResourceString("ArgumentOutOfRange_NeedPosNum"));
if (!CanRead && !CanWrite)
throw new ObjectDisposedException(null, Environment.GetResourceString("ObjectDisposed_StreamClosed"));
if (!destination.CanRead && !destination.CanWrite)
throw new ObjectDisposedException("destination", Environment.GetResourceString("ObjectDisposed_StreamClosed"));
if (!CanRead)
throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnreadableStream"));
if (!destination.CanWrite)
throw new NotSupportedException(Environment.GetResourceString("NotSupported_UnwritableStream"));
Contract.EndContractBlock();
InternalCopyTo(destination, bufferSize);
}
private void InternalCopyTo(Stream destination, int bufferSize)
{
Contract.Requires(destination != null);
Contract.Requires(CanRead);
Contract.Requires(destination.CanWrite);
Contract.Requires(bufferSize > 0);
byte[] buffer = new byte[bufferSize];
int read;
while ((read = Read(buffer, 0, buffer.Length)) != 0)
destination.Write(buffer, 0, read);
}
// Stream used to require that all cleanup logic went into Close(),
// which was thought up before we invented IDisposable. However, we
// need to follow the IDisposable pattern so that users can write
// sensible subclasses without needing to inspect all their base
// classes, and without worrying about version brittleness, from a
// base class switching to the Dispose pattern. We're moving
// Stream to the Dispose(bool) pattern - that's where all subclasses
// should put their cleanup starting in V2.
public virtual void Close()
{
/* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
Contract.Ensures(CanRead == false);
Contract.Ensures(CanWrite == false);
Contract.Ensures(CanSeek == false);
*/
Dispose(true);
GC.SuppressFinalize(this);
}
public void Dispose()
{
/* These are correct, but we'd have to fix PipeStream & NetworkStream very carefully.
Contract.Ensures(CanRead == false);
Contract.Ensures(CanWrite == false);
Contract.Ensures(CanSeek == false);
*/
Close();
}
protected virtual void Dispose(bool disposing)
{
// Note: Never change this to call other virtual methods on Stream
// like Write, since the state on subclasses has already been
// torn down. This is the last code to run on cleanup for a stream.
}
public abstract void Flush();
#if FEATURE_ASYNC_IO
[HostProtection(ExternalThreading=true)]
[ComVisible(false)]
public Task FlushAsync()
{
return FlushAsync(CancellationToken.None);
}
[HostProtection(ExternalThreading=true)]
[ComVisible(false)]
public virtual Task FlushAsync(CancellationToken cancellationToken)
{
return Task.Factory.StartNew(state => ((Stream)state).Flush(), this,
cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
}
#endif // FEATURE_ASYNC_IO
[Obsolete("CreateWaitHandle will be removed eventually. Please use \"new ManualResetEvent(false)\" instead.")]
protected virtual WaitHandle CreateWaitHandle()
{
Contract.Ensures(Contract.Result() != null);
return new ManualResetEvent(false);
}
[HostProtection(ExternalThreading=true)]
public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
{
Contract.Ensures(Contract.Result() != null);
return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
}
[HostProtection(ExternalThreading = true)]
internal IAsyncResult BeginReadInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
{
Contract.Ensures(Contract.Result() != null);
if (!CanRead) __Error.ReadNotSupported();
#if !NEW_EXPERIMENTAL_ASYNC_IO
return BlockingBeginRead(buffer, offset, count, callback, state);
#else
// Mango did not do Async IO.
if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
{
return BlockingBeginRead(buffer, offset, count, callback, state);
}
// To avoid a race with a stream's position pointer & generating ----
// conditions with internal buffer indexes in our own streams that
// don't natively support async IO operations when there are multiple
// async requests outstanding, we will block the application's main
// thread if it does a second IO request until the first one completes.
var semaphore = EnsureAsyncActiveSemaphoreInitialized();
Task semaphoreTask = null;
if (serializeAsynchronously)
{
semaphoreTask = semaphore.WaitAsync();
}
else
{
semaphore.Wait();
}
// Create the task to asynchronously do a Read. This task serves both
// as the asynchronous work item and as the IAsyncResult returned to the user.
var asyncResult = new ReadWriteTask(true /*isRead*/, delegate
{
// The ReadWriteTask stores all of the parameters to pass to Read.
// As we're currently inside of it, we can get the current task
// and grab the parameters from it.
var thisTask = Task.InternalCurrent as ReadWriteTask;
Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
// Do the Read and return the number of bytes read
var bytesRead = thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
thisTask.ClearBeginState(); // just to help alleviate some memory pressure
return bytesRead;
}, state, this, buffer, offset, count, callback);
// Schedule it
if (semaphoreTask != null)
RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
else
RunReadWriteTask(asyncResult);
return asyncResult; // return it
#endif
}
public virtual int EndRead(IAsyncResult asyncResult)
{
if (asyncResult == null)
throw new ArgumentNullException("asyncResult");
Contract.Ensures(Contract.Result() >= 0);
Contract.EndContractBlock();
#if !NEW_EXPERIMENTAL_ASYNC_IO
return BlockingEndRead(asyncResult);
#else
// Mango did not do async IO.
if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
{
return BlockingEndRead(asyncResult);
}
var readTask = _activeReadWriteTask;
if (readTask == null)
{
throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
}
else if (readTask != asyncResult)
{
throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
}
else if (!readTask._isRead)
{
throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple"));
}
try
{
return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception
}
finally
{
_activeReadWriteTask = null;
Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
_asyncActiveSemaphore.Release();
}
#endif
}
#if FEATURE_ASYNC_IO
[HostProtection(ExternalThreading = true)]
[ComVisible(false)]
public Task ReadAsync(Byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count, CancellationToken.None);
}
[HostProtection(ExternalThreading = true)]
[ComVisible(false)]
public virtual Task ReadAsync(Byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// If cancellation was requested, bail early with an already completed task.
// Otherwise, return a task that represents the Begin/End methods.
return cancellationToken.IsCancellationRequested
? Task.FromCancellation(cancellationToken)
: BeginEndReadAsync(buffer, offset, count);
}
private Task BeginEndReadAsync(Byte[] buffer, Int32 offset, Int32 count)
{
return TaskFactory.FromAsyncTrim(
this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
(stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
(stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
}
private struct ReadWriteParameters // struct for arguments to Read and Write calls
{
internal byte[] Buffer;
internal int Offset;
internal int Count;
}
#endif //FEATURE_ASYNC_IO
[HostProtection(ExternalThreading=true)]
public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state)
{
Contract.Ensures(Contract.Result() != null);
return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false);
}
[HostProtection(ExternalThreading = true)]
internal IAsyncResult BeginWriteInternal(byte[] buffer, int offset, int count, AsyncCallback callback, Object state, bool serializeAsynchronously)
{
Contract.Ensures(Contract.Result() != null);
if (!CanWrite) __Error.WriteNotSupported();
#if !NEW_EXPERIMENTAL_ASYNC_IO
return BlockingBeginWrite(buffer, offset, count, callback, state);
#else
// Mango did not do Async IO.
if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
{
return BlockingBeginWrite(buffer, offset, count, callback, state);
}
// To avoid a race with a stream's position pointer & generating ----
// conditions with internal buffer indexes in our own streams that
// don't natively support async IO operations when there are multiple
// async requests outstanding, we will block the application's main
// thread if it does a second IO request until the first one completes.
var semaphore = EnsureAsyncActiveSemaphoreInitialized();
Task semaphoreTask = null;
if (serializeAsynchronously)
{
semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block
}
else
{
semaphore.Wait(); // synchronously wait here
}
// Create the task to asynchronously do a Write. This task serves both
// as the asynchronous work item and as the IAsyncResult returned to the user.
var asyncResult = new ReadWriteTask(false /*isRead*/, delegate
{
// The ReadWriteTask stores all of the parameters to pass to Write.
// As we're currently inside of it, we can get the current task
// and grab the parameters from it.
var thisTask = Task.InternalCurrent as ReadWriteTask;
Contract.Assert(thisTask != null, "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask");
// Do the Write
thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
thisTask.ClearBeginState(); // just to help alleviate some memory pressure
return 0; // not used, but signature requires a value be returned
}, state, this, buffer, offset, count, callback);
// Schedule it
if (semaphoreTask != null)
RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
else
RunReadWriteTask(asyncResult);
return asyncResult; // return it
#endif
}
#if NEW_EXPERIMENTAL_ASYNC_IO
private void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWriteTask)
{
Contract.Assert(readWriteTask != null); // Should be Contract.Requires, but CCRewrite is doing a poor job with
// preconditions in async methods that await. Mike & Manuel are aware. (10/6/2011, bug 290222)
Contract.Assert(asyncWaiter != null); // Ditto
// If the wait has already complete, run the task.
if (asyncWaiter.IsCompleted)
{
Contract.Assert(asyncWaiter.IsRanToCompletion, "The semaphore wait should always complete successfully.");
RunReadWriteTask(readWriteTask);
}
else // Otherwise, wait for our turn, and then run the task.
{
asyncWaiter.ContinueWith((t, state) =>
{
Contract.Assert(t.IsRanToCompletion, "The semaphore wait should always complete successfully.");
var tuple = (Tuple)state;
tuple.Item1.RunReadWriteTask(tuple.Item2); // RunReadWriteTask(readWriteTask);
}, Tuple.Create(this, readWriteTask),
default(CancellationToken),
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
}
private void RunReadWriteTask(ReadWriteTask readWriteTask)
{
Contract.Requires(readWriteTask != null);
Contract.Assert(_activeReadWriteTask == null, "Expected no other readers or writers");
// Schedule the task. ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race.
// Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding
// two interlocked operations. However, if ReadWriteTask is ever changed to use
// a cancellation token, this should be changed to use Start.
_activeReadWriteTask = readWriteTask; // store the task so that EndXx can validate it's given the right one
readWriteTask.m_taskScheduler = TaskScheduler.Default;
readWriteTask.ScheduleAndStart(needsProtection: false);
}
#endif
public virtual void EndWrite(IAsyncResult asyncResult)
{
if (asyncResult==null)
throw new ArgumentNullException("asyncResult");
Contract.EndContractBlock();
#if !NEW_EXPERIMENTAL_ASYNC_IO
BlockingEndWrite(asyncResult);
#else
// Mango did not do Async IO.
if(CompatibilitySwitches.IsAppEarlierThanWindowsPhone8)
{
BlockingEndWrite(asyncResult);
return;
}
var writeTask = _activeReadWriteTask;
if (writeTask == null)
{
throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
}
else if (writeTask != asyncResult)
{
throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
}
else if (writeTask._isRead)
{
throw new ArgumentException(Environment.GetResourceString("InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple"));
}
try
{
writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions
Contract.Assert(writeTask.Status == TaskStatus.RanToCompletion);
}
finally
{
_activeReadWriteTask = null;
Contract.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
_asyncActiveSemaphore.Release();
}
#endif
}
#if NEW_EXPERIMENTAL_ASYNC_IO
// Task used by BeginRead / BeginWrite to do Read / Write asynchronously.
// A single instance of this task serves four purposes:
// 1. The work item scheduled to run the Read / Write operation
// 2. The state holding the arguments to be passed to Read / Write
// 3. The IAsyncResult returned from BeginRead / BeginWrite
// 4. The completion action that runs to invoke the user-provided callback.
// This last item is a bit tricky. Before the AsyncCallback is invoked, the
// IAsyncResult must have completed, so we can't just invoke the handler
// from within the task, since it is the IAsyncResult, and thus it's not
// yet completed. Instead, we use AddCompletionAction to install this
// task as its own completion handler. That saves the need to allocate
// a separate completion handler, it guarantees that the task will
// have completed by the time the handler is invoked, and it allows
// the handler to be invoked synchronously upon the completion of the
// task. This all enables BeginRead / BeginWrite to be implemented
// with a single allocation.
private sealed class ReadWriteTask : Task, ITaskCompletionAction
{
internal readonly bool _isRead;
internal Stream _stream;
internal byte [] _buffer;
internal int _offset;
internal int _count;
private AsyncCallback _callback;
private ExecutionContext _context;
internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC
{
_stream = null;
_buffer = null;
}
[SecuritySafeCritical] // necessary for EC.Capture
[MethodImpl(MethodImplOptions.NoInlining)]
public ReadWriteTask(
bool isRead,
Func