738 lines
31 KiB
C#
738 lines
31 KiB
C#
|
//------------------------------------------------------------
|
||
|
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||
|
//------------------------------------------------------------
|
||
|
using System;
|
||
|
using System.Collections;
|
||
|
using System.Collections.Generic;
|
||
|
using System.Text;
|
||
|
using System.Diagnostics;
|
||
|
using System.Diagnostics.CodeAnalysis;
|
||
|
using System.Globalization;
|
||
|
using System.Workflow.ComponentModel;
|
||
|
using System.Runtime.Serialization;
|
||
|
using System.Messaging;
|
||
|
|
||
|
namespace System.Workflow.Runtime
|
||
|
{
|
||
|
[Obsolete("The System.Workflow.* types are deprecated. Instead, please use the new types from System.Activities.*")]
|
||
|
public class WorkflowQueuingService
|
||
|
{
|
||
|
Object syncRoot = new Object();
|
||
|
IWorkflowCoreRuntime rootWorkflowExecutor;
|
||
|
List<IComparable> dirtyQueues;
|
||
|
EventQueueState pendingQueueState = new EventQueueState();
|
||
|
Dictionary<IComparable, EventQueueState> persistedQueueStates;
|
||
|
|
||
|
// event handler used by atomic execution context's Q service for message delivery
|
||
|
List<WorkflowQueuingService> messageArrivalEventHandlers;
|
||
|
|
||
|
// set for inner queuing service
|
||
|
WorkflowQueuingService rootQueuingService;
|
||
|
|
||
|
// Runtime information visible to host, stored on the root activity
|
||
|
[SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes",
|
||
|
Justification = "Design has been approved. This is a false positive. DependencyProperty is an immutable type.")]
|
||
|
public readonly static DependencyProperty PendingMessagesProperty = DependencyProperty.RegisterAttached("PendingMessages", typeof(Queue), typeof(WorkflowQueuingService), new PropertyMetadata(DependencyPropertyOptions.NonSerialized));
|
||
|
|
||
|
// Persisted state properties
|
||
|
internal static DependencyProperty RootPersistedQueueStatesProperty = DependencyProperty.RegisterAttached("RootPersistedQueueStates", typeof(Dictionary<IComparable, EventQueueState>), typeof(WorkflowQueuingService));
|
||
|
internal static DependencyProperty LocalPersistedQueueStatesProperty = DependencyProperty.RegisterAttached("LocalPersistedQueueStates", typeof(Dictionary<IComparable, EventQueueState>), typeof(WorkflowQueuingService));
|
||
|
private const string pendingNotification = "*PendingNotifications";
|
||
|
|
||
|
// Snapshots created during pre-persist and dumped during post-persist
|
||
|
// If persistence fails, changes made to queuing service during pre-persist must be undone
|
||
|
// in post-persist.
|
||
|
// Created for ref. 20575.
|
||
|
private Dictionary<IComparable, EventQueueState> persistedQueueStatesSnapshot = null;
|
||
|
private EventQueueState pendingQueueStateSnapshot = null;
|
||
|
|
||
|
// root Q service constructor
|
||
|
internal WorkflowQueuingService(IWorkflowCoreRuntime rootWorkflowExecutor)
|
||
|
{
|
||
|
this.rootWorkflowExecutor = rootWorkflowExecutor;
|
||
|
this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.PendingMessagesProperty, this.pendingQueueState.Messages);
|
||
|
this.persistedQueueStates = (Dictionary<IComparable, EventQueueState>)this.rootWorkflowExecutor.RootActivity.GetValue(WorkflowQueuingService.RootPersistedQueueStatesProperty);
|
||
|
if (this.persistedQueueStates == null)
|
||
|
{
|
||
|
this.persistedQueueStates = new Dictionary<IComparable, EventQueueState>();
|
||
|
this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.RootPersistedQueueStatesProperty, this.persistedQueueStates);
|
||
|
}
|
||
|
if (!this.Exists(pendingNotification))
|
||
|
this.CreateWorkflowQueue(pendingNotification, false);
|
||
|
}
|
||
|
|
||
|
// inner Q service constructor
|
||
|
internal WorkflowQueuingService(WorkflowQueuingService copyFromQueuingService)
|
||
|
{
|
||
|
this.rootQueuingService = copyFromQueuingService;
|
||
|
this.rootWorkflowExecutor = copyFromQueuingService.rootWorkflowExecutor;
|
||
|
this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.PendingMessagesProperty, this.pendingQueueState.Messages);
|
||
|
this.persistedQueueStates = new Dictionary<IComparable, EventQueueState>();
|
||
|
this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.LocalPersistedQueueStatesProperty, this.persistedQueueStates);
|
||
|
SubscribeForRootMessageDelivery();
|
||
|
}
|
||
|
|
||
|
public WorkflowQueue CreateWorkflowQueue(IComparable queueName, bool transactional)
|
||
|
{
|
||
|
if (queueName == null)
|
||
|
throw new ArgumentNullException("queueName");
|
||
|
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
// if not transactional create one at the root
|
||
|
// so it is visible outside this transaction
|
||
|
if (this.rootQueuingService != null && !transactional)
|
||
|
{
|
||
|
return this.rootQueuingService.CreateWorkflowQueue(queueName, false);
|
||
|
}
|
||
|
|
||
|
NewQueue(queueName, true, transactional);
|
||
|
|
||
|
return new WorkflowQueue(this, queueName);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public void DeleteWorkflowQueue(IComparable queueName)
|
||
|
{
|
||
|
if (queueName == null)
|
||
|
throw new ArgumentNullException("queueName");
|
||
|
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
// when we are deleting the queue from activity
|
||
|
// message delivery should not happen.
|
||
|
if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
|
||
|
{
|
||
|
this.rootQueuingService.DeleteWorkflowQueue(queueName);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
EventQueueState queueState = GetEventQueueState(queueName);
|
||
|
|
||
|
Queue queue = queueState.Messages;
|
||
|
Queue pendingQueue = this.pendingQueueState.Messages;
|
||
|
|
||
|
while (queue.Count != 0)
|
||
|
{
|
||
|
pendingQueue.Enqueue(queue.Dequeue());
|
||
|
}
|
||
|
|
||
|
WorkflowTrace.Runtime.TraceInformation("Queuing Service: Deleting Queue with ID {0} for {1}", queueName.GetHashCode(), queueName);
|
||
|
this.persistedQueueStates.Remove(queueName);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public bool Exists(IComparable queueName)
|
||
|
{
|
||
|
if (queueName == null)
|
||
|
throw new ArgumentNullException("queueName");
|
||
|
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
|
||
|
{
|
||
|
return this.rootQueuingService.Exists(queueName);
|
||
|
}
|
||
|
|
||
|
return this.persistedQueueStates.ContainsKey(queueName);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public WorkflowQueue GetWorkflowQueue(IComparable queueName)
|
||
|
{
|
||
|
if (queueName == null)
|
||
|
throw new ArgumentNullException("queueName");
|
||
|
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
|
||
|
{
|
||
|
return this.rootQueuingService.GetWorkflowQueue(queueName);
|
||
|
}
|
||
|
|
||
|
GetEventQueueState(queueName);
|
||
|
|
||
|
return new WorkflowQueue(this, queueName);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#region internal functions
|
||
|
|
||
|
internal Object SyncRoot
|
||
|
{
|
||
|
get { return syncRoot; }
|
||
|
}
|
||
|
|
||
|
internal void EnqueueEvent(IComparable queueName, Object item)
|
||
|
{
|
||
|
if (queueName == null)
|
||
|
throw new ArgumentNullException("queueName");
|
||
|
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
|
||
|
{
|
||
|
this.rootQueuingService.EnqueueEvent(queueName, item);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
EventQueueState qState = GetQueue(queueName);
|
||
|
if (!qState.Enabled)
|
||
|
{
|
||
|
throw new QueueException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueNotEnabled, queueName), MessageQueueErrorCode.QueueNotAvailable);
|
||
|
}
|
||
|
|
||
|
// note enqueue allowed irrespective of dirty flag since it is delivered through
|
||
|
qState.Messages.Enqueue(item);
|
||
|
|
||
|
WorkflowTrace.Runtime.TraceInformation("Queuing Service: Enqueue item Queue ID {0} for {1}", queueName.GetHashCode(), queueName);
|
||
|
|
||
|
// notify message arrived subscribers
|
||
|
for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i)
|
||
|
{
|
||
|
this.messageArrivalEventHandlers[i].OnItemEnqueued(queueName, item);
|
||
|
}
|
||
|
|
||
|
NotifyExternalSubscribers(queueName, qState, item);
|
||
|
}
|
||
|
}
|
||
|
internal bool SafeEnqueueEvent(IComparable queueName, Object item)
|
||
|
{
|
||
|
if (queueName == null)
|
||
|
throw new ArgumentNullException("queueName");
|
||
|
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
|
||
|
{
|
||
|
return this.rootQueuingService.SafeEnqueueEvent(queueName, item);
|
||
|
}
|
||
|
|
||
|
EventQueueState qState = GetQueue(queueName);
|
||
|
if (!qState.Enabled)
|
||
|
{
|
||
|
throw new QueueException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueNotEnabled, queueName), MessageQueueErrorCode.QueueNotAvailable);
|
||
|
}
|
||
|
|
||
|
// note enqueue allowed irrespective of dirty flag since it is delivered through
|
||
|
qState.Messages.Enqueue(item);
|
||
|
|
||
|
WorkflowTrace.Runtime.TraceInformation("Queuing Service: Enqueue item Queue ID {0} for {1}", queueName.GetHashCode(), queueName);
|
||
|
|
||
|
// notify message arrived subscribers
|
||
|
for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i)
|
||
|
{
|
||
|
this.messageArrivalEventHandlers[i].OnItemSafeEnqueued(queueName, item);
|
||
|
}
|
||
|
|
||
|
NotifySynchronousSubscribers(queueName, qState, item);
|
||
|
return QueueAsynchronousEvent(queueName, qState);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
internal object Peek(IComparable queueName)
|
||
|
{
|
||
|
if (queueName == null)
|
||
|
throw new ArgumentNullException("queueName");
|
||
|
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
|
||
|
{
|
||
|
return this.rootQueuingService.Peek(queueName);
|
||
|
}
|
||
|
|
||
|
EventQueueState queueState = GetEventQueueState(queueName);
|
||
|
if (queueState.Messages.Count != 0)
|
||
|
return queueState.Messages.Peek();
|
||
|
|
||
|
object[] args = new object[] { System.Messaging.MessageQueueErrorCode.MessageNotFound, queueName };
|
||
|
string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args);
|
||
|
|
||
|
throw new QueueException(message, MessageQueueErrorCode.MessageNotFound);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
internal Object DequeueEvent(IComparable queueName)
|
||
|
{
|
||
|
if (queueName == null)
|
||
|
throw new ArgumentNullException("queueName");
|
||
|
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
if (this.rootQueuingService != null && !IsTransactionalQueue(queueName))
|
||
|
{
|
||
|
return this.rootQueuingService.DequeueEvent(queueName);
|
||
|
}
|
||
|
|
||
|
EventQueueState queueState = GetEventQueueState(queueName);
|
||
|
if (queueState.Messages.Count != 0)
|
||
|
return queueState.Messages.Dequeue();
|
||
|
|
||
|
object[] args = new object[] { System.Messaging.MessageQueueErrorCode.MessageNotFound, queueName };
|
||
|
string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args);
|
||
|
|
||
|
throw new QueueException(message, MessageQueueErrorCode.MessageNotFound);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
internal EventQueueState GetQueueState(IComparable eventType)
|
||
|
{
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
return GetQueue(eventType);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
Activity caller;
|
||
|
|
||
|
internal Activity CallingActivity
|
||
|
{
|
||
|
get
|
||
|
{
|
||
|
if (this.rootQueuingService != null)
|
||
|
return this.rootQueuingService.CallingActivity;
|
||
|
return this.caller;
|
||
|
}
|
||
|
set
|
||
|
{
|
||
|
if (this.rootQueuingService != null)
|
||
|
this.rootQueuingService.CallingActivity = value;
|
||
|
|
||
|
this.caller = value;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
private bool QueueAsynchronousEvent(IComparable queueName, EventQueueState qState)
|
||
|
{
|
||
|
if (qState.AsynchronousListeners.Count != 0 || IsNestedListenersExist(queueName))
|
||
|
{
|
||
|
Queue q = GetQueue(pendingNotification).Messages;
|
||
|
q.Enqueue(new KeyValuePair<IComparable, EventQueueState>(queueName, qState));
|
||
|
WorkflowTrace.Runtime.TraceInformation("Queuing Service: Queued delayed message notification for '{0}'", queueName.ToString());
|
||
|
return q.Count == 1;
|
||
|
}
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
bool IsNestedListenersExist(IComparable queueName)
|
||
|
{
|
||
|
for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i)
|
||
|
{
|
||
|
WorkflowQueuingService qService = messageArrivalEventHandlers[i];
|
||
|
EventQueueState queueState = null;
|
||
|
|
||
|
if (qService.persistedQueueStates.TryGetValue(queueName, out queueState) &&
|
||
|
queueState.AsynchronousListeners.Count != 0)
|
||
|
return true;
|
||
|
}
|
||
|
return false;
|
||
|
}
|
||
|
internal void ProcessesQueuedAsynchronousEvents()
|
||
|
{
|
||
|
Queue q = GetQueue(pendingNotification).Messages;
|
||
|
while (q.Count > 0)
|
||
|
{
|
||
|
KeyValuePair<IComparable, EventQueueState> pair = (KeyValuePair<IComparable, EventQueueState>)q.Dequeue();
|
||
|
// notify message arrived subscribers
|
||
|
WorkflowTrace.Runtime.TraceInformation("Queuing Service: Processing delayed message notification '{0}'", pair.Key.ToString());
|
||
|
for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i)
|
||
|
{
|
||
|
WorkflowQueuingService service = this.messageArrivalEventHandlers[i];
|
||
|
if (service.persistedQueueStates.ContainsKey(pair.Key))
|
||
|
{
|
||
|
EventQueueState qState = service.GetQueue(pair.Key);
|
||
|
if (qState.Enabled)
|
||
|
{
|
||
|
service.NotifyAsynchronousSubscribers(pair.Key, qState, 1);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
NotifyAsynchronousSubscribers(pair.Key, pair.Value, 1);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
internal void NotifyAsynchronousSubscribers(IComparable queueName, EventQueueState qState, int numberOfNotification)
|
||
|
{
|
||
|
for (int i = 0; i < numberOfNotification; ++i)
|
||
|
{
|
||
|
QueueEventArgs args = new QueueEventArgs(queueName);
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
foreach (ActivityExecutorDelegateInfo<QueueEventArgs> subscriber in qState.AsynchronousListeners)
|
||
|
{
|
||
|
Activity contextActivity = rootWorkflowExecutor.GetContextActivityForId(subscriber.ContextId);
|
||
|
Debug.Assert(contextActivity != null);
|
||
|
subscriber.InvokeDelegate(contextActivity, args, false);
|
||
|
WorkflowTrace.Runtime.TraceInformation("Queuing Service: Notifying async subscriber on queue:'{0}' activity:{1}", queueName.ToString(), subscriber.ActivityQualifiedName);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// At termination/completion point, need to move messages from all queues to the pending queue
|
||
|
/// </summary>
|
||
|
internal void MoveAllMessagesToPendingQueue()
|
||
|
{
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
Queue pendingQueue = this.pendingQueueState.Messages;
|
||
|
foreach (EventQueueState queueState in this.persistedQueueStates.Values)
|
||
|
{
|
||
|
Queue queue = queueState.Messages;
|
||
|
while (queue.Count != 0)
|
||
|
{
|
||
|
pendingQueue.Enqueue(queue.Dequeue());
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#endregion
|
||
|
|
||
|
#region private root q service helpers
|
||
|
|
||
|
private EventQueueState GetEventQueueState(IComparable queueName)
|
||
|
{
|
||
|
EventQueueState queueState = GetQueue(queueName);
|
||
|
if (queueState.Dirty)
|
||
|
{
|
||
|
string message =
|
||
|
string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueBusyException, new object[] { queueName });
|
||
|
|
||
|
throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable);
|
||
|
}
|
||
|
|
||
|
return queueState;
|
||
|
}
|
||
|
|
||
|
private void NewQueue(IComparable queueID, bool enabled, bool transactional)
|
||
|
{
|
||
|
WorkflowTrace.Runtime.TraceInformation("Queuing Service: Creating new Queue with ID {0} for {1}", queueID.GetHashCode(), queueID);
|
||
|
|
||
|
if (this.persistedQueueStates.ContainsKey(queueID))
|
||
|
{
|
||
|
object[] args =
|
||
|
new object[] { System.Messaging.MessageQueueErrorCode.QueueExists, queueID };
|
||
|
string message =
|
||
|
string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args);
|
||
|
|
||
|
throw new QueueException(message, MessageQueueErrorCode.QueueExists);
|
||
|
}
|
||
|
|
||
|
EventQueueState queueState = new EventQueueState();
|
||
|
queueState.Enabled = enabled;
|
||
|
queueState.queueName = queueID;
|
||
|
queueState.Transactional = transactional;
|
||
|
this.persistedQueueStates.Add(queueID, queueState);
|
||
|
}
|
||
|
|
||
|
internal EventQueueState GetQueue(IComparable queueID)
|
||
|
{
|
||
|
EventQueueState queue;
|
||
|
if (this.persistedQueueStates.TryGetValue(queueID, out queue))
|
||
|
{
|
||
|
queue.queueName = queueID;
|
||
|
return queue;
|
||
|
}
|
||
|
|
||
|
object[] args =
|
||
|
new object[] { System.Messaging.MessageQueueErrorCode.QueueNotFound, queueID };
|
||
|
string message =
|
||
|
string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args);
|
||
|
|
||
|
throw new QueueException(message, MessageQueueErrorCode.QueueNotFound);
|
||
|
}
|
||
|
|
||
|
internal IEnumerable<IComparable> QueueNames
|
||
|
{
|
||
|
get
|
||
|
{
|
||
|
List<IComparable> list = new List<IComparable>(this.persistedQueueStates.Keys);
|
||
|
foreach (IComparable name in list)
|
||
|
{
|
||
|
if (name is String && (String)name == pendingNotification)
|
||
|
{
|
||
|
list.Remove(name);
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
return list;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
private void ApplyChangesFrom(EventQueueState srcPendingQueueState, Dictionary<IComparable, EventQueueState> srcPersistedQueueStates)
|
||
|
{
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
Dictionary<IComparable, EventQueueState> modifiedItems = new Dictionary<IComparable, EventQueueState>();
|
||
|
|
||
|
foreach (KeyValuePair<IComparable, EventQueueState> mergeItem in srcPersistedQueueStates)
|
||
|
{
|
||
|
Debug.Assert(mergeItem.Value.Transactional, "Queue inside a transactional context is not transactional!");
|
||
|
|
||
|
if (mergeItem.Value.Transactional)
|
||
|
{
|
||
|
if (this.persistedQueueStates.ContainsKey(mergeItem.Key))
|
||
|
{
|
||
|
EventQueueState oldvalue = this.persistedQueueStates[mergeItem.Key];
|
||
|
if (!oldvalue.Dirty)
|
||
|
{
|
||
|
// we could get here when there
|
||
|
// are conflicting create Qs
|
||
|
string message =
|
||
|
string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueBusyException, new object[] { mergeItem.Key });
|
||
|
|
||
|
throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable);
|
||
|
}
|
||
|
}
|
||
|
modifiedItems.Add(mergeItem.Key, mergeItem.Value);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// no conflicts detected now make the updates visible
|
||
|
foreach (KeyValuePair<IComparable, EventQueueState> modifiedItem in modifiedItems)
|
||
|
{
|
||
|
// shared queue in the root, swap out to new value
|
||
|
// or add new item
|
||
|
this.persistedQueueStates[modifiedItem.Key] = modifiedItem.Value;
|
||
|
}
|
||
|
|
||
|
this.pendingQueueState.CopyFrom(srcPendingQueueState);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// message arrival async notification
|
||
|
private void NotifyExternalSubscribers(IComparable queueName, EventQueueState qState, Object eventInstance)
|
||
|
{
|
||
|
NotifySynchronousSubscribers(queueName, qState, eventInstance);
|
||
|
NotifyAsynchronousSubscribers(queueName, qState, 1);
|
||
|
}
|
||
|
|
||
|
private void NotifySynchronousSubscribers(IComparable queueName, EventQueueState qState, Object eventInstance)
|
||
|
{
|
||
|
QueueEventArgs args = new QueueEventArgs(queueName);
|
||
|
|
||
|
for (int i = 0; i < qState.SynchronousListeners.Count; ++i)
|
||
|
{
|
||
|
if (qState.SynchronousListeners[i].HandlerDelegate != null)
|
||
|
qState.SynchronousListeners[i].HandlerDelegate(new WorkflowQueue(this, queueName), args);
|
||
|
else
|
||
|
qState.SynchronousListeners[i].EventListener.OnEvent(new WorkflowQueue(this, queueName), args);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// returns a valid state only if transactional and entry exists
|
||
|
private EventQueueState MarkQueueDirtyIfTransactional(IComparable queueName)
|
||
|
{
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
Debug.Assert(this.rootQueuingService == null, "MarkQueueDirty should be done at root");
|
||
|
|
||
|
if (!this.persistedQueueStates.ContainsKey(queueName))
|
||
|
return null;
|
||
|
|
||
|
EventQueueState queueState = GetQueue(queueName);
|
||
|
|
||
|
if (!queueState.Transactional)
|
||
|
return null;
|
||
|
|
||
|
if (queueState.Dirty)
|
||
|
return queueState; // already marked
|
||
|
|
||
|
queueState.Dirty = true;
|
||
|
|
||
|
if (this.dirtyQueues == null)
|
||
|
this.dirtyQueues = new List<IComparable>();
|
||
|
|
||
|
// add to the list of dirty queues
|
||
|
this.dirtyQueues.Add(queueName);
|
||
|
|
||
|
return queueState;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
private void AddMessageArrivedEventHandler(WorkflowQueuingService handler)
|
||
|
{
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
if (this.messageArrivalEventHandlers == null)
|
||
|
this.messageArrivalEventHandlers = new List<WorkflowQueuingService>();
|
||
|
this.messageArrivalEventHandlers.Add(handler);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
private void RemoveMessageArrivedEventHandler(WorkflowQueuingService handler)
|
||
|
{
|
||
|
lock (SyncRoot)
|
||
|
{
|
||
|
if (this.messageArrivalEventHandlers != null)
|
||
|
this.messageArrivalEventHandlers.Remove(handler);
|
||
|
|
||
|
if (this.dirtyQueues != null)
|
||
|
{
|
||
|
foreach (IComparable queueName in this.dirtyQueues)
|
||
|
{
|
||
|
EventQueueState qState = GetQueue(queueName);
|
||
|
qState.Dirty = false;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
#endregion
|
||
|
|
||
|
#region inner QueuingService functions
|
||
|
private bool IsTransactionalQueue(IComparable queueName)
|
||
|
{
|
||
|
// check inner service for existense
|
||
|
if (!this.persistedQueueStates.ContainsKey(queueName))
|
||
|
{
|
||
|
EventQueueState queueState = this.rootQueuingService.MarkQueueDirtyIfTransactional(queueName);
|
||
|
|
||
|
if (queueState != null)
|
||
|
{
|
||
|
// if transactional proceed to the inner queue service
|
||
|
// for this operation after adding the state
|
||
|
EventQueueState snapshotState = new EventQueueState();
|
||
|
snapshotState.CopyFrom(queueState);
|
||
|
this.persistedQueueStates.Add(queueName, snapshotState);
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
return true; // if entry exits, it must be transactional
|
||
|
}
|
||
|
|
||
|
private void SubscribeForRootMessageDelivery()
|
||
|
{
|
||
|
if (this.rootQueuingService == null)
|
||
|
return;
|
||
|
this.rootQueuingService.AddMessageArrivedEventHandler(this);
|
||
|
}
|
||
|
|
||
|
private void UnSubscribeFromRootMessageDelivery()
|
||
|
{
|
||
|
if (this.rootQueuingService == null)
|
||
|
return;
|
||
|
this.rootQueuingService.RemoveMessageArrivedEventHandler(this);
|
||
|
}
|
||
|
|
||
|
// listen on its internal(parent) queuing service
|
||
|
// messages and pull messages. There is one parent queuing service visible to the external
|
||
|
// host environment. A queueing service snapshot exists per atomic scope and external messages
|
||
|
// for existing queues need to be pushed through
|
||
|
private void OnItemEnqueued(IComparable queueName, object item)
|
||
|
{
|
||
|
if (this.persistedQueueStates.ContainsKey(queueName))
|
||
|
{
|
||
|
// make the message visible to inner queueing service
|
||
|
EventQueueState qState = GetQueue(queueName);
|
||
|
if (!qState.Enabled)
|
||
|
{
|
||
|
object[] msgArgs = new object[] { System.Messaging.MessageQueueErrorCode.QueueNotFound, queueName };
|
||
|
string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, msgArgs);
|
||
|
throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable);
|
||
|
}
|
||
|
qState.Messages.Enqueue(item);
|
||
|
NotifyExternalSubscribers(queueName, qState, item);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
private void OnItemSafeEnqueued(IComparable queueName, object item)
|
||
|
{
|
||
|
if (this.persistedQueueStates.ContainsKey(queueName))
|
||
|
{
|
||
|
// make the message visible to inner queueing service
|
||
|
EventQueueState qState = GetQueue(queueName);
|
||
|
if (!qState.Enabled)
|
||
|
{
|
||
|
object[] msgArgs = new object[] { System.Messaging.MessageQueueErrorCode.QueueNotFound, queueName };
|
||
|
string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, msgArgs);
|
||
|
throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable);
|
||
|
}
|
||
|
qState.Messages.Enqueue(item);
|
||
|
NotifySynchronousSubscribers(queueName, qState, item);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
internal void Complete(bool commitSucceeded)
|
||
|
{
|
||
|
if (commitSucceeded)
|
||
|
{
|
||
|
this.rootQueuingService.ApplyChangesFrom(this.pendingQueueState, this.persistedQueueStates);
|
||
|
}
|
||
|
|
||
|
UnSubscribeFromRootMessageDelivery();
|
||
|
}
|
||
|
#endregion
|
||
|
|
||
|
#region Pre-persist and Post-persist helpers for queuing service states
|
||
|
|
||
|
// Created for ref. 20575
|
||
|
internal void PostPersist(bool isPersistSuccessful)
|
||
|
{
|
||
|
// If persist is unsuccessful, we'll undo the changes done
|
||
|
// because of the call to .Complete() in PrePresist
|
||
|
if (!isPersistSuccessful)
|
||
|
{
|
||
|
Debug.Assert(rootWorkflowExecutor.CurrentAtomicActivity != null);
|
||
|
Debug.Assert(pendingQueueStateSnapshot != null);
|
||
|
Debug.Assert(persistedQueueStatesSnapshot != null);
|
||
|
|
||
|
TransactionalProperties transactionalProperties = rootWorkflowExecutor.CurrentAtomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty) as TransactionalProperties;
|
||
|
Debug.Assert(transactionalProperties != null);
|
||
|
|
||
|
// Restore queuing states and set root activity's dependency properties to the new values.
|
||
|
pendingQueueState = pendingQueueStateSnapshot;
|
||
|
persistedQueueStates = persistedQueueStatesSnapshot;
|
||
|
rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.RootPersistedQueueStatesProperty, persistedQueueStatesSnapshot);
|
||
|
rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.PendingMessagesProperty, pendingQueueStateSnapshot.Messages);
|
||
|
|
||
|
// Also call Subscribe...() because the .Complete() call called Unsubscribe
|
||
|
transactionalProperties.LocalQueuingService.SubscribeForRootMessageDelivery();
|
||
|
}
|
||
|
|
||
|
// The backups are no longer necessary.
|
||
|
// The next call to PrePresistQueuingServiceState() will do a re-backup.
|
||
|
persistedQueueStatesSnapshot = null;
|
||
|
pendingQueueStateSnapshot = null;
|
||
|
}
|
||
|
|
||
|
// Created for ref. 20575
|
||
|
internal void PrePersist()
|
||
|
{
|
||
|
if (rootWorkflowExecutor.CurrentAtomicActivity != null)
|
||
|
{
|
||
|
// Create transactionalProperties from currentAtomicActivity
|
||
|
TransactionalProperties transactionalProperties = this.rootWorkflowExecutor.CurrentAtomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty) as TransactionalProperties;
|
||
|
|
||
|
// Create backup snapshot of root queuing service's persistedQueuesStates
|
||
|
// qService.persistedQueueStates is changed when LocalQueuingService.Complete is called later.
|
||
|
persistedQueueStatesSnapshot = new Dictionary<IComparable, EventQueueState>();
|
||
|
foreach (KeyValuePair<IComparable, EventQueueState> kv in persistedQueueStates)
|
||
|
{
|
||
|
EventQueueState individualPersistedQueueStateValue = new EventQueueState();
|
||
|
individualPersistedQueueStateValue.CopyFrom(kv.Value);
|
||
|
persistedQueueStatesSnapshot.Add(kv.Key, individualPersistedQueueStateValue);
|
||
|
}
|
||
|
|
||
|
// Create backup snapshot of root queuing service's pendingQueueState
|
||
|
// qService.pendingQueueState is changed when LocalQueuingService.Complete is called later.
|
||
|
pendingQueueStateSnapshot = new EventQueueState();
|
||
|
pendingQueueStateSnapshot.CopyFrom(pendingQueueState);
|
||
|
|
||
|
// Reconcile differences between root and local queuing services.
|
||
|
transactionalProperties.LocalQueuingService.Complete(true);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
#endregion Pre-persist and post-persist helpers for queuing service states
|
||
|
}
|
||
|
}
|
||
|
|