You've already forked linux-packaging-mono
							
							
		
			
				
	
	
		
			349 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
			
		
		
	
	
			349 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
| //-----------------------------------------------------------------------------
 | |
| // Copyright (c) Microsoft Corporation.  All rights reserved.
 | |
| //-----------------------------------------------------------------------------
 | |
| namespace System.ServiceModel.Channels
 | |
| {
 | |
|     using System.Runtime;
 | |
|     using System.Threading;
 | |
|     using System.ServiceModel.Diagnostics.Application;
 | |
| 
 | |
|     sealed class Msmq4PoisonHandler : IPoisonHandlingStrategy
 | |
|     {
 | |
|         MsmqQueue mainQueue;
 | |
|         MsmqQueue mainQueueForMove;
 | |
|         MsmqQueue retryQueueForPeek;
 | |
|         MsmqQueue retryQueueForMove;
 | |
|         MsmqQueue poisonQueue;
 | |
|         MsmqQueue lockQueueForReceive;
 | |
| 
 | |
|         IOThreadTimer timer;
 | |
|         MsmqReceiveHelper receiver;
 | |
| 
 | |
|         bool disposed;
 | |
| 
 | |
|         string poisonQueueName;
 | |
|         string retryQueueName;
 | |
|         string mainQueueName;
 | |
| 
 | |
|         MsmqRetryQueueMessage retryQueueMessage;
 | |
|         static Action<object> onStartPeek = new Action<object>(StartPeek);
 | |
|         static AsyncCallback onPeekCompleted = Fx.ThunkCallback(OnPeekCompleted);
 | |
| 
 | |
|         public Msmq4PoisonHandler(MsmqReceiveHelper receiver)
 | |
|         {
 | |
|             this.receiver = receiver;
 | |
|             this.timer = new IOThreadTimer(new Action<object>(OnTimer), null, false);
 | |
|             this.disposed = false;
 | |
|             this.mainQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(this.ListenUri);
 | |
|             this.poisonQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";poison"));
 | |
|             this.retryQueueName = this.ReceiveParameters.AddressTranslator.UriToFormatName(new Uri(this.ListenUri.AbsoluteUri + ";retry"));
 | |
|         }
 | |
| 
 | |
|         MsmqReceiveParameters ReceiveParameters
 | |
|         {
 | |
|             get { return this.receiver.MsmqReceiveParameters; }
 | |
|         }
 | |
| 
 | |
|         Uri ListenUri
 | |
|         {
 | |
|             get { return this.receiver.ListenUri; }
 | |
|         }
 | |
| 
 | |
|         public void Open()
 | |
|         {
 | |
|             if (this.ReceiveParameters.ReceiveContextSettings.Enabled)
 | |
|             {
 | |
|                 Fx.Assert(this.receiver.Queue is MsmqSubqueueLockingQueue, "Queue must be MsmqSubqueueLockingQueue");
 | |
|                 this.lockQueueForReceive = ((MsmqSubqueueLockingQueue)this.receiver.Queue).LockQueueForReceive;
 | |
|             }
 | |
| 
 | |
|             this.mainQueue = this.receiver.Queue;
 | |
|             this.mainQueueForMove = new MsmqQueue(this.mainQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS);
 | |
|             // Open up the poison queue (for handling poison messages).
 | |
|             this.poisonQueue = new MsmqQueue(this.poisonQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS);
 | |
|             this.retryQueueForMove = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_MOVE_ACCESS);
 | |
|             this.retryQueueForPeek = new MsmqQueue(this.retryQueueName, UnsafeNativeMethods.MQ_RECEIVE_ACCESS);
 | |
|             this.retryQueueMessage = new MsmqRetryQueueMessage();
 | |
| 
 | |
|             if (Thread.CurrentThread.IsThreadPoolThread)
 | |
