You've already forked linux-packaging-mono
							
							
		
			
				
	
	
		
			1496 lines
		
	
	
		
			62 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
			
		
		
	
	
			1496 lines
		
	
	
		
			62 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
| //----------------------------------------------------------------
 | |
| // Copyright (c) Microsoft Corporation.  All rights reserved.
 | |
| //----------------------------------------------------------------
 | |
| 
 | |
| namespace System.ServiceModel.Activities.Dispatcher
 | |
| {
 | |
|     using System.Activities;
 | |
|     using System.Activities.DurableInstancing;
 | |
|     using System.Activities.Persistence;
 | |
|     using System.Collections.Generic;
 | |
|     using System.ComponentModel;
 | |
|     using System.Linq;
 | |
|     using System.Runtime;
 | |
|     using System.Runtime.DurableInstancing;
 | |
|     using System.ServiceModel.Activities.Description;
 | |
|     using System.ServiceModel.Channels;
 | |
|     using System.ServiceModel.Description;
 | |
|     using System.Threading;
 | |
|     using System.Transactions;
 | |
|     using System.Xml.Linq;
 | |
|     using System.ServiceModel.Activation;
 | |
| 
 | |
|     sealed class DurableInstanceManager
 | |
|     {
 | |
|         static AsyncCallback waitAndHandleStoreEventsCallback = Fx.ThunkCallback(new AsyncCallback(WaitAndHandleStoreEventsCallback));
 | |
| 
 | |
|         int state;
 | |
|         InstanceStore store;
 | |
|         InstanceHandle handle;
 | |
|         InstanceOwner owner;
 | |
|         IDictionary<XName, InstanceValue> instanceOwnerMetadata;
 | |
|         object thisLock;
 | |
|         IDictionary<XName, InstanceValue> instanceMetadataChanges;
 | |
|         AsyncWaitHandle waitForStoreEventsLoop;
 | |
|         WorkflowDefinitionProvider workflowDefinitionProvider;
 | |
| 
 | |
|         internal DurableInstanceManager(WorkflowServiceHost host)
 | |
|         {
 | |
|             DurableInstancingOptions = new DurableInstancingOptions(this);
 | |
|             this.instanceOwnerMetadata = new Dictionary<XName, InstanceValue>();
 | |
|             this.instanceMetadataChanges = new Dictionary<XName, InstanceValue>();
 | |
|             this.thisLock = new object();
 | |
| 
 | |
|             // This is for collision detection.  Will replace with the real service name prior to executing.
 | |
|             InstanceValue sentinel = new InstanceValue(XNamespace.Get("http://tempuri.org").GetName("Sentinel"));
 | |
|             this.instanceOwnerMetadata.Add(WorkflowNamespace.WorkflowHostType, sentinel);
 | |
|             this.instanceMetadataChanges.Add(WorkflowNamespace.WorkflowHostType, sentinel);
 | |
|             this.instanceMetadataChanges.Add(PersistenceMetadataNamespace.InstanceType, new InstanceValue(WorkflowNamespace.WorkflowHostType, InstanceValueOptions.WriteOnly));
 | |
| 
 | |
|             this.Host = host;
 | |
|         }
 | |
| 
 | |
|         WorkflowServiceHost Host { get; set; }
 | |
| 
 | |
|         internal PersistenceProviderDirectory PersistenceProviderDirectory { get; set; }
 | |
| 
 | |
|         public DurableInstancingOptions DurableInstancingOptions { get; private set; }
 | |
| 
 | |
|         public InstanceStore InstanceStore
 | |
|         {
 | |
|             get
 | |
|             {
 | |
|                 return this.store;
 | |
|             }
 | |
|             set
 | |
|             {
 | |
|                 ThrowIfDisposedOrImmutable(this.state);
 | |
|                 this.store = value;
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         public void AddInstanceOwnerValues(IDictionary<XName, object> readWriteValues, IDictionary<XName, object> writeOnlyValues)
 | |
|         {
 | |
|             ThrowIfDisposedOrImmutable(this.state);
 | |
| 
 | |
|             if (readWriteValues != null)
 | |
|             {
 | |
|                 foreach (KeyValuePair<XName, object> property in readWriteValues)
 | |
|                 {
 | |
|                     if (this.instanceOwnerMetadata.ContainsKey(property.Key))
 | |
|                     {
 | |
|                         throw FxTrace.Exception.Argument("readWriteValues", SR.ConflictingValueName(property.Key));
 | |
|                     }
 | |
|                     this.instanceOwnerMetadata.Add(property.Key, new InstanceValue(property.Value));
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             if (writeOnlyValues != null)
 | |
|             {
 | |
|                 foreach (KeyValuePair<XName, object> property in writeOnlyValues)
 | |
|                 {
 | |
|                     if (this.instanceOwnerMetadata.ContainsKey(property.Key))
 | |
|                     {
 | |
|                         throw FxTrace.Exception.Argument("writeOnlyValues", SR.ConflictingValueName(property.Key));
 | |
|                     }
 | |
|                     this.instanceOwnerMetadata.Add(property.Key, new InstanceValue(property.Value,
 | |
|                         InstanceValueOptions.Optional | InstanceValueOptions.WriteOnly));
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         public void AddInitialInstanceValues(IDictionary<XName, object> writeOnlyValues)
 | |
|         {
 | |
|             ThrowIfDisposedOrImmutable(this.state);
 | |
| 
 | |
|             if (writeOnlyValues != null)
 | |
|             {
 | |
|                 foreach (KeyValuePair<XName, object> pair in writeOnlyValues)
 | |
|                 {
 | |
|                     if (this.instanceMetadataChanges.ContainsKey(pair.Key))
 | |
|                     {
 | |
|                         throw FxTrace.Exception.Argument("writeOnlyValues", SR.ConflictingValueName(pair.Key));
 | |
|                     }
 | |
|                     this.instanceMetadataChanges.Add(pair.Key, new InstanceValue(pair.Value, InstanceValueOptions.Optional | InstanceValueOptions.WriteOnly));
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         static void ThrowIfDisposedOrImmutable(int state)
 | |
|         {
 | |
|             if (state == States.Aborted)
 | |
|             {
 | |
|                 throw FxTrace.Exception.AsError(new CommunicationObjectAbortedException(SR.ServiceHostExtensionAborted));
 | |
|             }
 | |
|             if (state == States.Closed)
 | |
|             {
 | |
|                 throw FxTrace.Exception.AsError(new ObjectDisposedException(typeof(DurableInstanceManager).Name));
 | |
|             }
 | |
|             if (state == States.Opened)
 | |
|             {
 | |
|                 throw FxTrace.Exception.AsError(new InvalidOperationException(SR.ServiceHostExtensionImmutable));
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         static void ThrowIfClosedOrAborted(int state)
 | |
|         {
 | |
|             if (state == States.Aborted)
 | |
|             {
 | |
|                 throw FxTrace.Exception.AsError(new CommunicationObjectAbortedException(SR.ServiceHostExtensionAborted));
 | |
|             }
 | |
|             if (state == States.Closed)
 | |
|             {
 | |
|                 throw FxTrace.Exception.AsError(new ObjectDisposedException(typeof(DurableInstanceManager).Name));
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         void InitializePersistenceProviderDirectory()
 | |
|         {   
 | |
|             int maxInstances = ServiceThrottlingBehavior.DefaultMaxConcurrentInstances;
 | |
|             ServiceThrottlingBehavior serviceThrottlingBehavior = Host.Description.Behaviors.Find<ServiceThrottlingBehavior>();
 | |
|             if (serviceThrottlingBehavior != null)
 | |
|             {
 | |
|                 maxInstances = serviceThrottlingBehavior.MaxConcurrentInstances;
 | |
|             }
 | |
| 
 | |
|             if (InstanceStore != null)
 | |
|             {
 | |
|                 PersistenceProviderDirectory = new PersistenceProviderDirectory(InstanceStore, this.owner, this.instanceMetadataChanges, this.workflowDefinitionProvider, Host, DurableConsistencyScope.Global, maxInstances);
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 PersistenceProviderDirectory = new PersistenceProviderDirectory(this.workflowDefinitionProvider, Host, maxInstances);
 | |
|             }
 | |
| 
 | |
|             bool aborted;
 | |
|             lock (this.thisLock)
 | |
|             {
 | |
|                 aborted = this.state == States.Aborted;
 | |
|             }
 | |
| 
 | |
|             if (aborted)
 | |
|             {
 | |
|                 if (this.handle != null)
 | |
|                 {
 | |
|                     this.handle.Free();
 | |
|                 }
 | |
| 
 | |
|                 PersistenceProviderDirectory.Abort();
 | |
|             }
 | |
| 
 | |
|             // Start listening to store event
 | |
|             if (InstanceStore != null && !aborted)
 | |
|             {
 | |
|                 this.waitForStoreEventsLoop = new AsyncWaitHandle(EventResetMode.ManualReset);
 | |
|                 BeginWaitAndHandleStoreEvents(waitAndHandleStoreEventsCallback, this);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         IAsyncResult BeginWaitAndHandleStoreEvents(AsyncCallback callback, object state)
 | |
|         {
 | |
|             return new WaitAndHandleStoreEventsAsyncResult(this, callback, state);
 | |
|         }
 | |
| 
 | |
|         void EndWaitAndHandleStoreEvents(IAsyncResult result)
 | |
|         {
 | |
|             WaitAndHandleStoreEventsAsyncResult.End(result);
 | |
|         }
 | |
| 
 | |
|         static void WaitAndHandleStoreEventsCallback(IAsyncResult result)
 | |
|         {
 | |
|             DurableInstanceManager thisPtr = (DurableInstanceManager)result.AsyncState;
 | |
|             bool stop = false;
 | |
|             try
 | |
|             {
 | |
|                 thisPtr.EndWaitAndHandleStoreEvents(result);
 | |
|             }
 | |
|             catch (OperationCanceledException exception)
 | |
|             {
 | |
|                 FxTrace.Exception.AsWarning(exception);
 | |
| 
 | |
|                 // The OCE, bubbled to this layer, is only from store.BeginWaitForEvents.
 | |
|                 // This indicates handle is freed by 1) normal closing sequence 2) store
 | |
|                 // is dead (eg. lock owner expired).  We will fault the host as well as 
 | |
|                 // cease the loop.
 | |
|                 if (thisPtr.Host.State == CommunicationState.Opening || thisPtr.Host.State == CommunicationState.Opened)
 | |
|                 {
 | |
|                     thisPtr.Host.Fault(exception);
 | |
|                 }
 | |
|                 stop = true;
 | |
|             }
 | |
|             catch (Exception exception)
 | |
|             {
 | |
|                 if (Fx.IsFatal(exception) || !thisPtr.HandleException(exception))
 | |
|                 {
 | |
|                     throw;
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             // Continue
 | |
|             if (!stop && thisPtr.state == States.Opened)
 | |
|             {
 | |
|                 thisPtr.BeginWaitAndHandleStoreEvents(waitAndHandleStoreEventsCallback, thisPtr);
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 thisPtr.waitForStoreEventsLoop.Set();
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         bool HandleException(Exception exception)
 | |
|         {
 | |
|             if (exception is TimeoutException ||
 | |
|                 exception is OperationCanceledException ||
 | |
|                 exception is TransactionException ||
 | |
|                 exception is CommunicationObjectAbortedException ||
 | |
|                 // When abort raised by WorkflowServiceInstance
 | |
|                 exception is FaultException ||
 | |
|                 exception is InstancePersistenceException)
 | |
|             {
 | |
|                 FxTrace.Exception.AsWarning(exception);
 | |
|                 this.Host.FaultServiceHostIfNecessary(exception);
 | |
|                 return true;
 | |
|             }
 | |
|             return false;
 | |
|         }
 | |
| 
 | |
|         void CheckPersistenceProviderBehavior()
 | |
|         {
 | |
|             foreach (IServiceBehavior behavior in Host.Description.Behaviors)
 | |
|             {
 | |
|                 if (behavior.GetType().FullName == "System.ServiceModel.Description.PersistenceProviderBehavior")
 | |
|                 {
 | |
|                     throw FxTrace.Exception.AsError(new CommunicationException(SR.UseInstanceStoreInsteadOfPersistenceProvider));
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         internal IAsyncResult BeginGetInstance(InstanceKey instanceKey, ICollection<InstanceKey> additionalKeys, WorkflowGetInstanceContext parameters, TimeSpan timeout, AsyncCallback callback, object state)
 | |
|         {
 | |
|             ThrowIfClosedOrAborted(this.state);
 | |
|             return new GetInstanceAsyncResult(this, instanceKey, additionalKeys, parameters, timeout, callback, state);
 | |
|         }
 | |
| 
 | |
|         internal IAsyncResult BeginGetInstance(Guid instanceId, WorkflowGetInstanceContext parameters,
 | |
|             WorkflowIdentityKey updatedIdentity, TimeSpan timeout, AsyncCallback callback, object state)
 | |
|         {
 | |
|             ThrowIfClosedOrAborted(this.state);
 | |
|             return new GetInstanceAsyncResult(this, instanceId, parameters, updatedIdentity, timeout, callback, state);
 | |
|         }
 | |
| 
 | |
|         internal WorkflowServiceInstance EndGetInstance(IAsyncResult result)
 | |
|         {
 | |
|             return GetInstanceAsyncResult.End(result);
 | |
|         }
 | |
| 
 | |
|         void AbortDirectory()
 | |
|         {
 | |
|             lock (this.thisLock)
 | |
|             {
 | |
|                 if (this.state == States.Aborted)
 | |
|                 {
 | |
|                     return;
 | |
|                 }
 | |
|                 this.state = States.Aborted;
 | |
|             }
 | |
| 
 | |
|             if (this.handle != null)
 | |
|             {
 | |
|                 this.handle.Free();
 | |
|             }
 | |
| 
 | |
|             // PersistenceProviderDirectory is assigned on opened.  Abort could happen before (eg. after created)
 | |
|             if (PersistenceProviderDirectory != null)
 | |
|             {
 | |
|                 PersistenceProviderDirectory.Abort();
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         void SetDefaultOwnerMetadata()
 | |
|         {
 | |
|             // Replace the sentinal value with the real scoping name here.
 | |
|             this.instanceOwnerMetadata[WorkflowNamespace.WorkflowHostType] = new InstanceValue(Host.DurableInstancingOptions.ScopeName);
 | |
|             this.instanceMetadataChanges[WorkflowNamespace.WorkflowHostType] = new InstanceValue(Host.DurableInstancingOptions.ScopeName);
 | |
| 
 | |
|             this.workflowDefinitionProvider.GetDefinitionIdentityMetadata(this.instanceOwnerMetadata);
 | |
| 
 | |
|             if (!this.instanceMetadataChanges.ContainsKey(WorkflowServiceNamespace.Service))
 | |
|             {
 | |
|                 this.instanceMetadataChanges[WorkflowServiceNamespace.Service] = new InstanceValue(Host.ServiceName, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional);
 | |
|             }
 | |
| 
 | |
|             // add instance metadata about all of our endpoints
 | |
|             foreach (ServiceEndpoint endpoint in this.Host.Description.Endpoints)
 | |
|             {
 | |
|                 if (endpoint.Name != null)
 | |
|                 {
 | |
|                     // treat the control endpoint as special
 | |
|                     if (endpoint is WorkflowControlEndpoint)
 | |
|                     {
 | |
|                         if (!this.instanceOwnerMetadata.ContainsKey(WorkflowServiceNamespace.ControlEndpoint))
 | |
|                         {
 | |
|                             this.instanceOwnerMetadata.Add(WorkflowServiceNamespace.ControlEndpoint, new InstanceValue(endpoint.ListenUri));
 | |
|                         }
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         XName endpointName = WorkflowServiceNamespace.EndpointsPath.GetName(endpoint.Name);
 | |
|                         if (!this.instanceOwnerMetadata.ContainsKey(endpointName))
 | |
|                         {
 | |
|                             this.instanceOwnerMetadata.Add(endpointName, new InstanceValue(endpoint.ListenUri));
 | |
|                         }
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             // as well as additional metadata if we're hosted
 | |
|             VirtualPathExtension virtualPathExtension = this.Host.Extensions.Find<VirtualPathExtension>();
 | |
|             if (virtualPathExtension != null && !this.instanceMetadataChanges.ContainsKey(PersistenceMetadataNamespace.ActivationType))
 | |
|             {
 | |
|                 // Example values for various web-host properties
 | |
|                 // SiteName: "Default Website"
 | |
|                 // RelativeApplicationPath/ApplicationVirtualPath: "/myApp1"
 | |
|                 // Virtual Path: "~/ShoppingCartService/ShoppingCartService.xaml"
 | |
|                 // Relative Service Path: "/myApp1/ShoppingCartService/ShoppingCartService.xaml"
 | |
|                 this.instanceMetadataChanges.Add(PersistenceMetadataNamespace.ActivationType, new InstanceValue(PersistenceMetadataNamespace.ActivationTypes.WAS, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional));
 | |
| 
 | |
|                 string siteName = this.Host.OverrideSiteName ? this.Host.Description.Name : virtualPathExtension.SiteName;
 | |
|                 
 | |
|                 // The remaining properties will get overritten if the user set them manually.  To control activation, the user should also set ActivationType, even if just to WAS.
 | |
|                 this.instanceMetadataChanges[WorkflowServiceNamespace.SiteName] = new InstanceValue(siteName, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional);
 | |
|                 this.instanceMetadataChanges[WorkflowServiceNamespace.RelativeApplicationPath] = new InstanceValue(virtualPathExtension.ApplicationVirtualPath, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional);
 | |
| 
 | |
|                 string virtualPath = virtualPathExtension.VirtualPath.Substring(1);
 | |
|                 string relativePath = ("/" == virtualPathExtension.ApplicationVirtualPath) ? virtualPath : virtualPathExtension.ApplicationVirtualPath + virtualPath;
 | |
|                 
 | |
|                 this.instanceMetadataChanges[WorkflowServiceNamespace.RelativeServicePath] = new InstanceValue(relativePath, InstanceValueOptions.WriteOnly | InstanceValueOptions.Optional);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         public void Open(TimeSpan timeout)
 | |
|         {
 | |
|             Fx.Assert(Host != null, "Extension should have been attached in WorkflowServiceHost constructor.");
 | |
| 
 | |
|             lock (this.thisLock)
 | |
|             {
 | |
|                 ThrowIfDisposedOrImmutable(this.state);
 | |
|                 this.state = States.Opened;
 | |
|             }
 | |
|             InitializeDefinitionProvider();
 | |
| 
 | |
|             CheckPersistenceProviderBehavior();
 | |
| 
 | |
|             SetDefaultOwnerMetadata();
 | |
| 
 | |
| 
 | |
|             if (InstanceStore != null)
 | |
|             {
 | |
|                 using (new TransactionScope(TransactionScopeOption.Suppress))
 | |
|                 {
 | |
|                     TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
 | |
|                     InstanceHandle handle = null;
 | |
|                     try
 | |
|                     {
 | |
|                         handle = InstanceStore.CreateInstanceHandle(null);
 | |
|                         this.owner = InstanceStore.Execute(handle, GetCreateOwnerCommand(), timeoutHelper.RemainingTime()).InstanceOwner;
 | |
|                         this.handle = handle;
 | |
|                         handle = null;
 | |
|                     }
 | |
|                     catch (InstancePersistenceException exception)
 | |
|                     {
 | |
|                         throw FxTrace.Exception.AsError(new CommunicationException(SR.UnableToOpenAndRegisterStore, exception));
 | |
|                     }
 | |
|                     finally
 | |
|                     {
 | |
|                         if (handle != null)
 | |
|                         {
 | |
|                             handle.Free();
 | |
|                         }
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             InitializePersistenceProviderDirectory();
 | |
|         }
 | |
| 
 | |
|         void InitializeDefinitionProvider()
 | |
|         {
 | |
|             WorkflowServiceBehavior workflowServiceBehavior = Host.Description.Behaviors.Find<WorkflowServiceBehavior>();
 | |
|             Fx.Assert(workflowServiceBehavior != null && workflowServiceBehavior.WorkflowDefinitionProvider != null,
 | |
|                 "WorkflowServiceBehavior must be present on WorkflowServiceHost and WorkflowDefinitionProvider must be present on WorkflowServiceBehavior.");
 | |
| 
 | |
|             this.workflowDefinitionProvider = workflowServiceBehavior.WorkflowDefinitionProvider;
 | |
|         }
 | |
| 
 | |
|         public IAsyncResult BeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
 | |
|         {
 | |
|             Fx.Assert(Host != null, "Extension should have been attached in WorkflowServiceHost constructor.");
 | |
| 
 | |
|             using (new TransactionScope(TransactionScopeOption.Suppress))
 | |
|             {
 | |
|                 return new OpenInstanceStoreAsyncResult(this, timeout, callback, state);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         public void EndOpen(IAsyncResult result)
 | |
|         {
 | |
|             OpenInstanceStoreAsyncResult.End(result);
 | |
|         }
 | |
| 
 | |
|         public void Close(TimeSpan timeout)
 | |
|         {
 | |
|             // We normally would have a purely synchronous path for our synchronous
 | |
|             // overload, but PersistenceIOParticipant.OnBeginSave() doesn't have a synchronous counterpart.
 | |
|             // Given that, at the very least we'd have to do PersistencePipeline.EndSave(PersistencePipeline.BeginSave).
 | |
|             // Therefore we resign ourselves to End(Begin) and take comfort in the unification of logic by not having two codepaths
 | |
|             CloseAsyncResult.End(new CloseAsyncResult(this, timeout, null, null));
 | |
|         }
 | |
| 
 | |
|         public IAsyncResult BeginClose(TimeSpan timeout, AsyncCallback callback, object state)
 | |
|         {
 | |
|             return new CloseAsyncResult(this, timeout, callback, state);
 | |
|         }
 | |
| 
 | |
|         public void EndClose(IAsyncResult result)
 | |
|         {
 | |
|             CloseAsyncResult.End(result);
 | |
|         }
 | |
| 
 | |
|         public void Abort()
 | |
|         {
 | |
|             AbortDirectory();
 | |
|         }
 | |
| 
 | |
|         InstancePersistenceCommand GetCreateOwnerCommand()
 | |
|         {
 | |
|             InstancePersistenceCommand command;
 | |
|             IDictionary<XName, InstanceValue> commandMetadata;
 | |
|             if (this.instanceOwnerMetadata.ContainsKey(Workflow45Namespace.DefinitionIdentities))
 | |
|             {
 | |
|                 CreateWorkflowOwnerWithIdentityCommand withIdentity = new CreateWorkflowOwnerWithIdentityCommand();
 | |
|                 command = withIdentity;
 | |
|                 commandMetadata = withIdentity.InstanceOwnerMetadata;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 CreateWorkflowOwnerCommand withoutIdentity = new CreateWorkflowOwnerCommand();
 | |
|                 command = withoutIdentity;
 | |
|                 commandMetadata = withoutIdentity.InstanceOwnerMetadata;
 | |
|             }
 | |
| 
 | |
|             foreach (KeyValuePair<XName, InstanceValue> metadata in this.instanceOwnerMetadata)
 | |
|             {
 | |
|                 commandMetadata.Add(metadata);
 | |
|             }
 | |
| 
 | |
|             return command;
 | |
|         }
 | |
| 
 | |
|         static class States
 | |
|         {
 | |
|             public const int Created = 0;
 | |
|             public const int Opened = 1;
 | |
|             public const int Closed = 2;
 | |
|             public const int Aborted = 3;
 | |
|         }
 | |
| 
 | |
|         class OpenInstanceStoreAsyncResult : AsyncResult
 | |
|         {
 | |
|             static AsyncCompletion handleEndExecute = new AsyncCompletion(HandleEndExecute);
 | |
|             static Action<AsyncResult, Exception> onFinally = new Action<AsyncResult, Exception>(OnFinally);
 | |
| 
 | |
|             DurableInstanceManager instanceManager;
 | |
|             TimeoutHelper timeoutHelper;
 | |
|             InstanceHandle handle;
 | |
| 
 | |
|             public OpenInstanceStoreAsyncResult(DurableInstanceManager instanceManager, TimeSpan timeout, AsyncCallback callback, object state)
 | |
|                 : base(callback, state)
 | |
|             {
 | |
|                 this.instanceManager = instanceManager;
 | |
|                 this.timeoutHelper = new TimeoutHelper(timeout);
 | |
| 
 | |
|                 lock (this.instanceManager.thisLock)
 | |
|                 {
 | |
|                     DurableInstanceManager.ThrowIfDisposedOrImmutable(this.instanceManager.state);
 | |
|                     this.instanceManager.state = States.Opened;
 | |
|                 }
 | |
| 
 | |
|                 this.instanceManager.InitializeDefinitionProvider();
 | |
| 
 | |
|                 instanceManager.CheckPersistenceProviderBehavior();
 | |
| 
 | |
|                 this.instanceManager.SetDefaultOwnerMetadata();
 | |
| 
 | |
|                 this.OnCompleting = OpenInstanceStoreAsyncResult.onFinally;
 | |
| 
 | |
|                 bool completeSelf;
 | |
|                 Exception completionException = null;
 | |
|                 try
 | |
|                 {
 | |
|                     if (instanceManager.InstanceStore == null)
 | |
|                     {
 | |
|                         completeSelf = CreateDirectory();
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         this.handle = this.instanceManager.InstanceStore.CreateInstanceHandle(null);
 | |
|                         IAsyncResult executeResult = this.instanceManager.InstanceStore.BeginExecute(this.handle,
 | |
|                             this.instanceManager.GetCreateOwnerCommand(), this.timeoutHelper.RemainingTime(),
 | |
|                             this.PrepareAsyncCompletion(OpenInstanceStoreAsyncResult.handleEndExecute), this);
 | |
|                         completeSelf = SyncContinue(executeResult);
 | |
|                     }
 | |
|                 }
 | |
|                 catch (Exception exception)
 | |
|                 {
 | |
|                     if (Fx.IsFatal(exception))
 | |
|                     {
 | |
|                         throw;
 | |
|                     }
 | |
| 
 | |
|                     completionException = exception;
 | |
|                     completeSelf = true;
 | |
|                 }
 | |
|                 if (completeSelf)
 | |
|                 {
 | |
|                     Complete(true, completionException);
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             static bool HandleEndExecute(IAsyncResult result)
 | |
|             {
 | |
|                 OpenInstanceStoreAsyncResult thisPtr = (OpenInstanceStoreAsyncResult)result.AsyncState;
 | |
| 
 | |
|                 thisPtr.instanceManager.owner = thisPtr.instanceManager.InstanceStore.EndExecute(result).InstanceOwner;
 | |
| 
 | |
|                 return thisPtr.CreateDirectory();
 | |
|             }
 | |
| 
 | |
|             static void OnFinally(AsyncResult result, Exception exception)
 | |
|             {
 | |
|                 if (exception != null)
 | |
|                 {
 | |
|                     try
 | |
|                     {
 | |
|                         if (exception is InstancePersistenceException)
 | |
|                         {
 | |
|                             throw FxTrace.Exception.AsError(new CommunicationException(SR.UnableToOpenAndRegisterStore, exception));
 | |
|                         }
 | |
|                     }
 | |
|                     finally
 | |
|                     {
 | |
|                         OpenInstanceStoreAsyncResult thisPtr = (OpenInstanceStoreAsyncResult)result;
 | |
|                         if (thisPtr.handle != null)
 | |
|                         {
 | |
|                             thisPtr.handle.Free();
 | |
|                         }
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             public static void End(IAsyncResult result)
 | |
|             {
 | |
|                 AsyncResult.End<OpenInstanceStoreAsyncResult>(result);
 | |
|             }
 | |
| 
 | |
|             bool CreateDirectory()
 | |
|             {
 | |
|                 this.instanceManager.InitializePersistenceProviderDirectory();
 | |
|                 this.instanceManager.handle = this.handle;
 | |
|                 this.handle = null;
 | |
|                 return true;
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         class CloseAsyncResult : AsyncResult
 | |
|         {
 | |
|             static AsyncCallback handleEndReleaseInstanceWrapperCallback = Fx.ThunkCallback(new AsyncCallback(HandleEndReleaseInstanceWrapperCallback));
 | |
|             static AsyncCompletion handleEndExecute = new AsyncCompletion(HandleEndExecute);
 | |
|             static Action<object, TimeoutException> handleWaitForStoreEvents = new Action<object, TimeoutException>(HandleWaitForStoreEvents);
 | |
|             static int outstandingUnloadCapacity = 10;
 | |
| 
 | |
|             TimeoutHelper timeoutHelper;
 | |
|             DurableInstanceManager instanceManager;
 | |
|             IEnumerator<PersistenceContext> workflowServiceInstances;
 | |
|             int instanceCount;
 | |
|             InstanceHandle handle;
 | |
| 
 | |
|             object instanceQueueLock;
 | |
|             int completedUnloadCount;
 | |
|             bool allReleaseInstancesCompletedSynchronously;
 | |
| 
 | |
|             public CloseAsyncResult(DurableInstanceManager instanceManager, TimeSpan timeout, AsyncCallback callback, object state)
 | |
|                 : base(callback, state)
 | |
|             {
 | |
|                 this.instanceManager = instanceManager;
 | |
|                 this.timeoutHelper = new TimeoutHelper(timeout);
 | |
|                 this.instanceQueueLock = new object();
 | |
|                 this.allReleaseInstancesCompletedSynchronously = true;
 | |
| 
 | |
|                 if (this.instanceManager.state == States.Opened && this.instanceManager.handle != null)
 | |
|                 {
 | |
|                     // Note: since we change state before actual openning, this may 
 | |
|                     // get NullRef (---- already exists in other places) if Close 
 | |
|                     // is called on an unsuccessful or incompleted opened DIM.  
 | |
|                     // Assuming it is a non supported scenario.  
 | |
|                     this.instanceManager.handle.Free();
 | |
|                     if (WaitForStoreEventsLoop())
 | |
|                     {
 | |
|                         Complete(true);
 | |
|                     }
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     if (PerformClose())
 | |
|                     {
 | |
|                         Complete(true);
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             bool PerformClose()
 | |
|             {
 | |
|                 bool closed;
 | |
|                 bool opened;
 | |
|                 bool aborted;
 | |
| 
 | |
|                 lock (this.instanceManager.thisLock)
 | |
|                 {
 | |
|                     closed = this.instanceManager.state == States.Closed;
 | |
|                     opened = this.instanceManager.state == States.Opened;
 | |
|                     aborted = this.instanceManager.state == States.Aborted;
 | |
|                     if (opened)
 | |
|                     {
 | |
|                         this.instanceManager.state = States.Closed;
 | |
|                     }
 | |
|                 }
 | |
| 
 | |
|                 if (closed)
 | |
|                 {
 | |
|                     return true;
 | |
|                 }
 | |
|                 if (!opened)
 | |
|                 {
 | |
|                     if (!aborted)
 | |
|                     {
 | |
|                         this.instanceManager.AbortDirectory();
 | |
|                     }
 | |
| 
 | |
|                     // We cannot throw here if the DurableInstanceManager is already aborted since service host could 
 | |
|                     // be aborted due to a timeout exception. Simply return here
 | |
|                     return true;
 | |
|                 }
 | |
| 
 | |
|                 IEnumerable<PersistenceContext> contexts = this.instanceManager.PersistenceProviderDirectory.GetContexts();
 | |
|                 this.instanceCount = contexts.Count<PersistenceContext>();
 | |
|                 this.workflowServiceInstances = contexts.GetEnumerator();
 | |
|                 // We only call StartProcess if we actually have instances to release.
 | |
|                 if (this.instanceCount > 0)
 | |
|                 {
 | |
|                     StartProcess();
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     // No instances to release. Do the post processing.
 | |
|                     return PostProcess();
 | |
|                 }
 | |
| 
 | |
|                 return false;
 | |
|             }
 | |
| 
 | |
|             bool WaitForStoreEventsLoop()
 | |
|             {
 | |
|                 // Event never get initialized, meaning we have not started the WaitForStoreEvents loop
 | |
|                 if (this.instanceManager.waitForStoreEventsLoop == null
 | |
|                     || this.instanceManager.waitForStoreEventsLoop.WaitAsync(handleWaitForStoreEvents, this, this.timeoutHelper.RemainingTime()))
 | |
|                 {
 | |
|                     return PerformClose();
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     return false;
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             static void HandleWaitForStoreEvents(object state, TimeoutException exception)
 | |
|             {
 | |
|                 CloseAsyncResult thisPtr = (CloseAsyncResult)state;
 | |
|                 if (exception != null)
 | |
|                 {
 | |
|                     thisPtr.Complete(false, exception);
 | |
|                     return;
 | |
|                 }
 | |
| 
 | |
|                 bool completeSelf = false;
 | |
|                 Exception completionException = null;
 | |
| 
 | |
|                 try
 | |
|                 {
 | |
|                     completeSelf = thisPtr.PerformClose();
 | |
|                 }
 | |
|                 catch (Exception e)
 | |
|                 {
 | |
|                     if (Fx.IsFatal(e))
 | |
|                     {
 | |
|                         throw;
 | |
|                     }
 | |
|                     completionException = exception;
 | |
|                     completeSelf = true;
 | |
|                 }
 | |
| 
 | |
|                 if (completeSelf)
 | |
|                 {
 | |
|                     thisPtr.Complete(false, completionException);
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             void StartProcess()
 | |
|             {
 | |
|                 for (int i = 0; i < outstandingUnloadCapacity; i++)
 | |
|                 {
 | |
|                     if (!Process())
 | |
|                     {
 | |
|                         break;
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             bool Process()
 | |
|             {
 | |
|                 bool shouldContinueProcess;
 | |
|                 WorkflowServiceInstance currentInstance = null;
 | |
| 
 | |
|                 lock (this.instanceQueueLock)
 | |
|                 {
 | |
|                     if (this.workflowServiceInstances.MoveNext())
 | |
|                     {
 | |
|                         currentInstance = this.workflowServiceInstances.Current.GetInstance(null);
 | |
|                         shouldContinueProcess = true;
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         shouldContinueProcess = false;
 | |
|                     }
 | |
|                 }
 | |
| 
 | |
|                 if (shouldContinueProcess)
 | |
|                 {
 | |
|                     if (currentInstance != null)
 | |
|                     {
 | |
|                         try
 | |
|                         {
 | |
|                             // Our own wrapper callback will invoke the inner callback even when result is completed synchronously
 | |
|                             IAsyncResult result = currentInstance.BeginReleaseInstance(
 | |
|                                 false,
 | |
|                                 this.timeoutHelper.RemainingTime(),
 | |
|                                 CloseAsyncResult.handleEndReleaseInstanceWrapperCallback,
 | |
|                                 this);
 | |
|                         }
 | |
|                         catch (Exception e)
 | |
|                         {
 | |
|                             if (Fx.IsFatal(e))
 | |
|                             {
 | |
|                                 throw;
 | |
|                             }
 | |
| 
 | |
|                             // Ignore exception thrown from BeginReleaseInstance.
 | |
|                             // We do not complete CloseAsyncResult with this exception.
 | |
|                             // Instead, we want to keep this thread running so that it can clean up other instances.
 | |
|                             FxTrace.Exception.AsWarning(e);
 | |
|                         }
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         if (Interlocked.Increment(ref this.completedUnloadCount) == this.instanceCount)
 | |
|                         {
 | |
|                             // We are done with the instances, so do post-processing. If that completes
 | |
|                             // synchronously, we need to call Complete. We completed synchronously if all
 | |
|                             // of the ReleaseInstance invocations completed synchronously.
 | |
|                             // The return value from this method only indicates
 | |
|                             // if there are more instances to deal with, not if we are Complete.
 | |
|                             if (PostProcess())
 | |
|                             {
 | |
|                                 Complete(this.allReleaseInstancesCompletedSynchronously);
 | |
|                             }
 | |
|                         }
 | |
|                     }
 | |
|                 }
 | |
| 
 | |
|                 return shouldContinueProcess;
 | |
|             }
 | |
| 
 | |
|             bool PostProcess()
 | |
|             {
 | |
|                 //cleanup any buffered receives unassociated with workflowServiceInstances
 | |
|                 BufferedReceiveManager bufferedReceiveManager = this.instanceManager.Host.Extensions.Find<BufferedReceiveManager>();
 | |
|                 if (bufferedReceiveManager != null)
 | |
|                 {
 | |
|                     bufferedReceiveManager.AbandonBufferedReceives();
 | |
|                 }
 | |
| 
 | |
|                 // Send the DeleteWorkflowOwner command to the instance store.
 | |
|                 if (this.instanceManager.InstanceStore != null)
 | |
|                 {
 | |
|                     IAsyncResult executeResult = null;
 | |
|                     this.handle = this.instanceManager.InstanceStore.CreateInstanceHandle(this.instanceManager.owner);
 | |
|                     try
 | |
|                     {
 | |
|                         executeResult = this.instanceManager.InstanceStore.BeginExecute(this.handle,
 | |
|                             new DeleteWorkflowOwnerCommand(), this.timeoutHelper.RemainingTime(),
 | |
|                             this.PrepareAsyncCompletion(CloseAsyncResult.handleEndExecute), this);
 | |
|                         return (SyncContinue(executeResult));
 | |
|                     }
 | |
|                     // Ignore some exceptions because DeleteWorkflowOwner is best effort.
 | |
|                     catch (InstancePersistenceCommandException) { }
 | |
|                     catch (InstanceOwnerException) { }
 | |
|                     catch (OperationCanceledException) { }
 | |
|                     finally
 | |
|                     {
 | |
|                         if (executeResult == null)
 | |
|                         {
 | |
|                             this.handle.Free();
 | |
|                             this.handle = null;
 | |
|                         }
 | |
|                     }
 | |
|                     return this.SyncContinue(executeResult);
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     CloseProviderDirectory();
 | |
|                     return true;
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             static void HandleEndReleaseInstance(IAsyncResult result)
 | |
|             {
 | |
|                 CloseAsyncResult thisPtr = (CloseAsyncResult)result.AsyncState;
 | |
|                 thisPtr.allReleaseInstancesCompletedSynchronously = thisPtr.allReleaseInstancesCompletedSynchronously && result.CompletedSynchronously;
 | |
|                 try
 | |
|                 {
 | |
|                     WorkflowServiceInstance.EndReleaseInstanceForClose(result);
 | |
|                 }
 | |
|                 catch (Exception e)
 | |
|                 {
 | |
|                     if (Fx.IsFatal(e))
 | |
|                     {
 | |
|                         throw;
 | |
|                     }
 | |
| 
 | |
|                     // Ignore exception thrown from ReleaseInstanceAsyncResult.End.
 | |
|                     // We do not complete CloseAsyncResult with this exception.
 | |
|                     // Instead, we want to keep this thread running so that it can clean up other instances.
 | |
|                     FxTrace.Exception.AsWarning(e);
 | |
|                 }
 | |
| 
 | |
|                 if (Interlocked.Increment(ref thisPtr.completedUnloadCount) == thisPtr.instanceCount)
 | |
|                 {
 | |
|                     if (thisPtr.PostProcess())
 | |
|                     {
 | |
|                         // If PostProcess completed synchronously, then the entire CloseAsyncResult is complete.
 | |
|                         // Whether or not we completed syncrhonously depends on if all the ReleaseInstance invocations completed
 | |
|                         // synchronously.
 | |
|                         thisPtr.Complete(thisPtr.allReleaseInstancesCompletedSynchronously);
 | |
|                     }
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     thisPtr.Process();
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             void CloseProviderDirectory()
 | |
|             {
 | |
|                 bool success = false;
 | |
|                 try
 | |
|                 {
 | |
|                     this.instanceManager.PersistenceProviderDirectory.Close();
 | |
|                     success = true;
 | |
|                 }
 | |
|                 finally
 | |
|                 {
 | |
|                     if (!success)
 | |
|                     {
 | |
|                         this.instanceManager.AbortDirectory();
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             static bool HandleEndExecute(IAsyncResult result)
 | |
|             {
 | |
|                 CloseAsyncResult thisPtr = (CloseAsyncResult)result.AsyncState;
 | |
| 
 | |
|                 try
 | |
|                 {
 | |
|                     thisPtr.instanceManager.owner = thisPtr.instanceManager.InstanceStore.EndExecute(result).InstanceOwner;
 | |
|                 }
 | |
|                 // Ignore some exceptions because DeleteWorkflowOwner is best effort.
 | |
|                 catch (InstancePersistenceCommandException) { }
 | |
|                 catch (InstanceOwnerException) { }
 | |
|                 catch (OperationCanceledException) { }
 | |
|                 finally
 | |
|                 {
 | |
|                     thisPtr.handle.Free();
 | |
|                     thisPtr.handle = null;
 | |
|                 }
 | |
| 
 | |
|                 thisPtr.CloseProviderDirectory();
 | |
|                 return true;
 | |
|             }
 | |
| 
 | |
|             public static void End(IAsyncResult result)
 | |
|             {
 | |
|                 AsyncResult.End<CloseAsyncResult>(result);
 | |
|             }
 | |
| 
 | |
|             static void HandleEndReleaseInstanceWrapperCallback(IAsyncResult result)
 | |
|             {
 | |
|                 Fx.Assert(result != null, "Async result cannot be null!");
 | |
| 
 | |
|                 CloseAsyncResult thisPtr = (CloseAsyncResult)result.AsyncState;
 | |
| 
 | |
|                 Exception completionException = null;
 | |
|                 try
 | |
|                 {
 | |
|                     HandleEndReleaseInstance(result);
 | |
|                 }
 | |
|                 catch (Exception e)
 | |
|                 {
 | |
|                     if (Fx.IsFatal(e))
 | |
|                     {
 | |
|                         throw;
 | |
|                     }
 | |
| 
 | |
|                     completionException = e;
 | |
|                 }
 | |
| 
 | |
|                 // Exceptions thrown from Process and process callback should be handled in those methods respectively.
 | |
|                 // The only exception that can get here should be exception thrown from PostProcess.
 | |
|                 // PostProcess is guaranteed to be called only once.
 | |
|                 if (completionException != null)
 | |
|                 {
 | |
|                     thisPtr.Complete(false, completionException);
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         // Need to ensure that any failure in the methods of GetInstanceAsyncResult after a WorkflowServiceInstance has been acquired
 | |
|         // results in one of three outcomes, namely :
 | |
|         // - the WorkflowServiceInstance is set to null
 | |
|         // - the WorkflowServiceInstance is aborted
 | |
|         // - ReleaseReference is called on the WorkflowServiceInstance to ensure that unload happens 
 | |
|         //   (ultimately resulting in the WorkflowServiceInstance being aborted)
 | |
|         // This is to prevent leaking WorkflowServiceInstances since nothing else has a handle to the WorkflowServiceInstance in those
 | |
|         // scenarios.
 | |
| 
 | |
|         class GetInstanceAsyncResult : TransactedAsyncResult
 | |
|         {
 | |
|             static AsyncCompletion handleEndAcquireReference = new AsyncCompletion(HandleEndAcquireReference);
 | |
|             static AsyncCompletion handleEndLoad = new AsyncCompletion(HandleEndLoad);
 | |
|             static AsyncCompletion handleAssociateInfrastructureKeys = new AsyncCompletion(HandleAssociateInfrastructureKeys);
 | |
|             static AsyncCompletion handleCommit = new AsyncCompletion(HandleCommit);
 | |
|             static AsyncCompletion handleEndEnlistContext = new AsyncCompletion(HandleEndEnlistContext);
 | |
|             static Action<AsyncResult, Exception> onCompleting = new Action<AsyncResult, Exception>(Finally);
 | |
| 
 | |
|             DurableInstanceManager instanceManager;
 | |
|             Guid instanceId;
 | |
|             InstanceKey instanceKey;
 | |
|             ICollection<InstanceKey> additionalKeys;
 | |
|             TimeSpan timeout;
 | |
|             WorkflowServiceInstance durableInstance;
 | |
|             bool referenceAcquired;
 | |
|             PersistenceContext persistenceContext;
 | |
|             WorkflowGetInstanceContext parameters;
 | |
|             DependentTransaction transaction;
 | |
|             CommittableTransaction committableTransaction;
 | |
|             bool loadAny;
 | |
|             WorkflowIdentityKey updatedIdentity;
 | |
| 
 | |
|             public GetInstanceAsyncResult(DurableInstanceManager instanceManager, InstanceKey instanceKey, ICollection<InstanceKey> additionalKeys, WorkflowGetInstanceContext parameters,
 | |
|                 TimeSpan timeout, AsyncCallback callback, object state)
 | |
|                 : this(instanceManager, parameters, timeout, callback, state)
 | |
|             {
 | |
|                 Fx.Assert(instanceKey != null, "Instance key must be set.");
 | |
| 
 | |
|                 this.instanceKey = instanceKey;
 | |
|                 this.additionalKeys = additionalKeys;
 | |
| 
 | |
|                 if (this.GetInstance())
 | |
|                 {
 | |
|                     this.Complete(true);
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             public GetInstanceAsyncResult(DurableInstanceManager instanceManager, Guid instanceId, WorkflowGetInstanceContext parameters, WorkflowIdentityKey updatedIdentity,
 | |
|                 TimeSpan timeout, AsyncCallback callback, object state)
 | |
|                 : this(instanceManager, parameters, timeout, callback, state)
 | |
|             {
 | |
|                 this.instanceId = instanceId;
 | |
|                 this.updatedIdentity = updatedIdentity;
 | |
| 
 | |
|                 if (this.GetInstance())
 | |
|                 {
 | |
|                     this.Complete(true);
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             GetInstanceAsyncResult(DurableInstanceManager instanceManager, WorkflowGetInstanceContext parameters,
 | |
|                 TimeSpan timeout, AsyncCallback callback, object state)
 | |
|                 : base(callback, state)
 | |
|             {
 | |
|                 this.instanceManager = instanceManager;
 | |
|                 this.parameters = parameters;
 | |
|                 this.timeout = timeout;
 | |
|                 this.loadAny = parameters == null;
 | |
|                 this.OnCompleting = onCompleting;
 | |
| 
 | |
|                 Transaction currentTransaction = Transaction.Current;
 | |
|                 if (currentTransaction == null && this.instanceManager.Host.IsLoadTransactionRequired)
 | |
|                 {
 | |
|                     this.committableTransaction = new CommittableTransaction(this.timeout);
 | |
|                     currentTransaction = committableTransaction;
 | |
|                 }
 | |
|                 if (currentTransaction != null)
 | |
|                 {
 | |
|                     this.transaction = currentTransaction.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             public static WorkflowServiceInstance End(IAsyncResult result)
 | |
|             {
 | |
|                 return AsyncResult.End<GetInstanceAsyncResult>(result).durableInstance;
 | |
|             }
 | |
| 
 | |
|             bool TryAcquire(bool fromCache)
 | |
|             {
 | |
|                 this.durableInstance = this.persistenceContext.GetInstance(this.parameters);
 | |
| 
 | |
|                 if (!fromCache)
 | |
|                 {
 | |
|                     this.referenceAcquired = true;
 | |
|                     return AssociateKeys();
 | |
|                 }
 | |
| 
 | |
|                 IAsyncResult nextResult = this.durableInstance.BeginTryAcquireReference(this.timeout, this.PrepareAsyncCompletion(handleEndAcquireReference), this);
 | |
|                 return SyncContinue(nextResult);
 | |
|             }
 | |
| 
 | |
|             static bool HandleEndAcquireReference(IAsyncResult result)
 | |
|             {
 | |
|                 GetInstanceAsyncResult thisPtr = (GetInstanceAsyncResult)result.AsyncState;
 | |
| 
 | |
|                 if (thisPtr.durableInstance.EndTryAcquireReference(result))
 | |
|                 {
 | |
|                     thisPtr.referenceAcquired = true;
 | |
|                     return thisPtr.TryEnlistContext();
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     //We have to re-dispense this Durable Instance this is not usable.
 | |
|                     thisPtr.referenceAcquired = false;
 | |
|                     thisPtr.durableInstance = null;
 | |
|                     return thisPtr.GetInstance();
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             bool TryEnlistContext()
 | |
|             {
 | |
|                 IAsyncResult enlistResult = null;
 | |
|                 bool tryAgain = false;
 | |
| 
 | |
|                 // We need to enlist for the transaction. This call will wait until
 | |
|                 // we obtain the transaction lock on the PersistenceContext, too. If there is no current transaction, this call
 | |
|                 // will still wait to get the transaction lock, but we not create an enlistment.
 | |
|                 using (PrepareTransactionalCall(this.transaction))
 | |
|                 {
 | |
|                     try
 | |
|                     {
 | |
|                         enlistResult = this.persistenceContext.BeginEnlist(this.timeout, PrepareAsyncCompletion(handleEndEnlistContext), this);
 | |
|                     }
 | |
|                     catch (ObjectDisposedException)
 | |
|                     {
 | |
|                         tryAgain = true;
 | |
|                     }
 | |
|                     catch (CommunicationObjectAbortedException)
 | |
|                     {
 | |
|                         throw FxTrace.Exception.AsError(new OperationCanceledException(SR.DefaultAbortReason));
 | |
|                     }
 | |
|                 }
 | |
| 
 | |
|                 if (tryAgain)
 | |
|                 {
 | |
|                     this.referenceAcquired = false;
 | |
|                     this.durableInstance = null;
 | |
|                     return this.GetInstance();
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     return SyncContinue(enlistResult);
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             static bool HandleEndEnlistContext(IAsyncResult result)
 | |
|             {
 | |
|                 GetInstanceAsyncResult thisPtr = (GetInstanceAsyncResult)result.AsyncState;
 | |
| 
 | |
|                 // 
 | |
| 
 | |
|                 try
 | |
|                 {
 | |
|                     thisPtr.persistenceContext.EndEnlist(result);
 | |
|                 }
 | |
|                 catch (ObjectDisposedException)
 | |
|                 {
 | |
|                     // It's possible that the PersistenceContext was closed and removed from the cache
 | |
|                     // while we were queued up for it. In that situation, this call to EndEnlist will
 | |
|                     // throw an ObjectDisposedException because the PersistenceContext is in the closed
 | |
|                     // state. If that happens, we need to try the load again from the beginning.
 | |
|                     thisPtr.referenceAcquired = false;
 | |
|                     thisPtr.durableInstance = null;
 | |
|                     return thisPtr.GetInstance();
 | |
|                 }
 | |
|                 catch (CommunicationObjectAbortedException)
 | |
|                 {
 | |
|                     throw FxTrace.Exception.AsError(new OperationCanceledException(SR.DefaultAbortReason));
 | |
|                 }
 | |
| 
 | |
|                 return thisPtr.AssociateKeys();
 | |
|             }
 | |
| 
 | |
|             bool GetInstance()
 | |
|             {
 | |
|                 IAsyncResult nextResult = null;
 | |
| 
 | |
|                 if (!this.loadAny && this.parameters.CanCreateInstance)
 | |
|                 {
 | |
|                     Fx.Assert(this.updatedIdentity == null, "Update() can never create instance. Enable this path if we ever support updating via user-defined operation.");
 | |
|                     if (this.instanceKey != null && this.instanceKey.IsValid)
 | |
|                     {
 | |
|                         nextResult = this.instanceManager.PersistenceProviderDirectory.BeginLoadOrCreate(
 | |
|                             this.instanceKey, Guid.Empty, this.additionalKeys, this.transaction,
 | |
|                             this.timeout, PrepareAsyncCompletion(handleEndLoad), this);
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         // Either invalid key (new instance) or lookup by instance ID.
 | |
|                         nextResult = this.instanceManager.PersistenceProviderDirectory.BeginLoadOrCreate(
 | |
|                             this.instanceId, this.additionalKeys, this.transaction,
 | |
|                             this.timeout, PrepareAsyncCompletion(handleEndLoad), this);
 | |
|                     }
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     if (this.instanceKey != null)
 | |
|                     {
 | |
|                         Fx.Assert(this.updatedIdentity == null, "Update() always has the instance ID. Enable this path if we ever support updating via user-defined operation that relies on correlation.");
 | |
|                         nextResult = this.instanceManager.PersistenceProviderDirectory.BeginLoad(
 | |
|                             this.instanceKey, this.additionalKeys, this.transaction,
 | |
|                             this.timeout, PrepareAsyncCompletion(handleEndLoad), this);
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         nextResult = this.instanceManager.PersistenceProviderDirectory.BeginLoad(
 | |
|                             this.instanceId, null, this.transaction, this.loadAny, this.updatedIdentity,
 | |
|                             this.timeout, PrepareAsyncCompletion(handleEndLoad), this);
 | |
|                     }
 | |
|                 }
 | |
|                 return SyncContinue(nextResult);
 | |
|             }
 | |
| 
 | |
|             bool AssociateKeys()
 | |
|             {
 | |
|                 if (this.additionalKeys != null && this.additionalKeys.Count > 0)
 | |
|                 {
 | |
|                     IAsyncResult result;
 | |
|                     try
 | |
|                     {
 | |
|                         result = this.durableInstance.BeginAssociateInfrastructureKeys(this.additionalKeys, this.transaction, this.timeout,
 | |
|                                 PrepareAsyncCompletion(handleAssociateInfrastructureKeys), this);
 | |
|                     }
 | |
|                     catch (Exception exception)
 | |
|                     {
 | |
|                         if (Fx.IsFatal(exception))
 | |
|                         {
 | |
|                             throw;
 | |
|                         }
 | |
|                         this.persistenceContext.Abort();
 | |
|                         throw;
 | |
|                     }
 | |
| 
 | |
|                     return SyncContinue(result);
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     return CommitTransaction();
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             static bool HandleEndLoad(IAsyncResult result)
 | |
|             {
 | |
|                 GetInstanceAsyncResult thisPtr = (GetInstanceAsyncResult)result.AsyncState;
 | |
| 
 | |
|                 PersistenceContext previousPersistenceContext = thisPtr.persistenceContext;
 | |
|                 bool fromCache;
 | |
|                 if (!thisPtr.loadAny && thisPtr.parameters.CanCreateInstance)
 | |
|                 {
 | |
|                     thisPtr.persistenceContext = thisPtr.instanceManager.PersistenceProviderDirectory.EndLoadOrCreate(result, out fromCache);
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     thisPtr.persistenceContext = thisPtr.instanceManager.PersistenceProviderDirectory.EndLoad(result, out fromCache);
 | |
|                 }
 | |
|                 Fx.AssertAndThrow(previousPersistenceContext != thisPtr.persistenceContext, "PPD should not load same PersistenceContext for the same GetInstanceAsyncResult!");
 | |
|                 return thisPtr.TryAcquire(fromCache);
 | |
|             }
 | |
| 
 | |
|             static bool HandleAssociateInfrastructureKeys(IAsyncResult result)
 | |
|             {
 | |
|                 GetInstanceAsyncResult thisPtr = (GetInstanceAsyncResult)result.AsyncState;
 | |
| 
 | |
|                 try
 | |
|                 {
 | |
|                     thisPtr.durableInstance.EndAssociateInfrastructureKeys(result);
 | |
|                 }
 | |
|                 catch (Exception exception)
 | |
|                 {
 | |
|                     if (Fx.IsFatal(exception))
 | |
|                     {
 | |
|                         throw;
 | |
|                     }
 | |
|                     thisPtr.persistenceContext.Abort();
 | |
|                     throw;
 | |
|                 }
 | |
| 
 | |
|                 return thisPtr.CommitTransaction();
 | |
|             }
 | |
| 
 | |
|             bool CommitTransaction()
 | |
|             {
 | |
|                 if (this.transaction != null)
 | |
|                 {
 | |
|                     this.transaction.Complete();
 | |
|                 }
 | |
|                 if (this.committableTransaction != null)
 | |
|                 {
 | |
|                     IAsyncResult result = this.committableTransaction.BeginCommit(PrepareAsyncCompletion(handleCommit), this);
 | |
|                     return SyncContinue(result);
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     return true;
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             static bool HandleCommit(IAsyncResult result)
 | |
|             {
 | |
|                 GetInstanceAsyncResult thisPtr = (GetInstanceAsyncResult)result.AsyncState;
 | |
|                 thisPtr.committableTransaction.EndCommit(result);
 | |
|                 thisPtr.committableTransaction = null;
 | |
|                 return true;
 | |
|             }
 | |
| 
 | |
|             static void Finally(AsyncResult result, Exception exception)
 | |
|             {
 | |
|                 GetInstanceAsyncResult thisPtr = (GetInstanceAsyncResult)result;
 | |
| 
 | |
|                 if (thisPtr.committableTransaction != null)
 | |
|                 {
 | |
|                     Fx.Assert(exception != null, "Shouldn't get here in the success case.");
 | |
| 
 | |
|                     try
 | |
|                     {
 | |
|                         thisPtr.committableTransaction.Rollback(exception);
 | |
|                     }
 | |
|                     catch (Exception rollbackException)
 | |
|                     {
 | |
|                         if (Fx.IsFatal(rollbackException))
 | |
|                         {
 | |
|                             throw;
 | |
|                         }
 | |
|                         FxTrace.Exception.AsWarning(rollbackException);
 | |
|                     }
 | |
|                 }
 | |
| 
 | |
|                 // Reference is acquired on an instance but we fail perform subsequent task before
 | |
|                 // return an instance to the client (Tx Enlist timeout).  We are responsible to 
 | |
|                 // release the reference.  We don't need to worry about Aborted or other State (has
 | |
|                 // no effect on ref counting).
 | |
|                 if (thisPtr.referenceAcquired && exception != null)
 | |
|                 {
 | |
|                     Fx.Assert(thisPtr.durableInstance != null, "durableInstance must not be null!");
 | |
|                     thisPtr.durableInstance.ReleaseReference();
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         // This async result waits for store events and handle them (currently only support HasRunnableWorkflowEvent).
 | |
|         // It is intended to always complete async to simplify caller usage. 
 | |
|         // 1) no code to handle sync completion. 
 | |
|         // 2) recursive call will be safe from StackOverflow.
 | |
|         // For simplicity, we handle (load/run) each event one-by-one.
 | |
|         // We ---- certain set of exception (see HandleException).  Other will crash the process.
 | |
|         // InvalidOperation is also handled due to TryLoadRunnableWorkflowCommand could fail if ---- with other hosts.
 | |
|         class WaitAndHandleStoreEventsAsyncResult : AsyncResult
 | |
|         {
 | |
|             static Action<object> waitAndHandleStoreEvents = new Action<object>(WaitAndHandleStoreEvents);
 | |
|             static AsyncCompletion handleEndWaitForStoreEvents = new AsyncCompletion(HandleEndWaitForStoreEvents);
 | |
|             static AsyncCompletion handleEndGetInstance = new AsyncCompletion(HandleEndGetInstance);
 | |
|             static AsyncCompletion handleEndRunInstance = new AsyncCompletion(HandleEndRunInstance);
 | |
| 
 | |
|             DurableInstanceManager instanceManager;
 | |
|             IEnumerator<InstancePersistenceEvent> events;
 | |
|             WorkflowServiceInstance currentInstance;
 | |
| 
 | |
|             public WaitAndHandleStoreEventsAsyncResult(DurableInstanceManager instanceManager, AsyncCallback callback, object state)
 | |
|                 : base(callback, state)
 | |
|             {
 | |
|                 this.instanceManager = instanceManager;
 | |
|                 ActionItem.Schedule(waitAndHandleStoreEvents, this);
 | |
|             }
 | |
| 
 | |
|             public static void End(IAsyncResult result)
 | |
|             {
 | |
|                 AsyncResult.End<WaitAndHandleStoreEventsAsyncResult>(result);
 | |
|             }
 | |
| 
 | |
|             static void WaitAndHandleStoreEvents(object state)
 | |
|             {
 | |
|                 WaitAndHandleStoreEventsAsyncResult thisPtr = (WaitAndHandleStoreEventsAsyncResult)state;
 | |
| 
 | |
|                 bool completeSelf;
 | |
|                 Exception completionException = null;
 | |
|                 try
 | |
|                 {
 | |
|                     completeSelf = thisPtr.WaitForStoreEvents();
 | |
|                 }
 | |
|                 catch (Exception exception)
 | |
|                 {
 | |
|                     if (Fx.IsFatal(exception))
 | |
|                     {
 | |
|                         throw;
 | |
|                     }
 | |
| 
 | |
|                     completionException = exception;
 | |
|                     completeSelf = true;
 | |
|                 }
 | |
| 
 | |
|                 if (completeSelf)
 | |
|                 {
 | |
|                     thisPtr.Complete(false, completionException);
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             bool WaitForStoreEvents()
 | |
|             {
 | |
|                 // Defense in depth with a predefined timeout
 | |
|                 IAsyncResult result = this.instanceManager.InstanceStore.BeginWaitForEvents(this.instanceManager.handle, 
 | |
|                     TimeSpan.FromSeconds(600), PrepareAsyncCompletion(handleEndWaitForStoreEvents), this);
 | |
|                 return SyncContinue(result);
 | |
|             }
 | |
| 
 | |
|             static bool HandleEndWaitForStoreEvents(IAsyncResult result)
 | |
|             {
 | |
|                 WaitAndHandleStoreEventsAsyncResult thisPtr = (WaitAndHandleStoreEventsAsyncResult)result.AsyncState;
 | |
|                 thisPtr.events = thisPtr.instanceManager.InstanceStore.EndWaitForEvents(result).GetEnumerator();
 | |
|                 return thisPtr.HandleStoreEvents();
 | |
|             }
 | |
| 
 | |
|             bool HandleStoreEvents()
 | |
|             {
 | |
|                 if (!this.events.MoveNext())
 | |
|                 {
 | |
|                     return true;
 | |
|                 }
 | |
| 
 | |
|                 InstancePersistenceEvent currentEvent = this.events.Current;
 | |
|                 if (currentEvent.Name == HasRunnableWorkflowEvent.Value.Name)
 | |
|                 {
 | |
|                     try
 | |
|                     {
 | |
|                         IAsyncResult result = this.instanceManager.BeginGetInstance(Guid.Empty, null, null, this.instanceManager.Host.PersistTimeout,
 | |
|                             PrepareAsyncCompletion(handleEndGetInstance), this);
 | |
|                         return SyncContinue(result);
 | |
|                     }
 | |
|                     catch (Exception exception)
 | |
|                     {
 | |
|                         if (Fx.IsFatal(exception) || !this.instanceManager.HandleException(exception))
 | |
|                         {
 | |
|                             throw;
 | |
|                         }
 | |
|                     }
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     Fx.AssertAndThrow("Unknown InstancePersistenceEvent (" + currentEvent.Name + ")!");
 | |
|                 }
 | |
| 
 | |
|                 return HandleStoreEvents();
 | |
|             }
 | |
| 
 | |
|             static bool HandleEndGetInstance(IAsyncResult result)
 | |
|             {
 | |
|                 WaitAndHandleStoreEventsAsyncResult thisPtr = (WaitAndHandleStoreEventsAsyncResult)result.AsyncState;
 | |
|                 try
 | |
|                 {
 | |
|                     thisPtr.currentInstance = thisPtr.instanceManager.EndGetInstance(result);
 | |
|                     return thisPtr.RunInstance();
 | |
|                 }
 | |
|                 catch (Exception exception)
 | |
|                 {
 | |
|                     if (Fx.IsFatal(exception) || !thisPtr.instanceManager.HandleException(exception))
 | |
|                     {
 | |
|                         throw;
 | |
|                     }
 | |
|                 }
 | |
|                 return thisPtr.HandleStoreEvents();
 | |
|             }
 | |
| 
 | |
|             bool RunInstance()
 | |
|             {
 | |
|                 try
 | |
|                 {
 | |
|                     IAsyncResult result = this.currentInstance.BeginRun(null, TimeSpan.MaxValue, PrepareAsyncCompletion(handleEndRunInstance), this);
 | |
|                     return SyncContinue(result);
 | |
|                 }
 | |
|                 catch (Exception exception)
 | |
|                 {
 | |
|                     if (Fx.IsFatal(exception))
 | |
|                     {
 | |
|                         throw;
 | |
|                     }
 | |
|                     if (this.currentInstance != null)
 | |
|                     {
 | |
|                         this.currentInstance.ReleaseReference();
 | |
|                         this.currentInstance = null;
 | |
|                     }
 | |
|                     if (!this.instanceManager.HandleException(exception))
 | |
|                     {
 | |
|                         throw;
 | |
|                     }
 | |
|                 }
 | |
|                 return HandleStoreEvents();
 | |
|             }
 | |
| 
 | |
|             static bool HandleEndRunInstance(IAsyncResult result)
 | |
|             {
 | |
|                 WaitAndHandleStoreEventsAsyncResult thisPtr = (WaitAndHandleStoreEventsAsyncResult)result.AsyncState;
 | |
|                 try
 | |
|                 {
 | |
|                     thisPtr.currentInstance.EndRun(result);
 | |
|                 }
 | |
|                 catch (Exception exception)
 | |
|                 {
 | |
|                     if (Fx.IsFatal(exception) || !thisPtr.instanceManager.HandleException(exception))
 | |
|                     {
 | |
|                         throw;
 | |
|                     }
 | |
|                 }
 | |
|                 finally
 | |
|                 {
 | |
|                     thisPtr.currentInstance.ReleaseReference();
 | |
|                     thisPtr.currentInstance = null;
 | |
|                 }
 | |
|                 return thisPtr.HandleStoreEvents();
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| }
 |