1286 lines
58 KiB
C#
1286 lines
58 KiB
C#
|
//-----------------------------------------------------------------------------
|
|||
|
// Copyright (c) Microsoft Corporation. All rights reserved.
|
|||
|
//-----------------------------------------------------------------------------
|
|||
|
|
|||
|
namespace System.ServiceModel.Activities
|
|||
|
{
|
|||
|
using System;
|
|||
|
using System.Activities;
|
|||
|
using System.Activities.DynamicUpdate;
|
|||
|
using System.Activities.Tracking;
|
|||
|
using System.Collections;
|
|||
|
using System.Collections.Generic;
|
|||
|
using System.Collections.ObjectModel;
|
|||
|
using System.ComponentModel;
|
|||
|
using System.Diagnostics;
|
|||
|
using System.Globalization;
|
|||
|
using System.Runtime;
|
|||
|
using System.Runtime.Diagnostics;
|
|||
|
using System.Runtime.Serialization;
|
|||
|
using System.ServiceModel;
|
|||
|
using System.ServiceModel.Activities.Description;
|
|||
|
using System.ServiceModel.Activities.Tracking;
|
|||
|
using System.ServiceModel.Channels;
|
|||
|
using System.ServiceModel.Diagnostics;
|
|||
|
using System.Transactions;
|
|||
|
using System.Xml.Linq;
|
|||
|
using SR2 = System.ServiceModel.Activities.SR;
|
|||
|
using System.Runtime.DurableInstancing;
|
|||
|
using System.Security;
|
|||
|
using System.ServiceModel.Description;
|
|||
|
using System.Xml;
|
|||
|
|
|||
|
|
|||
|
sealed class InternalReceiveMessage : NativeActivity
|
|||
|
{
|
|||
|
const string OperationNamePropertyName = "OperationName";
|
|||
|
const string ServiceContractNamePropertyName = "ServiceContractName";
|
|||
|
const string WSContextInstanceIdName = "wsc-instanceId";
|
|||
|
const string InstanceIdKey = ContextMessageProperty.InstanceIdKey;
|
|||
|
|
|||
|
static string runtimeTransactionHandlePropertyName = typeof(RuntimeTransactionHandle).FullName;
|
|||
|
|
|||
|
Collection<CorrelationInitializer> correlationInitializers;
|
|||
|
BookmarkCallback onMessageBookmarkCallback;
|
|||
|
ServiceDescriptionData additionalData;
|
|||
|
|
|||
|
string operationBookmarkName;
|
|||
|
|
|||
|
Variable<VolatileReceiveMessageInstance> receiveMessageInstance;
|
|||
|
WaitForReply waitForReply;
|
|||
|
CompletionCallback onClientReceiveMessageComplete;
|
|||
|
Variable<Bookmark> extensionReceiveBookmark;
|
|||
|
|
|||
|
public InternalReceiveMessage()
|
|||
|
{
|
|||
|
this.CorrelatesWith = new InArgument<CorrelationHandle>(context => (CorrelationHandle)null);
|
|||
|
|
|||
|
this.receiveMessageInstance = new Variable<VolatileReceiveMessageInstance>();
|
|||
|
this.waitForReply = new WaitForReply { Instance = this.receiveMessageInstance };
|
|||
|
this.onClientReceiveMessageComplete = new CompletionCallback(ClientScheduleOnReceiveMessageCallback);
|
|||
|
this.extensionReceiveBookmark = new Variable<Bookmark>();
|
|||
|
}
|
|||
|
|
|||
|
public string Action
|
|||
|
{
|
|||
|
get;
|
|||
|
set;
|
|||
|
}
|
|||
|
|
|||
|
public bool CanCreateInstance
|
|||
|
{
|
|||
|
get;
|
|||
|
set;
|
|||
|
}
|
|||
|
|
|||
|
public Collection<CorrelationInitializer> CorrelationInitializers
|
|||
|
{
|
|||
|
get
|
|||
|
{
|
|||
|
if (this.correlationInitializers == null)
|
|||
|
{
|
|||
|
this.correlationInitializers = new Collection<CorrelationInitializer>();
|
|||
|
}
|
|||
|
return this.correlationInitializers;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
public InArgument<CorrelationHandle> CorrelatesWith
|
|||
|
{
|
|||
|
get;
|
|||
|
set;
|
|||
|
}
|
|||
|
|
|||
|
public OutArgument<Message> Message
|
|||
|
{
|
|||
|
get;
|
|||
|
set;
|
|||
|
}
|
|||
|
|
|||
|
public InArgument<NoPersistHandle> NoPersistHandle
|
|||
|
{
|
|||
|
get;
|
|||
|
set;
|
|||
|
}
|
|||
|
|
|||
|
public string OperationName
|
|||
|
{
|
|||
|
get;
|
|||
|
set;
|
|||
|
}
|
|||
|
|
|||
|
protected override bool CanInduceIdle
|
|||
|
{
|
|||
|
get
|
|||
|
{
|
|||
|
return true;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
internal bool IsOneWay
|
|||
|
{
|
|||
|
get;
|
|||
|
set;
|
|||
|
}
|
|||
|
|
|||
|
// Added becuase there isn't a good way to distinguish
|
|||
|
// between the Receive and ReceiveReply modes of execution of this activity.
|
|||
|
// The Execute method distinguishes between those modes by predicating
|
|||
|
// on followingCorrelation not being null and being able to
|
|||
|
// acquire the RequestContext off the handle. We do not use RequestContext
|
|||
|
// for the SendReceiveExtension based code-paths hence need
|
|||
|
// another mechanism.
|
|||
|
internal bool IsReceiveReply
|
|||
|
{
|
|||
|
get;
|
|||
|
set;
|
|||
|
}
|
|||
|
|
|||
|
internal ServiceDescriptionData AdditionalData
|
|||
|
{
|
|||
|
get
|
|||
|
{
|
|||
|
if (this.additionalData == null)
|
|||
|
{
|
|||
|
this.additionalData = new ServiceDescriptionData();
|
|||
|
}
|
|||
|
|
|||
|
return this.additionalData;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
public XName ServiceContractName
|
|||
|
{
|
|||
|
get;
|
|||
|
set;
|
|||
|
}
|
|||
|
|
|||
|
// Used by CreateProtocolBookmark and WorkflowOperationBehavior
|
|||
|
internal string OperationBookmarkName
|
|||
|
{
|
|||
|
get
|
|||
|
{
|
|||
|
if (this.operationBookmarkName == null)
|
|||
|
{
|
|||
|
this.operationBookmarkName = BookmarkNameHelper.CreateBookmarkName(this.OperationName, this.ServiceContractName);
|
|||
|
}
|
|||
|
|
|||
|
return this.operationBookmarkName;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
internal string OwnerDisplayName { get; set; }
|
|||
|
|
|||
|
protected override void OnCreateDynamicUpdateMap(NativeActivityUpdateMapMetadata metadata, Activity originalActivity)
|
|||
|
{
|
|||
|
InternalReceiveMessage originalInternalReceive = (InternalReceiveMessage)originalActivity;
|
|||
|
|
|||
|
if (this.ServiceContractName != originalInternalReceive.ServiceContractName)
|
|||
|
{
|
|||
|
metadata.SaveOriginalValue(ServiceContractNamePropertyName, originalInternalReceive.ServiceContractName);
|
|||
|
}
|
|||
|
|
|||
|
if (this.OperationName != originalInternalReceive.OperationName)
|
|||
|
{
|
|||
|
metadata.SaveOriginalValue(OperationNamePropertyName, originalInternalReceive.OperationName);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
protected override void UpdateInstance(NativeActivityUpdateContext updateContext)
|
|||
|
{
|
|||
|
// we only care about the server-side Receive since the client-side Receive is not persistable.
|
|||
|
// only valid instance update condition is when a bookmark with OperationBookmarkName is found.
|
|||
|
|
|||
|
CorrelationHandle followingCorrelation = (this.CorrelatesWith == null) ? null : (CorrelationHandle)updateContext.GetValue(this.CorrelatesWith);
|
|||
|
if (followingCorrelation == null)
|
|||
|
{
|
|||
|
followingCorrelation = updateContext.FindExecutionProperty(CorrelationHandle.StaticExecutionPropertyName) as CorrelationHandle;
|
|||
|
}
|
|||
|
|
|||
|
BookmarkScope bookmarkScope;
|
|||
|
if (followingCorrelation != null && followingCorrelation.Scope != null)
|
|||
|
{
|
|||
|
bookmarkScope = followingCorrelation.Scope;
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
bookmarkScope = updateContext.DefaultBookmarkScope;
|
|||
|
}
|
|||
|
|
|||
|
string savedOriginalOperationName = (string)updateContext.GetSavedOriginalValue(OperationNamePropertyName);
|
|||
|
XName savedOriginalServiceContractName = (XName)updateContext.GetSavedOriginalValue(ServiceContractNamePropertyName);
|
|||
|
if ((savedOriginalOperationName == null && savedOriginalServiceContractName == null) || (savedOriginalOperationName == this.OperationName && savedOriginalServiceContractName == this.ServiceContractName))
|
|||
|
{
|
|||
|
// neither ServiceContractName nor OperationName have changed
|
|||
|
// nothing to do, so exit early.
|
|||
|
return;
|
|||
|
}
|
|||
|
|
|||
|
string originalOperationBookmarkName = BookmarkNameHelper.CreateBookmarkName(savedOriginalOperationName ?? this.OperationName, savedOriginalServiceContractName ?? this.ServiceContractName);
|
|||
|
if (updateContext.RemoveBookmark(originalOperationBookmarkName, bookmarkScope))
|
|||
|
{
|
|||
|
// if we are here, it means Receive is on the server-side and waiting for a request message to arrive
|
|||
|
updateContext.CreateBookmark(this.OperationBookmarkName, new BookmarkCallback(this.OnMessage), bookmarkScope);
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
// this means Receive is in a state DU is not allowed.
|
|||
|
updateContext.DisallowUpdate(SR.InvalidReceiveStateForDU);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
ReceiveSettings GetReceiveSettings()
|
|||
|
{
|
|||
|
string actionName = null;
|
|||
|
|
|||
|
if (!string.IsNullOrWhiteSpace(this.Action))
|
|||
|
{
|
|||
|
actionName = this.Action;
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
// These values are null in the ReceiveReply configuration
|
|||
|
if (this.ServiceContractName != null && !string.IsNullOrWhiteSpace(this.OperationName))
|
|||
|
{
|
|||
|
actionName = NamingHelper.GetMessageAction(new XmlQualifiedName(this.ServiceContractName.ToString()), this.OperationName, null, false);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
ReceiveSettings receiveSettings = new ReceiveSettings
|
|||
|
{
|
|||
|
Action = actionName,
|
|||
|
CanCreateInstance = this.CanCreateInstance,
|
|||
|
OwnerDisplayName = this.OwnerDisplayName
|
|||
|
};
|
|||
|
|
|||
|
return receiveSettings;
|
|||
|
}
|
|||
|
|
|||
|
protected override void Abort(NativeActivityAbortContext context)
|
|||
|
{
|
|||
|
SendReceiveExtension sendReceiveExtension = context.GetExtension<SendReceiveExtension>();
|
|||
|
if (sendReceiveExtension != null)
|
|||
|
{
|
|||
|
Bookmark pendingBookmark = this.extensionReceiveBookmark.Get(context);
|
|||
|
if (pendingBookmark != null)
|
|||
|
{
|
|||
|
sendReceiveExtension.Cancel(pendingBookmark);
|
|||
|
}
|
|||
|
}
|
|||
|
base.Abort(context);
|
|||
|
}
|
|||
|
|
|||
|
protected override void Cancel(NativeActivityContext context)
|
|||
|
{
|
|||
|
SendReceiveExtension sendReceiveExtension = context.GetExtension<SendReceiveExtension>();
|
|||
|
if (sendReceiveExtension != null)
|
|||
|
{
|
|||
|
Bookmark pendingBookmark = this.extensionReceiveBookmark.Get(context);
|
|||
|
if (pendingBookmark != null)
|
|||
|
{
|
|||
|
sendReceiveExtension.Cancel(pendingBookmark);
|
|||
|
context.RemoveBookmark(pendingBookmark);
|
|||
|
}
|
|||
|
}
|
|||
|
base.Cancel(context);
|
|||
|
}
|
|||
|
|
|||
|
// Activity Entry point: Phase 1: Execute
|
|||
|
// A separate code-path for extension based execution least impacts
|
|||
|
// the existing workflow hosts. In the future we will add an extension from
|
|||
|
// workflowservicehost and always use the extension.
|
|||
|
protected override void Execute(NativeActivityContext executionContext)
|
|||
|
{
|
|||
|
SendReceiveExtension sendReceiveExtension = executionContext.GetExtension<SendReceiveExtension>();
|
|||
|
if (sendReceiveExtension != null)
|
|||
|
{
|
|||
|
this.ExecuteUsingExtension(sendReceiveExtension, executionContext);
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
|
|||
|
// this activity's runtime DU particpation(UpdateInstance) is dependent on
|
|||
|
// the following server side logic for resolving CorrelationHandle and creating a protocol bookmark.
|
|||
|
|
|||
|
CorrelationHandle followingCorrelation = (this.CorrelatesWith == null) ? null : this.CorrelatesWith.Get(executionContext);
|
|||
|
bool triedAmbientCorrelation = false;
|
|||
|
CorrelationHandle ambientCorrelation = null;
|
|||
|
|
|||
|
if (followingCorrelation == null)
|
|||
|
{
|
|||
|
ambientCorrelation = executionContext.Properties.Find(CorrelationHandle.StaticExecutionPropertyName) as CorrelationHandle;
|
|||
|
triedAmbientCorrelation = true;
|
|||
|
if (ambientCorrelation != null)
|
|||
|
{
|
|||
|
followingCorrelation = ambientCorrelation;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
CorrelationRequestContext requestContext;
|
|||
|
if (followingCorrelation != null && followingCorrelation.TryAcquireRequestContext(executionContext, out requestContext))
|
|||
|
{
|
|||
|
// Client receive that is following a send.
|
|||
|
ReceiveMessageInstanceData instance = new ReceiveMessageInstanceData(requestContext);
|
|||
|
|
|||
|
// for perf, cache the ambient correlation information
|
|||
|
if (triedAmbientCorrelation)
|
|||
|
{
|
|||
|
instance.SetAmbientCorrelation(ambientCorrelation);
|
|||
|
}
|
|||
|
|
|||
|
ClientScheduleOnReceivedMessage(executionContext, instance);
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
// Server side receive
|
|||
|
|
|||
|
// Validation of correlatesWithHandle
|
|||
|
if (ambientCorrelation == null)
|
|||
|
{
|
|||
|
ambientCorrelation = executionContext.Properties.Find(CorrelationHandle.StaticExecutionPropertyName) as CorrelationHandle;
|
|||
|
}
|
|||
|
if (!this.IsOneWay && ambientCorrelation == null)
|
|||
|
{
|
|||
|
CorrelationHandle channelCorrelationHandle = CorrelationHandle.GetExplicitRequestReplyCorrelation(executionContext, this.correlationInitializers);
|
|||
|
if (channelCorrelationHandle == null)
|
|||
|
{
|
|||
|
// With a two-way contract, we require a request/reply correlation handle
|
|||
|
throw FxTrace.Exception.AsError(new InvalidOperationException(
|
|||
|
SR2.ReceiveMessageNeedsToPairWithSendMessageForTwoWayContract(this.OperationName)));
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
BookmarkScope bookmarkScope = (followingCorrelation != null) ? followingCorrelation.EnsureBookmarkScope(executionContext) : executionContext.DefaultBookmarkScope;
|
|||
|
|
|||
|
if (this.onMessageBookmarkCallback == null)
|
|||
|
{
|
|||
|
this.onMessageBookmarkCallback = new BookmarkCallback(this.OnMessage);
|
|||
|
}
|
|||
|
|
|||
|
executionContext.CreateBookmark(this.OperationBookmarkName, this.onMessageBookmarkCallback, bookmarkScope);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// Phase 2a: server side message has arrived and resumed the protocol bookmark
|
|||
|
void OnMessage(NativeActivityContext executionContext, Bookmark bookmark, object state)
|
|||
|
{
|
|||
|
WorkflowOperationContext workflowContext = state as WorkflowOperationContext;
|
|||
|
|
|||
|
if (workflowContext == null)
|
|||
|
{
|
|||
|
throw FxTrace.Exception.AsError(new InvalidOperationException(SR2.WorkflowMustBeHosted));
|
|||
|
}
|
|||
|
|
|||
|
ReceiveMessageInstanceData instance = new ReceiveMessageInstanceData(
|
|||
|
new CorrelationResponseContext
|
|||
|
{
|
|||
|
WorkflowOperationContext = workflowContext,
|
|||
|
});
|
|||
|
|
|||
|
SetupTransaction(executionContext, instance);
|
|||
|
}
|
|||
|
|
|||
|
// Phase 3: Setup Transaction for server receive case.
|
|||
|
//
|
|||
|
void SetupTransaction(NativeActivityContext executionContext, ReceiveMessageInstanceData instance)
|
|||
|
{
|
|||
|
WorkflowOperationContext workflowContext = instance.CorrelationResponseContext.WorkflowOperationContext;
|
|||
|
if (workflowContext.CurrentTransaction != null)
|
|||
|
{
|
|||
|
//get the RuntimeTransactionHandle from the ambient
|
|||
|
RuntimeTransactionHandle handle = null;
|
|||
|
handle = executionContext.Properties.Find(runtimeTransactionHandlePropertyName) as RuntimeTransactionHandle;
|
|||
|
if (handle != null)
|
|||
|
{
|
|||
|
//You are probably inside a TransactedReceiveScope
|
|||
|
//TransactedReceiveData is used to pass information about the Initiating Transaction to the TransactedReceiveScope
|
|||
|
//so that it can subsequently call Complete or Commit on it at the end of the scope
|
|||
|
TransactedReceiveData transactedReceiveData = executionContext.Properties.Find(TransactedReceiveData.TransactedReceiveDataExecutionPropertyName) as TransactedReceiveData;
|
|||
|
if (transactedReceiveData != null)
|
|||
|
{
|
|||
|
if (this.AdditionalData.IsFirstReceiveOfTransactedReceiveScopeTree)
|
|||
|
{
|
|||
|
Fx.Assert(workflowContext.OperationContext != null, "InternalReceiveMessage.SetupTransaction - Operation Context was null");
|
|||
|
Fx.Assert(workflowContext.OperationContext.TransactionFacet != null, "InternalReceiveMessage.SetupTransaction - Transaction Facet was null");
|
|||
|
transactedReceiveData.InitiatingTransaction = workflowContext.OperationContext.TransactionFacet.Current;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
Transaction currentTransaction = handle.GetCurrentTransaction(executionContext);
|
|||
|
if (currentTransaction != null)
|
|||
|
{
|
|||
|
if (!currentTransaction.Equals(workflowContext.CurrentTransaction))
|
|||
|
{
|
|||
|
throw FxTrace.Exception.AsError(new InvalidOperationException(SR2.FlowedTransactionDifferentFromAmbient));
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
ServerScheduleOnReceivedMessage(executionContext, instance);
|
|||
|
return;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
ReceiveMessageState receiveMessageState = new ReceiveMessageState
|
|||
|
{
|
|||
|
CurrentTransaction = workflowContext.CurrentTransaction.Clone(),
|
|||
|
Instance = instance
|
|||
|
};
|
|||
|
|
|||
|
handle.RequireTransactionContext(executionContext, RequireContextCallback, receiveMessageState);
|
|||
|
|
|||
|
return;
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
//Receive was probably not used within a TransactionFlowScope since no ambient transaction handle was found
|
|||
|
throw FxTrace.Exception.AsError(new InvalidOperationException(SR2.ReceiveNotWithinATransactedReceiveScope));
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
ServerScheduleOnReceivedMessage(executionContext, instance);
|
|||
|
}
|
|||
|
|
|||
|
internal static Guid TraceCorrelationActivityId
|
|||
|
{
|
|||
|
[Fx.Tag.SecurityNote(Critical = "Critical because Trace.CorrelationManager has a Link demand for UnmanagedCode.",
|
|||
|
Safe = "Safe because we aren't leaking a critical resource.")]
|
|||
|
[SecuritySafeCritical]
|
|||
|
get
|
|||
|
{
|
|||
|
return Trace.CorrelationManager.ActivityId;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
void ProcessReceiveMessageTrace(NativeActivityContext executionContext, ReceiveMessageInstanceData instance)
|
|||
|
{
|
|||
|
if (TraceUtility.MessageFlowTracing)
|
|||
|
{
|
|||
|
if (TraceUtility.ActivityTracing)
|
|||
|
{
|
|||
|
instance.AmbientActivityId = InternalReceiveMessage.TraceCorrelationActivityId;
|
|||
|
}
|
|||
|
|
|||
|
Guid receivedActivityId = Guid.Empty;
|
|||
|
if (instance.CorrelationRequestContext != null)
|
|||
|
{
|
|||
|
//client side reply
|
|||
|
receivedActivityId = TraceUtility.GetReceivedActivityId(instance.CorrelationRequestContext.OperationContext);
|
|||
|
}
|
|||
|
else if (instance.CorrelationResponseContext != null)
|
|||
|
{
|
|||
|
//server side receive
|
|||
|
receivedActivityId = instance.CorrelationResponseContext.WorkflowOperationContext.E2EActivityId;
|
|||
|
}
|
|||
|
|
|||
|
ProcessReceiveMessageTrace(executionContext, receivedActivityId);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
void ProcessReceiveMessageTrace(NativeActivityContext executionContext, Guid receivedActivityId)
|
|||
|
{
|
|||
|
if (TraceUtility.MessageFlowTracing)
|
|||
|
{
|
|||
|
try
|
|||
|
{
|
|||
|
//
|
|||
|
ReceiveMessageRecord messageFlowTrackingRecord = new ReceiveMessageRecord(MessagingActivityHelper.MessageCorrelationReceiveRecord)
|
|||
|
{
|
|||
|
E2EActivityId = receivedActivityId
|
|||
|
};
|
|||
|
executionContext.Track(messageFlowTrackingRecord);
|
|||
|
|
|||
|
if (receivedActivityId != Guid.Empty && DiagnosticTraceBase.ActivityId != receivedActivityId)
|
|||
|
{
|
|||
|
DiagnosticTraceBase.ActivityId = receivedActivityId;
|
|||
|
}
|
|||
|
|
|||
|
FxTrace.Trace.SetAndTraceTransfer(executionContext.WorkflowInstanceId, true);
|
|||
|
|
|||
|
if (TraceUtility.ActivityTracing)
|
|||
|
{
|
|||
|
if (TD.StartSignpostEventIsEnabled())
|
|||
|
{
|
|||
|
TD.StartSignpostEvent(new DictionaryTraceRecord(new Dictionary<string, string>(3) {
|
|||
|
{ MessagingActivityHelper.ActivityName, this.DisplayName },
|
|||
|
{ MessagingActivityHelper.ActivityType, MessagingActivityHelper.MessagingActivityTypeActivityExecution },
|
|||
|
{ MessagingActivityHelper.ActivityInstanceId, executionContext.ActivityInstanceId }
|
|||
|
}));
|
|||
|
}
|
|||
|
}
|
|||
|
else if (TD.WfMessageReceivedIsEnabled())
|
|||
|
{
|
|||
|
TD.WfMessageReceived(new EventTraceActivity(receivedActivityId), executionContext.WorkflowInstanceId);
|
|||
|
}
|
|||
|
}
|
|||
|
catch (Exception ex)
|
|||
|
{
|
|||
|
if (Fx.IsFatal(ex))
|
|||
|
{
|
|||
|
throw;
|
|||
|
}
|
|||
|
FxTrace.Exception.AsInformation(ex);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
void RequireContextCallback(NativeActivityTransactionContext transactionContext, object state)
|
|||
|
{
|
|||
|
Fx.Assert(transactionContext != null, "TransactionContext is null");
|
|||
|
|
|||
|
ReceiveMessageState receiveMessageState = state as ReceiveMessageState;
|
|||
|
Fx.Assert(receiveMessageState != null, "ReceiveMessageState is null");
|
|||
|
|
|||
|
transactionContext.SetRuntimeTransaction(receiveMessageState.CurrentTransaction);
|
|||
|
|
|||
|
NativeActivityContext executionContext = transactionContext as NativeActivityContext;
|
|||
|
Fx.Assert(executionContext != null, "Failed to cast ActivityTransactionContext to NativeActivityContext");
|
|||
|
ServerScheduleOnReceivedMessage(executionContext, receiveMessageState.Instance);
|
|||
|
}
|
|||
|
|
|||
|
// Phase 4: Set up the Message as OutArgument and invoke the OnReceivedMessage activity action
|
|||
|
void ServerScheduleOnReceivedMessage(NativeActivityContext executionContext, ReceiveMessageInstanceData instance)
|
|||
|
{
|
|||
|
Fx.Assert(instance.CorrelationResponseContext != null, "Server side receive must have CorrelationResponseContext");
|
|||
|
|
|||
|
// if we infer the contract as Message the first input parameter will be the requestMessage from the client
|
|||
|
Message request = instance.CorrelationResponseContext.WorkflowOperationContext.Inputs[0] as Message;
|
|||
|
Fx.Assert(request != null, "WorkflowOperationContext.Inputs[0] must be of type Message");
|
|||
|
Fx.Assert(request.State == MessageState.Created, "The request message must be in Created state");
|
|||
|
this.Message.Set(executionContext, request);
|
|||
|
|
|||
|
// update instance->CorrelationResponseContext with the MessageVersion information, this is later used by
|
|||
|
// ToReply formatter to construct the reply message
|
|||
|
instance.CorrelationResponseContext.MessageVersion = ((Message)instance.CorrelationResponseContext.WorkflowOperationContext.Inputs[0]).Version;
|
|||
|
|
|||
|
// initialize the relevant correlation handle(s) with the 'anonymous' response context
|
|||
|
CorrelationHandle ambientHandle = instance.GetAmbientCorrelation(executionContext);
|
|||
|
CorrelationHandle correlatesWithHandle = (this.CorrelatesWith == null) ? null : this.CorrelatesWith.Get(executionContext);
|
|||
|
|
|||
|
// populate instance keys first
|
|||
|
MessagingActivityHelper.InitializeCorrelationHandles(executionContext, correlatesWithHandle, ambientHandle, this.correlationInitializers,
|
|||
|
instance.CorrelationResponseContext.WorkflowOperationContext.OperationContext.IncomingMessageProperties);
|
|||
|
|
|||
|
// for the request/reply handle
|
|||
|
// then store the response context in the designated correlation handle
|
|||
|
// first check for an explicit association
|
|||
|
CorrelationHandle channelCorrelationHandle = CorrelationHandle.GetExplicitRequestReplyCorrelation(executionContext, this.correlationInitializers);
|
|||
|
|
|||
|
|
|||
|
if (this.IsOneWay)
|
|||
|
{
|
|||
|
// this is one way, verify that the channelHandle is null
|
|||
|
if (channelCorrelationHandle != null)
|
|||
|
{
|
|||
|
throw FxTrace.Exception.AsError(new InvalidOperationException(SR2.RequestReplyHandleShouldNotBePresentForOneWay));
|
|||
|
}
|
|||
|
|
|||
|
// we need to enter the nopersistzone using the NoPersistHandle and exit it in the formatter
|
|||
|
if (this.NoPersistHandle != null)
|
|||
|
{
|
|||
|
NoPersistHandle noPersistHandle = this.NoPersistHandle.Get(executionContext);
|
|||
|
if (noPersistHandle != null)
|
|||
|
{
|
|||
|
noPersistHandle.Enter(executionContext);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
// first check for an explicit association
|
|||
|
if (channelCorrelationHandle != null)
|
|||
|
{
|
|||
|
if (!channelCorrelationHandle.TryRegisterResponseContext(executionContext, instance.CorrelationResponseContext))
|
|||
|
{
|
|||
|
throw FxTrace.Exception.AsError(new InvalidOperationException(SR2.TryRegisterRequestContextFailed));
|
|||
|
}
|
|||
|
}
|
|||
|
else// if that fails, use ambient handle. we should never initialize CorrelatesWith with response context
|
|||
|
{
|
|||
|
Fx.Assert(ambientHandle != null, "Ambient handle should not be null for two-way server side receive/sendReply");
|
|||
|
if (!ambientHandle.TryRegisterResponseContext(executionContext, instance.CorrelationResponseContext))
|
|||
|
{
|
|||
|
// With a two-way contract, the request context must be initialized
|
|||
|
throw FxTrace.Exception.AsError(new InvalidOperationException(
|
|||
|
SR2.ReceiveMessageNeedsToPairWithSendMessageForTwoWayContract(this.OperationName)));
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// validate that NoPersistHandle is null, we should have nulled it out in Receive->SetIsOneWay during ContractInference
|
|||
|
Fx.Assert(this.NoPersistHandle == null, "NoPersistHandle should be null in case of two-way");
|
|||
|
}
|
|||
|
|
|||
|
// for the duplex handle: we want to save the callback context in the correlation handle
|
|||
|
if (instance.CorrelationCallbackContext != null)
|
|||
|
{
|
|||
|
// Pass the CorrelationCallbackContext to correlation handle.
|
|||
|
CorrelationHandle callbackHandle = CorrelationHandle.GetExplicitCallbackCorrelation(executionContext, this.correlationInitializers);
|
|||
|
|
|||
|
// if that is not set, then try the ambientHandle, we will not use the CorrelatesWith handle to store callback context
|
|||
|
if (callbackHandle == null)
|
|||
|
{
|
|||
|
callbackHandle = ambientHandle;
|
|||
|
}
|
|||
|
if (callbackHandle != null)
|
|||
|
{
|
|||
|
callbackHandle.CallbackContext = instance.CorrelationCallbackContext;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
FinalizeScheduleOnReceivedMessage(executionContext, instance);
|
|||
|
}
|
|||
|
|
|||
|
void ClientScheduleOnReceivedMessage(NativeActivityContext executionContext, ReceiveMessageInstanceData instance)
|
|||
|
{
|
|||
|
Fx.Assert(instance.CorrelationRequestContext != null, "Client side receive must have CorrelationRequestContext");
|
|||
|
|
|||
|
// client side: retrieve the reply from the request context
|
|||
|
if (instance.CorrelationRequestContext.TryGetReply())
|
|||
|
{
|
|||
|
// Reply has already come back because one of the following happened:
|
|||
|
// (1) Receive reply completed synchronously
|
|||
|
// (2) Async receive reply completed very quickly and channel callback already happened by now
|
|||
|
ClientScheduleOnReceiveMessageCore(executionContext, instance);
|
|||
|
FinalizeScheduleOnReceivedMessage(executionContext, instance);
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
// Async path: wait for reply to come back
|
|||
|
VolatileReceiveMessageInstance volatileInstance = new VolatileReceiveMessageInstance { Instance = instance };
|
|||
|
this.receiveMessageInstance.Set(executionContext, volatileInstance);
|
|||
|
|
|||
|
if (onClientReceiveMessageComplete == null)
|
|||
|
{
|
|||
|
onClientReceiveMessageComplete = new CompletionCallback(ClientScheduleOnReceiveMessageCallback);
|
|||
|
}
|
|||
|
|
|||
|
executionContext.ScheduleActivity(this.waitForReply, onClientReceiveMessageComplete);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
void ClientScheduleOnReceiveMessageCallback(NativeActivityContext executionContext, ActivityInstance completedInstance)
|
|||
|
{
|
|||
|
VolatileReceiveMessageInstance volatileInstance = this.receiveMessageInstance.Get(executionContext);
|
|||
|
ReceiveMessageInstanceData instance = volatileInstance.Instance;
|
|||
|
|
|||
|
if (instance.CorrelationRequestContext.TryGetReply())
|
|||
|
{
|
|||
|
ClientScheduleOnReceiveMessageCore(executionContext, instance);
|
|||
|
}
|
|||
|
FinalizeScheduleOnReceivedMessage(executionContext, instance);
|
|||
|
}
|
|||
|
|
|||
|
void ClientScheduleOnReceiveMessageCore(NativeActivityContext executionContext, ReceiveMessageInstanceData instance)
|
|||
|
{
|
|||
|
Fx.Assert(instance.CorrelationRequestContext.Reply != null, "Reply message cannot be null!");
|
|||
|
|
|||
|
// Initialize CorrelationContext and CorrelationCallbackContext
|
|||
|
instance.InitializeContextAndCallbackContext();
|
|||
|
|
|||
|
CorrelationHandle ambientHandle = instance.GetAmbientCorrelation(executionContext);
|
|||
|
|
|||
|
if (instance.CorrelationRequestContext.CorrelationKeyCalculator != null)
|
|||
|
{
|
|||
|
// Client side reply do not use CorrelatesWith to initialize correlation
|
|||
|
instance.CorrelationRequestContext.Reply = MessagingActivityHelper.InitializeCorrelationHandles(executionContext,
|
|||
|
null, ambientHandle, this.correlationInitializers,
|
|||
|
instance.CorrelationRequestContext.CorrelationKeyCalculator, instance.CorrelationRequestContext.Reply);
|
|||
|
}
|
|||
|
|
|||
|
// for the duplex-case
|
|||
|
// we would receive the Server Context in the Request-Reply message, we have to save the Server Context so that subsequent sends from the client to
|
|||
|
// the server can use this context to reach the correct Server instance
|
|||
|
if (instance.CorrelationContext != null)
|
|||
|
{
|
|||
|
// Pass the CorrelationContext to correlation handle.
|
|||
|
// Correlation handle will have to be in the correlation Initializers collection
|
|||
|
CorrelationHandle contextHandle = CorrelationHandle.GetExplicitContextCorrelation(executionContext, this.correlationInitializers);
|
|||
|
|
|||
|
// if that is not set, then try the ambient handle
|
|||
|
if (contextHandle == null)
|
|||
|
{
|
|||
|
// get the cached ambient handle, we only use explicit handle or ambient handle to store the context
|
|||
|
contextHandle = ambientHandle;
|
|||
|
}
|
|||
|
if (contextHandle != null)
|
|||
|
{
|
|||
|
contextHandle.Context = instance.CorrelationContext;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// set the Message with what is in the correlationRequestContext
|
|||
|
// this Message needs to be closed later by the formatter
|
|||
|
Message request = instance.CorrelationRequestContext.Reply;
|
|||
|
this.Message.Set(executionContext, request);
|
|||
|
}
|
|||
|
|
|||
|
void FinalizeScheduleOnReceivedMessage(NativeActivityContext executionContext, ReceiveMessageInstanceData instance)
|
|||
|
{
|
|||
|
ProcessReceiveMessageTrace(executionContext, instance);
|
|||
|
|
|||
|
IList<IReceiveMessageCallback> receiveMessageCallbacks = MessagingActivityHelper.GetCallbacks<IReceiveMessageCallback>(executionContext.Properties);
|
|||
|
if (receiveMessageCallbacks != null && receiveMessageCallbacks.Count > 0)
|
|||
|
{
|
|||
|
OperationContext operationContext = instance.GetOperationContext();
|
|||
|
// invoke the callback that user might have added in the AEC in the previous activity
|
|||
|
// e.g. distributed compensation activity will add this so that they can convert a message back to
|
|||
|
// an execution property
|
|||
|
foreach (IReceiveMessageCallback receiveMessageCallback in receiveMessageCallbacks)
|
|||
|
{
|
|||
|
receiveMessageCallback.OnReceiveMessage(operationContext, executionContext.Properties);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// call this method with or without callback
|
|||
|
this.FinalizeReceiveMessageCore(executionContext, instance);
|
|||
|
}
|
|||
|
|
|||
|
protected override void CacheMetadata(NativeActivityMetadata metadata)
|
|||
|
{
|
|||
|
RuntimeArgument correlatesWithArgument = new RuntimeArgument(Constants.CorrelatesWith, Constants.CorrelationHandleType, ArgumentDirection.In);
|
|||
|
if (this.CorrelatesWith == null)
|
|||
|
{
|
|||
|
this.CorrelatesWith = new InArgument<CorrelationHandle>();
|
|||
|
}
|
|||
|
metadata.Bind(this.CorrelatesWith, correlatesWithArgument);
|
|||
|
metadata.AddArgument(correlatesWithArgument);
|
|||
|
|
|||
|
if (this.correlationInitializers != null)
|
|||
|
{
|
|||
|
int count = 0;
|
|||
|
foreach (CorrelationInitializer correlation in this.correlationInitializers)
|
|||
|
{
|
|||
|
if (correlation.CorrelationHandle != null)
|
|||
|
{
|
|||
|
RuntimeArgument argument = new RuntimeArgument(Constants.Parameter + count,
|
|||
|
correlation.CorrelationHandle.ArgumentType, correlation.CorrelationHandle.Direction, true);
|
|||
|
metadata.Bind(correlation.CorrelationHandle, argument);
|
|||
|
metadata.AddArgument(argument);
|
|||
|
count++;
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
RuntimeArgument receiveMessageArgument = new RuntimeArgument(Constants.Message, Constants.MessageType, ArgumentDirection.Out);
|
|||
|
if (this.Message == null)
|
|||
|
{
|
|||
|
this.Message = new OutArgument<Message>();
|
|||
|
}
|
|||
|
metadata.Bind(this.Message, receiveMessageArgument);
|
|||
|
metadata.AddArgument(receiveMessageArgument);
|
|||
|
|
|||
|
RuntimeArgument noPersistHandleArgument = new RuntimeArgument(Constants.NoPersistHandle, Constants.NoPersistHandleType, ArgumentDirection.In);
|
|||
|
if (this.NoPersistHandle == null)
|
|||
|
{
|
|||
|
this.NoPersistHandle = new InArgument<NoPersistHandle>();
|
|||
|
}
|
|||
|
metadata.Bind(this.NoPersistHandle, noPersistHandleArgument);
|
|||
|
metadata.AddArgument(noPersistHandleArgument);
|
|||
|
|
|||
|
metadata.AddImplementationVariable(this.receiveMessageInstance);
|
|||
|
metadata.AddImplementationVariable(this.extensionReceiveBookmark);
|
|||
|
|
|||
|
metadata.AddImplementationChild(this.waitForReply);
|
|||
|
}
|
|||
|
|
|||
|
// Phase 5: Useful for the both client and server side receive. It passes down the response context if it is two way or
|
|||
|
// throw the exception right back to the workflow if it is not expected.
|
|||
|
void FinalizeReceiveMessageCore(NativeActivityContext executionContext, ReceiveMessageInstanceData instance)
|
|||
|
{
|
|||
|
if (instance != null)
|
|||
|
{
|
|||
|
if (instance.CorrelationRequestContext != null && instance.CorrelationRequestContext.Reply != null)
|
|||
|
{
|
|||
|
// This should be closed by the formatter after desrializing the message
|
|||
|
// clean this reply message up for a following receive
|
|||
|
//instance.CorrelationRequestContext.Reply.Close();
|
|||
|
}
|
|||
|
else if (instance.CorrelationResponseContext != null)
|
|||
|
{
|
|||
|
// this is only for the server side
|
|||
|
if (this.IsOneWay)
|
|||
|
{
|
|||
|
// mark this workflow service operation as complete
|
|||
|
instance.CorrelationResponseContext.WorkflowOperationContext.SetOperationCompleted();
|
|||
|
|
|||
|
if (instance.CorrelationResponseContext.Exception != null)
|
|||
|
{
|
|||
|
// We got an unexpected exception while running the OnReceivedMessage action
|
|||
|
throw FxTrace.Exception.AsError(instance.CorrelationResponseContext.Exception);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
//reset the trace
|
|||
|
this.ResetTrace(executionContext, instance);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
void ResetTrace(NativeActivityContext executionContext, ReceiveMessageInstanceData instance)
|
|||
|
{
|
|||
|
this.ResetTrace(executionContext, instance.AmbientActivityId);
|
|||
|
|
|||
|
if (TraceUtility.ActivityTracing)
|
|||
|
{
|
|||
|
instance.AmbientActivityId = Guid.Empty;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
void ResetTrace(NativeActivityContext executionContext, Guid ambientActivityId)
|
|||
|
{
|
|||
|
if (TraceUtility.ActivityTracing)
|
|||
|
{
|
|||
|
if (TD.StopSignpostEventIsEnabled())
|
|||
|
{
|
|||
|
TD.StopSignpostEvent(new DictionaryTraceRecord(new Dictionary<string, string>(3) {
|
|||
|
{ MessagingActivityHelper.ActivityName, this.DisplayName },
|
|||
|
{ MessagingActivityHelper.ActivityType, MessagingActivityHelper.MessagingActivityTypeActivityExecution },
|
|||
|
{ MessagingActivityHelper.ActivityInstanceId, executionContext.ActivityInstanceId }
|
|||
|
}));
|
|||
|
}
|
|||
|
FxTrace.Trace.SetAndTraceTransfer(ambientActivityId, true);
|
|||
|
}
|
|||
|
else if (TD.WfMessageReceivedIsEnabled())
|
|||
|
{
|
|||
|
TD.WfMessageReceived(new EventTraceActivity(executionContext.WorkflowInstanceId), ambientActivityId);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
void ExecuteUsingExtension(SendReceiveExtension sendReceiveExtension, NativeActivityContext executionContext)
|
|||
|
{
|
|||
|
Fx.Assert(sendReceiveExtension != null, "SendReceiveExtension should be available here.");
|
|||
|
|
|||
|
CorrelationHandle followingCorrelation = null;
|
|||
|
if (!this.TryGetCorrelatesWithHandle(executionContext, out followingCorrelation))
|
|||
|
{
|
|||
|
followingCorrelation = CorrelationHandle.GetAmbientCorrelation(executionContext);
|
|||
|
if (followingCorrelation == null)
|
|||
|
{
|
|||
|
if (!this.IsOneWay)
|
|||
|
{
|
|||
|
if (!this.correlationInitializers.TryGetRequestReplyCorrelationHandle(executionContext, out followingCorrelation))
|
|||
|
{
|
|||
|
throw FxTrace.Exception.AsError(new InvalidOperationException(
|
|||
|
SR2.ReceiveMessageNeedsToPairWithSendMessageForTwoWayContract(this.OperationName)));
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
Bookmark bookmark = executionContext.CreateBookmark(this.OnReceiveMessageFromExtension);
|
|||
|
this.extensionReceiveBookmark.Set(executionContext, bookmark);
|
|||
|
|
|||
|
InstanceKey correlatesWithValue = null;
|
|||
|
if (followingCorrelation != null)
|
|||
|
{
|
|||
|
if (this.IsReceiveReply && followingCorrelation.TransientInstanceKey != null)
|
|||
|
{
|
|||
|
correlatesWithValue = followingCorrelation.TransientInstanceKey;
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
correlatesWithValue = followingCorrelation.InstanceKey;
|
|||
|
}
|
|||
|
}
|
|||
|
sendReceiveExtension.RegisterReceive(this.GetReceiveSettings(), correlatesWithValue, bookmark);
|
|||
|
}
|
|||
|
|
|||
|
void OnReceiveMessageFromExtension(NativeActivityContext executionContext, Bookmark bookmark, object state)
|
|||
|
{
|
|||
|
SendReceiveExtension sendReceiveExtension = executionContext.GetExtension<SendReceiveExtension>();
|
|||
|
if (sendReceiveExtension == null)
|
|||
|
{
|
|||
|
throw FxTrace.Exception.AsError(new InvalidOperationException(SR2.SendReceiveExtensionNotFound));
|
|||
|
}
|
|||
|
|
|||
|
// Now that the bookmark has been resumed, clear out the workflow variable holding its value.
|
|||
|
this.extensionReceiveBookmark.Set(executionContext, null);
|
|||
|
|
|||
|
MessageContext messageContext = state as MessageContext;
|
|||
|
if (messageContext == null)
|
|||
|
{
|
|||
|
throw FxTrace.Exception.AsError(new InvalidOperationException(SR2.InvalidDataFromReceiveBookmarkState(this.OperationName)));
|
|||
|
}
|
|||
|
|
|||
|
this.Message.Set(executionContext, messageContext.Message);
|
|||
|
this.ProcessReceiveMessageTrace(executionContext, messageContext.EndToEndTracingId);
|
|||
|
this.InitializeCorrelationHandles(executionContext, messageContext.Message.Properties, messageContext.EndToEndTracingId);
|
|||
|
this.ResetTrace(executionContext, InternalReceiveMessage.TraceCorrelationActivityId);
|
|||
|
}
|
|||
|
|
|||
|
void InitializeCorrelationHandles(NativeActivityContext executionContext, MessageProperties messageProperties, Guid e2eTracingId)
|
|||
|
{
|
|||
|
CorrelationHandle ambientHandle = CorrelationHandle.GetAmbientCorrelation(executionContext);
|
|||
|
HostSettings hostSettings = executionContext.GetExtension<SendReceiveExtension>().HostSettings;
|
|||
|
|
|||
|
if (this.IsReceiveReply)
|
|||
|
{
|
|||
|
// Client side ReceiveReply.
|
|||
|
|
|||
|
MessagingActivityHelper.InitializeCorrelationHandles(executionContext, null, ambientHandle, this.correlationInitializers, messageProperties);
|
|||
|
|
|||
|
// Set InstanceKey on ContextCorrelation/Ambient handle.
|
|||
|
InstanceKey contextCorrelationInstanceKey;
|
|||
|
if (this.TryGetContextCorrelationInstanceKey(hostSettings, messageProperties, out contextCorrelationInstanceKey))
|
|||
|
{
|
|||
|
CorrelationHandle contextCorrelationHandle = CorrelationHandle.GetExplicitContextCorrelation(executionContext, this.correlationInitializers);
|
|||
|
MessagingActivityHelper.InitializeCorrelationHandles(executionContext, contextCorrelationHandle, ambientHandle, null, contextCorrelationInstanceKey, null);
|
|||
|
}
|
|||
|
|
|||
|
// ensure we clear the transient correlation handle so that it can be reused by subsequent request-reply pairs
|
|||
|
if (ambientHandle != null)
|
|||
|
{
|
|||
|
ambientHandle.TransientInstanceKey = null;
|
|||
|
}
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
// Server side receive. Can be a one-way Receive or a Receive-SendReply.
|
|||
|
CorrelationHandle requestReplyHandle = CorrelationHandle.GetExplicitRequestReplyCorrelation(executionContext, this.correlationInitializers);
|
|||
|
|
|||
|
if (requestReplyHandle == null && ambientHandle == null && !this.IsOneWay)
|
|||
|
{
|
|||
|
throw FxTrace.Exception.AsError(new InvalidOperationException(
|
|||
|
SR2.ReceiveMessageNeedsToPairWithSendMessageForTwoWayContract(this.OperationName)));
|
|||
|
}
|
|||
|
|
|||
|
if (requestReplyHandle != null && this.IsOneWay)
|
|||
|
{
|
|||
|
throw FxTrace.Exception.AsError(new InvalidOperationException(SR2.RequestReplyHandleShouldNotBePresentForOneWay));
|
|||
|
}
|
|||
|
|
|||
|
CorrelationHandle correlatesWithHandle;
|
|||
|
this.TryGetCorrelatesWithHandle(executionContext, out correlatesWithHandle);
|
|||
|
|
|||
|
MessagingActivityHelper.InitializeCorrelationHandles(executionContext, correlatesWithHandle, ambientHandle, this.correlationInitializers, messageProperties);
|
|||
|
|
|||
|
if (!this.IsOneWay)
|
|||
|
{
|
|||
|
InstanceKey requestReplyCorrelationInstanceKey;
|
|||
|
if (this.TryGetRequestReplyCorrelationInstanceKey(messageProperties, out requestReplyCorrelationInstanceKey))
|
|||
|
{
|
|||
|
MessagingActivityHelper.InitializeCorrelationHandles(executionContext, requestReplyHandle, ambientHandle, null, requestReplyCorrelationInstanceKey, null);
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
if (requestReplyHandle != null)
|
|||
|
{
|
|||
|
throw FxTrace.Exception.AsError(new InvalidOperationException(SR2.FailedToInitializeRequestReplyCorrelationHandle(this.OperationName)));
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
this.UpdateE2ETracingId(e2eTracingId, correlatesWithHandle, ambientHandle, requestReplyHandle);
|
|||
|
|
|||
|
// Set InstanceKey on CallbackCorrelation/Ambient handle.
|
|||
|
InstanceKey callbackContextCorrelationInstanceKey;
|
|||
|
if (this.TryGetCallbackContextCorrelationInstanceKey(hostSettings, messageProperties, out callbackContextCorrelationInstanceKey))
|
|||
|
{
|
|||
|
CorrelationHandle callbackContextCorrelationHandle = CorrelationHandle.GetExplicitCallbackCorrelation(executionContext, this.correlationInitializers);
|
|||
|
MessagingActivityHelper.InitializeCorrelationHandles(executionContext, callbackContextCorrelationHandle, ambientHandle, null, callbackContextCorrelationInstanceKey, null);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
void UpdateE2ETracingId(Guid e2eTracingId, CorrelationHandle correlatesWith, CorrelationHandle ambientHandle, CorrelationHandle requestReplyHandle)
|
|||
|
{
|
|||
|
if (correlatesWith != null)
|
|||
|
{
|
|||
|
correlatesWith.E2ETraceId = e2eTracingId;
|
|||
|
}
|
|||
|
else if (ambientHandle != null)
|
|||
|
{
|
|||
|
ambientHandle.E2ETraceId = e2eTracingId;
|
|||
|
}
|
|||
|
else if (requestReplyHandle != null)
|
|||
|
{
|
|||
|
requestReplyHandle.E2ETraceId = e2eTracingId;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
bool TryGetCallbackContextCorrelationInstanceKey(HostSettings hostSettings, MessageProperties messageProperties, out InstanceKey callbackContextCorrelationInstanceKey)
|
|||
|
{
|
|||
|
callbackContextCorrelationInstanceKey = null;
|
|||
|
CallbackContextMessageProperty callbackContext;
|
|||
|
if (CallbackContextMessageProperty.TryGet(messageProperties, out callbackContext))
|
|||
|
{
|
|||
|
if (callbackContext.Context != null)
|
|||
|
{
|
|||
|
string instanceId = null;
|
|||
|
if (callbackContext.Context.TryGetValue(InstanceIdKey, out instanceId))
|
|||
|
{
|
|||
|
IDictionary<string, string> keyData = new Dictionary<string, string>(1)
|
|||
|
{
|
|||
|
{ WSContextInstanceIdName, instanceId }
|
|||
|
};
|
|||
|
|
|||
|
callbackContextCorrelationInstanceKey = new CorrelationKey(keyData, hostSettings.ScopeName, null);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
return callbackContextCorrelationInstanceKey != null;
|
|||
|
}
|
|||
|
|
|||
|
bool TryGetContextCorrelationInstanceKey(HostSettings hostSettings, MessageProperties messageProperties, out InstanceKey correlationContextInstanceKey)
|
|||
|
{
|
|||
|
correlationContextInstanceKey = null;
|
|||
|
|
|||
|
ContextMessageProperty contextProperties = null;
|
|||
|
if (ContextMessageProperty.TryGet(messageProperties, out contextProperties))
|
|||
|
{
|
|||
|
if (contextProperties.Context != null)
|
|||
|
{
|
|||
|
string instanceId = null;
|
|||
|
if (contextProperties.Context.TryGetValue(InstanceIdKey, out instanceId))
|
|||
|
{
|
|||
|
IDictionary<string, string> keyData = new Dictionary<string, string>(1)
|
|||
|
{
|
|||
|
{ WSContextInstanceIdName, instanceId }
|
|||
|
};
|
|||
|
|
|||
|
correlationContextInstanceKey = new CorrelationKey(keyData, hostSettings.ScopeName, null);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
return correlationContextInstanceKey != null;
|
|||
|
}
|
|||
|
|
|||
|
bool TryGetRequestReplyCorrelationInstanceKey(MessageProperties messageProperties, out InstanceKey instanceKey)
|
|||
|
{
|
|||
|
instanceKey = null;
|
|||
|
CorrelationMessageProperty correlationMessageProperty;
|
|||
|
if (messageProperties.TryGetValue<CorrelationMessageProperty>(CorrelationMessageProperty.Name, out correlationMessageProperty))
|
|||
|
{
|
|||
|
foreach (InstanceKey key in correlationMessageProperty.TransientCorrelations)
|
|||
|
{
|
|||
|
InstanceValue value;
|
|||
|
if (key.Metadata.TryGetValue(WorkflowServiceNamespace.RequestReplyCorrelation, out value))
|
|||
|
{
|
|||
|
instanceKey = key;
|
|||
|
break;
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
return instanceKey != null;
|
|||
|
}
|
|||
|
|
|||
|
bool TryGetCorrelatesWithHandle(NativeActivityContext context, out CorrelationHandle correlationHandle)
|
|||
|
{
|
|||
|
correlationHandle = null;
|
|||
|
if (this.CorrelatesWith != null)
|
|||
|
{
|
|||
|
correlationHandle = this.CorrelatesWith.Get(context);
|
|||
|
}
|
|||
|
|
|||
|
return correlationHandle != null;
|
|||
|
}
|
|||
|
|
|||
|
[DataContract]
|
|||
|
internal class VolatileReceiveMessageInstance
|
|||
|
{
|
|||
|
public VolatileReceiveMessageInstance()
|
|||
|
{
|
|||
|
}
|
|||
|
|
|||
|
// Note that we do not mark this DataMember since we don<6F>t want it to be serialized
|
|||
|
public ReceiveMessageInstanceData Instance { get; set; }
|
|||
|
}
|
|||
|
|
|||
|
// This class defines the instance data that is saved in a variable. This will be initialized with null, and only be
|
|||
|
// used to pass data around during the execution. It is not intended to be persisted, thus it is not marked with
|
|||
|
// DataContract and DataMemeber.
|
|||
|
internal class ReceiveMessageInstanceData
|
|||
|
{
|
|||
|
bool triedAmbientCorrelation;
|
|||
|
CorrelationHandle ambientCorrelation;
|
|||
|
|
|||
|
public ReceiveMessageInstanceData(CorrelationRequestContext requestContext)
|
|||
|
{
|
|||
|
Fx.Assert(requestContext != null, "requestContext is a required parameter");
|
|||
|
this.CorrelationRequestContext = requestContext;
|
|||
|
}
|
|||
|
|
|||
|
public ReceiveMessageInstanceData(CorrelationResponseContext responseContext)
|
|||
|
{
|
|||
|
Fx.Assert(responseContext != null, "responseContext is a required parameter");
|
|||
|
this.CorrelationResponseContext = responseContext;
|
|||
|
this.CorrelationCallbackContext =
|
|||
|
MessagingActivityHelper.CreateCorrelationCallbackContext(responseContext.WorkflowOperationContext.OperationContext.IncomingMessageProperties);
|
|||
|
}
|
|||
|
|
|||
|
// For the client-receive case. Saves the context retrieved from the handle
|
|||
|
public CorrelationRequestContext CorrelationRequestContext
|
|||
|
{
|
|||
|
get;
|
|||
|
private set;
|
|||
|
}
|
|||
|
|
|||
|
// For the server-receive case. The context that will be used to by the following send.
|
|||
|
public CorrelationResponseContext CorrelationResponseContext
|
|||
|
{
|
|||
|
get;
|
|||
|
private set;
|
|||
|
}
|
|||
|
|
|||
|
public CorrelationCallbackContext CorrelationCallbackContext
|
|||
|
{
|
|||
|
get;
|
|||
|
private set;
|
|||
|
}
|
|||
|
|
|||
|
public CorrelationContext CorrelationContext
|
|||
|
{
|
|||
|
get;
|
|||
|
private set;
|
|||
|
}
|
|||
|
|
|||
|
public Guid AmbientActivityId
|
|||
|
{
|
|||
|
get;
|
|||
|
set;
|
|||
|
}
|
|||
|
|
|||
|
public CorrelationHandle GetAmbientCorrelation(NativeActivityContext context)
|
|||
|
{
|
|||
|
if (this.triedAmbientCorrelation)
|
|||
|
{
|
|||
|
return this.ambientCorrelation;
|
|||
|
}
|
|||
|
|
|||
|
this.triedAmbientCorrelation = true;
|
|||
|
this.ambientCorrelation = context.Properties.Find(CorrelationHandle.StaticExecutionPropertyName) as CorrelationHandle;
|
|||
|
return this.ambientCorrelation;
|
|||
|
}
|
|||
|
|
|||
|
public void SetAmbientCorrelation(CorrelationHandle ambientCorrelation)
|
|||
|
{
|
|||
|
Fx.Assert(!this.triedAmbientCorrelation, "can only set ambient correlation once");
|
|||
|
this.ambientCorrelation = ambientCorrelation;
|
|||
|
this.triedAmbientCorrelation = true;
|
|||
|
}
|
|||
|
|
|||
|
internal OperationContext GetOperationContext()
|
|||
|
{
|
|||
|
if (this.CorrelationRequestContext != null)
|
|||
|
{
|
|||
|
return this.CorrelationRequestContext.OperationContext;
|
|||
|
}
|
|||
|
else if (this.CorrelationResponseContext != null)
|
|||
|
{
|
|||
|
return this.CorrelationResponseContext.WorkflowOperationContext.OperationContext;
|
|||
|
}
|
|||
|
|
|||
|
return null;
|
|||
|
|
|||
|
}
|
|||
|
|
|||
|
public void InitializeContextAndCallbackContext()
|
|||
|
{
|
|||
|
Fx.Assert(this.CorrelationRequestContext.Reply != null, "Reply message cannot be null for context and callback!");
|
|||
|
|
|||
|
this.CorrelationCallbackContext =
|
|||
|
MessagingActivityHelper.CreateCorrelationCallbackContext(this.CorrelationRequestContext.Reply.Properties);
|
|||
|
// this is the context that the server must have send back in the initial hand-shake
|
|||
|
this.CorrelationContext =
|
|||
|
MessagingActivityHelper.CreateCorrelationContext(this.CorrelationRequestContext.Reply.Properties);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
class ReceiveMessageState
|
|||
|
{
|
|||
|
public Transaction CurrentTransaction
|
|||
|
{
|
|||
|
get;
|
|||
|
set;
|
|||
|
}
|
|||
|
|
|||
|
public ReceiveMessageInstanceData Instance
|
|||
|
{
|
|||
|
get;
|
|||
|
set;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
class WaitForReply : AsyncCodeActivity
|
|||
|
{
|
|||
|
public WaitForReply()
|
|||
|
{
|
|||
|
}
|
|||
|
|
|||
|
public InArgument<VolatileReceiveMessageInstance> Instance
|
|||
|
{
|
|||
|
get;
|
|||
|
set;
|
|||
|
}
|
|||
|
|
|||
|
protected override void CacheMetadata(CodeActivityMetadata metadata)
|
|||
|
{
|
|||
|
RuntimeArgument instanceArgument = new RuntimeArgument("Instance", typeof(VolatileReceiveMessageInstance), ArgumentDirection.In);
|
|||
|
if (this.Instance == null)
|
|||
|
{
|
|||
|
this.Instance = new InArgument<VolatileReceiveMessageInstance>();
|
|||
|
}
|
|||
|
metadata.Bind(this.Instance, instanceArgument);
|
|||
|
|
|||
|
metadata.SetArgumentsCollection(
|
|||
|
new Collection<RuntimeArgument>
|
|||
|
{
|
|||
|
instanceArgument
|
|||
|
});
|
|||
|
}
|
|||
|
|
|||
|
protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
|
|||
|
{
|
|||
|
VolatileReceiveMessageInstance volatileInstance = this.Instance.Get(context);
|
|||
|
|
|||
|
return new WaitForReplyAsyncResult(volatileInstance.Instance, callback, state);
|
|||
|
}
|
|||
|
|
|||
|
protected override void EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
|
|||
|
{
|
|||
|
WaitForReplyAsyncResult.End(result);
|
|||
|
}
|
|||
|
|
|||
|
protected override void Cancel(AsyncCodeActivityContext context)
|
|||
|
{
|
|||
|
VolatileReceiveMessageInstance volatileInstance = this.Instance.Get(context);
|
|||
|
volatileInstance.Instance.CorrelationRequestContext.Cancel();
|
|||
|
|
|||
|
base.Cancel(context);
|
|||
|
}
|
|||
|
|
|||
|
class WaitForReplyAsyncResult : AsyncResult
|
|||
|
{
|
|||
|
static Action<object, TimeoutException> onReceiveReply;
|
|||
|
|
|||
|
public WaitForReplyAsyncResult(ReceiveMessageInstanceData instance, AsyncCallback callback, object state)
|
|||
|
: base(callback, state)
|
|||
|
{
|
|||
|
if (onReceiveReply == null)
|
|||
|
{
|
|||
|
onReceiveReply = new Action<object, TimeoutException>(OnReceiveReply);
|
|||
|
}
|
|||
|
|
|||
|
if (instance.CorrelationRequestContext.WaitForReplyAsync(onReceiveReply, this))
|
|||
|
{
|
|||
|
Complete(true);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
public static void End(IAsyncResult result)
|
|||
|
{
|
|||
|
AsyncResult.End<WaitForReplyAsyncResult>(result);
|
|||
|
}
|
|||
|
|
|||
|
static void OnReceiveReply(object state, TimeoutException timeoutException)
|
|||
|
{
|
|||
|
WaitForReplyAsyncResult thisPtr = (WaitForReplyAsyncResult)state;
|
|||
|
thisPtr.Complete(false);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|