Files
linux-packaging-mono/mcs/class/Mono.Messaging.RabbitMQ/Mono.Messaging.RabbitMQ/RabbitMQMessagingProvider.cs
Jo Shields a575963da9 Imported Upstream version 3.6.0
Former-commit-id: da6be194a6b1221998fc28233f2503bd61dd9d14
2014-08-13 10:39:27 +01:00

168 lines
4.4 KiB
C#

//
// Mono.Messaging.RabbitMQ
//
// Authors:
// Michael Barker (mike@middlesoft.co.uk)
//
// (C) 2008 Michael Barker
//
//
// Permission is hereby granted, free of charge, to any person obtaining
// a copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to
// permit persons to whom the Software is furnished to do so, subject to
// the following conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
//
using System;
using System.Collections;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using Mono.Messaging;
using RabbitMQ.Client;
namespace Mono.Messaging.RabbitMQ {
public class RabbitMQMessagingProvider : IMessagingProvider {
private int txCounter = 0;
private readonly uint localIp;
private readonly MessagingContextPool contextPool;
public RabbitMQMessagingProvider ()
{
localIp = GetLocalIP ();
contextPool = new MessagingContextPool (new MessageFactory (this),
CreateConnection);
}
private static uint GetLocalIP ()
{
String strHostName = Dns.GetHostName ();
IPHostEntry ipEntry = Dns.GetHostByName (strHostName);
foreach (IPAddress ip in ipEntry.AddressList) {
if (AddressFamily.InterNetwork == ip.AddressFamily) {
byte[] addr = ip.GetAddressBytes ();
uint localIP = 0;
for (int i = 0; i < 4 && i < addr.Length; i++) {
localIP += (uint) (addr[i] << 8 * (3 - i));
}
return localIP;
}
}
return 0;
}
public IMessage CreateMessage ()
{
return new MessageBase ();
}
public IMessageQueueTransaction CreateMessageQueueTransaction ()
{
Interlocked.Increment (ref txCounter);
string txId = localIp.ToString () + txCounter.ToString ();
return new RabbitMQMessageQueueTransaction (txId, contextPool);
}
public IMessagingContext CreateContext (string host)
{
return contextPool.GetContext (host);
}
private IConnection CreateConnection (string host)
{
ConnectionFactory cf = new ConnectionFactory ();
cf.Address = host;
return cf.CreateConnection ();
}
public void DeleteQueue (QueueReference qRef)
{
RabbitMQMessageQueue.Delete (qRef);
}
private readonly IDictionary queues = new Hashtable ();
private readonly ReaderWriterLock qLock = new ReaderWriterLock ();
private const int TIMEOUT = 15000;
public IMessageQueue[] GetPublicQueues ()
{
IMessageQueue[] qs;
qLock.AcquireReaderLock (TIMEOUT);
try {
ICollection qCollection = queues.Values;
qs = new IMessageQueue[qCollection.Count];
qCollection.CopyTo (qs, 0);
return qs;
} finally {
qLock.ReleaseReaderLock ();
}
}
public bool Exists (QueueReference qRef)
{
qLock.AcquireReaderLock (TIMEOUT);
try {
return queues.Contains (qRef);
} finally {
qLock.ReleaseReaderLock ();
}
}
public IMessageQueue CreateMessageQueue (QueueReference qRef,
bool transactional)
{
qLock.AcquireWriterLock (TIMEOUT);
try {
IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, transactional);
queues[qRef] = mq;
return mq;
} finally {
qLock.ReleaseWriterLock ();
}
}
public IMessageQueue GetMessageQueue (QueueReference qRef)
{
qLock.AcquireReaderLock (TIMEOUT);
try {
if (queues.Contains (qRef))
return (IMessageQueue) queues[qRef];
else {
LockCookie lc = qLock.UpgradeToWriterLock (TIMEOUT);
try {
IMessageQueue mq = new RabbitMQMessageQueue (this, qRef, false);
queues[qRef] = mq;
return mq;
} finally {
qLock.DowngradeFromWriterLock (ref lc);
}
}
} finally {
qLock.ReleaseReaderLock ();
}
}
}
}