using System; using System.Collections; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Collections.Specialized; using System.Text; using System.IO; using System.Runtime.Serialization; using System.Runtime.Serialization.Formatters.Binary; using System.Data.Common; using System.Data; using System.Data.SqlClient; using System.Timers; using System.Diagnostics; using System.Reflection; using System.Workflow.Runtime; using System.Workflow.ComponentModel; using System.Workflow.Runtime.Hosting; using System.Text.RegularExpressions; using System.Threading; using System.Transactions; using System.Globalization; using System.Workflow.ComponentModel.Serialization; using System.ComponentModel.Design.Serialization; using System.Xml; using System.Configuration; namespace System.Workflow.Runtime.Tracking { [Obsolete("The System.Workflow.* types are deprecated. Instead, please use the new types from System.Activities.*")] public sealed class SqlTrackingService : TrackingService, IProfileNotification { #region Private/Protected Members private bool _isTrans = true; private bool _partition = false; private bool _defaultProfile = true; private bool _enableRetries = false; private bool _ignoreCommonEnableRetries = false; private DateTime _lastProfileCheck; private System.Timers.Timer _timer = new System.Timers.Timer(); private double _interval = 60000; //private static int _deadlock = 1205; private TypeKeyedCollection _types = new TypeKeyedCollection(); private object _typeCacheLock = new object(); private WorkflowCommitWorkBatchService _transactionService; private DbResourceAllocator _dbResourceAllocator; private static Version UnknownProfileVersionId = new Version(0, 0); // Saved from constructor input to be used in service start initialization private NameValueCollection _parameters; string _unvalidatedConnectionString; private delegate void ExecuteRetriedDelegate(object param); #endregion #region Configuration Properties public string ConnectionString { get { return _unvalidatedConnectionString; } } /// /// Determines if tracking data should be held and transactionally written to the database at persistence points. /// /// public bool IsTransactional { get { return _isTrans; } set { _isTrans = value; } } /// /// Indicates that records should be moved from the active instance tables to the appropriate parition tables when the instance completes. /// public bool PartitionOnCompletion { get { return _partition; } set { _partition = value; } } /// /// Determines if the default profile should be used for workflow types that do not have a profile specified for them. /// /// public bool UseDefaultProfile { get { return _defaultProfile; } set { _defaultProfile = value; } } /// /// The time interval, in milliseconds, at which to check the database for changes to profiles. /// Default is 60000. /// /// /// Setting the interval results in the next check to occur the specified number of millisecond /// from the time at which the property is set. /// public double ProfileChangeCheckInterval { get { return _interval; } set { if (value <= 0) throw new ArgumentException(ExecutionStringManager.InvalidProfileCheckValue); _interval = value; // // Set the timer's interval. // This will reset the timer _timer.Interval = _interval; } } public bool EnableRetries { get { return _enableRetries; } set { _enableRetries = value; _ignoreCommonEnableRetries = true; } } internal DbResourceAllocator DbResourceAllocator { get { return this._dbResourceAllocator; } } #endregion #region Construction public SqlTrackingService(string connectionString) { if (String.IsNullOrEmpty(connectionString)) throw new ArgumentNullException("connectionString", ExecutionStringManager.MissingConnectionString); _unvalidatedConnectionString = connectionString; } public SqlTrackingService(NameValueCollection parameters) { if (parameters == null) throw new ArgumentNullException("parameters", ExecutionStringManager.MissingParameters); if (parameters.Count > 0) { foreach (string key in parameters.Keys) { if (0 == string.Compare("IsTransactional", key, StringComparison.OrdinalIgnoreCase)) _isTrans = bool.Parse(parameters[key]); else if (0 == string.Compare("UseDefaultProfile", key, StringComparison.OrdinalIgnoreCase)) _defaultProfile = bool.Parse(parameters[key]); else if (0 == string.Compare("PartitionOnCompletion", key, StringComparison.OrdinalIgnoreCase)) _partition = bool.Parse(parameters[key]); else if (0 == string.Compare("ProfileChangeCheckInterval", key, StringComparison.OrdinalIgnoreCase)) { _interval = double.Parse(parameters[key], NumberFormatInfo.InvariantInfo); if (_interval <= 0) throw new ArgumentException(ExecutionStringManager.InvalidProfileCheckValue); } else if (0 == string.Compare("ConnectionString", key, StringComparison.OrdinalIgnoreCase)) _unvalidatedConnectionString = parameters[key]; else if (0 == string.Compare("EnableRetries", key, StringComparison.OrdinalIgnoreCase)) { _enableRetries = bool.Parse(parameters[key]); _ignoreCommonEnableRetries = true; } } } _parameters = parameters; } #endregion #region WorkflowRuntimeService override protected internal void Start() { _lastProfileCheck = DateTime.UtcNow; _dbResourceAllocator = new DbResourceAllocator(this.Runtime, _parameters, _unvalidatedConnectionString); // Check connection string mismatch if using SharedConnectionWorkflowTransactionService _transactionService = this.Runtime.GetService(); _dbResourceAllocator.DetectSharedConnectionConflict(_transactionService); // // If we didn't find a local value for enable retries // check in the common section if ((!_ignoreCommonEnableRetries) && (null != base.Runtime)) { NameValueConfigurationCollection commonConfigurationParameters = base.Runtime.CommonParameters; if (commonConfigurationParameters != null) { // Then scan for connection string in the common configuration parameters section foreach (string key in commonConfigurationParameters.AllKeys) { if (string.Compare("EnableRetries", key, StringComparison.OrdinalIgnoreCase) == 0) { _enableRetries = bool.Parse(commonConfigurationParameters[key].Value); break; } } } } _timer.Interval = _interval; _timer.AutoReset = false; // ensure that only one timer thread is checking for profile changes at a time _timer.Elapsed += new ElapsedEventHandler(CheckProfileChanges); _timer.Start(); base.Start(); } #endregion WorkflowRuntimeService #region IProfileNotification Implementation protected internal override TrackingChannel GetTrackingChannel(TrackingParameters parameters) { if (null == parameters) throw new ArgumentNullException("parameters"); // // Return a new channel for this instance // Give it the parameters and this to store return new SqlTrackingChannel(parameters, this); } public event EventHandler ProfileUpdated; public event EventHandler ProfileRemoved; protected internal override TrackingProfile GetProfile(Type workflowType, Version profileVersion) { if (null == workflowType) throw new ArgumentNullException("workflowType"); // parameter wantToCreateDefault = false: // looking for a specific version that has already been running with this instance; don't use a default here return GetProfileByScheduleType(workflowType, profileVersion, false); } protected internal override bool TryGetProfile(Type workflowType, out TrackingProfile profile) { if (null == workflowType) throw new ArgumentNullException("workflowType"); profile = GetProfileByScheduleType(workflowType, SqlTrackingService.UnknownProfileVersionId, _defaultProfile); if (null == profile) return false; else return true; } protected internal override TrackingProfile GetProfile(Guid scheduleInstanceId) { TrackingProfile profile = null; GetProfile(scheduleInstanceId, out profile); return profile; } private bool GetProfile(Guid scheduleInstanceId, out TrackingProfile profile) { profile = null; DbCommand cmd = this._dbResourceAllocator.NewCommand(); cmd.CommandType = CommandType.StoredProcedure; cmd.CommandText = "[dbo].[GetInstanceTrackingProfile]"; cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter("@InstanceId", scheduleInstanceId)); DbDataReader reader = null; try { reader = ExecuteReaderRetried(cmd, CommandBehavior.CloseConnection); // // Should only reach here in non exception state if (!reader.HasRows) { // // Didn't find a specific profile for this instance reader.Close(); profile = null; return false; } else { if (!reader.Read()) { reader.Close(); profile = null; return false; } if (reader.IsDBNull(0)) profile = null; else { string tmp = reader.GetString(0); TrackingProfileSerializer serializer = new TrackingProfileSerializer(); StringReader pReader = null; try { pReader = new StringReader(tmp); profile = serializer.Deserialize(pReader); } finally { if (null != pReader) pReader.Close(); } } return true; } } finally { if ((null != reader) && (!reader.IsClosed)) reader.Close(); if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State)) cmd.Connection.Close(); } } protected internal override bool TryReloadProfile(Type workflowType, Guid scheduleInstanceId, out TrackingProfile profile) { if (null == workflowType) throw new ArgumentNullException("workflowType"); bool found = GetProfile(scheduleInstanceId, out profile); if (found) return true; else { profile = null; return false; } } #endregion #region Profile Management Methods private void CheckProfileChanges(object sender, ElapsedEventArgs e) { DbCommand cmd = null; DbDataReader reader = null; try { if ((null == ProfileUpdated) && (null == ProfileRemoved)) return; // no one to notify Debug.WriteLine("Checking for updated profiles..."); cmd = this._dbResourceAllocator.NewCommand(); cmd.CommandText = "GetUpdatedTrackingProfiles"; cmd.CommandType = CommandType.StoredProcedure; cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter("@LastCheckDateTime", _lastProfileCheck)); DbParameter param = this._dbResourceAllocator.NewDbParameter(); param.ParameterName = "@MaxCheckDateTime"; param.DbType = DbType.DateTime; param.Direction = System.Data.ParameterDirection.Output; cmd.Parameters.Add(param); reader = ExecuteReaderRetried(cmd, CommandBehavior.CloseConnection); // // No changes if (!reader.HasRows) return; while (reader.Read()) { Type t = null; string tmp = null; TrackingProfile profile = null; t = Assembly.Load(reader[1] as string).GetType(reader[0] as string); if (null == t) continue; tmp = reader[2] as string; if (null == tmp) { if (null != ProfileRemoved) ProfileRemoved(this, new ProfileRemovedEventArgs(t)); } else { TrackingProfileSerializer serializer = new TrackingProfileSerializer(); StringReader pReader = null; try { pReader = new StringReader(tmp); profile = serializer.Deserialize(pReader); } finally { if (null != pReader) pReader.Close(); } if (null != ProfileUpdated) ProfileUpdated(this, new ProfileUpdatedEventArgs(t, profile)); } Debug.WriteLine(ExecutionStringManager.UpdatedProfile + t.FullName); } } finally { if ((null != reader) && (!reader.IsClosed)) reader.Close(); // // This should never be null/empty unless the proc failed which should throw if (null != cmd) { // // If the value is null we error'd so keep the same last time for the next check if (null != cmd.Parameters[1].Value) _lastProfileCheck = (DateTime)cmd.Parameters[1].Value; } if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State)) cmd.Connection.Close(); // // Start the timer again (autoreset is false to avoid multiple threads checking for profile changes) _timer.Start(); } } #endregion #region Private Methods private void ExecuteRetried(ExecuteRetriedDelegate executeRetried, object param) { short count = 0; DbRetry dbRetry = new DbRetry(_enableRetries); while (true) { try { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); executeRetried(param); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); break; } catch (Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteRetried caught exception: " + e.ToString()); if (dbRetry.TryDoRetry(ref count)) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteRetried " + executeRetried.Method.Name + " retrying."); continue; } throw; } } } private DbDataReader ExecuteReaderRetried(DbCommand command, CommandBehavior behavior) { DbDataReader reader = null; short count = 0; DbRetry dbRetry = new DbRetry(_enableRetries); while (true) { try { ResetConnectionForCommand(command); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried ExecuteReader start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); reader = command.ExecuteReader(behavior); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried ExecuteReader end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); break; } catch (Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteReaderRetried caught exception from ExecuteReader: " + e.ToString()); if (dbRetry.TryDoRetry(ref count)) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteReaderRetried retrying."); continue; } throw; } } return reader; } private void ExecuteNonQueryRetried(DbCommand command) { short count = 0; DbRetry dbRetry = new DbRetry(_enableRetries); while (true) { try { ResetConnectionForCommand(command); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); command.ExecuteNonQuery(); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); break; } catch (Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteNonQueryRetried caught exception from ExecuteNonQuery: " + e.ToString()); if (dbRetry.TryDoRetry(ref count)) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryRetried retrying."); continue; } throw; } } } private void ExecuteNonQueryWithTxRetried(DbCommand command) { try { short count = 0; DbRetry dbRetry = new DbRetry(_enableRetries); while (true) { try { ResetConnectionForCommand(command); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried ExecuteNonQuery start: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); command.Transaction = command.Connection.BeginTransaction(); command.ExecuteNonQuery(); command.Transaction.Commit(); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried ExecuteNonQuery end: " + DateTime.UtcNow.ToString("G", System.Globalization.CultureInfo.InvariantCulture)); break; } catch (Exception e) { WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried caught exception from ExecuteNonQuery: " + e.ToString()); try { if (null != command.Transaction) command.Transaction.Rollback(); } catch { // // Rollback() can throw, nothing to do but ---- if this happens // so that we don't lose the original exception } if (dbRetry.TryDoRetry(ref count)) { WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, "SqlTrackingService.ExecuteNonQueryWithTxRetried retrying."); continue; } throw; } } } finally { if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State)) command.Connection.Close(); } } private void ResetConnectionForCommand(DbCommand command) { if (null == command) return; if (null != command.Connection) { if (ConnectionState.Open != command.Connection.State) { if (ConnectionState.Closed != command.Connection.State) command.Connection.Close(); command.Connection.Dispose(); command.Connection = _dbResourceAllocator.OpenNewConnectionNoEnlist(); } } } internal static XmlWriter CreateXmlWriter(TextWriter output) { XmlWriterSettings settings = new XmlWriterSettings(); settings.Indent = true; settings.IndentChars = ("\t"); settings.OmitXmlDeclaration = true; settings.CloseOutput = true; return XmlWriter.Create(output as TextWriter, settings); } private TrackingProfile GetProfileByScheduleType(Type workflowType, Version profileVersionId, bool wantToCreateDefault) { DbCommand cmd = this._dbResourceAllocator.NewCommand(); DbDataReader reader = null; TrackingProfile profile = null; cmd.CommandType = CommandType.StoredProcedure; cmd.CommandText = "dbo.GetTrackingProfile"; cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter("@TypeFullName", workflowType.FullName)); cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter("@AssemblyFullName", workflowType.Assembly.FullName)); if (profileVersionId != SqlTrackingService.UnknownProfileVersionId) cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter("@Version", profileVersionId.ToString())); cmd.Parameters.Add(this._dbResourceAllocator.NewDbParameter("@CreateDefault", wantToCreateDefault)); try { reader = ExecuteReaderRetried(cmd, CommandBehavior.CloseConnection); if (reader.Read()) { string tmp = reader[0] as string; if (null != tmp) { TrackingProfileSerializer serializer = new TrackingProfileSerializer(); StringReader pReader = null; try { pReader = new StringReader(tmp); profile = serializer.Deserialize(pReader); } finally { if (null != pReader) pReader.Close(); } } } } finally { if ((null != reader) && (!reader.IsClosed)) reader.Close(); if ((null != cmd) && (null != cmd.Connection) && (ConnectionState.Closed != cmd.Connection.State)) cmd.Connection.Close(); } return profile; } #endregion #region Private Classes private class TypeKeyedCollection : KeyedCollection { protected override string GetKeyForItem(Type item) { return item.AssemblyQualifiedName; } } private class SerializedDataItem : TrackingDataItem { public Type Type; public string StringData; public byte[] SerializedData; public bool NonSerializable; } private class SerializedEventArgs : EventArgs { public Type Type; public byte[] SerializedArgs; } private struct AddedActivity { public string ActivityTypeFullName; public string ActivityTypeAssemblyFullName; public string QualifiedName; public string ParentQualifiedName; public string AddedActivityActionXoml; public int Order; } private struct RemovedActivity { public string QualifiedName; public string ParentQualifiedName; public string RemovedActivityActionXoml; public int Order; } private class SerializedWorkflowChangedEventArgs : SerializedEventArgs { public IList AddedActivities = new List(); public IList RemovedActivities = new List(); } #endregion Private Classes internal class SqlTrackingChannel : TrackingChannel, IPendingWork { #region Private Members private SqlTrackingService _service = null; private string _callPathKey = null, _parentCallPathKey = null; private bool _isTrans = false; private long _internalId = -1; private long _tmpInternalId = -1; private Dictionary _activityInstanceId = new Dictionary(32); private Dictionary _tmpActivityInstanceId = new Dictionary(10); private TrackingParameters _parameters = null; private bool _pendingArchive = false; private bool _completedTerminated = false; private static int _activityEventBatchSize = 5; private static int _dataItemBatchSize = 5; private static int _dataItemAnnotationBatchSize = 5; private static int _eventAnnotationBatchSize = 5; #endregion #region Construction protected SqlTrackingChannel() { } public SqlTrackingChannel(TrackingParameters parameters, SqlTrackingService service) { if (null == service) return; _service = service; _parameters = parameters; _isTrans = service.IsTransactional; GetCallPathKeys(parameters.CallPath); if (!_isTrans) { // // Look up instance id or insert if new instance // If we're transactional we'll do this in the first IPendingWork.Commit() _service.ExecuteRetried(ExecuteInsertWorkflowInstance, null); } } #endregion #region Public Properties private DbResourceAllocator DbResourceAllocator { get { return _service.DbResourceAllocator; } } private WorkflowCommitWorkBatchService WorkflowCommitWorkBatchService { get { return _service._transactionService; } } #endregion #region TrackingChannel protected internal override void InstanceCompletedOrTerminated() { if (_isTrans) { // // Indicate that at the next batch commit we should stamp the enddate _completedTerminated = true; // // Indicate that when the next batch commit completes successfully we should partition this instance if (_service.PartitionOnCompletion) _pendingArchive = true; } else { _service.ExecuteRetried(ExecuteSetEndDate, null); if (_service.PartitionOnCompletion) _service.ExecuteRetried(PartitionInstance, null); } } private void PartitionInstance(object param) { DbCommand command = null; try { // // Allow enlisting if there is an ambient tx // This can only happen on a host initiated terminate in V1. DbConnection connection = DbResourceAllocator.OpenNewConnection(false); command = DbResourceAllocator.NewCommand(connection); command.CommandText = "[dbo].[PartitionWorkflowInstance]"; command.CommandType = CommandType.StoredProcedure; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", _internalId)); command.ExecuteNonQuery(); } finally { if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State)) command.Connection.Close(); } } private void ExecuteSetEndDate(object param) { DbCommand command = null; try { // // Allow enlisting if there is an ambient tx // This can only happen on a host initiated terminate in V1. DbConnection connection = DbResourceAllocator.OpenNewConnection(false); command = DbResourceAllocator.NewCommand(connection); ExecuteSetEndDate(_internalId, command); } finally { if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State)) command.Connection.Close(); } } private void ExecuteSetEndDate(long internalId, DbCommand command) { if (null == command) throw new ArgumentNullException("command"); command.Parameters.Clear(); command.CommandText = "[dbo].[SetWorkflowInstanceEndDateTime]"; command.CommandType = CommandType.StoredProcedure; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EndDateTime", DateTime.UtcNow)); command.ExecuteNonQuery(); } protected internal override void Send(TrackingRecord record) { if ((Guid.Empty == _parameters.InstanceId) || (null == record)) throw new ArgumentException(ExecutionStringManager.MissingParametersTrack); if (record is ActivityTrackingRecord) { ActivityTrackingRecord act = record as ActivityTrackingRecord; if (_isTrans) WorkflowEnvironment.WorkBatch.Add(this, SerializeRecord(act)); else _service.ExecuteRetried(ExecuteInsertActivityStatusInstance, SerializeRecord(act)); } else if (record is WorkflowTrackingRecord) { // // Instance events cannot be batched - many occur when there isn't a batch WorkflowTrackingRecord inst = (WorkflowTrackingRecord)record; if (_isTrans) { WorkflowEnvironment.WorkBatch.Add(this, SerializeRecord(inst)); } else { if (TrackingWorkflowEvent.Changed == inst.TrackingWorkflowEvent) { // // Dynamic updates are inserted in the WorkflowInstanceEvent table // and then the arg (workflowchanges) is normalized into xoml // and the added/removed activities tables _service.ExecuteRetried(ExecuteInsertWorkflowChange, SerializeRecord(inst)); } else { _service.ExecuteRetried(ExecuteInsertWorkflowInstanceEvent, SerializeRecord(inst)); } } } else if (record is UserTrackingRecord) { UserTrackingRecord user = (UserTrackingRecord)record; if (_isTrans) WorkflowEnvironment.WorkBatch.Add(this, SerializeRecord(user)); else _service.ExecuteRetried(ExecuteInsertUserEvent, SerializeRecord(user)); } } #endregion #region IPendingWork Members public bool MustCommit(ICollection items) { // // Never force a persist - this is a balancing act but the V1 // decision is to err on the side of persisting only when the workflow // requires it based on its model. If the workflow uses persistence points // wisely this is great. If it goes a long time between persists with lots // of events the persists will take a long time as the batch can be huge. return false; } public void Commit(System.Transactions.Transaction transaction, ICollection items) { if ((null == items) || (0 == items.Count)) return; DbCommand command = null; DbConnection connection = null; bool needToCloseConnection = false; DbTransaction localTransaction = null; bool commitTx = false; try { // // Get the connection and transaction // The connection might be shared or local // The tx is shared and may be either a DTC or a local sql tx connection = DbResourceAllocator.GetEnlistedConnection( this.WorkflowCommitWorkBatchService, transaction, out needToCloseConnection); localTransaction = DbResourceAllocator.GetLocalTransaction( this.WorkflowCommitWorkBatchService, transaction); if (null == localTransaction) { localTransaction = connection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted); commitTx = true; } command = DbResourceAllocator.NewCommand(connection); command.Transaction = localTransaction; // // If we don't have the internal id for the instance this is the first batch // for this channel instance. If this is a new instance the following will insert // a new instance record in the db and set _tmpInternalId. If this is a reload of // an existing instance it will just do a lookup and set _tmpInternalId // In Completed we will assign _tmpInternalId to _internalId if the batch is successful. long internalId = -1; if (_internalId <= 0) { ExecuteInsertWorkflowInstance(command); internalId = _tmpInternalId; } else internalId = _internalId; IList activities = new List(5); WorkflowTrackingRecord workflow = null; // // Build the batch statement foreach (object o in items) { if (!(o is TrackingRecord)) continue; if (o is ActivityTrackingRecord) { // // If we have a cached workflow tracking record send it if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command); workflow = null; } ActivityTrackingRecord activity = (ActivityTrackingRecord)o; // // Add this event to the list and send to the db if we've hit our limit activities.Add(activity); if (_activityEventBatchSize == activities.Count) { ExecuteInsertActivityStatusInstance(internalId, activities, command); activities = new List(5); } } else if (o is UserTrackingRecord) { // // If we have cached activity or workflow tracking records send them if (activities.Count > 0) { ExecuteInsertActivityStatusInstance(internalId, activities, command); activities.Clear(); } if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command); workflow = null; } ExecuteInsertUserEvent(internalId, (UserTrackingRecord)o, command); } else if (o is WorkflowTrackingRecord) { // // If we have cached activity tracking records send them if (activities.Count > 0) { ExecuteInsertActivityStatusInstance(internalId, activities, command); activities.Clear(); } WorkflowTrackingRecord record = (WorkflowTrackingRecord)o; if (TrackingWorkflowEvent.Changed == record.TrackingWorkflowEvent) { // // If we're already holding a workflow tracking record send both to the db // else cache it and wait for the next workflow tracking record if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command); workflow = null; } ExecuteInsertWorkflowChange(internalId, record, command); } else { // // If we're already holding a workflow tracking record send both to the db // else cache it and wait for the next workflow tracking record if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, record, command); workflow = null; } else { workflow = record; } } } } // // If we ended up with any activities event send them. if (activities.Count > 0) ExecuteInsertActivityStatusInstance(internalId, activities, command); if (null != workflow) { ExecuteInsertWorkflowInstanceEvent(internalId, workflow, null, command); workflow = null; } if (_completedTerminated) ExecuteSetEndDate(internalId, command); if (commitTx) localTransaction.Commit(); } catch (DbException e) { if (commitTx) localTransaction.Rollback(); WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, "Error writing tracking data to database: " + e); throw; } finally { if (needToCloseConnection) { connection.Dispose(); } } return; } public void Complete(bool succeeded, ICollection items) { // // If we didn't succeed on commit reset all flags if (!succeeded) { _completedTerminated = false; _pendingArchive = false; _tmpInternalId = -1; _tmpActivityInstanceId.Clear(); return; } // // Commit succeeded - move the tmp internalId to the real internalId member if (-1 == _internalId && _tmpInternalId > 0) _internalId = _tmpInternalId; // // Move the tmp activity instance ids to the real activity instance id member if (null != _tmpActivityInstanceId && _tmpActivityInstanceId.Count > 0) { foreach (string key in _tmpActivityInstanceId.Keys) { if (!_activityInstanceId.ContainsKey(key)) _activityInstanceId.Add(key, _tmpActivityInstanceId[key]); } _tmpActivityInstanceId.Clear(); } if (_pendingArchive) { try { _service.ExecuteRetried(PartitionInstance, null); } catch (Exception e) { // // ---- exceptions here, do not fail the instance. // Partition logic can be re-run to clean up on failure WorkflowTrace.Host.TraceEvent(TraceEventType.Error, 0, string.Format(System.Globalization.CultureInfo.InvariantCulture, "Error partitioning instance {0}: {1}", _parameters.InstanceId, e.ToString())); } } } #endregion #region Sql Commands - InsertWorkflowInstance private void ExecuteInsertWorkflowInstance(object param) { DbConnection conn = DbResourceAllocator.OpenNewConnection(); DbCommand command = DbResourceAllocator.NewCommand(conn); DbTransaction tx = null; try { tx = conn.BeginTransaction(System.Data.IsolationLevel.ReadCommitted); command.Connection = conn; command.Transaction = tx; _internalId = ExecuteInsertWorkflowInstance(command); tx.Commit(); } catch (Exception) { try { if (null != tx) tx.Rollback(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception } // // Re-throw original exception throw; } finally { if ((null != conn) && (ConnectionState.Closed != conn.State)) conn.Close(); } return; } private long ExecuteInsertWorkflowInstance(DbCommand command) { if (null == command) throw new ArgumentNullException("command"); if ((null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentException(ExecutionStringManager.InvalidCommandBadConnection, "command"); // // Write the type and the workflow definition string xaml = _parameters.RootActivity.GetValue(Activity.WorkflowXamlMarkupProperty) as string; if (null != xaml && xaml.Length > 0) InsertWorkflow(command, _parameters.InstanceId, null, _parameters.RootActivity); else InsertWorkflow(command, _parameters.InstanceId, _parameters.WorkflowType, _parameters.RootActivity); // // Write the instance record BuildInsertWorkflowInstanceParameters(command); DbDataReader reader = null; try { reader = command.ExecuteReader(); if (reader.Read()) _tmpInternalId = reader.GetInt64(0); return _tmpInternalId; } finally { if (null != reader) reader.Close(); } } private void BuildInsertWorkflowInstanceParameters(DbCommand command) { Debug.Assert((command != null), "Null command"); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertWorkflowInstance]"; command.Parameters.Clear(); bool xamlInst = false; string xaml = _parameters.RootActivity.GetValue(Activity.WorkflowXamlMarkupProperty) as string; if (null != xaml && xaml.Length > 0) xamlInst = true; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceId", _parameters.InstanceId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", (xamlInst ? _parameters.InstanceId.ToString() : _parameters.WorkflowType.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", (xamlInst ? _parameters.InstanceId.ToString() : _parameters.WorkflowType.Assembly.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ContextGuid", _parameters.ContextGuid)); if (Guid.Empty != _parameters.CallerInstanceId) { command.Parameters.Add(DbResourceAllocator.NewDbParameter("@CallerInstanceId", _parameters.CallerInstanceId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@CallPath", _callPathKey)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@CallerContextGuid", _parameters.CallerContextGuid)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@CallerParentContextGuid", _parameters.CallerParentContextGuid)); } command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime", this.GetSqlDateTimeString(DateTime.UtcNow))); } private void InsertWorkflow(DbCommand command, Guid workflowInstanceId, Type workflowType, Activity rootActivity) { string xoml = null; // // If we've already seen this type just return if (null != workflowType) { lock (_service._typeCacheLock) { if (_service._types.Contains(workflowType.AssemblyQualifiedName)) return; else xoml = GetXomlDocument(rootActivity); } } else { // Don't forget to deal with XOML-only workflows lock (_service._typeCacheLock) { xoml = GetXomlDocument(rootActivity); } } // // It is possible to ---- here but the pk specifies ignore duplicate key // This is better than taking a lock around all of the logic in this method. command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertWorkflow]"; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", (null == workflowType ? workflowInstanceId.ToString() : workflowType.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", (null == workflowType ? workflowInstanceId.ToString() : workflowType.Assembly.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@IsInstanceType", (null == workflowType ? true : false))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowDefinition", xoml)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowId", DbType.Int32, System.Data.ParameterDirection.Output)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Exists", DbType.Boolean, System.Data.ParameterDirection.Output)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Activities", GetActivitiesXml((CompositeActivity)rootActivity))); command.ExecuteNonQuery(); // // Add this to the list of types we've already seen so we don't go // through the serialization overhead again and hit the db only to learn we've already stored it // Use a lock here to avoid ---- on _types dictionary if (null != workflowType) { lock (_service._typeCacheLock) { if (!_service._types.Contains(workflowType.AssemblyQualifiedName)) { _service._types.Add(workflowType); } } } return; } #endregion #region Sql Commands - InsertWorkflowInstanceEvent private void ExecuteInsertWorkflowInstanceEvent(object param) { WorkflowTrackingRecord record = param as WorkflowTrackingRecord; if (null == record) throw new ArgumentException(ExecutionStringManager.InvalidWorkflowTrackingRecordParameter, "param"); DbConnection conn = DbResourceAllocator.OpenNewConnection(); DbCommand command = DbResourceAllocator.NewCommand(conn); DbTransaction tx = null; try { tx = conn.BeginTransaction(System.Data.IsolationLevel.ReadCommitted); command.Connection = conn; command.Transaction = tx; ExecuteInsertWorkflowInstanceEvent(_internalId, record, null, command); tx.Commit(); } catch (Exception) { try { if (null != tx) tx.Rollback(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception } // // Re-throw original exception throw; } finally { if ((null != conn) && (ConnectionState.Closed != conn.State)) conn.Close(); } return; } private void ExecuteInsertWorkflowInstanceEvent(long internalId, WorkflowTrackingRecord record1, WorkflowTrackingRecord record2, DbCommand command) { if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentException(); BuildInsertWorkflowInstanceEventParameters(internalId, record1, record2, command); command.ExecuteNonQuery(); long eventId1 = (long)command.Parameters["@WorkflowInstanceEventId1"].Value; Debug.Assert(eventId1 > 0, "Invalid eventId1"); long eventId2 = -1; if (null != record2) { eventId2 = (long)command.Parameters["@WorkflowInstanceEventId2"].Value; Debug.Assert(eventId2 > 0, "Invalid eventId2"); } List> annotations = new List>(record1.Annotations.Count + (null == record2 ? 0 : record2.Annotations.Count)); foreach (string s in record1.Annotations) annotations.Add(new KeyValuePair(eventId1, s)); if (null != record2) { foreach (string s in record2.Annotations) annotations.Add(new KeyValuePair(eventId2, s)); } BatchExecuteInsertEventAnnotation(internalId, 'w', annotations, command); } private void BuildInsertWorkflowInstanceEventParameters(long internalId, WorkflowTrackingRecord record1, WorkflowTrackingRecord record2, DbCommand command) { if (null == record1) throw new ArgumentNullException("record"); if (null == command) throw new ArgumentNullException("command"); Debug.Assert(internalId != -1, "Invalid internalId"); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertWorkflowInstanceEvent]"; command.Parameters.Clear(); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingWorkflowEventId1", (int)record1.TrackingWorkflowEvent)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime1", record1.EventDateTime)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder1", record1.EventOrder)); if (null != record1.EventArgs) { Type t = record1.EventArgs.GetType(); Byte[] data = null; if (!(record1.EventArgs is SerializedEventArgs)) record1 = SerializeRecord(record1); SerializedEventArgs sargs = record1.EventArgs as SerializedEventArgs; data = sargs.SerializedArgs; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgTypeFullName1", t.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgAssemblyFullName1", t.Assembly.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArg1", data)); } command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId1", DbType.Int64, ParameterDirection.Output)); if (null != record2) { command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingWorkflowEventId2", (int)record2.TrackingWorkflowEvent)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime2", record2.EventDateTime)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder2", record2.EventOrder)); if (null != record2.EventArgs) { Type t = record2.EventArgs.GetType(); Byte[] data = null; if (!(record2.EventArgs is SerializedEventArgs)) record2 = SerializeRecord(record2); SerializedEventArgs sargs = record2.EventArgs as SerializedEventArgs; data = sargs.SerializedArgs; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgTypeFullName2", t.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArgAssemblyFullName2", t.Assembly.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventArg2", data)); } command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId2", DbType.Int64, ParameterDirection.Output)); } } #endregion #region Sql Commands - InsertActivityStatusInstance private void ExecuteInsertActivityStatusInstance(object param) { ActivityTrackingRecord record = param as ActivityTrackingRecord; if (null == record) throw new ArgumentException(ExecutionStringManager.InvalidActivityTrackingRecordParameter, "param"); DbConnection conn = DbResourceAllocator.OpenNewConnection(); DbTransaction tx = null; try { tx = conn.BeginTransaction(System.Data.IsolationLevel.ReadCommitted); DbCommand command = conn.CreateCommand(); command.Transaction = tx; IList activity = new List(1); activity.Add(record); ExecuteInsertActivityStatusInstance(_internalId, activity, command); tx.Commit(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception try { if (null != tx) tx.Rollback(); } catch (Exception) { } // // Re-throw original exception throw; } finally { if ((null != conn) && (ConnectionState.Closed != conn.State)) conn.Close(); } return; } private void ExecuteInsertActivityStatusInstance(long internalId, IList activities, DbCommand command) { if (null == activities || activities.Count <= 0) return; if (activities.Count > _activityEventBatchSize) throw new ArgumentOutOfRangeException("activities"); if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentException(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertActivityExecutionStatusEventMultiple]"; // // Add the common parameters command.Parameters.Clear(); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceId", _parameters.InstanceId)); // // If we have the workflow's internal id use it to avoid the look up in the db DbParameter param = DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", DbType.Int64, System.Data.ParameterDirection.InputOutput); command.Parameters.Add(param); if (internalId > 0) param.Value = internalId; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceContextGuid", _parameters.ContextGuid)); // // Hashed ids of QName, context and pcontext used as key for storing activity record ids // Save these for each record in the list so we don't have to recompute them below when adding to the cache string[] ids = new string[] { null, null, null, null, null }; for (int i = 0; i < activities.Count; i++) { ActivityTrackingRecord record = activities[i]; long aid = -1; ids[i] = BuildQualifiedNameVarName(record.QualifiedName, record.ContextGuid, record.ParentContextGuid); TryGetActivityInstanceId(ids[i], out aid); BuildInsertActivityStatusEventParameters(internalId, aid, i + 1, record, command); } command.ExecuteNonQuery(); // // Get all the output ids long[] eventIds = new long[] { -1, -1, -1, -1, -1 }; for (int i = 0; i < activities.Count; i++) { string index = (i + 1).ToString(CultureInfo.InvariantCulture); // // ActivityInstanceId long aId = (long)command.Parameters["@ActivityInstanceId" + index].Value; Debug.Assert(aId > 0, "Invalid @ActivityInstanceId output parameter value"); // // For all status changes that aren't "Closed" add the id to the instance cache // Set... method checks and only adds if it does already exist. // To keep the cache size under control remove entries for activities that have closed. // The activity might fault and need to do a lookup in the db but this isn't the common // path and the db lookup isn't very expensive. if (ActivityExecutionStatus.Closed != activities[i].ExecutionStatus) SetActivityInstanceId(ids[i], aId); else RemoveActivityInstanceId(ids[i]); // // ActivityExecutionStatusEventId long aeseId = (long)command.Parameters["@ActivityExecutionStatusEventId" + index].Value; Debug.Assert(aeseId > 0, "Invalid @ActivityExecutionStatusEventId output parameter value"); eventIds[i] = aeseId; } List> annotations = new List>(10); List> items = new List>(10); for (int i = 0; i < activities.Count; i++) { ActivityTrackingRecord record = activities[i]; // // Get the ActivityExecutionStatusEventId long eventId = eventIds[i]; if (eventId <= 0) throw new InvalidOperationException(); foreach (string s in record.Annotations) annotations.Add(new KeyValuePair(eventId, s)); foreach (TrackingDataItem item in record.Body) items.Add(new KeyValuePair(eventId, item)); } BatchExecuteInsertEventAnnotation(internalId, 'a', annotations, command); BatchExecuteInsertTrackingDataItems(internalId, 'a', items, command); } private void BuildInsertActivityStatusEventParameters(long internalId, long activityInstanceId, int parameterId, ActivityTrackingRecord record, DbCommand command) { string paramIdString = parameterId.ToString(CultureInfo.InvariantCulture); // // If we have the activity's instance id use it to avoid the look up in the db DbParameter param = DbResourceAllocator.NewDbParameter("@ActivityInstanceId" + paramIdString, DbType.Int64, System.Data.ParameterDirection.InputOutput); command.Parameters.Add(param); if (activityInstanceId > 0) param.Value = activityInstanceId; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName" + paramIdString, record.QualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ContextGuid" + paramIdString, record.ContextGuid)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentContextGuid" + paramIdString, record.ParentContextGuid)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ExecutionStatusId" + paramIdString, (int)record.ExecutionStatus)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime" + paramIdString, record.EventDateTime)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder" + paramIdString, record.EventOrder)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ActivityExecutionStatusEventId" + paramIdString, DbType.Int64, ParameterDirection.Output)); } #endregion #region Sql Commands - InsertUserEvent private void ExecuteInsertUserEvent(object param) { UserTrackingRecord record = param as UserTrackingRecord; if (null == record) throw new ArgumentException(ExecutionStringManager.InvalidUserTrackingRecordParameter, "param"); DbConnection conn = DbResourceAllocator.OpenNewConnection(); DbTransaction tx = null; try { tx = conn.BeginTransaction(System.Data.IsolationLevel.ReadCommitted); DbCommand command = conn.CreateCommand(); command.Transaction = tx; ExecuteInsertUserEvent(_internalId, record, command); tx.Commit(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception try { if (null != tx) tx.Rollback(); } catch (Exception) { } // // Re-throw original exception throw; } finally { if ((null != conn) && (ConnectionState.Closed != conn.State)) conn.Close(); } return; } private void ExecuteInsertUserEvent(long internalId, UserTrackingRecord record, DbCommand command) { if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentException(); long aid = -1; bool cached = false; // // Check if we have the activityInstanceId in the cache - we cache to avoid repeatedly searching this table. string id = BuildQualifiedNameVarName(record.QualifiedName, record.ContextGuid, record.ParentContextGuid); if (TryGetActivityInstanceId(id, out aid)) cached = true; BuildInsertUserEventParameters(internalId, aid, record, command); command.ExecuteNonQuery(); // // If we didn't already have the activityInstanceId get it from the IN/OUT param and put it in the cache if (!cached) SetActivityInstanceId(id, (long)command.Parameters["@ActivityInstanceId"].Value); long eventId = (long)command.Parameters["@UserEventId"].Value; List> annotations = new List>(10); List> items = new List>(10); foreach (string s in record.Annotations) annotations.Add(new KeyValuePair(eventId, s)); foreach (TrackingDataItem item in record.Body) items.Add(new KeyValuePair(eventId, item)); BatchExecuteInsertEventAnnotation(internalId, 'u', annotations, command); BatchExecuteInsertTrackingDataItems(internalId, 'u', items, command); } private void BuildInsertUserEventParameters(long internalId, long activityInstanceId, UserTrackingRecord record, DbCommand command) { Debug.Assert(internalId != -1, "Invalid internalId"); Debug.Assert((command != null), "Null command passed to BuildInsertActivityStatusEventParameters"); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertUserEvent]"; command.Parameters.Clear(); DbParameter param = DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", DbType.Int64); command.Parameters.Add(param); param.Value = internalId; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventOrder", record.EventOrder)); // // If we have the activity's instance id use it to avoid the look up in the db param = DbResourceAllocator.NewDbParameter("@ActivityInstanceId", DbType.Int64, System.Data.ParameterDirection.InputOutput); command.Parameters.Add(param); if (activityInstanceId > 0) { param.Value = activityInstanceId; } else { // // Keep the network traffic down - only include the fields needed // to insert an ActivityInstance record if we don't have the activityInstanceId command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", record.QualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ContextGuid", record.ContextGuid)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentContextGuid", record.ParentContextGuid)); } command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventDateTime", record.EventDateTime)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataKey", record.UserDataKey)); if (null != record.UserData) { Type t = record.UserData.GetType(); Byte[] data = null; bool nonSerializable = false; string userDataString = null; if (!(record.UserData is SerializedDataItem)) SerializeDataItem(record.UserData, out data, out nonSerializable); SerializedDataItem sItem = record.UserData as SerializedDataItem; data = sItem.SerializedData; nonSerializable = sItem.NonSerializable; userDataString = sItem.StringData; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataTypeFullName", t.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataAssemblyFullName", t.Assembly.FullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserData_Str", userDataString)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserData_Blob", data)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserDataNonSerializable", nonSerializable)); } command.Parameters.Add(DbResourceAllocator.NewDbParameter("@UserEventId", DbType.Int64, ParameterDirection.Output)); } #endregion #region Sql Commands - InsertTrackingDataItem private void BatchExecuteInsertTrackingDataItems(long internalId, char eventTypeId, IList> items, DbCommand command) { if (null == items || items.Count <= 0) return; // // If the list is smaller than the batch size just push the whole thing if (items.Count <= _dataItemBatchSize) { ExecuteInsertTrackingDataItems(internalId, eventTypeId, items, command); return; } // // Need to split the list into max batch size chunks List> batch = new List>(_dataItemBatchSize); foreach (KeyValuePair kvp in items) { batch.Add(kvp); if (batch.Count == _dataItemBatchSize) { ExecuteInsertTrackingDataItems(internalId, eventTypeId, batch, command); batch.Clear(); } } // // Send anything that hasn't been sent if (batch.Count > 0) ExecuteInsertTrackingDataItems(internalId, eventTypeId, batch, command); } private void ExecuteInsertTrackingDataItems(long internalId, char eventTypeId, IList> items, DbCommand command) { Debug.Assert(internalId != -1, "Invalid internalId"); if (null == items || items.Count <= 0) return; if (items.Count > _dataItemAnnotationBatchSize) throw new ArgumentOutOfRangeException("items"); if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentException(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertTrackingDataItemMultiple]"; command.Parameters.Clear(); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventTypeId", eventTypeId)); int i = 1; // base 1 to match parameter names foreach (KeyValuePair kvp in items) { string index = (i++).ToString(CultureInfo.InvariantCulture); SerializedDataItem sItem = kvp.Value as SerializedDataItem; if (null == sItem) sItem = SerializeDataItem(kvp.Value); Type t = sItem.Type; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventId" + index, kvp.Key)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@FieldName" + index, sItem.FieldName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName" + index, ((null == t) ? null : t.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName" + index, ((null == t) ? null : t.Assembly.FullName))); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Data_Str" + index, sItem.StringData)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Data_Blob" + index, sItem.SerializedData)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@DataNonSerializable" + index, sItem.NonSerializable)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingDataItemId" + index, DbType.Int64, System.Data.ParameterDirection.Output)); } command.ExecuteNonQuery(); // // Get all the out parameters holding the data item record ids // This keeps us from repeatedly going into the parameters collection // below if a data item has more than one annotation List ids = new List(_dataItemAnnotationBatchSize); for (i = 0; i < items.Count; i++) { string index = (i + 1).ToString(CultureInfo.InvariantCulture); ids.Insert(i, (long)command.Parameters["@TrackingDataItemId" + index].Value); } // // Go through all the data items and send all the annotations in batches List> annotations = new List>(_dataItemAnnotationBatchSize); i = 0; foreach (KeyValuePair kvp in items) { TrackingDataItem item = kvp.Value; long dataItemId = ids[i++]; foreach (string s in item.Annotations) { annotations.Add(new KeyValuePair(dataItemId, s)); if (annotations.Count == _dataItemAnnotationBatchSize) { ExecuteInsertAnnotation(internalId, annotations, command); annotations.Clear(); } } } // // If we have anything left send them. if (annotations.Count > 0) ExecuteInsertAnnotation(internalId, annotations, command); } private void ExecuteInsertAnnotation(long internalId, IList> annotations, DbCommand command) { if (null == annotations || annotations.Count <= 0) return; if (annotations.Count > _dataItemAnnotationBatchSize) throw new ArgumentOutOfRangeException("annotations"); if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentNullException("command"); command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertTrackingDataItemAnnotationMultiple]"; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId)); int i = 1; // base 1 to match parameter names foreach (KeyValuePair kvp in annotations) { string index = (i++).ToString(CultureInfo.InvariantCulture); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@HasData" + index, true)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TrackingDataItemId" + index, kvp.Key)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Annotation" + index, kvp.Value)); } command.ExecuteNonQuery(); return; } private void BatchExecuteInsertEventAnnotation(long internalId, char eventTypeId, IList> annotations, DbCommand command) { if (null == annotations || annotations.Count <= 0) return; // // If the list is smaller than the max batch size just send it directly if (annotations.Count <= _eventAnnotationBatchSize) { ExecuteInsertEventAnnotation(internalId, eventTypeId, annotations, command); return; } // // Need to split the list into max batch size chunks List> batch = new List>(_eventAnnotationBatchSize); foreach (KeyValuePair kvp in annotations) { batch.Add(kvp); if (batch.Count == _eventAnnotationBatchSize) { ExecuteInsertEventAnnotation(internalId, eventTypeId, batch, command); batch.Clear(); } } // // Send anything that hasn't been sent if (batch.Count > 0) ExecuteInsertEventAnnotation(internalId, eventTypeId, batch, command); } private void ExecuteInsertEventAnnotation(long internalId, char eventTypeId, IList> annotations, DbCommand command) { Debug.Assert(internalId != -1, "Invalid internalId"); if (null == annotations || annotations.Count <= 0) return; if (annotations.Count > _eventAnnotationBatchSize) throw new ArgumentOutOfRangeException("annotations"); if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentNullException("command"); command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertEventAnnotationMultiple]"; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventTypeId", eventTypeId)); int i = 1; //base 1 to match parameter names foreach (KeyValuePair kvp in annotations) { string index = (i++).ToString(CultureInfo.InvariantCulture); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@HasData" + index, true)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@EventId" + index, kvp.Key)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Annotation" + index, kvp.Value)); } command.ExecuteNonQuery(); return; } #endregion #region Workflow Change private void ExecuteInsertWorkflowChange(object param) { WorkflowTrackingRecord record = param as WorkflowTrackingRecord; if (null == record) throw new ArgumentException(ExecutionStringManager.InvalidWorkflowTrackingRecordParameter, "param"); DbCommand command = DbResourceAllocator.NewCommand(); try { if (ConnectionState.Open != command.Connection.State) command.Connection.Open(); command.Transaction = command.Connection.BeginTransaction(); ExecuteInsertWorkflowChange(_internalId, record, command); command.Transaction.Commit(); } catch (Exception) { // // Rollback can throw - ignore these exceptions // so we don't lose the original exception try { if ((null != command) && (null != command.Transaction)) command.Transaction.Rollback(); } catch (Exception) { } // // Re-throw original exception throw; } finally { if ((null != command) && (null != command.Connection) && (ConnectionState.Closed != command.Connection.State)) command.Connection.Close(); } return; } private void ExecuteInsertWorkflowChange(long internalId, WorkflowTrackingRecord record, DbCommand command) { if (null == record) throw new ArgumentNullException("record"); if (null == record.EventArgs) throw new InvalidOperationException(ExecutionStringManager.InvalidWorkflowChangeArgs); if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentNullException("command"); // // If we haven't already serialized do so now. // This is work we have to do to write to store in the db anyway. if (!(record.EventArgs is SerializedWorkflowChangedEventArgs)) record = SerializeRecord(record); // // Insert the workflow instance event BuildInsertWorkflowInstanceEventParameters(internalId, record, null, command); command.ExecuteNonQuery(); // // Get the event id for added/removed activities and annotations long eventId = (long)command.Parameters["@WorkflowInstanceEventId1"].Value; SerializedWorkflowChangedEventArgs sargs = (SerializedWorkflowChangedEventArgs)record.EventArgs; // // Normalize the activities that have been added/removed if we're tracking definitions if ((null != sargs.AddedActivities) && (sargs.AddedActivities.Count > 0)) { foreach (AddedActivity added in sargs.AddedActivities) ExecuteInsertAddedActivity(internalId, added.QualifiedName, added.ParentQualifiedName, added.ActivityTypeFullName, added.ActivityTypeAssemblyFullName, added.AddedActivityActionXoml, eventId, added.Order, command); } if ((null != sargs.RemovedActivities) && (sargs.RemovedActivities.Count > 0)) { foreach (RemovedActivity removed in sargs.RemovedActivities) ExecuteInsertRemovedActivity(internalId, removed.QualifiedName, removed.ParentQualifiedName, removed.RemovedActivityActionXoml, eventId, removed.Order, command); } List> annotations = new List>(record.Annotations.Count); foreach (string s in record.Annotations) annotations.Add(new KeyValuePair(eventId, s)); BatchExecuteInsertEventAnnotation(internalId, 'w', annotations, command); } private void ExecuteInsertAddedActivity(long internalId, string qualifiedName, string parentQualifiedName, string typeFullName, string assemblyFullName, string addedActivityActionXoml, long eventId, int order, DbCommand command) { if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentNullException("command"); command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertAddedActivity]"; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId", eventId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", qualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@TypeFullName", typeFullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AssemblyFullName", assemblyFullName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentQualifiedName", parentQualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@AddedActivityAction", addedActivityActionXoml)); if (-1 == order) command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", DBNull.Value)); else command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", order)); command.ExecuteNonQuery(); } private void ExecuteInsertRemovedActivity(long internalId, string qualifiedName, string parentQualifiedName, string removedActivityActionXoml, long eventId, int order, DbCommand command) { if ((null == command) || (null == command.Connection) || (ConnectionState.Open != command.Connection.State)) throw new ArgumentNullException("command"); command.Parameters.Clear(); command.CommandType = CommandType.StoredProcedure; command.CommandText = "[dbo].[InsertRemovedActivity]"; command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceInternalId", internalId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@WorkflowInstanceEventId", eventId)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@QualifiedName", qualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@ParentQualifiedName", parentQualifiedName)); command.Parameters.Add(DbResourceAllocator.NewDbParameter("@RemovedActivityAction", removedActivityActionXoml)); if (-1 == order) command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", DBNull.Value)); else command.Parameters.Add(DbResourceAllocator.NewDbParameter("@Order", order)); command.ExecuteNonQuery(); } #endregion #region Utility private bool TryGetActivityInstanceId(string key, out long id) { // // Check the cache of committed ids if (_activityInstanceId.TryGetValue(key, out id)) return true; // // If we're batched check the cache of temp ids generated during this batch commit if (_isTrans) return _tmpActivityInstanceId.TryGetValue(key, out id); else return false; // not batched so we didn't find the id } private void SetActivityInstanceId(string key, long id) { // // If we're batched put the ids in the temp member // If the commit is successful we'll move these to the real member // in IPendingWork.Complete if (_isTrans) { if (!_tmpActivityInstanceId.ContainsKey(key)) _tmpActivityInstanceId.Add(key, id); } else { if (!_activityInstanceId.ContainsKey(key)) _activityInstanceId.Add(key, id); } } private void RemoveActivityInstanceId(string key) { // // Remove from both the temp and real caches if (_isTrans) { if (_tmpActivityInstanceId.ContainsKey(key)) _tmpActivityInstanceId.Remove(key); } if (_activityInstanceId.ContainsKey(key)) _activityInstanceId.Remove(key); } private string GetSqlDateTimeString(DateTime dateTime) { return dateTime.Year.ToString(System.Globalization.CultureInfo.InvariantCulture) + PadToDblDigit(dateTime.Month) + PadToDblDigit(dateTime.Day) + " " + dateTime.Hour.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Minute.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Second.ToString(System.Globalization.CultureInfo.InvariantCulture) + ":" + dateTime.Millisecond.ToString(System.Globalization.CultureInfo.InvariantCulture); } private string PadToDblDigit(int num) { string s = num.ToString(System.Globalization.CultureInfo.InvariantCulture); if (s.Length == 1) return "0" + s; else return s; } /// /// Build a string to uniquely identify each activity that should be recorded as a seperate instance. /// A separate instance is defined by the combination of QualifiedName, Context and ParentContext /// /// /// private string BuildQualifiedNameVarName(string qId, Guid context, Guid parentContext) { Guid hashed = HashHelper.HashServiceType(qId); return hashed.ToString().Replace('-', '_') + "_" + context.ToString().Replace('-', '_') + "_" + parentContext.ToString().Replace('-', '_'); } private ActivityTrackingRecord SerializeRecord(ActivityTrackingRecord record) { if ((null == record.Body) || (0 == record.Body.Count)) return record; for (int i = 0; i < record.Body.Count; i++) record.Body[i] = SerializeDataItem(record.Body[i]); return record; } private UserTrackingRecord SerializeRecord(UserTrackingRecord record) { if (((null == record.Body) || (0 == record.Body.Count)) && (null == record.EventArgs) && (null == record.UserData)) return record; if (null != record.UserData) { SerializedDataItem item = new SerializedDataItem(); byte[] data = null; bool nonSerializable; SerializeDataItem(record.UserData, out data, out nonSerializable); item.Type = record.UserData.GetType(); item.StringData = record.UserData.ToString(); item.SerializedData = data; item.NonSerializable = nonSerializable; record.UserData = item; } for (int i = 0; i < record.Body.Count; i++) record.Body[i] = SerializeDataItem(record.Body[i]); return record; } private WorkflowTrackingRecord SerializeRecord(WorkflowTrackingRecord record) { if (null == record.EventArgs) return record; SerializedEventArgs args; if (TrackingWorkflowEvent.Changed == record.TrackingWorkflowEvent) { // // Convert the WorkflowChanged items SerializedWorkflowChangedEventArgs sargs = new SerializedWorkflowChangedEventArgs(); TrackingWorkflowChangedEventArgs wargs = (TrackingWorkflowChangedEventArgs)record.EventArgs; if (null != wargs) { for (int i = 0; i < wargs.Changes.Count; i++) { WorkflowChangeAction action = wargs.Changes[i]; if (action is RemovedActivityAction) AddRemovedActivity((RemovedActivityAction)action, i, sargs.RemovedActivities); else if (action is AddedActivityAction) AddAddedActivity((AddedActivityAction)action, i, sargs.AddedActivities); } } args = sargs; } else { args = new SerializedEventArgs(); byte[] data = null; bool nonSerializable; SerializeDataItem(record.EventArgs, out data, out nonSerializable); args.SerializedArgs = data; // // nonSerializable will only be null for SerializationExceptions, all others bubble if (nonSerializable) { // // Something didn't serialize. // If this is an exception or terminated event it is most likely the Exception member // Save the exception message - better than losing all record of the exception Exception e; switch (record.TrackingWorkflowEvent) { case TrackingWorkflowEvent.Terminated: e = ((TrackingWorkflowTerminatedEventArgs)record.EventArgs).Exception; if (null != e) { SerializeDataItem(e.ToString(), out data, out nonSerializable); args.SerializedArgs = data; } break; case TrackingWorkflowEvent.Exception: e = ((TrackingWorkflowExceptionEventArgs)record.EventArgs).Exception; if (null != e) { SerializeDataItem(e.ToString(), out data, out nonSerializable); args.SerializedArgs = data; } break; } } } // // Set the type of the EventArgs and then // put the serialized item in the args member, // we don't need the original Args object any longer args.Type = record.EventArgs.GetType(); record.EventArgs = args; return record; } private void AddRemovedActivity(RemovedActivityAction removedAction, int order, IList activities) { Activity removed = removedAction.OriginalRemovedActivity; RemovedActivity removedActivity = new RemovedActivity(); removedActivity.Order = order; removedActivity.QualifiedName = removed.QualifiedName; if (null != removed.Parent) removedActivity.ParentQualifiedName = removed.Parent.QualifiedName; // // Save the defintion of this change removedActivity.RemovedActivityActionXoml = GetXomlDocument(removedAction); activities.Add(removedActivity); // // Recursively add all contained activities to the removed list if (removed is CompositeActivity) { foreach (Activity activity in ((CompositeActivity)removed).Activities) { AddRemovedActivity(activity, activities); } } } private void AddRemovedActivity(Activity removed, IList activities) { RemovedActivity removedActivity = new RemovedActivity(); removedActivity.Order = -1; removedActivity.QualifiedName = removed.QualifiedName; if (null != removed.Parent) removedActivity.ParentQualifiedName = removed.Parent.QualifiedName; activities.Add(removedActivity); // // Recursively add all contained activities to the removed list if (removed is CompositeActivity) { foreach (Activity activity in ((CompositeActivity)removed).Activities) { AddRemovedActivity(activity, activities); } } } private void AddAddedActivity(AddedActivityAction addedAction, int order, IList activities) { Activity added = addedAction.AddedActivity; AddedActivity addedActivity = new AddedActivity(); addedActivity.Order = order; Type type = added.GetType(); addedActivity.ActivityTypeFullName = type.FullName; addedActivity.ActivityTypeAssemblyFullName = type.Assembly.FullName; addedActivity.QualifiedName = added.QualifiedName; if (null != added.Parent) addedActivity.ParentQualifiedName = added.Parent.QualifiedName; addedActivity.AddedActivityActionXoml = GetXomlDocument(addedAction); activities.Add(addedActivity); // // Recursively add all contained activities to the added list if (added is CompositeActivity) { foreach (Activity activity in ((CompositeActivity)added).Activities) { AddAddedActivity(activity, activities); } } } private void AddAddedActivity(Activity added, IList activities) { AddedActivity addedActivity = new AddedActivity(); addedActivity.Order = -1; Type type = added.GetType(); addedActivity.ActivityTypeFullName = type.FullName; addedActivity.ActivityTypeAssemblyFullName = type.Assembly.FullName; addedActivity.QualifiedName = added.QualifiedName; if (null != added.Parent) addedActivity.ParentQualifiedName = added.Parent.QualifiedName; activities.Add(addedActivity); // // Recursively add all contained activities to the added list if (added is CompositeActivity) { foreach (Activity activity in ((CompositeActivity)added).Activities) { AddAddedActivity(activity, activities); } } } private SerializedDataItem SerializeDataItem(TrackingDataItem item) { if (null == item) return null; SerializedDataItem s = new SerializedDataItem(); s.Data = item.Data; s.Annotations.AddRange(item.Annotations); s.FieldName = item.FieldName; if (null != item.Data) { byte[] state = null; bool nonSerializable; SerializeDataItem(item.Data, out state, out nonSerializable); s.SerializedData = state; s.StringData = item.Data.ToString(); s.Type = item.Data.GetType(); s.NonSerializable = nonSerializable; } return s; } /// /// Binary serialize an object. Used to persist trackingDataItems. /// /// /// private void SerializeDataItem(object data, out byte[] state, out bool nonSerializable) { nonSerializable = false; state = null; if (null == data) return; MemoryStream stream = new MemoryStream(1024); BinaryFormatter bf = new BinaryFormatter(); try { bf.Serialize(stream, data); state = new byte[stream.Length]; stream.Position = 0; if (stream.Length > Int32.MaxValue) return; else { int read = 0, totalRead = 0, cbToRead = 0; do { totalRead += read; cbToRead = (int)stream.Length - totalRead; read = stream.Read(state, totalRead, cbToRead); } while (read > 0); } } catch (SerializationException) { nonSerializable = true; return; } finally { stream.Close(); } } /// /// Make string sql safe /// /// /// private string SqlEscape(string val) { if (null == val) return null; return val.Replace("'", "''"); } /* static char[] hexDigits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; /// /// Convert a byte array to a string of hex chars for sql image type /// /// /// private static string ToHexString( byte[] bytes ) { if ( null == bytes ) return null; if ( 0 == bytes.Length ) return null; char[] chars = new char[bytes.Length * 2]; for ( int i = 0; i < bytes.Length; i++ ) { int b = bytes[i]; chars[i * 2] = hexDigits[b >> 4]; chars[i * 2 + 1] = hexDigits[b & 0xF]; } return "0x" + new string( chars ); } */ private void GetCallPathKeys(IList callPath) { if ((null == callPath) || (callPath.Count <= 0)) return; for (int i = 0; i < callPath.Count; i++) { _callPathKey = _callPathKey + "." + callPath[i]; if (i < callPath.Count - 1) _parentCallPathKey = _parentCallPathKey + "." + callPath[i]; } if (null != _callPathKey) _callPathKey = SqlEscape(_callPathKey.Substring(1)); if (null != _parentCallPathKey) _parentCallPathKey = SqlEscape(_parentCallPathKey.Substring(1)); } private string GetActivitiesXml(CompositeActivity root) { if (null == root) return null; StringBuilder sb = new StringBuilder(); XmlWriter writer = XmlWriter.Create(sb); try { writer.WriteStartDocument(); writer.WriteStartElement("Activities"); WriteActivity(root, writer); writer.WriteEndElement(); writer.WriteEndDocument(); } finally { writer.Flush(); writer.Close(); } return sb.ToString(); } private void WriteActivity(Activity activity, XmlWriter writer) { if (null == activity) return; if (null == writer) throw new ArgumentNullException("writer"); Type t = activity.GetType(); writer.WriteStartElement("Activity"); writer.WriteElementString("TypeFullName", t.FullName); writer.WriteElementString("AssemblyFullName", t.Assembly.FullName); writer.WriteElementString("QualifiedName", activity.QualifiedName); // // Don't write the element if the value is null, sql will see a missing element as a null value if (null != activity.Parent) writer.WriteElementString("ParentQualifiedName", activity.Parent.QualifiedName); writer.WriteEndElement(); if (activity is CompositeActivity) foreach (Activity a in GetAllEnabledActivities((CompositeActivity)activity)) WriteActivity(a, writer); } // This function returns all the executable activities including secondary flow activities. private IList GetAllEnabledActivities(CompositeActivity compositeActivity) { if (compositeActivity == null) throw new ArgumentNullException("compositeActivity"); List allActivities = new List(compositeActivity.EnabledActivities); foreach (Activity secondaryFlowActivity in ((ISupportAlternateFlow)compositeActivity).AlternateFlowActivities) { if (!allActivities.Contains(secondaryFlowActivity)) allActivities.Add(secondaryFlowActivity); } return allActivities; } internal string GetXomlDocument(object obj) { string xomlText = null; using (StringWriter stringWriter = new StringWriter(System.Globalization.CultureInfo.InvariantCulture)) { using (XmlWriter xmlWriter = CreateXmlWriter(stringWriter)) { WorkflowMarkupSerializer serializer = new WorkflowMarkupSerializer(); serializer.Serialize(xmlWriter, obj); xomlText = stringWriter.ToString(); } } return xomlText; } #endregion } } }