//----------------------------------------------------------------------------- // Copyright (c) Microsoft Corporation. All rights reserved. //----------------------------------------------------------------------------- namespace System.ServiceModel.Dispatcher { using System; using System.Collections.Generic; using System.ServiceModel.Diagnostics; using System.Runtime; using System.ServiceModel.Channels; using System.Threading; class MultipleReceiveBinder : IChannelBinder { internal static class MultipleReceiveDefaults { internal const int MaxPendingReceives = 1; } static AsyncCallback onInnerReceiveCompleted = Fx.ThunkCallback(OnInnerReceiveCompleted); MultipleReceiveAsyncResult outstanding; IChannelBinder channelBinder; ReceiveScopeQueue pendingResults; bool ordered; public MultipleReceiveBinder(IChannelBinder channelBinder, int size, bool ordered) { this.ordered = ordered; this.channelBinder = channelBinder; this.pendingResults = new ReceiveScopeQueue(size); } public IChannel Channel { get { return this.channelBinder.Channel; } } public bool HasSession { get { return this.channelBinder.HasSession; } } public Uri ListenUri { get { return this.channelBinder.ListenUri; } } public EndpointAddress LocalAddress { get { return this.channelBinder.LocalAddress; } } public EndpointAddress RemoteAddress { get { return this.channelBinder.RemoteAddress; } } public void Abort() { this.channelBinder.Abort(); } public void CloseAfterFault(TimeSpan timeout) { this.channelBinder.CloseAfterFault(timeout); } public bool TryReceive(TimeSpan timeout, out RequestContext requestContext) { return this.channelBinder.TryReceive(timeout, out requestContext); } public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) { // At anytime there can be only one thread in BeginTryReceive and the // outstanding AsyncResult should have completed before the next one. // There should be no pending oustanding result here. Fx.AssertAndThrow(this.outstanding == null, "BeginTryReceive should not have a pending result."); MultipleReceiveAsyncResult multipleReceiveResult = new MultipleReceiveAsyncResult(callback, state); this.outstanding = multipleReceiveResult; EnsurePump(timeout); IAsyncResult innerResult; if (this.pendingResults.TryDequeueHead(out innerResult)) { HandleReceiveRequestComplete(innerResult, true); } return multipleReceiveResult; } void EnsurePump(TimeSpan timeout) { // ensure we're running at full throttle, the BeginTryReceive calls we make below on the // IChannelBinder will typically complete future calls to BeginTryReceive made by CannelHandler // corollary to that is that most times these calls will be completed sycnhronously while (!this.pendingResults.IsFull) { ReceiveScopeSignalGate receiveScope = new ReceiveScopeSignalGate(this); // Enqueue the result without locks since this is the pump. // BeginTryReceive can be called only from one thread and // the head is not yet unlocked so no items can proceed. this.pendingResults.Enqueue(receiveScope); IAsyncResult result = this.channelBinder.BeginTryReceive(timeout, onInnerReceiveCompleted, receiveScope); if (result.CompletedSynchronously) { this.SignalReceiveCompleted(result); } } } static void OnInnerReceiveCompleted(IAsyncResult nestedResult) { if (nestedResult.CompletedSynchronously) { return; } ReceiveScopeSignalGate thisPtr = nestedResult.AsyncState as ReceiveScopeSignalGate; thisPtr.Binder.HandleReceiveAndSignalCompletion(nestedResult, false); } void HandleReceiveAndSignalCompletion(IAsyncResult nestedResult, bool completedSynchronosly) { if (SignalReceiveCompleted(nestedResult)) { HandleReceiveRequestComplete(nestedResult, completedSynchronosly); } } private bool SignalReceiveCompleted(IAsyncResult nestedResult) { if (this.ordered) { // Ordered recevies can proceed only if its own gate has // been unlocked. Head is the only gate unlocked and only the // result that owns the is the gate at the head can proceed. return this.pendingResults.TrySignal((ReceiveScopeSignalGate)nestedResult.AsyncState, nestedResult); } else { // Unordered receives can proceed with any gate. If the is head // is not unlocked by BeginTryReceive then the result will // be put on the last pending gate. return this.pendingResults.TrySignalPending(nestedResult); } } void HandleReceiveRequestComplete(IAsyncResult innerResult, bool completedSynchronously) { MultipleReceiveAsyncResult receiveResult = this.outstanding; Exception completionException = null; try { Fx.AssertAndThrow(receiveResult != null, "HandleReceive invoked without an outstanding result"); // Cleanup states this.outstanding = null; // set the context on the outer result for the ChannelHandler. RequestContext context; receiveResult.Valid = this.channelBinder.EndTryReceive(innerResult, out context); receiveResult.RequestContext = context; } catch (Exception ex) { if (Fx.IsFatal(ex)) { throw; } completionException = ex; } receiveResult.Complete(completedSynchronously, completionException); } public bool EndTryReceive(IAsyncResult result, out RequestContext requestContext) { return MultipleReceiveAsyncResult.End(result, out requestContext); } public RequestContext CreateRequestContext(Message message) { return this.channelBinder.CreateRequestContext(message); } public void Send(Message message, TimeSpan timeout) { this.channelBinder.Send(message, timeout); } public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) { return this.channelBinder.BeginSend(message, timeout, callback, state); } public void EndSend(IAsyncResult result) { this.channelBinder.EndSend(result); } public Message Request(Message message, TimeSpan timeout) { return this.channelBinder.Request(message, timeout); } public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state) { return this.channelBinder.BeginRequest(message, timeout, callback, state); } public Message EndRequest(IAsyncResult result) { return this.channelBinder.EndRequest(result); } public bool WaitForMessage(TimeSpan timeout) { return this.channelBinder.WaitForMessage(timeout); } public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) { return this.channelBinder.BeginWaitForMessage(timeout, callback, state); } public bool EndWaitForMessage(IAsyncResult result) { return this.channelBinder.EndWaitForMessage(result); } class MultipleReceiveAsyncResult : AsyncResult { public MultipleReceiveAsyncResult(AsyncCallback callback, object state) : base(callback, state) { } public bool Valid { get; set; } public RequestContext RequestContext { get; set; } public new void Complete(bool completedSynchronously, Exception completionException) { base.Complete(completedSynchronously, completionException); } public static bool End(IAsyncResult result, out RequestContext context) { MultipleReceiveAsyncResult thisPtr = AsyncResult.End(result); context = thisPtr.RequestContext; return thisPtr.Valid; } } class ReceiveScopeSignalGate : SignalGate { public ReceiveScopeSignalGate(MultipleReceiveBinder binder) { this.Binder = binder; } public MultipleReceiveBinder Binder { get; private set; } } class ReceiveScopeQueue { // This class is a circular queue with 2 pointers for pending items and head. // Ordered Receives : The head is unlocked by BeginTryReceive. The ReceiveGate can signal only the // the gate that it owns. If the gate is the head then it will proceed. // Unordered Receives: Any pending item can be signalled. The pending index keeps track // of results that haven't been completed. If the head is unlocked then it will proceed. int pending; int head; int count; readonly int size; ReceiveScopeSignalGate[] items; public ReceiveScopeQueue(int size) { this.size = size; this.head = 0; this.count = 0; this.pending = 0; items = new ReceiveScopeSignalGate[size]; } internal bool IsFull { get { return this.count == this.size; } } internal void Enqueue(ReceiveScopeSignalGate receiveScope) { // This should only be called from EnsurePump which itself should only be // BeginTryReceive. This makes sure that we don't need locks to enqueue an item. Fx.AssertAndThrow(this.count < this.size, "Cannot Enqueue into a full queue."); this.items[(this.head + this.count) % this.size] = receiveScope; count++; } void Dequeue() { // Dequeue should not be called outside a signal/unlock boundary. // There are no locks as this boundary ensures that only one thread // Tries to dequeu an item either in the unlock or Signal thread. Fx.AssertAndThrow(this.count > 0, "Cannot Dequeue and empty queue."); this.items[head] = null; this.head = (head + 1) % this.size; this.count--; } internal bool TryDequeueHead(out IAsyncResult result) { // Invoked only from BeginTryReceive as only the main thread can // dequeue the head and is Successful only if it's already been signaled and completed. Fx.AssertAndThrow(this.count > 0, "Cannot unlock item when queue is empty"); if (this.items[head].Unlock(out result)) { this.Dequeue(); return true; } return false; } public bool TrySignal(ReceiveScopeSignalGate scope, IAsyncResult nestedResult) { // Ordered receives can only signal the gate that the AsyncResult owns. // If the head has already been unlocked then it can proceed. if (scope.Signal(nestedResult)) { Dequeue(); return true; } return false; } public bool TrySignalPending(IAsyncResult result) { // free index will wrap around and always return the next free index; // Only the head of the queue can proceed as the head would be unlocked by // BeginTryReceive. All other requests will just submit their completed result. int nextPending = GetNextPending(); if (this.items[nextPending].Signal(result)) { Dequeue(); return true; } return false; } int GetNextPending() { int slot = this.pending; while (true) { if (slot == (slot = Interlocked.CompareExchange(ref this.pending, (slot + 1) % this.size, slot))) { return slot; } } } } } }