e79aa3c0ed
Former-commit-id: a2155e9bd80020e49e72e86c44da02a8ac0e57a4
662 lines
30 KiB
C#
662 lines
30 KiB
C#
//------------------------------------------------------------
|
|
// Copyright (c) Microsoft Corporation. All rights reserved.
|
|
//------------------------------------------------------------
|
|
|
|
namespace System.Runtime
|
|
{
|
|
using System.Threading;
|
|
using System.Security;
|
|
|
|
// IOThreadScheduler takes no locks due to contention problems on multiproc.
|
|
[Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.NonBlocking)]
|
|
class IOThreadScheduler
|
|
{
|
|
// Do not increase the maximum capacity above 32k! It must be a power of two, 0x8000 or less, in order to
|
|
// work with the strategy for 'headTail'.
|
|
const int MaximumCapacity = 0x8000;
|
|
|
|
[Fx.Tag.SecurityNote(Miscellaneous = "can be called outside user context")]
|
|
static class Bits
|
|
{
|
|
public const int HiShift = 32 / 2;
|
|
|
|
public const int HiOne = 1 << HiShift;
|
|
public const int LoHiBit = HiOne >> 1;
|
|
public const int HiHiBit = LoHiBit << HiShift;
|
|
public const int LoCountMask = LoHiBit - 1;
|
|
public const int HiCountMask = LoCountMask << HiShift;
|
|
public const int LoMask = LoCountMask | LoHiBit;
|
|
public const int HiMask = HiCountMask | HiHiBit;
|
|
public const int HiBits = LoHiBit | HiHiBit;
|
|
|
|
public static int Count(int slot)
|
|
{
|
|
return ((slot >> HiShift) - slot + 2 & LoMask) - 1;
|
|
}
|
|
|
|
public static int CountNoIdle(int slot)
|
|
{
|
|
return (slot >> HiShift) - slot + 1 & LoMask;
|
|
}
|
|
|
|
public static int IncrementLo(int slot)
|
|
{
|
|
return slot + 1 & LoMask | slot & HiMask;
|
|
}
|
|
|
|
// This method is only valid if you already know that (gate & HiBits) != 0.
|
|
public static bool IsComplete(int gate)
|
|
{
|
|
return (gate & HiMask) == gate << HiShift;
|
|
}
|
|
}
|
|
|
|
static IOThreadScheduler current = new IOThreadScheduler(32, 32);
|
|
readonly ScheduledOverlapped overlapped;
|
|
|
|
[Fx.Tag.Queue(typeof(Slot), Scope = Fx.Tag.Strings.AppDomain)]
|
|
[Fx.Tag.SecurityNote(Critical = "holds callbacks which get called outside of the app security context")]
|
|
[SecurityCritical]
|
|
readonly Slot[] slots;
|
|
|
|
[Fx.Tag.Queue(typeof(Slot), Scope = Fx.Tag.Strings.AppDomain)]
|
|
[Fx.Tag.SecurityNote(Critical = "holds callbacks which get called outside of the app security context")]
|
|
[SecurityCritical]
|
|
readonly Slot[] slotsLowPri;
|
|
|
|
// This field holds both the head (HiWord) and tail (LoWord) indicies into the slot array. This limits each
|
|
// value to 64k. In order to be able to distinguish wrapping the slot array (allowed) from wrapping the
|
|
// indicies relative to each other (not allowed), the size of the slot array is limited by an additional bit
|
|
// to 32k.
|
|
//
|
|
// The HiWord (head) holds the index of the last slot to have been scheduled into. The LoWord (tail) holds
|
|
// the index of the next slot to be dispatched from. When the queue is empty, the LoWord will be exactly
|
|
// one slot ahead of the HiWord. When the two are equal, the queue holds one item.
|
|
//
|
|
// When the tail is *two* slots ahead of the head (equivalent to a count of -1), that means the IOTS is
|
|
// idle. Hence, we start out headTail with a -2 (equivalent) in the head and zero in the tail.
|
|
[Fx.Tag.SynchronizationObject(Blocking = false, Kind = Fx.Tag.SynchronizationKind.InterlockedNoSpin)]
|
|
int headTail = -2 << Bits.HiShift;
|
|
|
|
// This field is the same except that it governs the low-priority work items. It doesn't have a concept
|
|
// of idle (-2) so starts empty (-1).
|
|
[Fx.Tag.SynchronizationObject(Blocking = false, Kind = Fx.Tag.SynchronizationKind.InterlockedNoSpin)]
|
|
int headTailLowPri = -1 << Bits.HiShift;
|
|
|
|
[Fx.Tag.SecurityNote(Critical = "creates a ScheduledOverlapped, touches slots, can be called outside of user context",
|
|
Safe = "The scheduled overlapped is only used internally, and flows security.")]
|
|
[SecuritySafeCritical]
|
|
IOThreadScheduler(int capacity, int capacityLowPri)
|
|
{
|
|
Fx.Assert(capacity > 0, "Capacity must be positive.");
|
|
Fx.Assert(capacity <= 0x8000, "Capacity cannot exceed 32k.");
|
|
|
|
Fx.Assert(capacityLowPri > 0, "Low-priority capacity must be positive.");
|
|
Fx.Assert(capacityLowPri <= 0x8000, "Low-priority capacity cannot exceed 32k.");
|
|
|
|
this.slots = new Slot[capacity];
|
|
Fx.Assert((this.slots.Length & SlotMask) == 0, "Capacity must be a power of two.");
|
|
|
|
this.slotsLowPri = new Slot[capacityLowPri];
|
|
Fx.Assert((this.slotsLowPri.Length & SlotMaskLowPri) == 0, "Low-priority capacity must be a power of two.");
|
|
|
|
this.overlapped = new ScheduledOverlapped();
|
|
}
|
|
|
|
[Fx.Tag.SecurityNote(Critical = "Calls into critical class CriticalHelper, doesn't flow context")]
|
|
[SecurityCritical]
|
|
public static void ScheduleCallbackNoFlow(Action<object> callback, object state)
|
|
{
|
|
if (callback == null)
|
|
{
|
|
throw Fx.Exception.ArgumentNull("callback");
|
|
}
|
|
|
|
bool queued = false;
|
|
while (!queued)
|
|
{
|
|
try { } finally
|
|
{
|
|
// Called in a finally because it needs to run uninterrupted in order to maintain consistency.
|
|
queued = IOThreadScheduler.current.ScheduleCallbackHelper(callback, state);
|
|
}
|
|
}
|
|
}
|
|
|
|
[Fx.Tag.SecurityNote(Critical = "Calls into critical class CriticalHelper, doesn't flow context")]
|
|
[SecurityCritical]
|
|
public static void ScheduleCallbackLowPriNoFlow(Action<object> callback, object state)
|
|
{
|
|
if (callback == null)
|
|
{
|
|
throw Fx.Exception.ArgumentNull("callback");
|
|
}
|
|
|
|
bool queued = false;
|
|
while (!queued)
|
|
{
|
|
try { } finally
|
|
{
|
|
// Called in a finally because it needs to run uninterrupted in order to maintain consistency.
|
|
queued = IOThreadScheduler.current.ScheduleCallbackLowPriHelper(callback, state);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Returns true if successfully scheduled, false otherwise.
|
|
[Fx.Tag.SecurityNote(Critical = "calls into ScheduledOverlapped to post it, touches slots, can be called outside user context.")]
|
|
[SecurityCritical]
|
|
bool ScheduleCallbackHelper(Action<object> callback, object state)
|
|
{
|
|
// See if there's a free slot. Fortunately the overflow bit is simply lost.
|
|
int slot = Interlocked.Add(ref this.headTail, Bits.HiOne);
|
|
|
|
// If this brings us to 'empty', then the IOTS used to be 'idle'. Remember that, and increment
|
|
// again. This doesn't need to be in a loop, because until we call Post(), we can't go back to idle.
|
|
bool wasIdle = Bits.Count(slot) == 0;
|
|
if (wasIdle)
|
|
{
|
|
slot = Interlocked.Add(ref this.headTail, Bits.HiOne);
|
|
Fx.Assert(Bits.Count(slot) != 0, "IOTS went idle when it shouldn't have.");
|
|
}
|
|
|
|
// Check if we wrapped *around* to idle.
|
|
if (Bits.Count(slot) == -1)
|
|
{
|
|
// Since the capacity is limited to 32k, this means we wrapped the array at least twice. That's bad
|
|
// because headTail no longer knows how many work items we have - it looks like zero. This can
|
|
// only happen if 32k threads come through here while one is swapped out.
|
|
throw Fx.AssertAndThrowFatal("Head/Tail overflow!");
|
|
}
|
|
|
|
bool wrapped;
|
|
bool queued = this.slots[slot >> Bits.HiShift & SlotMask].TryEnqueueWorkItem(callback, state, out wrapped);
|
|
|
|
if (wrapped)
|
|
{
|
|
// Wrapped around the circular buffer. Create a new, bigger IOThreadScheduler.
|
|
IOThreadScheduler next =
|
|
new IOThreadScheduler(Math.Min(this.slots.Length * 2, MaximumCapacity), this.slotsLowPri.Length);
|
|
Interlocked.CompareExchange<IOThreadScheduler>(ref IOThreadScheduler.current, next, this);
|
|
}
|
|
|
|
if (wasIdle)
|
|
{
|
|
// It's our responsibility to kick off the overlapped.
|
|
this.overlapped.Post(this);
|
|
}
|
|
|
|
return queued;
|
|
}
|
|
|
|
// Returns true if successfully scheduled, false otherwise.
|
|
[Fx.Tag.SecurityNote(Critical = "calls into ScheduledOverlapped to post it, touches slots, can be called outside user context.")]
|
|
[SecurityCritical]
|
|
bool ScheduleCallbackLowPriHelper(Action<object> callback, object state)
|
|
{
|
|
// See if there's a free slot. Fortunately the overflow bit is simply lost.
|
|
int slot = Interlocked.Add(ref this.headTailLowPri, Bits.HiOne);
|
|
|
|
// If this is the first low-priority work item, make sure we're not idle.
|
|
bool wasIdle = false;
|
|
if (Bits.CountNoIdle(slot) == 1)
|
|
{
|
|
// Since Interlocked calls create a full thread barrier, this will read the value of headTail
|
|
// at the time of the Interlocked.Add or later. The invariant is that the IOTS is unidle at some
|
|
// point after the Add.
|
|
int ht = this.headTail;
|
|
|
|
if (Bits.Count(ht) == -1)
|
|
{
|
|
// Use a temporary local here to store the result of the Interlocked.CompareExchange. This
|
|
// works around a codegen bug in the 32-bit JIT (TFS 749182).
|
|
int interlockedResult = Interlocked.CompareExchange(ref this.headTail, ht + Bits.HiOne, ht);
|
|
if (ht == interlockedResult)
|
|
{
|
|
wasIdle = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check if we wrapped *around* to empty.
|
|
if (Bits.CountNoIdle(slot) == 0)
|
|
{
|
|
// Since the capacity is limited to 32k, this means we wrapped the array at least twice. That's bad
|
|
// because headTail no longer knows how many work items we have - it looks like zero. This can
|
|
// only happen if 32k threads come through here while one is swapped out.
|
|
throw Fx.AssertAndThrowFatal("Low-priority Head/Tail overflow!");
|
|
}
|
|
|
|
bool wrapped;
|
|
bool queued = this.slotsLowPri[slot >> Bits.HiShift & SlotMaskLowPri].TryEnqueueWorkItem(
|
|
callback, state, out wrapped);
|
|
|
|
if (wrapped)
|
|
{
|
|
IOThreadScheduler next =
|
|
new IOThreadScheduler(this.slots.Length, Math.Min(this.slotsLowPri.Length * 2, MaximumCapacity));
|
|
Interlocked.CompareExchange<IOThreadScheduler>(ref IOThreadScheduler.current, next, this);
|
|
}
|
|
|
|
if (wasIdle)
|
|
{
|
|
// It's our responsibility to kick off the overlapped.
|
|
this.overlapped.Post(this);
|
|
}
|
|
|
|
return queued;
|
|
}
|
|
|
|
[Fx.Tag.SecurityNote(Critical = "calls into ScheduledOverlapped to post it, touches slots, may be called outside of user context")]
|
|
[SecurityCritical]
|
|
void CompletionCallback(out Action<object> callback, out object state)
|
|
{
|
|
int slot = this.headTail;
|
|
int slotLowPri;
|
|
while (true)
|
|
{
|
|
Fx.Assert(Bits.Count(slot) != -1, "CompletionCallback called on idle IOTS!");
|
|
|
|
bool wasEmpty = Bits.Count(slot) == 0;
|
|
if (wasEmpty)
|
|
{
|
|
// We're about to set this to idle. First check the low-priority queue. This alone doesn't
|
|
// guarantee we service all the low-pri items - there hasn't even been an Interlocked yet. But
|
|
// we take care of that later.
|
|
slotLowPri = this.headTailLowPri;
|
|
while (Bits.CountNoIdle(slotLowPri) != 0)
|
|
{
|
|
if (slotLowPri == (slotLowPri = Interlocked.CompareExchange(ref this.headTailLowPri,
|
|
Bits.IncrementLo(slotLowPri), slotLowPri)))
|
|
{
|
|
this.overlapped.Post(this);
|
|
this.slotsLowPri[slotLowPri & SlotMaskLowPri].DequeueWorkItem(out callback, out state);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (slot == (slot = Interlocked.CompareExchange(ref this.headTail, Bits.IncrementLo(slot), slot)))
|
|
{
|
|
if (!wasEmpty)
|
|
{
|
|
this.overlapped.Post(this);
|
|
this.slots[slot & SlotMask].DequeueWorkItem(out callback, out state);
|
|
return;
|
|
}
|
|
|
|
// We just set the IOThreadScheduler to idle. Check if a low-priority item got added in the
|
|
// interim.
|
|
// Interlocked calls create a thread barrier, so this read will give us the value of
|
|
// headTailLowPri at the time of the interlocked that set us to idle, or later. The invariant
|
|
// here is that either the low-priority queue was empty at some point after we set the IOTS to
|
|
// idle (so that the next enqueue will notice, and issue a Post), or that the IOTS was unidle at
|
|
// some point after we set it to idle (so that the next attempt to go idle will verify that the
|
|
// low-priority queue is empty).
|
|
slotLowPri = this.headTailLowPri;
|
|
|
|
if (Bits.CountNoIdle(slotLowPri) != 0)
|
|
{
|
|
// Whoops, go back from being idle (unless someone else already did). If we go back, start
|
|
// over. (We still owe a Post.)
|
|
slot = Bits.IncrementLo(slot);
|
|
if (slot == Interlocked.CompareExchange(ref this.headTail, slot + Bits.HiOne, slot))
|
|
{
|
|
slot += Bits.HiOne;
|
|
continue;
|
|
}
|
|
|
|
// We know that there's a low-priority work item. But we also know that the IOThreadScheduler
|
|
// wasn't idle. It's best to let it take care of itself, since according to this method, we
|
|
// just set the IOThreadScheduler to idle so shouldn't take on any tasks.
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
callback = null;
|
|
state = null;
|
|
return;
|
|
}
|
|
|
|
[Fx.Tag.SecurityNote(Critical = "touches slots, may be called outside of user context")]
|
|
[SecurityCritical]
|
|
bool TryCoalesce(out Action<object> callback, out object state)
|
|
{
|
|
int slot = this.headTail;
|
|
int slotLowPri;
|
|
while (true)
|
|
{
|
|
if (Bits.Count(slot) > 0)
|
|
{
|
|
if (slot == (slot = Interlocked.CompareExchange(ref this.headTail, Bits.IncrementLo(slot), slot)))
|
|
{
|
|
this.slots[slot & SlotMask].DequeueWorkItem(out callback, out state);
|
|
return true;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
slotLowPri = this.headTailLowPri;
|
|
if (Bits.CountNoIdle(slotLowPri) > 0)
|
|
{
|
|
if (slotLowPri == (slotLowPri = Interlocked.CompareExchange(ref this.headTailLowPri,
|
|
Bits.IncrementLo(slotLowPri), slotLowPri)))
|
|
{
|
|
this.slotsLowPri[slotLowPri & SlotMaskLowPri].DequeueWorkItem(out callback, out state);
|
|
return true;
|
|
}
|
|
slot = this.headTail;
|
|
continue;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
callback = null;
|
|
state = null;
|
|
return false;
|
|
}
|
|
|
|
int SlotMask
|
|
{
|
|
[Fx.Tag.SecurityNote(Critical = "touches slots, may be called outside of user context")]
|
|
[SecurityCritical]
|
|
get
|
|
{
|
|
return this.slots.Length - 1;
|
|
}
|
|
}
|
|
|
|
int SlotMaskLowPri
|
|
{
|
|
[Fx.Tag.SecurityNote(Critical = "touches slots, may be called outside of user context")]
|
|
[SecurityCritical]
|
|
get
|
|
{
|
|
return this.slotsLowPri.Length - 1;
|
|
}
|
|
}
|
|
|
|
//
|
|
|
|
|
|
~IOThreadScheduler()
|
|
{
|
|
// If the AppDomain is shutting down, we may still have pending ops. The AppDomain shutdown will clean
|
|
// everything up.
|
|
if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload())
|
|
{
|
|
#if DEBUG
|
|
DebugVerifyHeadTail();
|
|
#endif
|
|
Cleanup();
|
|
}
|
|
}
|
|
|
|
[SecuritySafeCritical]
|
|
void Cleanup()
|
|
{
|
|
if (this.overlapped != null)
|
|
{
|
|
this.overlapped.Cleanup();
|
|
}
|
|
}
|
|
|
|
#if DEBUG
|
|
|
|
[SecuritySafeCritical]
|
|
private void DebugVerifyHeadTail()
|
|
{
|
|
if (this.slots != null)
|
|
{
|
|
// The headTail value could technically be zero if the constructor was aborted early. The
|
|
// constructor wasn't aborted early if the slot array got created.
|
|
Fx.Assert(Bits.Count(this.headTail) == -1, "IOTS finalized while not idle.");
|
|
|
|
for (int i = 0; i < this.slots.Length; i++)
|
|
{
|
|
this.slots[i].DebugVerifyEmpty();
|
|
}
|
|
}
|
|
|
|
if (this.slotsLowPri != null)
|
|
{
|
|
Fx.Assert(Bits.CountNoIdle(this.headTailLowPri) == 0, "IOTS finalized with low-priority items queued.");
|
|
|
|
for (int i = 0; i < this.slotsLowPri.Length; i++)
|
|
{
|
|
this.slotsLowPri[i].DebugVerifyEmpty();
|
|
}
|
|
}
|
|
}
|
|
|
|
#endif
|
|
|
|
// TryEnqueueWorkItem and DequeueWorkItem use the slot's 'gate' field for synchronization. Because the
|
|
// slot array is circular and there are no locks, we must assume that multiple threads can be entering each
|
|
// method simultaneously. If the first DequeueWorkItem occurs before the first TryEnqueueWorkItem, the
|
|
// sequencing (and the enqueue) fails.
|
|
//
|
|
// The gate is a 32-bit int divided into four fields. The bottom 15 bits (0x00007fff) are the count of
|
|
// threads that have entered TryEnqueueWorkItem. The first thread to enter is the one responsible for
|
|
// filling the slot with work. The 16th bit (0x00008000) is a flag indicating that the slot has been
|
|
// successfully filled. Only the first thread to enter TryEnqueueWorkItem can set this flag. The
|
|
// high-word (0x7fff0000) is the count of threads entering DequeueWorkItem. The first thread to enter
|
|
// is the one responsible for accepting (and eventually dispatching) the work in the slot. The
|
|
// high-bit (0x80000000) is a flag indicating that the slot has been successfully emptied.
|
|
//
|
|
// When the low-word and high-work counters are equal, and both bit flags have been set, the gate is considered
|
|
// 'complete' and can be reset back to zero. Any operation on the gate might bring it to this state.
|
|
// It's the responsibility of the thread that brings the gate to a completed state to reset it to zero.
|
|
// (It's possible that the gate will fall out of the completed state before it can be reset - that's ok,
|
|
// the next time it becomes completed it can be reset.)
|
|
//
|
|
// It's unlikely either count will ever go higher than 2 or 3.
|
|
//
|
|
// The value of 'callback' has these properties:
|
|
// - When the gate is zero, callback is null.
|
|
// - When the low-word count is non-zero, but the 0x8000 bit is unset, callback is writable by the thread
|
|
// that incremented the low word to 1. Its value is undefined for other threads. The thread that
|
|
// sets callback is responsible for setting the 0x8000 bit when it's done.
|
|
// - When the 0x8000 bit is set and the high-word count is zero, callback is valid. (It may be null.)
|
|
// - When the 0x8000 bit is set, the high-word count is non-zero, and the high bit is unset, callback is
|
|
// writable by the thread that incremented the high word to 1 *or* the thread that set the 0x8000 bit,
|
|
// whichever happened last. That thread can read the value and set callback to null. Its value is
|
|
// undefined for other threads. The thread that clears the callback is responsible for setting the
|
|
// high bit.
|
|
// - When the high bit is set, callback is null.
|
|
// - It's illegal for the gate to be in a state that would satisfy more than one of these conditions.
|
|
// - The state field follows the same rules as callback.
|
|
struct Slot
|
|
{
|
|
int gate;
|
|
Action<object> callback;
|
|
object state;
|
|
|
|
[Fx.Tag.SecurityNote(Miscellaneous = "called by critical code, can be called outside user context")]
|
|
public bool TryEnqueueWorkItem(Action<object> callback, object state, out bool wrapped)
|
|
{
|
|
// Register our arrival and check the state of this slot. If the slot was already full, we wrapped.
|
|
int gateSnapshot = Interlocked.Increment(ref this.gate);
|
|
wrapped = (gateSnapshot & Bits.LoCountMask) != 1;
|
|
if (wrapped)
|
|
{
|
|
if ((gateSnapshot & Bits.LoHiBit) != 0 && Bits.IsComplete(gateSnapshot))
|
|
{
|
|
Interlocked.CompareExchange(ref this.gate, 0, gateSnapshot);
|
|
}
|
|
return false;
|
|
}
|
|
|
|
Fx.Assert(this.callback == null, "Slot already has a work item.");
|
|
Fx.Assert((gateSnapshot & Bits.HiBits) == 0, "Slot already marked.");
|
|
|
|
this.state = state;
|
|
this.callback = callback;
|
|
|
|
// Set the special bit to show that the slot is filled.
|
|
gateSnapshot = Interlocked.Add(ref this.gate, Bits.LoHiBit);
|
|
Fx.Assert((gateSnapshot & Bits.HiBits) == Bits.LoHiBit, "Slot already empty.");
|
|
|
|
if ((gateSnapshot & Bits.HiCountMask) == 0)
|
|
{
|
|
// Good - no one has shown up looking for this work yet.
|
|
return true;
|
|
}
|
|
|
|
// Oops - someone already came looking for this work. We have to abort and reschedule.
|
|
this.state = null;
|
|
this.callback = null;
|
|
|
|
// Indicate that the slot is clear. We might be able to bypass setting the high bit.
|
|
if (gateSnapshot >> Bits.HiShift != (gateSnapshot & Bits.LoCountMask) ||
|
|
Interlocked.CompareExchange(ref this.gate, 0, gateSnapshot) != gateSnapshot)
|
|
{
|
|
gateSnapshot = Interlocked.Add(ref this.gate, Bits.HiHiBit);
|
|
if (Bits.IsComplete(gateSnapshot))
|
|
{
|
|
Interlocked.CompareExchange(ref this.gate, 0, gateSnapshot);
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
[Fx.Tag.SecurityNote(Miscellaneous = "called by critical code, can be called outside user context")]
|
|
public void DequeueWorkItem(out Action<object> callback, out object state)
|
|
{
|
|
// Stake our claim on the item.
|
|
int gateSnapshot = Interlocked.Add(ref this.gate, Bits.HiOne);
|
|
|
|
if ((gateSnapshot & Bits.LoHiBit) == 0)
|
|
{
|
|
// Whoops, a ----. The work item hasn't made it in yet. In this context, returning a null callback
|
|
// is treated like a degenrate work item (rather than an empty queue). The enqueuing thread will
|
|
// notice this ---- and reschedule the real work in a new slot. Do not reset the slot to zero,
|
|
// since it's still going to get enqueued into. (The enqueueing thread will reset it.)
|
|
callback = null;
|
|
state = null;
|
|
return;
|
|
}
|
|
|
|
// If we're the first, we get to do the work.
|
|
if ((gateSnapshot & Bits.HiCountMask) == Bits.HiOne)
|
|
{
|
|
callback = this.callback;
|
|
state = this.state;
|
|
this.state = null;
|
|
this.callback = null;
|
|
|
|
// Indicate that the slot is clear.
|
|
// We should be able to bypass setting the high-bit in the common case.
|
|
if ((gateSnapshot & Bits.LoCountMask) != 1 ||
|
|
Interlocked.CompareExchange(ref this.gate, 0, gateSnapshot) != gateSnapshot)
|
|
{
|
|
gateSnapshot = Interlocked.Add(ref this.gate, Bits.HiHiBit);
|
|
if (Bits.IsComplete(gateSnapshot))
|
|
{
|
|
Interlocked.CompareExchange(ref this.gate, 0, gateSnapshot);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
callback = null;
|
|
state = null;
|
|
|
|
// If we're the last, we get to reset the slot.
|
|
if (Bits.IsComplete(gateSnapshot))
|
|
{
|
|
Interlocked.CompareExchange(ref this.gate, 0, gateSnapshot);
|
|
}
|
|
}
|
|
}
|
|
|
|
#if DEBUG
|
|
public void DebugVerifyEmpty()
|
|
{
|
|
Fx.Assert(this.gate == 0, "Finalized with unfinished slot.");
|
|
Fx.Assert(this.callback == null, "Finalized with leaked callback.");
|
|
Fx.Assert(this.state == null, "Finalized with leaked state.");
|
|
}
|
|
#endif
|
|
}
|
|
|
|
// A note about the IOThreadScheduler and the ScheduledOverlapped references:
|
|
// Although for each scheduler we have a single instance of overlapped, we cannot point to the scheduler from the
|
|
// overlapped, through the entire lifetime of the overlapped. This is because the ScheduledOverlapped is pinned
|
|
// and if it has a reference to the IOTS, it would be rooted and the finalizer will never get called.
|
|
// Therefore, we are passing the reference, when we post a pending callback and reset it, once the callback was
|
|
// invoked; during that time the scheduler is rooted but in that time we don't want that it would be collected
|
|
// by the GC anyway.
|
|
[Fx.Tag.SecurityNote(Critical = "manages NativeOverlapped instance, can be called outside user context")]
|
|
[SecurityCritical]
|
|
unsafe class ScheduledOverlapped
|
|
{
|
|
readonly NativeOverlapped* nativeOverlapped;
|
|
IOThreadScheduler scheduler;
|
|
|
|
public ScheduledOverlapped()
|
|
{
|
|
this.nativeOverlapped = (new Overlapped()).UnsafePack(
|
|
Fx.ThunkCallback(new IOCompletionCallback(IOCallback)), null);
|
|
}
|
|
|
|
[Fx.Tag.SecurityNote(Miscellaneous = "note that in some hosts this runs without any user context on the stack")]
|
|
void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* nativeOverlapped)
|
|
{
|
|
// Unhook the IOThreadScheduler ASAP to prevent it from leaking.
|
|
IOThreadScheduler iots = this.scheduler;
|
|
this.scheduler = null;
|
|
Fx.Assert(iots != null, "Overlapped completed without a scheduler.");
|
|
|
|
Action<object> callback;
|
|
object state;
|
|
try { } finally
|
|
{
|
|
// Called in a finally because it needs to run uninterrupted in order to maintain consistency.
|
|
iots.CompletionCallback(out callback, out state);
|
|
}
|
|
|
|
bool found = true;
|
|
while (found)
|
|
{
|
|
// The callback can be null if synchronization misses result in unsuable slots. Keep going onto
|
|
// the next slot in such cases until there are no more slots.
|
|
if (callback != null)
|
|
{
|
|
callback(state);
|
|
}
|
|
|
|
try { } finally
|
|
{
|
|
// Called in a finally because it needs to run uninterrupted in order to maintain consistency.
|
|
found = iots.TryCoalesce(out callback, out state);
|
|
}
|
|
}
|
|
}
|
|
|
|
public void Post(IOThreadScheduler iots)
|
|
{
|
|
Fx.Assert(this.scheduler == null, "Post called on an overlapped that is already posted.");
|
|
Fx.Assert(iots != null, "Post called with a null scheduler.");
|
|
|
|
this.scheduler = iots;
|
|
ThreadPool.UnsafeQueueNativeOverlapped(this.nativeOverlapped);
|
|
}
|
|
|
|
[Fx.Tag.SecurityNote(Miscellaneous = "note that this runs on the finalizer thread")]
|
|
public void Cleanup()
|
|
{
|
|
if (this.scheduler != null)
|
|
{
|
|
throw Fx.AssertAndThrowFatal("Cleanup called on an overlapped that is in-flight.");
|
|
}
|
|
Overlapped.Free(this.nativeOverlapped);
|
|
}
|
|
}
|
|
}
|
|
}
|