|             {
 | |
|                 StartPeek(this);
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 ActionItem.Schedule(Msmq4PoisonHandler.onStartPeek, this);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         static void StartPeek(object state)
 | |
|         {
 | |
|             Msmq4PoisonHandler handler = state as Msmq4PoisonHandler;
 | |
|             lock (handler)
 | |
|             {
 | |
|                 if (!handler.disposed)
 | |
|                 {
 | |
|                     handler.retryQueueForPeek.BeginPeek(handler.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, handler);
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         public bool CheckAndHandlePoisonMessage(MsmqMessageProperty messageProperty)
 | |
|         {
 | |
|             if (this.ReceiveParameters.ReceiveContextSettings.Enabled)
 | |
|             {
 | |
|                 return ReceiveContextPoisonHandling(messageProperty);
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 return NonReceiveContextPoisonHandling(messageProperty);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         public bool ReceiveContextPoisonHandling(MsmqMessageProperty messageProperty)
 | |
|         {
 | |
|             // The basic idea is to use the message move count to get the number of retry attempts the message has been through
 | |
|             // The computation of the retry count and retry cycle count is slightly involved due to fact that the message being processed
 | |
|             // could have been recycled message. (Recycled message is the message that moves from lock queue to retry queue to main queue
 | |
|             // and back to lock queue
 | |
|             //
 | |
| 
 | |
|             // Count to tally message recycling (lock queue to retry queue to main queue adds move count of 2 to the message)
 | |
|             const int retryMoveCount = 2;
 | |
| 
 | |
|             // Actual number of times message is received before recycling to retry queue
 | |
|             int actualReceiveRetryCount = this.ReceiveParameters.ReceiveRetryCount + 1;
 | |
| 
 | |
|             // The message is recycled these many number of times
 | |
|             int maxRetryCycles = this.ReceiveParameters.MaxRetryCycles;
 | |
| 
 | |
|             // Max change in message move count between recycling
 | |
|             int maxMovePerCycle = (2 * actualReceiveRetryCount) + 1;
 | |
| 
 | |
|             // Number of recycles the message has been through
 | |
|             int messageCyclesCompleted = messageProperty.MoveCount / (maxMovePerCycle + retryMoveCount);
 | |
| 
 | |
|             // Total number of moves on the message at the end of the last recycle
 | |
|             int messageMoveCountForCyclesCompleted = messageCyclesCompleted * (maxMovePerCycle + retryMoveCount);
 | |
| 
 | |
|             // The differential move count for the current cycle
 | |
|             int messageMoveCountForCurrentCycle = messageProperty.MoveCount - messageMoveCountForCyclesCompleted;
 | |
| 
 | |
|             lock (this)
 | |
|             {
 | |
|                 if (this.disposed)
 | |
|                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ObjectDisposedException(this.GetType().ToString()));
 | |
| 
 | |
|                 // Check if the message has already completed its max recycle count (MaxRetryCycles)
 | |
|                 // and the disposed the message first. Such a message was previously disposed using the ReceiveErrorHandling method
 | |
|                 // and the channel/listener would immediately fault 
 | |
|                 //
 | |
|                 if (messageCyclesCompleted > maxRetryCycles)
 | |
|                 {
 | |
|                     FinalDisposition(messageProperty);
 | |
|                     return true;
 | |
|                 }
 | |
| 
 | |
|                 // Check if the message is eligible for recycling/disposition
 | |
|                 if (messageMoveCountForCurrentCycle >= maxMovePerCycle)
 | |
|                 {
 | |
|                     if (TD.ReceiveRetryCountReachedIsEnabled())
 | |
|                     {
 | |
|                         TD.ReceiveRetryCountReached(messageProperty.MessageId);
 | |
|                     }
 | |
|                     if (messageCyclesCompleted < maxRetryCycles)
 | |
|                     {
 | |
|                         // The message is eligible for recycling, move the message the message to retry queue
 | |
|                         MsmqReceiveHelper.MoveReceivedMessage(this.lockQueueForReceive, this.retryQueueForMove, messageProperty.LookupId);
 | |
|                         MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, false, this.receiver.InstanceId);
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         if (TD.MaxRetryCyclesExceededMsmqIsEnabled())
 | |
|                         {
 | |
|                             TD.MaxRetryCyclesExceededMsmq(messageProperty.MessageId);
 | |
|                         }
 | |
|                         // Dispose the message using ReceiveErrorHandling
 | |
|                         FinalDisposition(messageProperty);
 | |
|                     }
 | |
| 
 | |
|                     return true;
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     return false;
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         public bool NonReceiveContextPoisonHandling(MsmqMessageProperty messageProperty)
 | |
|         {
 | |
|             if (messageProperty.AbortCount <= this.ReceiveParameters.ReceiveRetryCount)
 | |
|                 return false;
 | |
|             int retryCycle = messageProperty.MoveCount / 2;
 | |
| 
 | |
|             lock (this)
 | |
|             {
 | |
|                 if (this.disposed)
 | |
|                     throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ObjectDisposedException(this.GetType().ToString()));
 | |
| 
 | |
|                 if (retryCycle >= this.ReceiveParameters.MaxRetryCycles)
 | |
|                 {
 | |
|                     if (TD.MaxRetryCyclesExceededMsmqIsEnabled())
 | |
|                     {
 | |
|                         TD.MaxRetryCyclesExceededMsmq(messageProperty.MessageId);
 | |
|                     }
 | |
|                     FinalDisposition(messageProperty);
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     MsmqReceiveHelper.MoveReceivedMessage(this.mainQueue, this.retryQueueForMove, messageProperty.LookupId);
 | |
|                     MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, false, this.receiver.InstanceId);
 | |
|                 }
 | |
|             }
 | |
|             return true;
 | |
|         }
 | |
| 
 | |
|         public void FinalDisposition(MsmqMessageProperty messageProperty)
 | |
|         {
 | |
|             if (this.ReceiveParameters.ReceiveContextSettings.Enabled)
 | |
|             {
 | |
|                 this.InternalFinalDisposition(this.lockQueueForReceive, messageProperty);
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 this.InternalFinalDisposition(this.mainQueue, messageProperty);
 | |
|             }
 | |
|         }
 | |
| 
 | |
| 
 | |
|         private void InternalFinalDisposition(MsmqQueue disposeFromQueue, MsmqMessageProperty messageProperty)
 | |
|         {
 | |
|             switch (this.ReceiveParameters.ReceiveErrorHandling)
 | |
|             {
 | |
|                 case ReceiveErrorHandling.Drop:
 | |
|                     this.receiver.DropOrRejectReceivedMessage(disposeFromQueue, messageProperty, false);
 | |
|                     break;
 | |
| 
 | |
|                 case ReceiveErrorHandling.Fault:
 | |
|                     MsmqReceiveHelper.TryAbortTransactionCurrent();
 | |
|                     if (null != this.receiver.ChannelListener)
 | |
|                         this.receiver.ChannelListener.FaultListener();
 | |
|                     if (null != this.receiver.Channel)
 | |
|                         this.receiver.Channel.FaultChannel();
 | |
|                     break;
 | |
| 
 | |
|                 case ReceiveErrorHandling.Reject:
 | |
|                     this.receiver.DropOrRejectReceivedMessage(disposeFromQueue, messageProperty, true);
 | |
|                     MsmqDiagnostics.PoisonMessageRejected(messageProperty.MessageId, this.receiver.InstanceId);
 | |
|                     break;
 | |
| 
 | |
|                 case ReceiveErrorHandling.Move:
 | |
|                     MsmqReceiveHelper.MoveReceivedMessage(disposeFromQueue, this.poisonQueue, messageProperty.LookupId);
 | |
|                     MsmqDiagnostics.PoisonMessageMoved(messageProperty.MessageId, true, this.receiver.InstanceId);
 | |
|                     break;
 | |
| 
 | |
|                 default:
 | |
|                     Fx.Assert("System.ServiceModel.Channels.Msmq4PoisonHandler.FinalDisposition(): (unexpected ReceiveErrorHandling)");
 | |
|                     break;
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         public void Dispose()
 | |
|         {
 | |
|             lock (this)
 | |
|             {
 | |
|                 if (!this.disposed)
 | |
|                 {
 | |
|                     this.disposed = true;
 | |
|                     this.timer.Cancel();
 | |
| 
 | |
|                     if (null != this.retryQueueForPeek)
 | |
|                         this.retryQueueForPeek.Dispose();
 | |
|                     if (null != this.retryQueueForMove)
 | |
|                         this.retryQueueForMove.Dispose();
 | |
|                     if (null != this.poisonQueue)
 | |
|                         this.poisonQueue.Dispose();
 | |
|                     if (null != this.mainQueueForMove)
 | |
|                         this.mainQueueForMove.Dispose();
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         static void OnPeekCompleted(IAsyncResult result)
 | |
|         {
 | |
|             Msmq4PoisonHandler handler = result.AsyncState as Msmq4PoisonHandler;
 | |
|             MsmqQueue.ReceiveResult receiveResult = MsmqQueue.ReceiveResult.Unknown;
 | |
|             try
 | |
|             {
 | |
|                 receiveResult = handler.retryQueueForPeek.EndPeek(result);
 | |
|             }
 | |
|             catch (MsmqException ex)
 | |
|             {
 | |
|                 MsmqDiagnostics.ExpectedException(ex);
 | |
|             }
 | |
| 
 | |
|             if (MsmqQueue.ReceiveResult.MessageReceived == receiveResult)
 | |
|             {
 | |
|                 lock (handler)
 | |
|                 {
 | |
|                     if (!handler.disposed)
 | |
|                     {
 | |
|                         // Check the time - move it, and begin peeking again
 | |
|                         // if necessary, or wait for the timeout.
 | |
| 
 | |
|                         DateTime lastMoveTime = MsmqDateTime.ToDateTime(handler.retryQueueMessage.LastMoveTime.Value);
 | |
| 
 | |
|                         TimeSpan waitTime = lastMoveTime + handler.ReceiveParameters.RetryCycleDelay - DateTime.UtcNow;
 | |
|                         if (waitTime < TimeSpan.Zero)
 | |
|                             handler.OnTimer(handler);
 | |
|                         else
 | |
|                             handler.timer.Set(waitTime);
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         void OnTimer(object state)
 | |
|         {
 | |
|             lock (this)
 | |
|             {
 | |
|                 if (!this.disposed)
 | |
|                 {
 | |
|                     try
 | |
|                     {
 | |
|                         this.retryQueueForPeek.TryMoveMessage(this.retryQueueMessage.LookupId.Value, this.mainQueueForMove, MsmqTransactionMode.Single);
 | |
|                     }
 | |
|                     catch (MsmqException ex)
 | |
|                     {
 | |
|                         MsmqDiagnostics.ExpectedException(ex);
 | |
|                     }
 | |
|                     this.retryQueueForPeek.BeginPeek(this.retryQueueMessage, TimeSpan.MaxValue, onPeekCompleted, this);
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         class MsmqRetryQueueMessage : NativeMsmqMessage
 | |
|         {
 | |
|             LongProperty lookupId;
 | |
|             IntProperty lastMoveTime;
 | |
| 
 | |
|             public MsmqRetryQueueMessage()
 | |
|                 : base(2)
 | |
|             {
 | |
|                 this.lookupId = new LongProperty(this, UnsafeNativeMethods.PROPID_M_LOOKUPID);
 | |
|                 this.lastMoveTime = new IntProperty(this, UnsafeNativeMethods.PROPID_M_LAST_MOVE_TIME);
 | |
|             }
 | |
| 
 | |
|             public LongProperty LookupId
 | |
|             {
 | |
|                 get { return this.lookupId; }
 | |
|             }
 | |
| 
 | |
|             public IntProperty LastMoveTime
 | |
|             {
 | |
|                 get { return this.lastMoveTime; }
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| }
 |