//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ using System; using System.Collections; using System.Collections.Generic; using System.Text; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Globalization; using System.Workflow.ComponentModel; using System.Runtime.Serialization; using System.Messaging; namespace System.Workflow.Runtime { [Obsolete("The System.Workflow.* types are deprecated. Instead, please use the new types from System.Activities.*")] public class WorkflowQueuingService { Object syncRoot = new Object(); IWorkflowCoreRuntime rootWorkflowExecutor; List dirtyQueues; EventQueueState pendingQueueState = new EventQueueState(); Dictionary persistedQueueStates; // event handler used by atomic execution context's Q service for message delivery List messageArrivalEventHandlers; // set for inner queuing service WorkflowQueuingService rootQueuingService; // Runtime information visible to host, stored on the root activity [SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes", Justification = "Design has been approved. This is a false positive. DependencyProperty is an immutable type.")] public readonly static DependencyProperty PendingMessagesProperty = DependencyProperty.RegisterAttached("PendingMessages", typeof(Queue), typeof(WorkflowQueuingService), new PropertyMetadata(DependencyPropertyOptions.NonSerialized)); // Persisted state properties internal static DependencyProperty RootPersistedQueueStatesProperty = DependencyProperty.RegisterAttached("RootPersistedQueueStates", typeof(Dictionary), typeof(WorkflowQueuingService)); internal static DependencyProperty LocalPersistedQueueStatesProperty = DependencyProperty.RegisterAttached("LocalPersistedQueueStates", typeof(Dictionary), typeof(WorkflowQueuingService)); private const string pendingNotification = "*PendingNotifications"; // Snapshots created during pre-persist and dumped during post-persist // If persistence fails, changes made to queuing service during pre-persist must be undone // in post-persist. // Created for ref. 20575. private Dictionary persistedQueueStatesSnapshot = null; private EventQueueState pendingQueueStateSnapshot = null; // root Q service constructor internal WorkflowQueuingService(IWorkflowCoreRuntime rootWorkflowExecutor) { this.rootWorkflowExecutor = rootWorkflowExecutor; this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.PendingMessagesProperty, this.pendingQueueState.Messages); this.persistedQueueStates = (Dictionary)this.rootWorkflowExecutor.RootActivity.GetValue(WorkflowQueuingService.RootPersistedQueueStatesProperty); if (this.persistedQueueStates == null) { this.persistedQueueStates = new Dictionary(); this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.RootPersistedQueueStatesProperty, this.persistedQueueStates); } if (!this.Exists(pendingNotification)) this.CreateWorkflowQueue(pendingNotification, false); } // inner Q service constructor internal WorkflowQueuingService(WorkflowQueuingService copyFromQueuingService) { this.rootQueuingService = copyFromQueuingService; this.rootWorkflowExecutor = copyFromQueuingService.rootWorkflowExecutor; this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.PendingMessagesProperty, this.pendingQueueState.Messages); this.persistedQueueStates = new Dictionary(); this.rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.LocalPersistedQueueStatesProperty, this.persistedQueueStates); SubscribeForRootMessageDelivery(); } public WorkflowQueue CreateWorkflowQueue(IComparable queueName, bool transactional) { if (queueName == null) throw new ArgumentNullException("queueName"); lock (SyncRoot) { // if not transactional create one at the root // so it is visible outside this transaction if (this.rootQueuingService != null && !transactional) { return this.rootQueuingService.CreateWorkflowQueue(queueName, false); } NewQueue(queueName, true, transactional); return new WorkflowQueue(this, queueName); } } public void DeleteWorkflowQueue(IComparable queueName) { if (queueName == null) throw new ArgumentNullException("queueName"); lock (SyncRoot) { // when we are deleting the queue from activity // message delivery should not happen. if (this.rootQueuingService != null && !IsTransactionalQueue(queueName)) { this.rootQueuingService.DeleteWorkflowQueue(queueName); return; } EventQueueState queueState = GetEventQueueState(queueName); Queue queue = queueState.Messages; Queue pendingQueue = this.pendingQueueState.Messages; while (queue.Count != 0) { pendingQueue.Enqueue(queue.Dequeue()); } WorkflowTrace.Runtime.TraceInformation("Queuing Service: Deleting Queue with ID {0} for {1}", queueName.GetHashCode(), queueName); this.persistedQueueStates.Remove(queueName); } } public bool Exists(IComparable queueName) { if (queueName == null) throw new ArgumentNullException("queueName"); lock (SyncRoot) { if (this.rootQueuingService != null && !IsTransactionalQueue(queueName)) { return this.rootQueuingService.Exists(queueName); } return this.persistedQueueStates.ContainsKey(queueName); } } public WorkflowQueue GetWorkflowQueue(IComparable queueName) { if (queueName == null) throw new ArgumentNullException("queueName"); lock (SyncRoot) { if (this.rootQueuingService != null && !IsTransactionalQueue(queueName)) { return this.rootQueuingService.GetWorkflowQueue(queueName); } GetEventQueueState(queueName); return new WorkflowQueue(this, queueName); } } #region internal functions internal Object SyncRoot { get { return syncRoot; } } internal void EnqueueEvent(IComparable queueName, Object item) { if (queueName == null) throw new ArgumentNullException("queueName"); lock (SyncRoot) { if (this.rootQueuingService != null && !IsTransactionalQueue(queueName)) { this.rootQueuingService.EnqueueEvent(queueName, item); return; } EventQueueState qState = GetQueue(queueName); if (!qState.Enabled) { throw new QueueException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueNotEnabled, queueName), MessageQueueErrorCode.QueueNotAvailable); } // note enqueue allowed irrespective of dirty flag since it is delivered through qState.Messages.Enqueue(item); WorkflowTrace.Runtime.TraceInformation("Queuing Service: Enqueue item Queue ID {0} for {1}", queueName.GetHashCode(), queueName); // notify message arrived subscribers for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i) { this.messageArrivalEventHandlers[i].OnItemEnqueued(queueName, item); } NotifyExternalSubscribers(queueName, qState, item); } } internal bool SafeEnqueueEvent(IComparable queueName, Object item) { if (queueName == null) throw new ArgumentNullException("queueName"); lock (SyncRoot) { if (this.rootQueuingService != null && !IsTransactionalQueue(queueName)) { return this.rootQueuingService.SafeEnqueueEvent(queueName, item); } EventQueueState qState = GetQueue(queueName); if (!qState.Enabled) { throw new QueueException(String.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueNotEnabled, queueName), MessageQueueErrorCode.QueueNotAvailable); } // note enqueue allowed irrespective of dirty flag since it is delivered through qState.Messages.Enqueue(item); WorkflowTrace.Runtime.TraceInformation("Queuing Service: Enqueue item Queue ID {0} for {1}", queueName.GetHashCode(), queueName); // notify message arrived subscribers for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i) { this.messageArrivalEventHandlers[i].OnItemSafeEnqueued(queueName, item); } NotifySynchronousSubscribers(queueName, qState, item); return QueueAsynchronousEvent(queueName, qState); } } internal object Peek(IComparable queueName) { if (queueName == null) throw new ArgumentNullException("queueName"); lock (SyncRoot) { if (this.rootQueuingService != null && !IsTransactionalQueue(queueName)) { return this.rootQueuingService.Peek(queueName); } EventQueueState queueState = GetEventQueueState(queueName); if (queueState.Messages.Count != 0) return queueState.Messages.Peek(); object[] args = new object[] { System.Messaging.MessageQueueErrorCode.MessageNotFound, queueName }; string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args); throw new QueueException(message, MessageQueueErrorCode.MessageNotFound); } } internal Object DequeueEvent(IComparable queueName) { if (queueName == null) throw new ArgumentNullException("queueName"); lock (SyncRoot) { if (this.rootQueuingService != null && !IsTransactionalQueue(queueName)) { return this.rootQueuingService.DequeueEvent(queueName); } EventQueueState queueState = GetEventQueueState(queueName); if (queueState.Messages.Count != 0) return queueState.Messages.Dequeue(); object[] args = new object[] { System.Messaging.MessageQueueErrorCode.MessageNotFound, queueName }; string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args); throw new QueueException(message, MessageQueueErrorCode.MessageNotFound); } } internal EventQueueState GetQueueState(IComparable eventType) { lock (SyncRoot) { return GetQueue(eventType); } } Activity caller; internal Activity CallingActivity { get { if (this.rootQueuingService != null) return this.rootQueuingService.CallingActivity; return this.caller; } set { if (this.rootQueuingService != null) this.rootQueuingService.CallingActivity = value; this.caller = value; } } private bool QueueAsynchronousEvent(IComparable queueName, EventQueueState qState) { if (qState.AsynchronousListeners.Count != 0 || IsNestedListenersExist(queueName)) { Queue q = GetQueue(pendingNotification).Messages; q.Enqueue(new KeyValuePair(queueName, qState)); WorkflowTrace.Runtime.TraceInformation("Queuing Service: Queued delayed message notification for '{0}'", queueName.ToString()); return q.Count == 1; } return false; } bool IsNestedListenersExist(IComparable queueName) { for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i) { WorkflowQueuingService qService = messageArrivalEventHandlers[i]; EventQueueState queueState = null; if (qService.persistedQueueStates.TryGetValue(queueName, out queueState) && queueState.AsynchronousListeners.Count != 0) return true; } return false; } internal void ProcessesQueuedAsynchronousEvents() { Queue q = GetQueue(pendingNotification).Messages; while (q.Count > 0) { KeyValuePair pair = (KeyValuePair)q.Dequeue(); // notify message arrived subscribers WorkflowTrace.Runtime.TraceInformation("Queuing Service: Processing delayed message notification '{0}'", pair.Key.ToString()); for (int i = 0; messageArrivalEventHandlers != null && i < messageArrivalEventHandlers.Count; ++i) { WorkflowQueuingService service = this.messageArrivalEventHandlers[i]; if (service.persistedQueueStates.ContainsKey(pair.Key)) { EventQueueState qState = service.GetQueue(pair.Key); if (qState.Enabled) { service.NotifyAsynchronousSubscribers(pair.Key, qState, 1); } } } NotifyAsynchronousSubscribers(pair.Key, pair.Value, 1); } } internal void NotifyAsynchronousSubscribers(IComparable queueName, EventQueueState qState, int numberOfNotification) { for (int i = 0; i < numberOfNotification; ++i) { QueueEventArgs args = new QueueEventArgs(queueName); lock (SyncRoot) { foreach (ActivityExecutorDelegateInfo subscriber in qState.AsynchronousListeners) { Activity contextActivity = rootWorkflowExecutor.GetContextActivityForId(subscriber.ContextId); Debug.Assert(contextActivity != null); subscriber.InvokeDelegate(contextActivity, args, false); WorkflowTrace.Runtime.TraceInformation("Queuing Service: Notifying async subscriber on queue:'{0}' activity:{1}", queueName.ToString(), subscriber.ActivityQualifiedName); } } } } /// /// At termination/completion point, need to move messages from all queues to the pending queue /// internal void MoveAllMessagesToPendingQueue() { lock (SyncRoot) { Queue pendingQueue = this.pendingQueueState.Messages; foreach (EventQueueState queueState in this.persistedQueueStates.Values) { Queue queue = queueState.Messages; while (queue.Count != 0) { pendingQueue.Enqueue(queue.Dequeue()); } } } } #endregion #region private root q service helpers private EventQueueState GetEventQueueState(IComparable queueName) { EventQueueState queueState = GetQueue(queueName); if (queueState.Dirty) { string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueBusyException, new object[] { queueName }); throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable); } return queueState; } private void NewQueue(IComparable queueID, bool enabled, bool transactional) { WorkflowTrace.Runtime.TraceInformation("Queuing Service: Creating new Queue with ID {0} for {1}", queueID.GetHashCode(), queueID); if (this.persistedQueueStates.ContainsKey(queueID)) { object[] args = new object[] { System.Messaging.MessageQueueErrorCode.QueueExists, queueID }; string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args); throw new QueueException(message, MessageQueueErrorCode.QueueExists); } EventQueueState queueState = new EventQueueState(); queueState.Enabled = enabled; queueState.queueName = queueID; queueState.Transactional = transactional; this.persistedQueueStates.Add(queueID, queueState); } internal EventQueueState GetQueue(IComparable queueID) { EventQueueState queue; if (this.persistedQueueStates.TryGetValue(queueID, out queue)) { queue.queueName = queueID; return queue; } object[] args = new object[] { System.Messaging.MessageQueueErrorCode.QueueNotFound, queueID }; string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, args); throw new QueueException(message, MessageQueueErrorCode.QueueNotFound); } internal IEnumerable QueueNames { get { List list = new List(this.persistedQueueStates.Keys); foreach (IComparable name in list) { if (name is String && (String)name == pendingNotification) { list.Remove(name); break; } } return list; } } private void ApplyChangesFrom(EventQueueState srcPendingQueueState, Dictionary srcPersistedQueueStates) { lock (SyncRoot) { Dictionary modifiedItems = new Dictionary(); foreach (KeyValuePair mergeItem in srcPersistedQueueStates) { Debug.Assert(mergeItem.Value.Transactional, "Queue inside a transactional context is not transactional!"); if (mergeItem.Value.Transactional) { if (this.persistedQueueStates.ContainsKey(mergeItem.Key)) { EventQueueState oldvalue = this.persistedQueueStates[mergeItem.Key]; if (!oldvalue.Dirty) { // we could get here when there // are conflicting create Qs string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.QueueBusyException, new object[] { mergeItem.Key }); throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable); } } modifiedItems.Add(mergeItem.Key, mergeItem.Value); } } // no conflicts detected now make the updates visible foreach (KeyValuePair modifiedItem in modifiedItems) { // shared queue in the root, swap out to new value // or add new item this.persistedQueueStates[modifiedItem.Key] = modifiedItem.Value; } this.pendingQueueState.CopyFrom(srcPendingQueueState); } } // message arrival async notification private void NotifyExternalSubscribers(IComparable queueName, EventQueueState qState, Object eventInstance) { NotifySynchronousSubscribers(queueName, qState, eventInstance); NotifyAsynchronousSubscribers(queueName, qState, 1); } private void NotifySynchronousSubscribers(IComparable queueName, EventQueueState qState, Object eventInstance) { QueueEventArgs args = new QueueEventArgs(queueName); for (int i = 0; i < qState.SynchronousListeners.Count; ++i) { if (qState.SynchronousListeners[i].HandlerDelegate != null) qState.SynchronousListeners[i].HandlerDelegate(new WorkflowQueue(this, queueName), args); else qState.SynchronousListeners[i].EventListener.OnEvent(new WorkflowQueue(this, queueName), args); } } // returns a valid state only if transactional and entry exists private EventQueueState MarkQueueDirtyIfTransactional(IComparable queueName) { lock (SyncRoot) { Debug.Assert(this.rootQueuingService == null, "MarkQueueDirty should be done at root"); if (!this.persistedQueueStates.ContainsKey(queueName)) return null; EventQueueState queueState = GetQueue(queueName); if (!queueState.Transactional) return null; if (queueState.Dirty) return queueState; // already marked queueState.Dirty = true; if (this.dirtyQueues == null) this.dirtyQueues = new List(); // add to the list of dirty queues this.dirtyQueues.Add(queueName); return queueState; } } private void AddMessageArrivedEventHandler(WorkflowQueuingService handler) { lock (SyncRoot) { if (this.messageArrivalEventHandlers == null) this.messageArrivalEventHandlers = new List(); this.messageArrivalEventHandlers.Add(handler); } } private void RemoveMessageArrivedEventHandler(WorkflowQueuingService handler) { lock (SyncRoot) { if (this.messageArrivalEventHandlers != null) this.messageArrivalEventHandlers.Remove(handler); if (this.dirtyQueues != null) { foreach (IComparable queueName in this.dirtyQueues) { EventQueueState qState = GetQueue(queueName); qState.Dirty = false; } } } } #endregion #region inner QueuingService functions private bool IsTransactionalQueue(IComparable queueName) { // check inner service for existense if (!this.persistedQueueStates.ContainsKey(queueName)) { EventQueueState queueState = this.rootQueuingService.MarkQueueDirtyIfTransactional(queueName); if (queueState != null) { // if transactional proceed to the inner queue service // for this operation after adding the state EventQueueState snapshotState = new EventQueueState(); snapshotState.CopyFrom(queueState); this.persistedQueueStates.Add(queueName, snapshotState); return true; } return false; } return true; // if entry exits, it must be transactional } private void SubscribeForRootMessageDelivery() { if (this.rootQueuingService == null) return; this.rootQueuingService.AddMessageArrivedEventHandler(this); } private void UnSubscribeFromRootMessageDelivery() { if (this.rootQueuingService == null) return; this.rootQueuingService.RemoveMessageArrivedEventHandler(this); } // listen on its internal(parent) queuing service // messages and pull messages. There is one parent queuing service visible to the external // host environment. A queueing service snapshot exists per atomic scope and external messages // for existing queues need to be pushed through private void OnItemEnqueued(IComparable queueName, object item) { if (this.persistedQueueStates.ContainsKey(queueName)) { // make the message visible to inner queueing service EventQueueState qState = GetQueue(queueName); if (!qState.Enabled) { object[] msgArgs = new object[] { System.Messaging.MessageQueueErrorCode.QueueNotFound, queueName }; string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, msgArgs); throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable); } qState.Messages.Enqueue(item); NotifyExternalSubscribers(queueName, qState, item); } } private void OnItemSafeEnqueued(IComparable queueName, object item) { if (this.persistedQueueStates.ContainsKey(queueName)) { // make the message visible to inner queueing service EventQueueState qState = GetQueue(queueName); if (!qState.Enabled) { object[] msgArgs = new object[] { System.Messaging.MessageQueueErrorCode.QueueNotFound, queueName }; string message = string.Format(CultureInfo.CurrentCulture, ExecutionStringManager.EventQueueException, msgArgs); throw new QueueException(message, MessageQueueErrorCode.QueueNotAvailable); } qState.Messages.Enqueue(item); NotifySynchronousSubscribers(queueName, qState, item); } } internal void Complete(bool commitSucceeded) { if (commitSucceeded) { this.rootQueuingService.ApplyChangesFrom(this.pendingQueueState, this.persistedQueueStates); } UnSubscribeFromRootMessageDelivery(); } #endregion #region Pre-persist and Post-persist helpers for queuing service states // Created for ref. 20575 internal void PostPersist(bool isPersistSuccessful) { // If persist is unsuccessful, we'll undo the changes done // because of the call to .Complete() in PrePresist if (!isPersistSuccessful) { Debug.Assert(rootWorkflowExecutor.CurrentAtomicActivity != null); Debug.Assert(pendingQueueStateSnapshot != null); Debug.Assert(persistedQueueStatesSnapshot != null); TransactionalProperties transactionalProperties = rootWorkflowExecutor.CurrentAtomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty) as TransactionalProperties; Debug.Assert(transactionalProperties != null); // Restore queuing states and set root activity's dependency properties to the new values. pendingQueueState = pendingQueueStateSnapshot; persistedQueueStates = persistedQueueStatesSnapshot; rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.RootPersistedQueueStatesProperty, persistedQueueStatesSnapshot); rootWorkflowExecutor.RootActivity.SetValue(WorkflowQueuingService.PendingMessagesProperty, pendingQueueStateSnapshot.Messages); // Also call Subscribe...() because the .Complete() call called Unsubscribe transactionalProperties.LocalQueuingService.SubscribeForRootMessageDelivery(); } // The backups are no longer necessary. // The next call to PrePresistQueuingServiceState() will do a re-backup. persistedQueueStatesSnapshot = null; pendingQueueStateSnapshot = null; } // Created for ref. 20575 internal void PrePersist() { if (rootWorkflowExecutor.CurrentAtomicActivity != null) { // Create transactionalProperties from currentAtomicActivity TransactionalProperties transactionalProperties = this.rootWorkflowExecutor.CurrentAtomicActivity.GetValue(WorkflowExecutor.TransactionalPropertiesProperty) as TransactionalProperties; // Create backup snapshot of root queuing service's persistedQueuesStates // qService.persistedQueueStates is changed when LocalQueuingService.Complete is called later. persistedQueueStatesSnapshot = new Dictionary(); foreach (KeyValuePair kv in persistedQueueStates) { EventQueueState individualPersistedQueueStateValue = new EventQueueState(); individualPersistedQueueStateValue.CopyFrom(kv.Value); persistedQueueStatesSnapshot.Add(kv.Key, individualPersistedQueueStateValue); } // Create backup snapshot of root queuing service's pendingQueueState // qService.pendingQueueState is changed when LocalQueuingService.Complete is called later. pendingQueueStateSnapshot = new EventQueueState(); pendingQueueStateSnapshot.CopyFrom(pendingQueueState); // Reconcile differences between root and local queuing services. transactionalProperties.LocalQueuingService.Complete(true); } } #endregion Pre-persist and post-persist helpers for queuing service states } }