You've already forked linux-packaging-mono
							
							
		
			
				
	
	
		
			168 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
			
		
		
	
	
			168 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
| //
 | |
| // Mono.Messaging.RabbitMQ
 | |
| //
 | |
| // Authors:
 | |
| //	  Michael Barker (mike@middlesoft.co.uk)
 | |
| //
 | |
| // (C) 2008 Michael Barker
 | |
| //
 | |
| 
 | |
| //
 | |
| // Permission is hereby granted, free of charge, to any person obtaining
 | |
| // a copy of this software and associated documentation files (the
 | |
| // "Software"), to deal in the Software without restriction, including
 | |
| // without limitation the rights to use, copy, modify, merge, publish,
 | |
| // distribute, sublicense, and/or sell copies of the Software, and to
 | |
| // permit persons to whom the Software is furnished to do so, subject to
 | |
| // the following conditions:
 | |
| // 
 | |
| // The above copyright notice and this permission notice shall be
 | |
| // included in all copies or substantial portions of the Software.
 | |
| // 
 | |
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 | |
| // EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 | |
| // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 | |
| // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
 | |
| // LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
 | |
| // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 | |
| // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 | |
| //
 | |
| 
 | |
| using System;
 | |
| using System.Collections;
 | |
| using System.Net;
 | |
| using System.Net.Sockets;
 | |
| using System.Threading;
 | |
| 
 | |
| using Mono.Messaging;
 | |
| 
 | |
| using RabbitMQ.Client;
 | |
| 
 | |
| namespace Mono.Messaging.RabbitMQ {
 | |
| 
 | |
| 	public class RabbitMQMessagingProvider : IMessagingProvider {
 | |
| 		
 | |
| 		private int txCounter = 0;
 | |
| 		private readonly uint localIp;
 | |
| 		private readonly MessagingContextPool contextPool;
 | |
| 		
 | |
| 		public RabbitMQMessagingProvider ()
 | |
| 		{
 | |
| 			localIp = GetLocalIP ();
 | |
| 			contextPool = new MessagingContextPool (new MessageFactory (this),
 | |
| 													CreateConnection);
 | |
| 		}
 | |
| 		
 | |
| 		private static uint GetLocalIP ()
 | |
| 		{
 | |
| 			String strHostName = Dns.GetHostName ();
 | |
| 			IPHostEntry ipEntry = Dns.GetHostByName (strHostName);
 | |
| 			foreach (IPAddress ip in ipEntry.AddressList) {
 | |
| 				if (AddressFamily.InterNetwork == ip.AddressFamily) {
 | |
| 					byte[] addr = ip.GetAddressBytes ();
 | |
| 					uint localIP = 0;
 | |
| 					for (int i = 0; i < 4 && i < addr.Length; i++) {
 | |
| 						localIP += (uint) (addr[i] << 8 * (3 - i));
 | |
| 					}
 | |
| 					return localIP;
 | |
| 				}
 | |
| 			}
 | |
| 			return 0;
 | |
| 		}
 | |
| 		
 | |
| 		public IMessage CreateMessage ()
 | |
| 		{
 | |
| 			return new MessageBase ();
 | |
| 		}
 | |
| 		
 | |
| 		public IMessageQueueTransaction CreateMessageQueueTransaction ()
 | |
| 		{
 | |
| 			Interlocked.Increment (ref txCounter);
 | |
| 			string txId = localIp.ToString () + txCounter.ToString ();
 | |
| 			
 | |
| 			return new RabbitMQMessageQueueTransaction (txId, contextPool);
 | |
| 		}
 | |
| 		
 | |
| 		public IMessagingContext CreateContext (string host)
 | |
| 		{
 | |
| 			return contextPool.GetContext (host);
 | |
| 		}
 | |
| 		
 | |
| 		private IConnection CreateConnection (string host)
 | |
| 		{
 | |
| 			ConnectionFactory cf = new ConnectionFactory ();
 | |
| 			cf.Address = host;
 | |
| 			return cf.CreateConnection ();
 | |
| 		}
 | |
| 		
 | |
| 		public void DeleteQueue (QueueReference qRef)
 | |
| 		{
 | |
| 			RabbitMQMessageQueue.Delete (qRef);
 | |
| 		}
 | |
| 		
 | |
| 		private readonly IDictionary queues = new Hashtable ();
 | |
| 		private readonly ReaderWriterLock qLock = new ReaderWriterLock ();
 | |
| 		private const int TIMEOUT = 15000;
 | |
| 		
 | |
| 		public IMessageQueue[] GetPublicQueues ()
 | |
| 		{
 | |
| 			IMessageQueue[] qs;
 | |
| 			qLock.AcquireReaderLock (TIMEOUT);
 | |
| 			try {
 | |
| 				ICollection qCollection = queues.Values;
 | |
| 				qs = new IMessageQueue[qCollection.Count];
 | |
| 				qCollection.CopyTo (qs, 0);
 | |
| 				return qs;
 | |
| 			} finally {
 | |
| 				qLock.ReleaseReaderLock ();
 | |
| 			}
 | |
| 		}
 | |
| 		
 | |
| 		public bool Exists (QueueReference qRef)
 | |
| 		{
 | |
| 			qLock.AcquireReaderLock (TIMEOUT);
 | |
| 			try {
 | |
| 				return queues.Contains (qRef);
 | |
| 			} finally {
 | |
| 				qLock.ReleaseReaderLock ();
 | |
| 			}
 | |
| 		}
 | |
| 		
 | |
| 		public IMessageQueue CreateMessageQueue (QueueReference qRef,
 | |
| 		                                         bool transactional)
 | |
| 		{
 | |
| 			qLock.AcquireWriterLock (TIMEOUT);
 | |
| 			try {
 | |
| 				IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, transactional);
 | |
| 				queues[qRef] = mq;
 | |
| 				return mq;
 | |
| 			} finally {
 | |
| 				qLock.ReleaseWriterLock ();
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		public IMessageQueue GetMessageQueue (QueueReference qRef)
 | |
| 		{
 | |
| 			qLock.AcquireReaderLock (TIMEOUT);
 | |
| 			try {
 | |
| 				if (queues.Contains (qRef))
 | |
| 					return (IMessageQueue) queues[qRef];
 | |
| 				else {
 | |
| 					LockCookie lc = qLock.UpgradeToWriterLock (TIMEOUT);
 | |
| 					try {
 | |
| 						IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, false);
 | |
| 						queues[qRef] = mq;
 | |
| 						return mq;
 | |
| 					} finally {
 | |
| 						qLock.DowngradeFromWriterLock (ref lc);
 | |
| 					}
 | |
| 				}
 | |
| 			} finally {
 | |
| 				qLock.ReleaseReaderLock ();
 | |
| 			}
 | |
| 		}
 | |
| 		
 | |
| 
 | |
| 	}
 | |
| }
 |