//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ namespace System.ServiceModel.Channels { using System; using System.Collections.Generic; using System.Globalization; using System.IO; using System.Runtime; using System.Runtime.Diagnostics; using System.ServiceModel; using System.ServiceModel.Diagnostics.Application; using System.Threading; /// /// /// BufferedOutputAsyncStream is used for writing streamed response. /// For performance reasons, the behavior we want is chunk, chunk, chunk,.. terminating chunk without a delay. /// We call BeginWrite,BeginWrite,BeginWrite and Close()(close sends the terminating chunk) without /// waiting for all outstanding BeginWrites to complete. /// /// BufferedOutputAsyncStream is not a general-purpose stream wrapper, it requires that the base stream /// 1. allow concurrent IO (for multiple BeginWrite calls) /// 2. support the BeginWrite,BeginWrite,BeginWrite,.. Close() calling pattern. /// /// Currently BufferedOutputAsyncStream only used to wrap the System.Net.HttpResponseStream, which satisfy both requirements. /// /// BufferedOutputAsyncStream can also be used when doing asynchronous operations. Sync operations are not allowed when an async /// operation is in-flight. If a sync operation is in progress (i.e., data exists in our CurrentBuffer) and we issue an async operation, /// we flush everything in the buffers (and block while doing so) before the async operation is allowed to proceed. /// /// class BufferedOutputAsyncStream : Stream { readonly Stream stream; readonly int bufferSize; readonly int bufferLimit; readonly BufferQueue buffers; ByteBuffer currentByteBuffer; int availableBufferCount; static AsyncEventArgsCallback onFlushComplete = new AsyncEventArgsCallback(OnFlushComplete); int asyncWriteCount; WriteAsyncState writeState; WriteAsyncArgs writeArgs; static AsyncEventArgsCallback onAsyncFlushComplete; static AsyncEventArgsCallback onWriteCallback; EventTraceActivity activity; bool closed; internal BufferedOutputAsyncStream(Stream stream, int bufferSize, int bufferLimit) { this.stream = stream; this.bufferSize = bufferSize; this.bufferLimit = bufferLimit; this.buffers = new BufferQueue(this.bufferLimit); this.buffers.Add(new ByteBuffer(this, this.bufferSize, stream)); this.availableBufferCount = 1; } public override bool CanRead { get { return false; } } public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return stream.CanWrite && (!this.closed); } } public override long Length { get { #pragma warning suppress 56503 // Microsoft, required by the Stream.Length contract throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } } public override long Position { get { #pragma warning suppress 56503 // Microsoft, required by the Stream.Position contract throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } set { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } } internal EventTraceActivity EventTraceActivity { get { if (TD.BufferedAsyncWriteStartIsEnabled()) { if (this.activity == null) { this.activity = EventTraceActivity.GetFromThreadOrCreate(); } } return this.activity; } } ByteBuffer GetCurrentBuffer() { // Dequeue will null out the buffer this.ThrowOnException(); if (this.currentByteBuffer == null) { this.currentByteBuffer = this.buffers.CurrentBuffer(); } return this.currentByteBuffer; } public override void Close() { try { if (!this.closed) { this.FlushPendingBuffer(); stream.Close(); this.WaitForAllWritesToComplete(); } } finally { this.closed = true; } } public override void Flush() { FlushPendingBuffer(); stream.Flush(); } void FlushPendingBuffer() { ByteBuffer asyncBuffer = this.buffers.CurrentBuffer(); if (asyncBuffer != null) { this.DequeueAndFlush(asyncBuffer, onFlushComplete); } } void IncrementAsyncWriteCount() { if (Interlocked.Increment(ref this.asyncWriteCount) > 1) { throw FxTrace.Exception.AsError(new InvalidOperationException(SR.GetString(SR.WriterAsyncWritePending))); } } void DecrementAsyncWriteCount() { if (Interlocked.Decrement(ref this.asyncWriteCount) != 0) { throw FxTrace.Exception.AsError(new InvalidOperationException(SR.GetString(SR.NoAsyncWritePending))); } } void EnsureNoAsyncWritePending() { if (this.asyncWriteCount != 0) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.WriterAsyncWritePending))); } } void EnsureOpened() { if (this.closed) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.StreamClosed))); } } ByteBuffer NextBuffer() { if (!this.AdjustBufferSize()) { this.buffers.WaitForAny(); } return this.GetCurrentBuffer(); } bool AdjustBufferSize() { if (this.availableBufferCount < this.bufferLimit) { buffers.Add(new ByteBuffer(this, bufferSize, stream)); this.availableBufferCount++; return true; } return false; } public override int Read(byte[] buffer, int offset, int count) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } public override int ReadByte() { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported))); } public override long Seek(long offset, SeekOrigin origin) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } public override void SetLength(long value) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported))); } void WaitForAllWritesToComplete() { // Complete all outstanding writes this.buffers.WaitForAllWritesToComplete(); } public override void Write(byte[] buffer, int offset, int count) { this.EnsureOpened(); this.EnsureNoAsyncWritePending(); while (count > 0) { ByteBuffer currentBuffer = this.GetCurrentBuffer(); if (currentBuffer == null) { currentBuffer = this.NextBuffer(); } int freeBytes = currentBuffer.FreeBytes; // space left in the CurrentBuffer if (freeBytes > 0) { if (freeBytes > count) freeBytes = count; currentBuffer.CopyData(buffer, offset, freeBytes); offset += freeBytes; count -= freeBytes; } if (currentBuffer.FreeBytes == 0) { this.DequeueAndFlush(currentBuffer, onFlushComplete); } } } public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { this.EnsureOpened(); this.IncrementAsyncWriteCount(); Fx.Assert(this.writeState == null || this.writeState.Arguments == null || this.writeState.Arguments.Count <= 0, "All data has not been written yet."); if (onWriteCallback == null) { onWriteCallback = new AsyncEventArgsCallback(OnWriteCallback); onAsyncFlushComplete = new AsyncEventArgsCallback(OnAsyncFlushComplete); } if (this.writeState == null) { this.writeState = new WriteAsyncState(); this.writeArgs = new WriteAsyncArgs(); } else { // Since writeState!= null, check if the stream has an // exception as the async path has already been invoked. this.ThrowOnException(); } this.writeArgs.Set(buffer, offset, count, callback, state); this.writeState.Set(onWriteCallback, this.writeArgs, this); if (this.WriteAsync(this.writeState) == AsyncCompletionResult.Completed) { this.writeState.Complete(true); if (callback != null) { callback(this.writeState.CompletedSynchronouslyAsyncResult); } return this.writeState.CompletedSynchronouslyAsyncResult; } return this.writeState.PendingAsyncResult; } public override void EndWrite(IAsyncResult asyncResult) { this.DecrementAsyncWriteCount(); this.ThrowOnException(); } public override void WriteByte(byte value) { this.EnsureNoAsyncWritePending(); ByteBuffer currentBuffer = this.GetCurrentBuffer(); if (currentBuffer == null) { currentBuffer = NextBuffer(); } currentBuffer.CopyData(value); if (currentBuffer.FreeBytes == 0) { this.DequeueAndFlush(currentBuffer, onFlushComplete); } } void DequeueAndFlush(ByteBuffer currentBuffer, AsyncEventArgsCallback callback) { // Dequeue does a checkout of the buffer from its slot. // the callback for the sync path only enqueues the buffer. // The WriteAsync callback needs to enqueue and also complete. this.currentByteBuffer = null; ByteBuffer dequeued = this.buffers.Dequeue(); Fx.Assert(dequeued == currentBuffer, "Buffer queue in an inconsistent state."); WriteFlushAsyncEventArgs writeflushState = (WriteFlushAsyncEventArgs)currentBuffer.FlushAsyncArgs; if (writeflushState == null) { writeflushState = new WriteFlushAsyncEventArgs(); currentBuffer.FlushAsyncArgs = writeflushState; } writeflushState.Set(callback, null, this); if (currentBuffer.FlushAsync() == AsyncCompletionResult.Completed) { this.buffers.Enqueue(currentBuffer); writeflushState.Complete(true); } } static void OnFlushComplete(IAsyncEventArgs state) { BufferedOutputAsyncStream thisPtr = (BufferedOutputAsyncStream)state.AsyncState; WriteFlushAsyncEventArgs flushState = (WriteFlushAsyncEventArgs)state; ByteBuffer byteBuffer = flushState.Result; thisPtr.buffers.Enqueue(byteBuffer); } AsyncCompletionResult WriteAsync(WriteAsyncState state) { Fx.Assert(state != null && state.Arguments != null, "Invalid WriteAsyncState parameter."); if (state.Arguments.Count == 0) { return AsyncCompletionResult.Completed; } byte[] buffer = state.Arguments.Buffer; int offset = state.Arguments.Offset; int count = state.Arguments.Count; ByteBuffer currentBuffer = this.GetCurrentBuffer(); while (count > 0) { if (currentBuffer == null) { throw FxTrace.Exception.AsError(new InvalidOperationException(SR.GetString(SR.WriteAsyncWithoutFreeBuffer))); } int freeBytes = currentBuffer.FreeBytes; // space left in the CurrentBuffer if (freeBytes > 0) { if (freeBytes > count) freeBytes = count; currentBuffer.CopyData(buffer, offset, freeBytes); offset += freeBytes; count -= freeBytes; } if (currentBuffer.FreeBytes == 0) { this.DequeueAndFlush(currentBuffer, onAsyncFlushComplete); // We might need to increase the number of buffers available // if there is more data to be written or no buffer is available. if (count > 0 || this.buffers.Count == 0) { this.AdjustBufferSize(); } } //Update state for any pending writes. state.Arguments.Offset = offset; state.Arguments.Count = count; // We can complete synchronously only // if there a buffer available for writes. currentBuffer = this.GetCurrentBuffer(); if (currentBuffer == null) { if (this.buffers.TryUnlock()) { return AsyncCompletionResult.Queued; } currentBuffer = this.GetCurrentBuffer(); } } return AsyncCompletionResult.Completed; } static void OnAsyncFlushComplete(IAsyncEventArgs state) { BufferedOutputAsyncStream thisPtr = (BufferedOutputAsyncStream)state.AsyncState; Exception completionException = null; bool completeSelf = false; try { OnFlushComplete(state); if (thisPtr.buffers.TryAcquireLock()) { WriteFlushAsyncEventArgs flushState = (WriteFlushAsyncEventArgs)state; if (flushState.Exception != null) { completeSelf = true; completionException = flushState.Exception; } else { if (thisPtr.WriteAsync(thisPtr.writeState) == AsyncCompletionResult.Completed) { completeSelf = true; } } } } catch (Exception exception) { if (Fx.IsFatal(exception)) { throw; } if (completionException == null) { completionException = exception; } completeSelf = true; } if (completeSelf) { thisPtr.writeState.Complete(false, completionException); } } static void OnWriteCallback(IAsyncEventArgs state) { BufferedOutputAsyncStream thisPtr = (BufferedOutputAsyncStream)state.AsyncState; IAsyncResult returnResult = thisPtr.writeState.PendingAsyncResult; AsyncCallback callback = thisPtr.writeState.Arguments.Callback; thisPtr.writeState.Arguments.Callback = null; if (callback != null) { callback(returnResult); } } void ThrowOnException() { // if any of the buffers or the write state has an // exception the stream is not usable anymore. this.buffers.ThrowOnException(); if (this.writeState != null) { this.writeState.ThrowOnException(); } } class BufferQueue { readonly List refBufferList; readonly int size; readonly Slot[] buffers; Exception completionException; int head; int count; bool waiting; bool pendingCompletion; internal BufferQueue(int queueSize) { this.head = 0; this.count = 0; this.size = queueSize; this.buffers = new Slot[size]; this.refBufferList = new List(); for (int i = 0; i < queueSize; i++) { Slot s = new Slot(); s.checkedOut = true; //Start with all buffers checkedout. this.buffers[i] = s; } } object ThisLock { get { return this.buffers; } } internal int Count { get { lock (ThisLock) { return count; } } } internal ByteBuffer Dequeue() { Fx.Assert(!this.pendingCompletion, "Dequeue cannot be invoked when there is a pending completion"); lock (ThisLock) { if (count == 0) { return null; } Slot s = buffers[head]; Fx.Assert(!s.checkedOut, "This buffer is already in use."); this.head = (this.head + 1) % size; this.count--; ByteBuffer buffer = s.buffer; s.buffer = null; s.checkedOut = true; return buffer; } } internal void Add(ByteBuffer buffer) { lock (ThisLock) { Fx.Assert(this.refBufferList.Count < size, "Bufferlist is already full."); if (this.refBufferList.Count < this.size) { this.refBufferList.Add(buffer); this.Enqueue(buffer); } } } internal void Enqueue(ByteBuffer buffer) { lock (ThisLock) { this.completionException = this.completionException ?? buffer.CompletionException; Fx.Assert(count < size, "The queue is already full."); int tail = (this.head + this.count) % size; Slot s = this.buffers[tail]; this.count++; Fx.Assert(s.checkedOut, "Current buffer is still free."); s.checkedOut = false; s.buffer = buffer; if (this.waiting) { Monitor.Pulse(this.ThisLock); } } } internal ByteBuffer CurrentBuffer() { lock (ThisLock) { ThrowOnException(); Slot s = this.buffers[head]; return s.buffer; } } internal void WaitForAllWritesToComplete() { for (int i = 0; i < this.refBufferList.Count; i++) { this.refBufferList[i].WaitForWriteComplete(); } } internal void WaitForAny() { lock (ThisLock) { if (this.count == 0) { this.waiting = true; Monitor.Wait(ThisLock); this.waiting = false; } } this.ThrowOnException(); } internal void ThrowOnException() { if (this.completionException != null) { throw FxTrace.Exception.AsError(this.completionException); } } internal bool TryUnlock() { // The main thread tries to indicate a pending completion // if there aren't any free buffers for the next write. // The callback should try to complete() through TryAcquireLock. lock (ThisLock) { Fx.Assert(!this.pendingCompletion, "There is already a completion pending."); if (this.count == 0) { this.pendingCompletion = true; return true; } } return false; } internal bool TryAcquireLock() { // The callback tries to acquire the lock if there is a pending completion and a free buffer. // Buffers might get dequeued by the main writing thread as soon as they are enqueued. lock (ThisLock) { if (this.pendingCompletion && this.count > 0) { this.pendingCompletion = false; return true; } } return false; } class Slot { internal bool checkedOut; internal ByteBuffer buffer; } } /// /// AsyncEventArgs used to invoke the FlushAsync() on the ByteBuffer. /// class WriteFlushAsyncEventArgs : AsyncEventArgs { } class ByteBuffer { byte[] bytes; int position; Stream stream; bool writePending; bool waiting; Exception completionException; BufferedOutputAsyncStream parent; static AsyncCallback writeCallback = Fx.ThunkCallback(new AsyncCallback(WriteCallback)); static AsyncCallback flushCallback; internal ByteBuffer(BufferedOutputAsyncStream parent, int bufferSize, Stream stream) { this.waiting = false; this.writePending = false; this.position = 0; this.bytes = DiagnosticUtility.Utility.AllocateByteArray(bufferSize); this.stream = stream; this.parent = parent; } object ThisLock { get { return this; } } internal Exception CompletionException { get { return this.completionException; } } internal int FreeBytes { get { return this.bytes.Length - this.position; } } internal AsyncEventArgs FlushAsyncArgs { get; set; } static void WriteCallback(IAsyncResult result) { if (result.CompletedSynchronously) return; // Fetch our state information: ByteBuffer ByteBuffer buffer = (ByteBuffer)result.AsyncState; try { if (TD.BufferedAsyncWriteStopIsEnabled()) { TD.BufferedAsyncWriteStop(buffer.parent.EventTraceActivity); } buffer.stream.EndWrite(result); } #pragma warning suppress 56500 // Microsoft, transferring exception to another thread catch (Exception e) { if (Fx.IsFatal(e)) { throw; } buffer.completionException = e; } // Tell the main thread we've finished. lock (buffer.ThisLock) { buffer.writePending = false; // Do not Pulse if no one is waiting, to avoid the overhead of Pulse if (!buffer.waiting) return; Monitor.Pulse(buffer.ThisLock); } } internal void WaitForWriteComplete() { lock (ThisLock) { if (this.writePending) { // Wait until the async write of this buffer is finished. this.waiting = true; Monitor.Wait(ThisLock); this.waiting = false; } } // Raise exception if necessary if (this.completionException != null) { throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(completionException); } } internal void CopyData(byte[] buffer, int offset, int count) { Fx.Assert(this.position + count <= this.bytes.Length, string.Format(CultureInfo.InvariantCulture, "Chunk is too big to fit in this buffer. Chunk size={0}, free space={1}", count, this.bytes.Length - this.position)); Fx.Assert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position)); Buffer.BlockCopy(buffer, offset, this.bytes, this.position, count); this.position += count; } internal void CopyData(byte value) { Fx.Assert(this.position < this.bytes.Length, "Buffer is full"); Fx.Assert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position)); this.bytes[this.position++] = value; } /// /// Set the ByteBuffer's FlushAsyncArgs to invoke FlushAsync() /// /// internal AsyncCompletionResult FlushAsync() { if (this.position <= 0) return AsyncCompletionResult.Completed; Fx.Assert(this.FlushAsyncArgs != null, "FlushAsyncArgs not set."); if (flushCallback == null) { flushCallback = new AsyncCallback(OnAsyncFlush); } int bytesToWrite = this.position; this.SetWritePending(); this.position = 0; if (TD.BufferedAsyncWriteStartIsEnabled()) { TD.BufferedAsyncWriteStart(this.parent.EventTraceActivity, this.GetHashCode(), bytesToWrite); } IAsyncResult asyncResult = this.stream.BeginWrite(this.bytes, 0, bytesToWrite, flushCallback, this); if (asyncResult.CompletedSynchronously) { if (TD.BufferedAsyncWriteStopIsEnabled()) { TD.BufferedAsyncWriteStop(this.parent.EventTraceActivity); } this.stream.EndWrite(asyncResult); this.ResetWritePending(); return AsyncCompletionResult.Completed; } return AsyncCompletionResult.Queued; } static void OnAsyncFlush(IAsyncResult result) { if (result.CompletedSynchronously) { return; } ByteBuffer thisPtr = (ByteBuffer)result.AsyncState; AsyncEventArgs asyncEventArgs = thisPtr.FlushAsyncArgs; try { ByteBuffer.WriteCallback(result); asyncEventArgs.Result = thisPtr; } catch (Exception exception) { if (Fx.IsFatal(exception)) { throw; } if (thisPtr.completionException == null) { thisPtr.completionException = exception; } } asyncEventArgs.Complete(false, thisPtr.completionException); } void ResetWritePending() { lock (ThisLock) { this.writePending = false; } } void SetWritePending() { lock (ThisLock) { if (this.writePending) { throw FxTrace.Exception.AsError(new InvalidOperationException(SR.GetString(SR.FlushBufferAlreadyInUse))); } this.writePending = true; } } } /// /// Used to hold the users callback and state and arguments when BeginWrite is invoked. /// class WriteAsyncArgs { internal byte[] Buffer { get; set; } internal int Offset { get; set; } internal int Count { get; set; } internal AsyncCallback Callback { get; set; } internal object AsyncState { get; set; } internal void Set(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { this.Buffer = buffer; this.Offset = offset; this.Count = count; this.Callback = callback; this.AsyncState = state; } } class WriteAsyncState : AsyncEventArgs { PooledAsyncResult pooledAsyncResult; PooledAsyncResult completedSynchronouslyResult; internal IAsyncResult PendingAsyncResult { get { if (this.pooledAsyncResult == null) { this.pooledAsyncResult = new PooledAsyncResult(this, false); } return this.pooledAsyncResult; } } internal IAsyncResult CompletedSynchronouslyAsyncResult { get { if (this.completedSynchronouslyResult == null) { this.completedSynchronouslyResult = new PooledAsyncResult(this, true); } return completedSynchronouslyResult; } } internal void ThrowOnException() { if (this.Exception != null) { throw FxTrace.Exception.AsError(this.Exception); } } class PooledAsyncResult : IAsyncResult { readonly WriteAsyncState writeState; readonly bool completedSynchronously; internal PooledAsyncResult(WriteAsyncState parentState, bool completedSynchronously) { this.writeState = parentState; this.completedSynchronously = completedSynchronously; } public object AsyncState { get { return this.writeState.Arguments != null ? this.writeState.Arguments.AsyncState : null; } } public WaitHandle AsyncWaitHandle { get { throw FxTrace.Exception.AsError(new NotImplementedException()); } } public bool CompletedSynchronously { get { return this.completedSynchronously; } } public bool IsCompleted { get { throw FxTrace.Exception.AsError(new NotImplementedException()); } } } } } }