| 
									
										
										
										
											2016-08-03 10:59:49 +00:00
										 |  |  |  | //---------------------------------------------------------------- | 
					
						
							|  |  |  |  | // Copyright (c) Microsoft Corporation.  All rights reserved. | 
					
						
							|  |  |  |  | //---------------------------------------------------------------- | 
					
						
							|  |  |  |  | namespace System.ServiceModel.Activities.Dispatcher | 
					
						
							|  |  |  |  | { | 
					
						
							|  |  |  |  |     using System.Activities.Hosting; | 
					
						
							|  |  |  |  |     using System.Collections.Generic; | 
					
						
							|  |  |  |  |     using System.Collections.ObjectModel; | 
					
						
							|  |  |  |  |     using System.Runtime; | 
					
						
							|  |  |  |  |     using System.Runtime.DurableInstancing; | 
					
						
							|  |  |  |  |     using System.ServiceModel.Channels; | 
					
						
							|  |  |  |  |     using System.Threading; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     sealed class BufferedReceiveManager : IExtension<ServiceHostBase> | 
					
						
							|  |  |  |  |     { | 
					
						
							|  |  |  |  |         static AsyncCallback onEndAbandon; | 
					
						
							|  |  |  |  |         Dictionary<InstanceKey, List<BufferedReceiveMessageProperty>> bufferedProperties; | 
					
						
							|  |  |  |  |         PendingMessageThrottle throttle; | 
					
						
							|  |  |  |  |         WorkflowServiceHost host; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         int initialized; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         [Fx.Tag.SynchronizationObject(Blocking = false)] | 
					
						
							|  |  |  |  |         object thisLock; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         public BufferedReceiveManager(int maxPendingMessagesPerChannel) | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |             this.throttle = new PendingMessageThrottle(maxPendingMessagesPerChannel); | 
					
						
							|  |  |  |  |             this.thisLock = new object(); | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         public bool BufferReceive(OperationContext operationContext, ReceiveContext receiveContext, string bookmarkName, BufferedReceiveState state, bool retry) | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |             Fx.Assert(receiveContext != null, "ReceiveContext must be present in order to perform buffering"); | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             bool success = false; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             BufferedReceiveMessageProperty property = null; | 
					
						
							|  |  |  |  |             if (BufferedReceiveMessageProperty.TryGet(operationContext.IncomingMessageProperties, out property)) | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 CorrelationMessageProperty correlation = null; | 
					
						
							|  |  |  |  |                 if (CorrelationMessageProperty.TryGet(operationContext.IncomingMessageProperties, out correlation)) | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     InstanceKey instanceKey = correlation.CorrelationKey; | 
					
						
							|  |  |  |  |                     int channelKey = operationContext.Channel.GetHashCode(); | 
					
						
							|  |  |  |  |                     if (this.throttle.Acquire(channelKey)) | 
					
						
							|  |  |  |  |                     { | 
					
						
							|  |  |  |  |                         try | 
					
						
							|  |  |  |  |                         { | 
					
						
							|  |  |  |  |                             // Tag the property with identifying data to be used during later processing | 
					
						
							|  |  |  |  |                             if (UpdateProperty(property, receiveContext, channelKey, bookmarkName, state)) | 
					
						
							|  |  |  |  |                             { | 
					
						
							|  |  |  |  |                                 // Cleanup if we are notified the ReceiveContext faulted underneath us | 
					
						
							|  |  |  |  |                                 receiveContext.Faulted += delegate(object sender, EventArgs e) | 
					
						
							|  |  |  |  |                                 { | 
					
						
							|  |  |  |  |                                     lock (this.thisLock) | 
					
						
							|  |  |  |  |                                     { | 
					
						
							|  |  |  |  |                                         if (this.bufferedProperties.ContainsKey(instanceKey)) | 
					
						
							|  |  |  |  |                                         { | 
					
						
							|  |  |  |  |                                             if (this.bufferedProperties[instanceKey].Remove(property)) | 
					
						
							|  |  |  |  |                                             { | 
					
						
							|  |  |  |  |                                                 try | 
					
						
							|  |  |  |  |                                                 { | 
					
						
							|  |  |  |  |                                                     property.RequestContext.DelayClose(false); | 
					
						
							|  |  |  |  |                                                     property.RequestContext.Abort(); | 
					
						
							|  |  |  |  |                                                 } | 
					
						
							|  |  |  |  |                                                 catch (Exception exception) | 
					
						
							|  |  |  |  |                                                 { | 
					
						
							|  |  |  |  |                                                     if (Fx.IsFatal(exception)) | 
					
						
							|  |  |  |  |                                                     { | 
					
						
							|  |  |  |  |                                                         throw; | 
					
						
							|  |  |  |  |                                                     } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                                                     // ---- these exceptions as we are already on the error path | 
					
						
							|  |  |  |  |                                                 } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                                                 this.throttle.Release(channelKey); | 
					
						
							|  |  |  |  |                                             } | 
					
						
							|  |  |  |  |                                         } | 
					
						
							|  |  |  |  |                                     } | 
					
						
							|  |  |  |  |                                 }; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                                 // Actual Buffering | 
					
						
							|  |  |  |  |                                 lock (this.thisLock) | 
					
						
							|  |  |  |  |                                 { | 
					
						
							| 
									
										
										
										
											2017-08-21 15:34:15 +00:00
										 |  |  |  |                                     // Optimistic state check in case we just raced with the receiveContext | 
					
						
							| 
									
										
										
										
											2016-08-03 10:59:49 +00:00
										 |  |  |  |                                     // faulting. If the receiveContext still faults after the state check, the above | 
					
						
							|  |  |  |  |                                     // cleanup routine will handle things correctly. In both cases, a double-release | 
					
						
							|  |  |  |  |                                     // of the throttle is protected. | 
					
						
							|  |  |  |  |                                     if (receiveContext.State == ReceiveContextState.Received) | 
					
						
							|  |  |  |  |                                     { | 
					
						
							|  |  |  |  |                                         bool found = false; | 
					
						
							|  |  |  |  |                                         // if the exception indicates retry-able (such as RetryException), | 
					
						
							|  |  |  |  |                                         // we will simply retry.  This happens when racing with abort and  | 
					
						
							|  |  |  |  |                                         // WF informing the client to retry (BufferedReceiveManager is a | 
					
						
							|  |  |  |  |                                         // client in this case). | 
					
						
							|  |  |  |  |                                         if (retry) | 
					
						
							|  |  |  |  |                                         { | 
					
						
							|  |  |  |  |                                             property.RequestContext.DelayClose(true); | 
					
						
							|  |  |  |  |                                             property.RegisterForReplay(operationContext); | 
					
						
							|  |  |  |  |                                             property.ReplayRequest(); | 
					
						
							|  |  |  |  |                                             property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext); | 
					
						
							|  |  |  |  |                                             found = true; | 
					
						
							|  |  |  |  |                                         } | 
					
						
							|  |  |  |  |                                         else | 
					
						
							|  |  |  |  |                                         { | 
					
						
							|  |  |  |  |                                             ReadOnlyCollection<BookmarkInfo> bookmarks = this.host.DurableInstanceManager.PersistenceProviderDirectory.GetBookmarksForInstance(instanceKey); | 
					
						
							|  |  |  |  |                                             // Retry in case match the existing bookmark | 
					
						
							|  |  |  |  |                                             if (bookmarks != null) | 
					
						
							|  |  |  |  |                                             { | 
					
						
							|  |  |  |  |                                                 for (int i = 0; i < bookmarks.Count; ++i) | 
					
						
							|  |  |  |  |                                                 { | 
					
						
							|  |  |  |  |                                                     BookmarkInfo bookmark = bookmarks[i]; | 
					
						
							|  |  |  |  |                                                     if (bookmark.BookmarkName == bookmarkName) | 
					
						
							|  |  |  |  |                                                     { | 
					
						
							|  |  |  |  |                                                         // Found it so retry... | 
					
						
							|  |  |  |  |                                                         property.RequestContext.DelayClose(true); | 
					
						
							|  |  |  |  |                                                         property.RegisterForReplay(operationContext); | 
					
						
							|  |  |  |  |                                                         property.ReplayRequest(); | 
					
						
							|  |  |  |  |                                                         property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext); | 
					
						
							|  |  |  |  |                                                         found = true; | 
					
						
							|  |  |  |  |                                                         break; | 
					
						
							|  |  |  |  |                                                     } | 
					
						
							|  |  |  |  |                                                 } | 
					
						
							|  |  |  |  |                                             } | 
					
						
							|  |  |  |  |                                         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                                         if (!found) | 
					
						
							|  |  |  |  |                                         { | 
					
						
							|  |  |  |  |                                             List<BufferedReceiveMessageProperty> properties; | 
					
						
							|  |  |  |  |                                             if (!this.bufferedProperties.TryGetValue(instanceKey, out properties)) | 
					
						
							|  |  |  |  |                                             { | 
					
						
							|  |  |  |  |                                                 properties = new List<BufferedReceiveMessageProperty>(); | 
					
						
							|  |  |  |  |                                                 this.bufferedProperties.Add(instanceKey, properties); | 
					
						
							|  |  |  |  |                                             } | 
					
						
							|  |  |  |  |                                             property.RequestContext.DelayClose(true); | 
					
						
							|  |  |  |  |                                             property.RegisterForReplay(operationContext); | 
					
						
							|  |  |  |  |                                             properties.Add(property); | 
					
						
							|  |  |  |  |                                         } | 
					
						
							|  |  |  |  |                                         else | 
					
						
							|  |  |  |  |                                         { | 
					
						
							|  |  |  |  |                                             this.throttle.Release(channelKey); | 
					
						
							|  |  |  |  |                                         } | 
					
						
							|  |  |  |  |                                         success = true; | 
					
						
							|  |  |  |  |                                     } | 
					
						
							|  |  |  |  |                                 } | 
					
						
							|  |  |  |  |                             } | 
					
						
							|  |  |  |  |                         } | 
					
						
							|  |  |  |  |                         finally | 
					
						
							|  |  |  |  |                         { | 
					
						
							|  |  |  |  |                             if (!success) | 
					
						
							|  |  |  |  |                             { | 
					
						
							|  |  |  |  |                                 this.throttle.Release(channelKey); | 
					
						
							|  |  |  |  |                             } | 
					
						
							|  |  |  |  |                         } | 
					
						
							|  |  |  |  |                     } | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             return success; | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         public void Retry(HashSet<InstanceKey> associatedInstances, ReadOnlyCollection<BookmarkInfo> availableBookmarks) | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |             List<BookmarkInfo> bookmarks = new List<BookmarkInfo>(availableBookmarks); | 
					
						
							|  |  |  |  |             foreach (InstanceKey instanceKey in associatedInstances) | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 lock (this.thisLock) | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     if (this.bufferedProperties.ContainsKey(instanceKey)) | 
					
						
							|  |  |  |  |                     { | 
					
						
							|  |  |  |  |                         List<BufferedReceiveMessageProperty> properties = this.bufferedProperties[instanceKey]; | 
					
						
							|  |  |  |  |                         int index = 0; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                         while (index < properties.Count && bookmarks.Count > 0) | 
					
						
							|  |  |  |  |                         { | 
					
						
							|  |  |  |  |                             BufferedReceiveMessageProperty property = properties[index]; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                             // Determine if this property is now ready to be processed | 
					
						
							|  |  |  |  |                             int channelKey = 0; | 
					
						
							|  |  |  |  |                             bool found = false; | 
					
						
							|  |  |  |  |                             for (int i = 0; i < bookmarks.Count; ++i) | 
					
						
							|  |  |  |  |                             { | 
					
						
							|  |  |  |  |                                 BookmarkInfo bookmark = (BookmarkInfo)bookmarks[i]; | 
					
						
							|  |  |  |  |                                 PropertyData data = (PropertyData)property.UserState; | 
					
						
							|  |  |  |  |                                 if (bookmark.BookmarkName == data.BookmarkName) | 
					
						
							|  |  |  |  |                                 { | 
					
						
							|  |  |  |  |                                     // Found it so retry... | 
					
						
							|  |  |  |  |                                     bookmarks.RemoveAt(i); | 
					
						
							|  |  |  |  |                                     channelKey = data.ChannelKey; | 
					
						
							|  |  |  |  |                                     property.ReplayRequest(); | 
					
						
							|  |  |  |  |                                     property.Notification.NotifyInvokeReceived(property.RequestContext.InnerRequestContext); | 
					
						
							|  |  |  |  |                                     found = true; | 
					
						
							|  |  |  |  |                                     break; | 
					
						
							|  |  |  |  |                                 } | 
					
						
							|  |  |  |  |                             } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                             if (!found) | 
					
						
							|  |  |  |  |                             { | 
					
						
							|  |  |  |  |                                 index++; | 
					
						
							|  |  |  |  |                             } | 
					
						
							|  |  |  |  |                             else | 
					
						
							|  |  |  |  |                             { | 
					
						
							|  |  |  |  |                                 properties.RemoveAt(index); | 
					
						
							|  |  |  |  |                                 this.throttle.Release(channelKey); | 
					
						
							|  |  |  |  |                             } | 
					
						
							|  |  |  |  |                         } | 
					
						
							|  |  |  |  |                     } | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                 if (bookmarks.Count == 0) | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     break; | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         public void AbandonBufferedReceives(HashSet<InstanceKey> associatedInstances) | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |             foreach (InstanceKey instanceKey in associatedInstances) | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 lock (this.thisLock) | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     if (this.bufferedProperties.ContainsKey(instanceKey)) | 
					
						
							|  |  |  |  |                     { | 
					
						
							|  |  |  |  |                         foreach (BufferedReceiveMessageProperty property in this.bufferedProperties[instanceKey]) | 
					
						
							|  |  |  |  |                         { | 
					
						
							|  |  |  |  |                             PropertyData data = (PropertyData)property.UserState; | 
					
						
							|  |  |  |  |                             AbandonReceiveContext(data.ReceiveContext); | 
					
						
							|  |  |  |  |                             this.throttle.Release(data.ChannelKey); | 
					
						
							|  |  |  |  |                         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                         this.bufferedProperties.Remove(instanceKey); | 
					
						
							|  |  |  |  |                     } | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         // clean up any remaining buffered receives as part of ServiceHost close. | 
					
						
							|  |  |  |  |         internal void AbandonBufferedReceives() | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |             lock (this.thisLock) | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 foreach (List<BufferedReceiveMessageProperty> value in this.bufferedProperties.Values) | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     foreach (BufferedReceiveMessageProperty property in value) | 
					
						
							|  |  |  |  |                     { | 
					
						
							|  |  |  |  |                         PropertyData data = (PropertyData)property.UserState; | 
					
						
							|  |  |  |  |                         AbandonReceiveContext(data.ReceiveContext); | 
					
						
							|  |  |  |  |                         this.throttle.Release(data.ChannelKey); | 
					
						
							|  |  |  |  |                     } | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  |                 this.bufferedProperties.Clear(); | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         // Best-effort to abandon the receiveContext | 
					
						
							|  |  |  |  |         internal static void AbandonReceiveContext(ReceiveContext receiveContext) | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |             if (receiveContext != null) | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 if (onEndAbandon == null) | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     onEndAbandon = Fx.ThunkCallback(new AsyncCallback(OnEndAbandon)); | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                 try | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     IAsyncResult result = receiveContext.BeginAbandon( | 
					
						
							|  |  |  |  |                         TimeSpan.MaxValue, onEndAbandon, receiveContext); | 
					
						
							|  |  |  |  |                     if (result.CompletedSynchronously) | 
					
						
							|  |  |  |  |                     { | 
					
						
							|  |  |  |  |                         HandleEndAbandon(result); | 
					
						
							|  |  |  |  |                     } | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  |                 catch (Exception exception) | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     if (Fx.IsFatal(exception)) | 
					
						
							|  |  |  |  |                     { | 
					
						
							|  |  |  |  |                         throw; | 
					
						
							|  |  |  |  |                     } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                     // We ---- any Abandon exception - best effort. | 
					
						
							|  |  |  |  |                     FxTrace.Exception.AsWarning(exception); | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         static bool HandleEndAbandon(IAsyncResult result) | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |             ReceiveContext receiveContext = (ReceiveContext)result.AsyncState; | 
					
						
							|  |  |  |  |             receiveContext.EndAbandon(result); | 
					
						
							|  |  |  |  |             return true; | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         static void OnEndAbandon(IAsyncResult result) | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |             if (result.CompletedSynchronously) | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 return; | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             try | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 HandleEndAbandon(result); | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  |             catch (Exception exception) | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 if (Fx.IsFatal(exception)) | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     throw; | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                 // We ---- any Abandon exception - best effort. | 
					
						
							|  |  |  |  |                 FxTrace.Exception.AsWarning(exception); | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         void IExtension<ServiceHostBase>.Attach(ServiceHostBase owner) | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |             if (owner == null) | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 throw FxTrace.Exception.AsError(new ArgumentNullException("owner")); | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             if (Interlocked.CompareExchange(ref this.initialized, 1, 0) != 0) | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 throw FxTrace.Exception.AsError( | 
					
						
							|  |  |  |  |                     new InvalidOperationException(SR.BufferedReceiveBehaviorMultipleUse)); | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             owner.ThrowIfClosedOrOpened(); | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             Fx.Assert(owner is WorkflowServiceHost, "owner must be of WorkflowServiceHost type!"); | 
					
						
							|  |  |  |  |             this.host = (WorkflowServiceHost)owner; | 
					
						
							|  |  |  |  |             Initialize(); | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         void IExtension<ServiceHostBase>.Detach(ServiceHostBase owner) | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         bool UpdateProperty(BufferedReceiveMessageProperty property, ReceiveContext receiveContext, int channelKey, string bookmarkName, BufferedReceiveState state) | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |             // If there's data already there make sure the state is allowed | 
					
						
							|  |  |  |  |             if (property.UserState == null) | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 property.UserState = new PropertyData() | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     ReceiveContext = receiveContext, | 
					
						
							|  |  |  |  |                     ChannelKey = channelKey, | 
					
						
							|  |  |  |  |                     BookmarkName = bookmarkName, | 
					
						
							|  |  |  |  |                     State = state | 
					
						
							|  |  |  |  |                 }; | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  |             else | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 PropertyData data = (PropertyData)property.UserState; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                 // We should not buffer twice at the same state | 
					
						
							|  |  |  |  |                 if (data.State == state) | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     return false; | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                 data.State = state; | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             return true; | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         void Initialize() | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |             this.bufferedProperties = new Dictionary<InstanceKey, List<BufferedReceiveMessageProperty>>(); | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         class PendingMessageThrottle | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |             [Fx.Tag.SynchronizationObject(Blocking = false)] | 
					
						
							|  |  |  |  |             Dictionary<int, ThrottleEntry> pendingMessages; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             int maxPendingMessagesPerChannel; | 
					
						
							|  |  |  |  |             int warningRestoreLimit; | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             public PendingMessageThrottle(int maxPendingMessagesPerChannel) | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 this.maxPendingMessagesPerChannel = maxPendingMessagesPerChannel; | 
					
						
							|  |  |  |  |                 this.warningRestoreLimit = (int)Math.Floor(0.7 * (double)maxPendingMessagesPerChannel); | 
					
						
							|  |  |  |  |                 this.pendingMessages = new Dictionary<int, ThrottleEntry>(); | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             public bool Acquire(int channelKey) | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 lock (this.pendingMessages) | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     if (!this.pendingMessages.ContainsKey(channelKey)) | 
					
						
							|  |  |  |  |                     { | 
					
						
							|  |  |  |  |                         this.pendingMessages.Add(channelKey, new ThrottleEntry()); | 
					
						
							|  |  |  |  |                     } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                     ThrottleEntry entry = this.pendingMessages[channelKey]; | 
					
						
							|  |  |  |  |                     if (entry.Count < this.maxPendingMessagesPerChannel) | 
					
						
							|  |  |  |  |                     { | 
					
						
							|  |  |  |  |                         entry.Count++; | 
					
						
							|  |  |  |  |                         if (TD.PendingMessagesPerChannelRatioIsEnabled()) | 
					
						
							|  |  |  |  |                         { | 
					
						
							|  |  |  |  |                             TD.PendingMessagesPerChannelRatio(entry.Count, this.maxPendingMessagesPerChannel); | 
					
						
							|  |  |  |  |                         } | 
					
						
							|  |  |  |  |                         return true; | 
					
						
							|  |  |  |  |                     } | 
					
						
							|  |  |  |  |                     else | 
					
						
							|  |  |  |  |                     { | 
					
						
							|  |  |  |  |                         if (TD.MaxPendingMessagesPerChannelExceededIsEnabled()) | 
					
						
							|  |  |  |  |                         { | 
					
						
							|  |  |  |  |                             if (!entry.WarningIssued) | 
					
						
							|  |  |  |  |                             { | 
					
						
							|  |  |  |  |                                 TD.MaxPendingMessagesPerChannelExceeded(this.maxPendingMessagesPerChannel); | 
					
						
							|  |  |  |  |                                 entry.WarningIssued = true; | 
					
						
							|  |  |  |  |                             } | 
					
						
							|  |  |  |  |                         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                         return false; | 
					
						
							|  |  |  |  |                     } | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             public void Release(int channelKey) | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 lock (this.pendingMessages) | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     ThrottleEntry entry = this.pendingMessages[channelKey]; | 
					
						
							|  |  |  |  |                     Fx.Assert(entry.Count > 0, "The pending message throttle was released too many times"); | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                     entry.Count--; | 
					
						
							|  |  |  |  |                     if (TD.PendingMessagesPerChannelRatioIsEnabled()) | 
					
						
							|  |  |  |  |                     { | 
					
						
							|  |  |  |  |                         TD.PendingMessagesPerChannelRatio(entry.Count, this.maxPendingMessagesPerChannel); | 
					
						
							|  |  |  |  |                     } | 
					
						
							|  |  |  |  |                     if (entry.Count == 0) | 
					
						
							|  |  |  |  |                     { | 
					
						
							|  |  |  |  |                         this.pendingMessages.Remove(channelKey); | 
					
						
							|  |  |  |  |                     } | 
					
						
							|  |  |  |  |                     else if (entry.Count < this.warningRestoreLimit) | 
					
						
							|  |  |  |  |                     { | 
					
						
							|  |  |  |  |                         entry.WarningIssued = false; | 
					
						
							|  |  |  |  |                     } | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             class ThrottleEntry | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 public ThrottleEntry() | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                 public bool WarningIssued | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     get; | 
					
						
							|  |  |  |  |                     set; | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |                 public int Count | 
					
						
							|  |  |  |  |                 { | 
					
						
							|  |  |  |  |                     get; | 
					
						
							|  |  |  |  |                     set; | 
					
						
							|  |  |  |  |                 } | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         class PropertyData | 
					
						
							|  |  |  |  |         { | 
					
						
							|  |  |  |  |             public PropertyData() | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             public ReceiveContext ReceiveContext | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 get; | 
					
						
							|  |  |  |  |                 set; | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             public int ChannelKey | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 get; | 
					
						
							|  |  |  |  |                 set; | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             public string BookmarkName | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 get; | 
					
						
							|  |  |  |  |                 set; | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             public BufferedReceiveState State | 
					
						
							|  |  |  |  |             { | 
					
						
							|  |  |  |  |                 get; | 
					
						
							|  |  |  |  |                 set; | 
					
						
							|  |  |  |  |             } | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  |     } | 
					
						
							|  |  |  |  | } |