1108 lines
34 KiB
C#
1108 lines
34 KiB
C#
|
//------------------------------------------------------------
|
||
|
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||
|
//------------------------------------------------------------
|
||
|
|
||
|
namespace System.Runtime
|
||
|
{
|
||
|
using System;
|
||
|
using System.Collections.Generic;
|
||
|
using System.Threading;
|
||
|
|
||
|
[Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.PrivatePrimitive, SupportsAsync = true, ReleaseMethod = "Dispatch")]
|
||
|
sealed class InputQueue<T> : IDisposable where T : class
|
||
|
{
|
||
|
static Action<object> completeOutstandingReadersCallback;
|
||
|
static Action<object> completeWaitersFalseCallback;
|
||
|
static Action<object> completeWaitersTrueCallback;
|
||
|
static Action<object> onDispatchCallback;
|
||
|
static Action<object> onInvokeDequeuedCallback;
|
||
|
|
||
|
QueueState queueState;
|
||
|
|
||
|
[Fx.Tag.SynchronizationObject(Blocking = false, Kind = Fx.Tag.SynchronizationKind.LockStatement)]
|
||
|
ItemQueue itemQueue;
|
||
|
|
||
|
[Fx.Tag.SynchronizationObject]
|
||
|
Queue<IQueueReader> readerQueue;
|
||
|
|
||
|
[Fx.Tag.SynchronizationObject]
|
||
|
List<IQueueWaiter> waiterList;
|
||
|
|
||
|
public InputQueue()
|
||
|
{
|
||
|
this.itemQueue = new ItemQueue();
|
||
|
this.readerQueue = new Queue<IQueueReader>();
|
||
|
this.waiterList = new List<IQueueWaiter>();
|
||
|
this.queueState = QueueState.Open;
|
||
|
}
|
||
|
|
||
|
public InputQueue(Func<Action<AsyncCallback, IAsyncResult>> asyncCallbackGenerator)
|
||
|
: this()
|
||
|
{
|
||
|
Fx.Assert(asyncCallbackGenerator != null, "use default ctor if you don't have a generator");
|
||
|
AsyncCallbackGenerator = asyncCallbackGenerator;
|
||
|
}
|
||
|
|
||
|
public int PendingCount
|
||
|
{
|
||
|
get
|
||
|
{
|
||
|
lock (ThisLock)
|
||
|
{
|
||
|
return this.itemQueue.ItemCount;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Users like ServiceModel can hook this abort ICommunicationObject or handle other non-IDisposable objects
|
||
|
public Action<T> DisposeItemCallback
|
||
|
{
|
||
|
get;
|
||
|
set;
|
||
|
}
|
||
|
|
||
|
// Users like ServiceModel can hook this to wrap the AsyncQueueReader callback functionality for tracing, etc
|
||
|
Func<Action<AsyncCallback, IAsyncResult>> AsyncCallbackGenerator
|
||
|
{
|
||
|
get;
|
||
|
set;
|
||
|
}
|
||
|
|
||
|
object ThisLock
|
||
|
{
|
||
|
get { return this.itemQueue; }
|
||
|
}
|
||
|
|
||
|
public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
|
||
|
{
|
||
|
Item item = default(Item);
|
||
|
|
||
|
lock (ThisLock)
|
||
|
{
|
||
|
if (queueState == QueueState.Open)
|
||
|
{
|
||
|
if (itemQueue.HasAvailableItem)
|
||
|
{
|
||
|
item = itemQueue.DequeueAvailableItem();
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
|
||
|
readerQueue.Enqueue(reader);
|
||
|
return reader;
|
||
|
}
|
||
|
}
|
||
|
else if (queueState == QueueState.Shutdown)
|
||
|
{
|
||
|
if (itemQueue.HasAvailableItem)
|
||
|
{
|
||
|
item = itemQueue.DequeueAvailableItem();
|
||
|
}
|
||
|
else if (itemQueue.HasAnyItem)
|
||
|
{
|
||
|
AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
|
||
|
readerQueue.Enqueue(reader);
|
||
|
return reader;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
InvokeDequeuedCallback(item.DequeuedCallback);
|
||
|
return new CompletedAsyncResult<T>(item.GetValue(), callback, state);
|
||
|
}
|
||
|
|
||
|
public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state)
|
||
|
{
|
||
|
lock (ThisLock)
|
||
|
{
|
||
|
if (queueState == QueueState.Open)
|
||
|
{
|
||
|
if (!itemQueue.HasAvailableItem)
|
||
|
{
|
||
|
AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
|
||
|
waiterList.Add(waiter);
|
||
|
return waiter;
|
||
|
}
|
||
|
}
|
||
|
else if (queueState == QueueState.Shutdown)
|
||
|
{
|
||
|
if (!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
|
||
|
{
|
||
|
AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
|
||
|
waiterList.Add(waiter);
|
||
|
return waiter;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return new CompletedAsyncResult<bool>(true, callback, state);
|
||
|
}
|
||
|
|
||
|
public void Close()
|
||
|
{
|
||
|
Dispose();
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.Blocking(CancelMethod = "Close")]
|
||
|
public T Dequeue(TimeSpan timeout)
|
||
|
{
|
||
|
T value;
|
||
|
|
||
|
if (!this.Dequeue(timeout, out value))
|
||
|
{
|
||
|
throw Fx.Exception.AsError(new TimeoutException(InternalSR.TimeoutInputQueueDequeue(timeout)));
|
||
|
}
|
||
|
|
||
|
return value;
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.Blocking(CancelMethod = "Close")]
|
||
|
public bool Dequeue(TimeSpan timeout, out T value)
|
||
|
{
|
||
|
WaitQueueReader reader = null;
|
||
|
Item item = new Item();
|
||
|
|
||
|
lock (ThisLock)
|
||
|
{
|
||
|
if (queueState == QueueState.Open)
|
||
|
{
|
||
|
if (itemQueue.HasAvailableItem)
|
||
|
{
|
||
|
item = itemQueue.DequeueAvailableItem();
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
reader = new WaitQueueReader(this);
|
||
|
readerQueue.Enqueue(reader);
|
||
|
}
|
||
|
}
|
||
|
else if (queueState == QueueState.Shutdown)
|
||
|
{
|
||
|
if (itemQueue.HasAvailableItem)
|
||
|
{
|
||
|
item = itemQueue.DequeueAvailableItem();
|
||
|
}
|
||
|
else if (itemQueue.HasAnyItem)
|
||
|
{
|
||
|
reader = new WaitQueueReader(this);
|
||
|
readerQueue.Enqueue(reader);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
value = default(T);
|
||
|
return true;
|
||
|
}
|
||
|
}
|
||
|
else // queueState == QueueState.Closed
|
||
|
{
|
||
|
value = default(T);
|
||
|
return true;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (reader != null)
|
||
|
{
|
||
|
return reader.Wait(timeout, out value);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
InvokeDequeuedCallback(item.DequeuedCallback);
|
||
|
value = item.GetValue();
|
||
|
return true;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public void Dispatch()
|
||
|
{
|
||
|
IQueueReader reader = null;
|
||
|
Item item = new Item();
|
||
|
IQueueReader[] outstandingReaders = null;
|
||
|
IQueueWaiter[] waiters = null;
|
||
|
bool itemAvailable = true;
|
||
|
|
||
|
lock (ThisLock)
|
||
|
{
|
||
|
itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
|
||
|
this.GetWaiters(out waiters);
|
||
|
|
||
|
if (queueState != QueueState.Closed)
|
||
|
{
|
||
|
itemQueue.MakePendingItemAvailable();
|
||
|
|
||
|
if (readerQueue.Count > 0)
|
||
|
{
|
||
|
item = itemQueue.DequeueAvailableItem();
|
||
|
reader = readerQueue.Dequeue();
|
||
|
|
||
|
if (queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
|
||
|
{
|
||
|
outstandingReaders = new IQueueReader[readerQueue.Count];
|
||
|
readerQueue.CopyTo(outstandingReaders, 0);
|
||
|
readerQueue.Clear();
|
||
|
|
||
|
itemAvailable = false;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (outstandingReaders != null)
|
||
|
{
|
||
|
if (completeOutstandingReadersCallback == null)
|
||
|
{
|
||
|
completeOutstandingReadersCallback = new Action<object>(CompleteOutstandingReadersCallback);
|
||
|
}
|
||
|
|
||
|
ActionItem.Schedule(completeOutstandingReadersCallback, outstandingReaders);
|
||
|
}
|
||
|
|
||
|
if (waiters != null)
|
||
|
{
|
||
|
CompleteWaitersLater(itemAvailable, waiters);
|
||
|
}
|
||
|
|
||
|
if (reader != null)
|
||
|
{
|
||
|
InvokeDequeuedCallback(item.DequeuedCallback);
|
||
|
reader.Set(item);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")]
|
||
|
public bool EndDequeue(IAsyncResult result, out T value)
|
||
|
{
|
||
|
CompletedAsyncResult<T> typedResult = result as CompletedAsyncResult<T>;
|
||
|
|
||
|
if (typedResult != null)
|
||
|
{
|
||
|
value = CompletedAsyncResult<T>.End(result);
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
return AsyncQueueReader.End(result, out value);
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.Blocking(CancelMethod = "Close", Conditional = "!result.IsCompleted")]
|
||
|
public T EndDequeue(IAsyncResult result)
|
||
|
{
|
||
|
T value;
|
||
|
|
||
|
if (!this.EndDequeue(result, out value))
|
||
|
{
|
||
|
throw Fx.Exception.AsError(new TimeoutException());
|
||
|
}
|
||
|
|
||
|
return value;
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.Blocking(CancelMethod = "Dispatch", Conditional = "!result.IsCompleted")]
|
||
|
public bool EndWaitForItem(IAsyncResult result)
|
||
|
{
|
||
|
CompletedAsyncResult<bool> typedResult = result as CompletedAsyncResult<bool>;
|
||
|
if (typedResult != null)
|
||
|
{
|
||
|
return CompletedAsyncResult<bool>.End(result);
|
||
|
}
|
||
|
|
||
|
return AsyncQueueWaiter.End(result);
|
||
|
}
|
||
|
|
||
|
public void EnqueueAndDispatch(T item)
|
||
|
{
|
||
|
EnqueueAndDispatch(item, null);
|
||
|
}
|
||
|
|
||
|
// dequeuedCallback is called as an item is dequeued from the InputQueue. The
|
||
|
// InputQueue lock is not held during the callback. However, the user code will
|
||
|
// not be notified of the item being available until the callback returns. If you
|
||
|
// are not sure if the callback will block for a long time, then first call
|
||
|
// IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
|
||
|
public void EnqueueAndDispatch(T item, Action dequeuedCallback)
|
||
|
{
|
||
|
EnqueueAndDispatch(item, dequeuedCallback, true);
|
||
|
}
|
||
|
|
||
|
public void EnqueueAndDispatch(Exception exception, Action dequeuedCallback, bool canDispatchOnThisThread)
|
||
|
{
|
||
|
Fx.Assert(exception != null, "EnqueueAndDispatch: exception parameter should not be null");
|
||
|
EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread);
|
||
|
}
|
||
|
|
||
|
public void EnqueueAndDispatch(T item, Action dequeuedCallback, bool canDispatchOnThisThread)
|
||
|
{
|
||
|
Fx.Assert(item != null, "EnqueueAndDispatch: item parameter should not be null");
|
||
|
EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread);
|
||
|
}
|
||
|
|
||
|
public bool EnqueueWithoutDispatch(T item, Action dequeuedCallback)
|
||
|
{
|
||
|
Fx.Assert(item != null, "EnqueueWithoutDispatch: item parameter should not be null");
|
||
|
return EnqueueWithoutDispatch(new Item(item, dequeuedCallback));
|
||
|
}
|
||
|
|
||
|
public bool EnqueueWithoutDispatch(Exception exception, Action dequeuedCallback)
|
||
|
{
|
||
|
Fx.Assert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null");
|
||
|
return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
|
||
|
}
|
||
|
|
||
|
|
||
|
public void Shutdown()
|
||
|
{
|
||
|
this.Shutdown(null);
|
||
|
}
|
||
|
|
||
|
// Don't let any more items in. Differs from Close in that we keep around
|
||
|
// existing items in our itemQueue for possible future calls to Dequeue
|
||
|
public void Shutdown(Func<Exception> pendingExceptionGenerator)
|
||
|
{
|
||
|
IQueueReader[] outstandingReaders = null;
|
||
|
lock (ThisLock)
|
||
|
{
|
||
|
if (queueState == QueueState.Shutdown)
|
||
|
{
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (queueState == QueueState.Closed)
|
||
|
{
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
this.queueState = QueueState.Shutdown;
|
||
|
|
||
|
if (readerQueue.Count > 0 && this.itemQueue.ItemCount == 0)
|
||
|
{
|
||
|
outstandingReaders = new IQueueReader[readerQueue.Count];
|
||
|
readerQueue.CopyTo(outstandingReaders, 0);
|
||
|
readerQueue.Clear();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (outstandingReaders != null)
|
||
|
{
|
||
|
for (int i = 0; i < outstandingReaders.Length; i++)
|
||
|
{
|
||
|
Exception exception = (pendingExceptionGenerator != null) ? pendingExceptionGenerator() : null;
|
||
|
outstandingReaders[i].Set(new Item(exception, null));
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.Blocking(CancelMethod = "Dispatch")]
|
||
|
public bool WaitForItem(TimeSpan timeout)
|
||
|
{
|
||
|
WaitQueueWaiter waiter = null;
|
||
|
bool itemAvailable = false;
|
||
|
|
||
|
lock (ThisLock)
|
||
|
{
|
||
|
if (queueState == QueueState.Open)
|
||
|
{
|
||
|
if (itemQueue.HasAvailableItem)
|
||
|
{
|
||
|
itemAvailable = true;
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
waiter = new WaitQueueWaiter();
|
||
|
waiterList.Add(waiter);
|
||
|
}
|
||
|
}
|
||
|
else if (queueState == QueueState.Shutdown)
|
||
|
{
|
||
|
if (itemQueue.HasAvailableItem)
|
||
|
{
|
||
|
itemAvailable = true;
|
||
|
}
|
||
|
else if (itemQueue.HasAnyItem)
|
||
|
{
|
||
|
waiter = new WaitQueueWaiter();
|
||
|
waiterList.Add(waiter);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
return true;
|
||
|
}
|
||
|
}
|
||
|
else // queueState == QueueState.Closed
|
||
|
{
|
||
|
return true;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (waiter != null)
|
||
|
{
|
||
|
return waiter.Wait(timeout);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
return itemAvailable;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public void Dispose()
|
||
|
{
|
||
|
bool dispose = false;
|
||
|
|
||
|
lock (ThisLock)
|
||
|
{
|
||
|
if (queueState != QueueState.Closed)
|
||
|
{
|
||
|
queueState = QueueState.Closed;
|
||
|
dispose = true;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (dispose)
|
||
|
{
|
||
|
while (readerQueue.Count > 0)
|
||
|
{
|
||
|
IQueueReader reader = readerQueue.Dequeue();
|
||
|
reader.Set(default(Item));
|
||
|
}
|
||
|
|
||
|
while (itemQueue.HasAnyItem)
|
||
|
{
|
||
|
Item item = itemQueue.DequeueAnyItem();
|
||
|
DisposeItem(item);
|
||
|
InvokeDequeuedCallback(item.DequeuedCallback);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void DisposeItem(Item item)
|
||
|
{
|
||
|
T value = item.Value;
|
||
|
if (value != null)
|
||
|
{
|
||
|
if (value is IDisposable)
|
||
|
{
|
||
|
((IDisposable)value).Dispose();
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
Action<T> disposeItemCallback = this.DisposeItemCallback;
|
||
|
if (disposeItemCallback != null)
|
||
|
{
|
||
|
disposeItemCallback(value);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void CompleteOutstandingReadersCallback(object state)
|
||
|
{
|
||
|
IQueueReader[] outstandingReaders = (IQueueReader[])state;
|
||
|
|
||
|
for (int i = 0; i < outstandingReaders.Length; i++)
|
||
|
{
|
||
|
outstandingReaders[i].Set(default(Item));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters)
|
||
|
{
|
||
|
for (int i = 0; i < waiters.Length; i++)
|
||
|
{
|
||
|
waiters[i].Set(itemAvailable);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void CompleteWaitersFalseCallback(object state)
|
||
|
{
|
||
|
CompleteWaiters(false, (IQueueWaiter[])state);
|
||
|
}
|
||
|
|
||
|
static void CompleteWaitersLater(bool itemAvailable, IQueueWaiter[] waiters)
|
||
|
{
|
||
|
if (itemAvailable)
|
||
|
{
|
||
|
if (completeWaitersTrueCallback == null)
|
||
|
{
|
||
|
completeWaitersTrueCallback = new Action<object>(CompleteWaitersTrueCallback);
|
||
|
}
|
||
|
|
||
|
ActionItem.Schedule(completeWaitersTrueCallback, waiters);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
if (completeWaitersFalseCallback == null)
|
||
|
{
|
||
|
completeWaitersFalseCallback = new Action<object>(CompleteWaitersFalseCallback);
|
||
|
}
|
||
|
|
||
|
ActionItem.Schedule(completeWaitersFalseCallback, waiters);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void CompleteWaitersTrueCallback(object state)
|
||
|
{
|
||
|
CompleteWaiters(true, (IQueueWaiter[])state);
|
||
|
}
|
||
|
|
||
|
static void InvokeDequeuedCallback(Action dequeuedCallback)
|
||
|
{
|
||
|
if (dequeuedCallback != null)
|
||
|
{
|
||
|
dequeuedCallback();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void InvokeDequeuedCallbackLater(Action dequeuedCallback)
|
||
|
{
|
||
|
if (dequeuedCallback != null)
|
||
|
{
|
||
|
if (onInvokeDequeuedCallback == null)
|
||
|
{
|
||
|
onInvokeDequeuedCallback = new Action<object>(OnInvokeDequeuedCallback);
|
||
|
}
|
||
|
|
||
|
ActionItem.Schedule(onInvokeDequeuedCallback, dequeuedCallback);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void OnDispatchCallback(object state)
|
||
|
{
|
||
|
((InputQueue<T>)state).Dispatch();
|
||
|
}
|
||
|
|
||
|
static void OnInvokeDequeuedCallback(object state)
|
||
|
{
|
||
|
Fx.Assert(state != null, "InputQueue.OnInvokeDequeuedCallback: (state != null)");
|
||
|
|
||
|
Action dequeuedCallback = (Action)state;
|
||
|
dequeuedCallback();
|
||
|
}
|
||
|
|
||
|
void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread)
|
||
|
{
|
||
|
bool disposeItem = false;
|
||
|
IQueueReader reader = null;
|
||
|
bool dispatchLater = false;
|
||
|
IQueueWaiter[] waiters = null;
|
||
|
bool itemAvailable = true;
|
||
|
|
||
|
lock (ThisLock)
|
||
|
{
|
||
|
itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
|
||
|
this.GetWaiters(out waiters);
|
||
|
|
||
|
if (queueState == QueueState.Open)
|
||
|
{
|
||
|
if (canDispatchOnThisThread)
|
||
|
{
|
||
|
if (readerQueue.Count == 0)
|
||
|
{
|
||
|
itemQueue.EnqueueAvailableItem(item);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
reader = readerQueue.Dequeue();
|
||
|
}
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
if (readerQueue.Count == 0)
|
||
|
{
|
||
|
itemQueue.EnqueueAvailableItem(item);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
itemQueue.EnqueuePendingItem(item);
|
||
|
dispatchLater = true;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
else // queueState == QueueState.Closed || queueState == QueueState.Shutdown
|
||
|
{
|
||
|
disposeItem = true;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (waiters != null)
|
||
|
{
|
||
|
if (canDispatchOnThisThread)
|
||
|
{
|
||
|
CompleteWaiters(itemAvailable, waiters);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
CompleteWaitersLater(itemAvailable, waiters);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (reader != null)
|
||
|
{
|
||
|
InvokeDequeuedCallback(item.DequeuedCallback);
|
||
|
reader.Set(item);
|
||
|
}
|
||
|
|
||
|
if (dispatchLater)
|
||
|
{
|
||
|
if (onDispatchCallback == null)
|
||
|
{
|
||
|
onDispatchCallback = new Action<object>(OnDispatchCallback);
|
||
|
}
|
||
|
|
||
|
ActionItem.Schedule(onDispatchCallback, this);
|
||
|
}
|
||
|
else if (disposeItem)
|
||
|
{
|
||
|
InvokeDequeuedCallback(item.DequeuedCallback);
|
||
|
DisposeItem(item);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// This will not block, however, Dispatch() must be called later if this function
|
||
|
// returns true.
|
||
|
bool EnqueueWithoutDispatch(Item item)
|
||
|
{
|
||
|
lock (ThisLock)
|
||
|
{
|
||
|
// Open
|
||
|
if (queueState != QueueState.Closed && queueState != QueueState.Shutdown)
|
||
|
{
|
||
|
if (readerQueue.Count == 0 && waiterList.Count == 0)
|
||
|
{
|
||
|
itemQueue.EnqueueAvailableItem(item);
|
||
|
return false;
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
itemQueue.EnqueuePendingItem(item);
|
||
|
return true;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
DisposeItem(item);
|
||
|
InvokeDequeuedCallbackLater(item.DequeuedCallback);
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
void GetWaiters(out IQueueWaiter[] waiters)
|
||
|
{
|
||
|
if (waiterList.Count > 0)
|
||
|
{
|
||
|
waiters = waiterList.ToArray();
|
||
|
waiterList.Clear();
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
waiters = null;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Used for timeouts. The InputQueue must remove readers from its reader queue to prevent
|
||
|
// dispatching items to timed out readers.
|
||
|
bool RemoveReader(IQueueReader reader)
|
||
|
{
|
||
|
Fx.Assert(reader != null, "InputQueue.RemoveReader: (reader != null)");
|
||
|
|
||
|
lock (ThisLock)
|
||
|
{
|
||
|
if (queueState == QueueState.Open || queueState == QueueState.Shutdown)
|
||
|
{
|
||
|
bool removed = false;
|
||
|
|
||
|
for (int i = readerQueue.Count; i > 0; i--)
|
||
|
{
|
||
|
IQueueReader temp = readerQueue.Dequeue();
|
||
|
if (object.ReferenceEquals(temp, reader))
|
||
|
{
|
||
|
removed = true;
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
readerQueue.Enqueue(temp);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return removed;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
enum QueueState
|
||
|
{
|
||
|
Open,
|
||
|
Shutdown,
|
||
|
Closed
|
||
|
}
|
||
|
|
||
|
interface IQueueReader
|
||
|
{
|
||
|
void Set(Item item);
|
||
|
}
|
||
|
|
||
|
interface IQueueWaiter
|
||
|
{
|
||
|
void Set(bool itemAvailable);
|
||
|
}
|
||
|
|
||
|
struct Item
|
||
|
{
|
||
|
Action dequeuedCallback;
|
||
|
Exception exception;
|
||
|
T value;
|
||
|
|
||
|
public Item(T value, Action dequeuedCallback)
|
||
|
: this(value, null, dequeuedCallback)
|
||
|
{
|
||
|
}
|
||
|
|
||
|
public Item(Exception exception, Action dequeuedCallback)
|
||
|
: this(null, exception, dequeuedCallback)
|
||
|
{
|
||
|
}
|
||
|
|
||
|
Item(T value, Exception exception, Action dequeuedCallback)
|
||
|
{
|
||
|
this.value = value;
|
||
|
this.exception = exception;
|
||
|
this.dequeuedCallback = dequeuedCallback;
|
||
|
}
|
||
|
|
||
|
public Action DequeuedCallback
|
||
|
{
|
||
|
get { return this.dequeuedCallback; }
|
||
|
}
|
||
|
|
||
|
public Exception Exception
|
||
|
{
|
||
|
get { return this.exception; }
|
||
|
}
|
||
|
|
||
|
public T Value
|
||
|
{
|
||
|
get { return this.value; }
|
||
|
}
|
||
|
|
||
|
public T GetValue()
|
||
|
{
|
||
|
if (this.exception != null)
|
||
|
{
|
||
|
throw Fx.Exception.AsError(this.exception);
|
||
|
}
|
||
|
|
||
|
return this.value;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
|
||
|
class AsyncQueueReader : AsyncResult, IQueueReader
|
||
|
{
|
||
|
static Action<object> timerCallback = new Action<object>(AsyncQueueReader.TimerCallback);
|
||
|
|
||
|
bool expired;
|
||
|
InputQueue<T> inputQueue;
|
||
|
T item;
|
||
|
IOThreadTimer timer;
|
||
|
|
||
|
public AsyncQueueReader(InputQueue<T> inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
|
||
|
: base(callback, state)
|
||
|
{
|
||
|
if (inputQueue.AsyncCallbackGenerator != null)
|
||
|
{
|
||
|
base.VirtualCallback = inputQueue.AsyncCallbackGenerator();
|
||
|
}
|
||
|
this.inputQueue = inputQueue;
|
||
|
if (timeout != TimeSpan.MaxValue)
|
||
|
{
|
||
|
this.timer = new IOThreadTimer(timerCallback, this, false);
|
||
|
this.timer.Set(timeout);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")]
|
||
|
public static bool End(IAsyncResult result, out T value)
|
||
|
{
|
||
|
AsyncQueueReader readerResult = AsyncResult.End<AsyncQueueReader>(result);
|
||
|
|
||
|
if (readerResult.expired)
|
||
|
{
|
||
|
value = default(T);
|
||
|
return false;
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
value = readerResult.item;
|
||
|
return true;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public void Set(Item item)
|
||
|
{
|
||
|
this.item = item.Value;
|
||
|
if (this.timer != null)
|
||
|
{
|
||
|
this.timer.Cancel();
|
||
|
}
|
||
|
Complete(false, item.Exception);
|
||
|
}
|
||
|
|
||
|
static void TimerCallback(object state)
|
||
|
{
|
||
|
AsyncQueueReader thisPtr = (AsyncQueueReader)state;
|
||
|
if (thisPtr.inputQueue.RemoveReader(thisPtr))
|
||
|
{
|
||
|
thisPtr.expired = true;
|
||
|
thisPtr.Complete(false);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.AsyncResult, SupportsAsync = true, ReleaseMethod = "Set")]
|
||
|
class AsyncQueueWaiter : AsyncResult, IQueueWaiter
|
||
|
{
|
||
|
static Action<object> timerCallback = new Action<object>(AsyncQueueWaiter.TimerCallback);
|
||
|
bool itemAvailable;
|
||
|
|
||
|
[Fx.Tag.SynchronizationObject(Blocking = false)]
|
||
|
object thisLock = new object();
|
||
|
|
||
|
IOThreadTimer timer;
|
||
|
|
||
|
public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state)
|
||
|
{
|
||
|
if (timeout != TimeSpan.MaxValue)
|
||
|
{
|
||
|
this.timer = new IOThreadTimer(timerCallback, this, false);
|
||
|
this.timer.Set(timeout);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
object ThisLock
|
||
|
{
|
||
|
get
|
||
|
{
|
||
|
return this.thisLock;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.Blocking(Conditional = "!result.IsCompleted", CancelMethod = "Set")]
|
||
|
public static bool End(IAsyncResult result)
|
||
|
{
|
||
|
AsyncQueueWaiter waiterResult = AsyncResult.End<AsyncQueueWaiter>(result);
|
||
|
return waiterResult.itemAvailable;
|
||
|
}
|
||
|
|
||
|
public void Set(bool itemAvailable)
|
||
|
{
|
||
|
bool timely;
|
||
|
|
||
|
lock (ThisLock)
|
||
|
{
|
||
|
timely = (this.timer == null) || this.timer.Cancel();
|
||
|
this.itemAvailable = itemAvailable;
|
||
|
}
|
||
|
|
||
|
if (timely)
|
||
|
{
|
||
|
Complete(false);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void TimerCallback(object state)
|
||
|
{
|
||
|
AsyncQueueWaiter thisPtr = (AsyncQueueWaiter)state;
|
||
|
thisPtr.Complete(false);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
class ItemQueue
|
||
|
{
|
||
|
int head;
|
||
|
Item[] items;
|
||
|
int pendingCount;
|
||
|
int totalCount;
|
||
|
|
||
|
public ItemQueue()
|
||
|
{
|
||
|
this.items = new Item[1];
|
||
|
}
|
||
|
|
||
|
public bool HasAnyItem
|
||
|
{
|
||
|
get { return this.totalCount > 0; }
|
||
|
}
|
||
|
|
||
|
public bool HasAvailableItem
|
||
|
{
|
||
|
get { return this.totalCount > this.pendingCount; }
|
||
|
}
|
||
|
|
||
|
public int ItemCount
|
||
|
{
|
||
|
get { return this.totalCount; }
|
||
|
}
|
||
|
|
||
|
public Item DequeueAnyItem()
|
||
|
{
|
||
|
if (this.pendingCount == this.totalCount)
|
||
|
{
|
||
|
this.pendingCount--;
|
||
|
}
|
||
|
return DequeueItemCore();
|
||
|
}
|
||
|
|
||
|
public Item DequeueAvailableItem()
|
||
|
{
|
||
|
Fx.AssertAndThrow(this.totalCount != this.pendingCount, "ItemQueue does not contain any available items");
|
||
|
return DequeueItemCore();
|
||
|
}
|
||
|
|
||
|
public void EnqueueAvailableItem(Item item)
|
||
|
{
|
||
|
EnqueueItemCore(item);
|
||
|
}
|
||
|
|
||
|
public void EnqueuePendingItem(Item item)
|
||
|
{
|
||
|
EnqueueItemCore(item);
|
||
|
this.pendingCount++;
|
||
|
}
|
||
|
|
||
|
public void MakePendingItemAvailable()
|
||
|
{
|
||
|
Fx.AssertAndThrow(this.pendingCount != 0, "ItemQueue does not contain any pending items");
|
||
|
this.pendingCount--;
|
||
|
}
|
||
|
|
||
|
Item DequeueItemCore()
|
||
|
{
|
||
|
Fx.AssertAndThrow(totalCount != 0, "ItemQueue does not contain any items");
|
||
|
Item item = this.items[this.head];
|
||
|
this.items[this.head] = new Item();
|
||
|
this.totalCount--;
|
||
|
this.head = (this.head + 1) % this.items.Length;
|
||
|
return item;
|
||
|
}
|
||
|
|
||
|
void EnqueueItemCore(Item item)
|
||
|
{
|
||
|
if (this.totalCount == this.items.Length)
|
||
|
{
|
||
|
Item[] newItems = new Item[this.items.Length * 2];
|
||
|
for (int i = 0; i < this.totalCount; i++)
|
||
|
{
|
||
|
newItems[i] = this.items[(head + i) % this.items.Length];
|
||
|
}
|
||
|
this.head = 0;
|
||
|
this.items = newItems;
|
||
|
}
|
||
|
int tail = (this.head + this.totalCount) % this.items.Length;
|
||
|
this.items[tail] = item;
|
||
|
this.totalCount++;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.SynchronizationObject(Blocking = false)]
|
||
|
[Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")]
|
||
|
class WaitQueueReader : IQueueReader
|
||
|
{
|
||
|
Exception exception;
|
||
|
InputQueue<T> inputQueue;
|
||
|
T item;
|
||
|
|
||
|
[Fx.Tag.SynchronizationObject]
|
||
|
ManualResetEvent waitEvent;
|
||
|
|
||
|
public WaitQueueReader(InputQueue<T> inputQueue)
|
||
|
{
|
||
|
this.inputQueue = inputQueue;
|
||
|
waitEvent = new ManualResetEvent(false);
|
||
|
}
|
||
|
|
||
|
public void Set(Item item)
|
||
|
{
|
||
|
lock (this)
|
||
|
{
|
||
|
Fx.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
|
||
|
Fx.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
|
||
|
|
||
|
this.exception = item.Exception;
|
||
|
this.item = item.Value;
|
||
|
waitEvent.Set();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.Blocking(CancelMethod = "Set")]
|
||
|
public bool Wait(TimeSpan timeout, out T value)
|
||
|
{
|
||
|
bool isSafeToClose = false;
|
||
|
try
|
||
|
{
|
||
|
if (!TimeoutHelper.WaitOne(waitEvent, timeout))
|
||
|
{
|
||
|
if (this.inputQueue.RemoveReader(this))
|
||
|
{
|
||
|
value = default(T);
|
||
|
isSafeToClose = true;
|
||
|
return false;
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
waitEvent.WaitOne();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
isSafeToClose = true;
|
||
|
}
|
||
|
finally
|
||
|
{
|
||
|
if (isSafeToClose)
|
||
|
{
|
||
|
waitEvent.Close();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (this.exception != null)
|
||
|
{
|
||
|
throw Fx.Exception.AsError(this.exception);
|
||
|
}
|
||
|
|
||
|
value = item;
|
||
|
return true;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.ManualResetEvent, ReleaseMethod = "Set")]
|
||
|
class WaitQueueWaiter : IQueueWaiter
|
||
|
{
|
||
|
bool itemAvailable;
|
||
|
|
||
|
[Fx.Tag.SynchronizationObject]
|
||
|
ManualResetEvent waitEvent;
|
||
|
|
||
|
public WaitQueueWaiter()
|
||
|
{
|
||
|
waitEvent = new ManualResetEvent(false);
|
||
|
}
|
||
|
|
||
|
public void Set(bool itemAvailable)
|
||
|
{
|
||
|
lock (this)
|
||
|
{
|
||
|
this.itemAvailable = itemAvailable;
|
||
|
waitEvent.Set();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
[Fx.Tag.Blocking(CancelMethod = "Set")]
|
||
|
public bool Wait(TimeSpan timeout)
|
||
|
{
|
||
|
if (!TimeoutHelper.WaitOne(waitEvent, timeout))
|
||
|
{
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
return this.itemAvailable;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|