2016-08-03 10:59:49 +00:00
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// Barrier.cs
//
2017-08-21 15:34:15 +00:00
// <OWNER>Microsoft</OWNER>
2016-08-03 10:59:49 +00:00
//
// A barrier allows multiple tasks to cooperatively work on some algorithm in parallel.
// A group of tasks cooperate by moving through a series of phases, where each in the group signals it has arrived at
// the barrier in a given phase and implicitly waits for all others to arrive.
// The same barrier can be used for multiple phases.
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System ;
using System.Collections.Generic ;
using System.Diagnostics ;
using System.Security.Permissions ;
using System.Runtime.InteropServices ;
using System.Threading ;
using System.Runtime.Serialization ;
using System.Security ;
namespace System.Threading
{
/// <summary>
/// The exception that is thrown when the post-phase action of a <see cref="Barrier"/> fails.
/// </summary>
#if ! SILVERLIGHT
[Serializable]
#endif
public class BarrierPostPhaseException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class.
/// </summary>
public BarrierPostPhaseException ( ) : this ( ( string ) null )
{
}
/// <summary>
/// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class with the specified inner exception.
/// </summary>
/// <param name="innerException">The exception that is the cause of the current exception.</param>
public BarrierPostPhaseException ( Exception innerException ) : this ( null , innerException )
{
}
/// <summary>
/// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class with a specified error message.
/// </summary>
/// <param name="message">A string that describes the exception.</param>
public BarrierPostPhaseException ( string message ) : this ( message , null )
{
}
/// <summary>
/// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class with a specified error message and inner exception.
/// </summary>
/// <param name="message">A string that describes the exception.</param>
/// <param name="innerException">The exception that is the cause of the current exception.</param>
public BarrierPostPhaseException ( string message , Exception innerException )
: base ( message = = null ? SR . GetString ( SR . BarrierPostPhaseException ) : message , innerException )
{
}
#if ! SILVERLIGHT
/// <summary>
/// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class with serialized data.
/// </summary>
/// <param name="info">The object that holds the serialized object data.</param>
/// <param name="context">An object that describes the source or destination of the serialized data.</param>
[SecurityCritical]
protected BarrierPostPhaseException ( SerializationInfo info , StreamingContext context )
: base ( info , context )
{
}
#endif
}
/// <summary>
/// Enables multiple tasks to cooperatively work on an algorithm in parallel through multiple phases.
/// </summary>
/// <remarks>
/// <para>
/// A group of tasks cooperate by moving through a series of phases, where each in the group signals it
/// has arrived at the <see cref="Barrier"/> in a given phase and implicitly waits for all others to
/// arrive. The same <see cref="Barrier"/> can be used for multiple phases.
/// </para>
/// <para>
/// All public and protected members of <see cref="Barrier"/> are thread-safe and may be used
/// concurrently from multiple threads, with the exception of Dispose, which
/// must only be used when all other operations on the <see cref="Barrier"/> have
/// completed.
/// </para>
/// </remarks>
[ComVisible(false)]
#if ! FEATURE_NETCORE
#pragma warning disable 0618
[HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
#pragma warning restore 0618
#endif
[DebuggerDisplay("Participant Count={ParticipantCount},Participants Remaining={ParticipantsRemaining}")]
public class Barrier : IDisposable
{
//This variable holds the basic barrier variables:
// 1- The current particiants count
// 2- The total participants count
// 3- The sense flag (true if the cuurrent phase is even, false otherwise)
// The first 15 bits are for the total count which means the maximum participants for the barrier is about 32K
// The 16th bit is dummy
// The next 15th bit for the current
// And the last highest bit is for the sense
volatile int m_currentTotalCount ;
// Bitmask to extract the current count
const int CURRENT_MASK = 0x7FFF0000 ;
// Bitmask to extract the total count
const int TOTAL_MASK = 0x00007FFF ;
// Bitmask to extratc the sense flag
const int SENSE_MASK = unchecked ( ( int ) 0x80000000 ) ;
// The maximum participants the barrier can operate = 32767 ( 2 power 15 - 1 )
const int MAX_PARTICIPANTS = TOTAL_MASK ;
// The current barrier phase
// We don't need to worry about overflow, the max value is 2^63-1; If it starts from 0 at a
// rate of 4 billion increments per second, it will takes about 64 years to overflow.
long m_currentPhase ;
// dispose flag
bool m_disposed ;
// Odd phases event
ManualResetEventSlim m_oddEvent ;
// Even phases event
ManualResetEventSlim m_evenEvent ;
// The execution context of the creator thread
ExecutionContext m_ownerThreadContext ;
// The EC callback that invokes the psot phase action
[SecurityCritical]
private static ContextCallback s_invokePostPhaseAction ;
// Post phase action after each phase
Action < Barrier > m_postPhaseAction ;
// In case the post phase action throws an exception, wraps it in BarrierPostPhaseException
Exception m_exception ;
// This is the ManagedThreadID of the postPhaseAction caller thread, this is used to determine if the SignalAndWait, Dispose or Add/RemoveParticipant caller thread is
// the same thread as the postPhaseAction thread which means this method was called from the postPhaseAction which is illegal.
// This value is captured before calling the action and reset back to zero after it.
int m_actionCallerID ;
#region Properties
/// <summary>
/// Gets the number of participants in the barrier that haven’ t yet signaled
/// in the current phase.
/// </summary>
/// <remarks>
/// This could be 0 during a post-phase action delegate execution or if the
/// ParticipantCount is 0.
/// </remarks>
public int ParticipantsRemaining
{
get
{
int currentTotal = m_currentTotalCount ;
int total = ( int ) ( currentTotal & TOTAL_MASK ) ;
int current = ( int ) ( ( currentTotal & CURRENT_MASK ) > > 16 ) ;
return total - current ;
}
}
/// <summary>
/// Gets the total number of participants in the barrier.
/// </summary>
public int ParticipantCount
{
get { return ( int ) ( m_currentTotalCount & TOTAL_MASK ) ; }
}
/// <summary>
/// Gets the number of the barrier's current phase.
/// </summary>
public long CurrentPhaseNumber
{
// use the new Volatile.Read/Write method because it is cheaper than Interlocked.Read on AMD64 architecture
get { return Volatile . Read ( ref m_currentPhase ) ; }
internal set { Volatile . Write ( ref m_currentPhase , value ) ; }
}
#endregion
/// <summary>
/// Initializes a new instance of the <see cref="Barrier"/> class.
/// </summary>
/// <param name="participantCount">The number of participating threads.</param>
/// <exception cref="ArgumentOutOfRangeException"> <paramref name="participantCount"/> is less than 0
/// or greater than <see cref="T:System.Int16.MaxValue"/>.</exception>
public Barrier ( int participantCount )
: this ( participantCount , null )
{
}
/// <summary>
/// Initializes a new instance of the <see cref="Barrier"/> class.
/// </summary>
/// <param name="participantCount">The number of participating threads.</param>
/// <param name="postPhaseAction">The <see cref="T:System.Action`1"/> to be executed after each
/// phase.</param>
/// <exception cref="T:System.ArgumentOutOfRangeException"> <paramref name="participantCount"/> is less than 0
/// or greater than <see cref="T:System.Int32.MaxValue"/>.</exception>
/// <remarks>
/// The <paramref name="postPhaseAction"/> delegate will be executed after
/// all participants have arrived at the barrier in one phase. The participants
/// will not be released to the next phase until the postPhaseAction delegate
/// has completed execution.
/// </remarks>
public Barrier ( int participantCount , Action < Barrier > postPhaseAction )
{
// the count must be non negative value
if ( participantCount < 0 | | participantCount > MAX_PARTICIPANTS )
{
throw new ArgumentOutOfRangeException ( "participantCount" , participantCount , SR . GetString ( SR . Barrier_ctor_ArgumentOutOfRange ) ) ;
}
m_currentTotalCount = ( int ) participantCount ;
m_postPhaseAction = postPhaseAction ;
//Lazily initialize the events
m_oddEvent = new ManualResetEventSlim ( true ) ;
m_evenEvent = new ManualResetEventSlim ( false ) ;
// Capture the context if the post phase action is not null
if ( postPhaseAction ! = null & & ! ExecutionContext . IsFlowSuppressed ( ) )
{
m_ownerThreadContext = ExecutionContext . Capture ( ) ;
}
m_actionCallerID = 0 ;
}
/// <summary>
/// Extract the three variables current, total and sense from a given big variable
/// </summary>
/// <param name="currentTotal">The integer variable that contains the other three variables</param>
/// <param name="current">The current cparticipant count</param>
/// <param name="total">The total participants count</param>
/// <param name="sense">The sense flag</param>
private void GetCurrentTotal ( int currentTotal , out int current , out int total , out bool sense )
{
total = ( int ) ( currentTotal & TOTAL_MASK ) ;
current = ( int ) ( ( currentTotal & CURRENT_MASK ) > > 16 ) ;
sense = ( currentTotal & SENSE_MASK ) = = 0 ? true : false ;
}
/// <summary>
/// Write the three variables current. total and the sense to the m_currentTotal
/// </summary>
/// <param name="currentTotal">The old current total to compare</param>
/// <param name="current">The current cparticipant count</param>
/// <param name="total">The total participants count</param>
/// <param name="sense">The sense flag</param>
/// <returns>True if the CAS succeeded, false otherwise</returns>
private bool SetCurrentTotal ( int currentTotal , int current , int total , bool sense )
{
int newCurrentTotal = ( current < < 16 ) | total ;
if ( ! sense )
{
newCurrentTotal | = SENSE_MASK ;
}
#pragma warning disable 0420
return Interlocked . CompareExchange ( ref m_currentTotalCount , newCurrentTotal , currentTotal ) = = currentTotal ;
#pragma warning restore 0420
}
/// <summary>
/// Notifies the <see cref="Barrier"/> that there will be an additional participant.
/// </summary>
/// <returns>The phase number of the barrier in which the new participants will first
/// participate.</returns>
/// <exception cref="T:System.InvalidOperationException">
/// Adding a participant would cause the barrier's participant count to
/// exceed <see cref="T:System.Int16.MaxValue"/>.
/// </exception>
/// <exception cref="T:System.InvalidOperationException">
/// The method was invoked from within a post-phase action.
/// </exception>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been
/// disposed.</exception>
public long AddParticipant ( )
{
try
{
return AddParticipants ( 1 ) ;
}
catch ( ArgumentOutOfRangeException )
{
throw new InvalidOperationException ( SR . GetString ( SR . Barrier_AddParticipants_Overflow_ArgumentOutOfRange ) ) ;
}
}
/// <summary>
/// Notifies the <see cref="Barrier"/> that there will be additional participants.
/// </summary>
/// <param name="participantCount">The number of additional participants to add to the
/// barrier.</param>
/// <returns>The phase number of the barrier in which the new participants will first
/// participate.</returns>
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="participantCount"/> is less than
/// 0.</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">Adding <paramref name="participantCount"/> participants would cause the
/// barrier's participant count to exceed <see cref="T:System.Int16.MaxValue"/>.</exception>
/// <exception cref="T:System.InvalidOperationException">
/// The method was invoked from within a post-phase action.
/// </exception>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been
/// disposed.</exception>
public long AddParticipants ( int participantCount )
{
// check dispose
ThrowIfDisposed ( ) ;
if ( participantCount < 1 )
{
throw new ArgumentOutOfRangeException ( "participantCount" , participantCount ,
SR . GetString ( SR . Barrier_AddParticipants_NonPositive_ArgumentOutOfRange ) ) ;
}
else if ( participantCount > MAX_PARTICIPANTS ) //overflow
{
throw new ArgumentOutOfRangeException ( "participantCount" ,
SR . GetString ( SR . Barrier_AddParticipants_Overflow_ArgumentOutOfRange ) ) ;
}
// in case of this is called from the PHA
if ( m_actionCallerID ! = 0 & & Thread . CurrentThread . ManagedThreadId = = m_actionCallerID )
{
throw new InvalidOperationException ( SR . GetString ( SR . Barrier_InvalidOperation_CalledFromPHA ) ) ;
}
SpinWait spinner = new SpinWait ( ) ;
long newPhase = 0 ;
while ( true )
{
int currentTotal = m_currentTotalCount ;
int total ;
int current ;
bool sense ;
GetCurrentTotal ( currentTotal , out current , out total , out sense ) ;
if ( participantCount + total > MAX_PARTICIPANTS ) //overflow
{
throw new ArgumentOutOfRangeException ( "participantCount" ,
SR . GetString ( SR . Barrier_AddParticipants_Overflow_ArgumentOutOfRange ) ) ;
}
if ( SetCurrentTotal ( currentTotal , current , total + participantCount , sense ) )
{
// Calculating the first phase for that participant, if the current phase already finished return the nextphase else return the current phase
// To know that the current phase is the sense doesn't match the
// phase odd even, so that means it didn't yet change the phase count, so currentPhase +1 is returned, otherwise currentPhase is returned
long currPhase = CurrentPhaseNumber ;
newPhase = ( sense ! = ( currPhase % 2 = = 0 ) ) ? currPhase + 1 : currPhase ;
// If this participant is going to join the next phase, which means the postPhaseAction is being running, this participants must wait until this done
// and its event is reset.
// Without that, if the postPhaseAction takes long time, this means the event ehich the current participant is goint to wait on is still set
// (FinishPPhase didn't reset it yet) so it should wait until it reset
if ( newPhase ! = currPhase )
{
// Wait on the opposite event
if ( sense )
{
m_oddEvent . Wait ( ) ;
}
else
{
m_evenEvent . Wait ( ) ;
}
}
//This else to fix the racing where the current phase has been finished, m_currentPhase has been updated but the events have not been set/reset yet
// otherwise when this participant calls SignalAndWait it will wait on a set event however all other participants have not arrived yet.
else
{
if ( sense & & m_evenEvent . IsSet )
m_evenEvent . Reset ( ) ;
else if ( ! sense & & m_oddEvent . IsSet )
m_oddEvent . Reset ( ) ;
}
break ;
}
spinner . SpinOnce ( ) ;
}
return newPhase ;
}
/// <summary>
/// Notifies the <see cref="Barrier"/> that there will be one less participant.
/// </summary>
/// <exception cref="T:System.InvalidOperationException">The barrier already has 0
/// participants.</exception>
/// <exception cref="T:System.InvalidOperationException">
/// The method was invoked from within a post-phase action.
/// </exception>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been
/// disposed.</exception>
public void RemoveParticipant ( )
{
RemoveParticipants ( 1 ) ;
}
/// <summary>
/// Notifies the <see cref="Barrier"/> that there will be fewer participants.
/// </summary>
/// <param name="participantCount">The number of additional participants to remove from the barrier.</param>
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="participantCount"/> is less than
/// 0.</exception>
/// <exception cref="T:System.InvalidOperationException">The barrier already has 0 participants.</exception>
/// <exception cref="T:System.InvalidOperationException">
/// The method was invoked from within a post-phase action.
/// </exception>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been
/// disposed.</exception>
public void RemoveParticipants ( int participantCount )
{
// check dispose
ThrowIfDisposed ( ) ;
// Validate input
if ( participantCount < 1 )
{
throw new ArgumentOutOfRangeException ( "participantCount" , participantCount ,
SR . GetString ( SR . Barrier_RemoveParticipants_NonPositive_ArgumentOutOfRange ) ) ;
}
// in case of this is called from the PHA
if ( m_actionCallerID ! = 0 & & Thread . CurrentThread . ManagedThreadId = = m_actionCallerID )
{
throw new InvalidOperationException ( SR . GetString ( SR . Barrier_InvalidOperation_CalledFromPHA ) ) ;
}
SpinWait spinner = new SpinWait ( ) ;
while ( true )
{
int currentTotal = m_currentTotalCount ;
int total ;
int current ;
bool sense ;
GetCurrentTotal ( currentTotal , out current , out total , out sense ) ;
if ( total < participantCount )
{
throw new ArgumentOutOfRangeException ( "participantCount" ,
SR . GetString ( SR . Barrier_RemoveParticipants_ArgumentOutOfRange ) ) ;
}
if ( total - participantCount < current )
{
throw new InvalidOperationException ( SR . GetString ( SR . Barrier_RemoveParticipants_InvalidOperation ) ) ;
}
// If the remaining participats = current participants, then finish the current phase
int remaingParticipants = total - participantCount ;
if ( remaingParticipants > 0 & & current = = remaingParticipants )
{
if ( SetCurrentTotal ( currentTotal , 0 , total - participantCount , ! sense ) )
{
FinishPhase ( sense ) ;
break ;
}
}
else
{
if ( SetCurrentTotal ( currentTotal , current , total - participantCount , sense ) )
{
break ;
}
}
spinner . SpinOnce ( ) ;
}
}
/// <summary>
/// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other
/// participants to reach the barrier as well.
/// </summary>
/// <exception cref="T:System.InvalidOperationException">
/// The method was invoked from within a post-phase action, the barrier currently has 0 participants,
/// or the barrier is being used by more threads than are registered as participants.
/// </exception>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been
/// disposed.</exception>
public void SignalAndWait ( )
{
SignalAndWait ( new CancellationToken ( ) ) ;
}
/// <summary>
/// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other
/// participants to reach the barrier, while observing a <see
/// cref="T:System.Threading.CancellationToken"/>.
/// </summary>
/// <param name="cancellationToken">The <see cref="T:System.Threading.CancellationToken"/> to
/// observe.</param>
/// <exception cref="T:System.InvalidOperationException">
/// The method was invoked from within a post-phase action, the barrier currently has 0 participants,
/// or the barrier is being used by more threads than are registered as participants.
/// </exception>
/// <exception cref="T:System.OperationCanceledException"><paramref name="cancellationToken"/> has been
/// canceled.</exception>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been
/// disposed.</exception>
public void SignalAndWait ( CancellationToken cancellationToken )
{
#if DEBUG
bool result =
#endif
SignalAndWait ( Timeout . Infinite , cancellationToken ) ;
#if DEBUG
Debug . Assert ( result ) ;
#endif
}
/// <summary>
/// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other
/// participants to reach the barrier as well, using a
/// <see cref="T:System.TimeSpan"/> to measure the time interval.
/// </summary>
/// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of
/// milliseconds to wait, or a <see cref="T:System.TimeSpan"/> that represents -1 milliseconds to
/// wait indefinitely.</param>
/// <returns>true if all other participants reached the barrier; otherwise, false.</returns>
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/>is a negative number
/// other than -1 milliseconds, which represents an infinite time-out, or it is greater than
/// <see cref="T:System.Int32.MaxValue"/>.</exception>
/// <exception cref="T:System.InvalidOperationException">
/// The method was invoked from within a post-phase action, the barrier currently has 0 participants,
/// or the barrier is being used by more threads than are registered as participants.
/// </exception>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been
/// disposed.</exception>
public Boolean SignalAndWait ( TimeSpan timeout )
{
return SignalAndWait ( timeout , new CancellationToken ( ) ) ;
}
/// <summary>
/// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other
/// participants to reach the barrier as well, using a
/// <see cref="T:System.TimeSpan"/> to measure the time interval, while observing a <see
/// cref="T:System.Threading.CancellationToken"/>.
/// </summary>
/// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of
/// milliseconds to wait, or a <see cref="T:System.TimeSpan"/> that represents -1 milliseconds to
/// wait indefinitely.</param>
/// <param name="cancellationToken">The <see cref="T:System.Threading.CancellationToken"/> to
/// observe.</param>
/// <returns>true if all other participants reached the barrier; otherwise, false.</returns>
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/>is a negative number
/// other than -1 milliseconds, which represents an infinite time-out.</exception>
/// <exception cref="T:System.InvalidOperationException">
/// The method was invoked from within a post-phase action, the barrier currently has 0 participants,
/// or the barrier is being used by more threads than are registered as participants.
/// </exception>
/// <exception cref="T:System.OperationCanceledException"><paramref name="cancellationToken"/> has been
/// canceled.</exception>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been
/// disposed.</exception>
public Boolean SignalAndWait ( TimeSpan timeout , CancellationToken cancellationToken )
{
Int64 totalMilliseconds = ( Int64 ) timeout . TotalMilliseconds ;
if ( totalMilliseconds < - 1 | | totalMilliseconds > int . MaxValue )
{
throw new System . ArgumentOutOfRangeException ( "timeout" , timeout ,
SR . GetString ( SR . Barrier_SignalAndWait_ArgumentOutOfRange ) ) ;
}
return SignalAndWait ( ( int ) timeout . TotalMilliseconds , cancellationToken ) ;
}
/// <summary>
/// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other
/// participants to reach the barrier as well, using a
/// 32-bit signed integer to measure the time interval.
/// </summary>
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
/// cref="Timeout.Infinite"/>(-1) to wait indefinitely.</param>
/// <returns>true if all other participants reached the barrier; otherwise, false.</returns>
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
/// negative number other than -1, which represents an infinite time-out.</exception>
/// <exception cref="T:System.InvalidOperationException">
/// The method was invoked from within a post-phase action, the barrier currently has 0 participants,
/// or the barrier is being used by more threads than are registered as participants.
/// </exception>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been
/// disposed.</exception>
public bool SignalAndWait ( int millisecondsTimeout )
{
return SignalAndWait ( millisecondsTimeout , new CancellationToken ( ) ) ;
}
/// <summary>
/// Signals that a participant has reached the barrier and waits for all other participants to reach
/// the barrier as well, using a
/// 32-bit signed integer to measure the time interval, while observing a <see
/// cref="T:System.Threading.CancellationToken"/>.
/// </summary>
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
/// cref="Timeout.Infinite"/>(-1) to wait indefinitely.</param>
/// <param name="cancellationToken">The <see cref="T:System.Threading.CancellationToken"/> to
/// observe.</param>
/// <returns>true if all other participants reached the barrier; otherwise, false.</returns>
/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
/// negative number other than -1, which represents an infinite time-out.</exception>
/// <exception cref="T:System.InvalidOperationException">
/// The method was invoked from within a post-phase action, the barrier currently has 0 participants,
/// or the barrier is being used by more threads than are registered as participants.
/// </exception>
/// <exception cref="T:System.OperationCanceledException"><paramref name="cancellationToken"/> has been
/// canceled.</exception>
/// <exception cref="T:System.ObjectDisposedException">The current instance has already been
/// disposed.</exception>
public bool SignalAndWait ( int millisecondsTimeout , CancellationToken cancellationToken )
{
ThrowIfDisposed ( ) ;
cancellationToken . ThrowIfCancellationRequested ( ) ;
if ( millisecondsTimeout < - 1 )
{
throw new System . ArgumentOutOfRangeException ( "millisecondsTimeout" , millisecondsTimeout ,
SR . GetString ( SR . Barrier_SignalAndWait_ArgumentOutOfRange ) ) ;
}
// in case of this is called from the PHA
if ( m_actionCallerID ! = 0 & & Thread . CurrentThread . ManagedThreadId = = m_actionCallerID )
{
throw new InvalidOperationException ( SR . GetString ( SR . Barrier_InvalidOperation_CalledFromPHA ) ) ;
}
// local variables to extract the basic barrier variable and update them
// The are declared here instead of inside the loop body because the will be used outside the loop
bool sense ; // The sense of the barrier *before* the phase associated with this SignalAndWait call completes
int total ;
int current ;
int currentTotal ;
long phase ;
SpinWait spinner = new SpinWait ( ) ;
while ( true )
{
currentTotal = m_currentTotalCount ;
GetCurrentTotal ( currentTotal , out current , out total , out sense ) ;
phase = CurrentPhaseNumber ;
// throw if zero participants
if ( total = = 0 )
{
throw new InvalidOperationException ( SR . GetString ( SR . Barrier_SignalAndWait_InvalidOperation_ZeroTotal ) ) ;
}
// Try to detect if the number of threads for this phase exceeded the total number of participants or not
// This can be detected if the current is zero which means all participants for that phase has arrived and the phase number is not changed yet
if ( current = = 0 & & sense ! = ( CurrentPhaseNumber % 2 = = 0 ) )
{
throw new InvalidOperationException ( SR . GetString ( SR . Barrier_SignalAndWait_InvalidOperation_ThreadsExceeded ) ) ;
}
//This is the last thread, finish the phase
if ( current + 1 = = total )
{
if ( SetCurrentTotal ( currentTotal , 0 , total , ! sense ) )
{
#if ! FEATURE_PAL & & ! SILVERLIGHT // PAL doesn't support eventing
if ( CdsSyncEtwBCLProvider . Log . IsEnabled ( ) )
{
CdsSyncEtwBCLProvider . Log . Barrier_PhaseFinished ( sense , CurrentPhaseNumber ) ;
}
#endif
FinishPhase ( sense ) ;
return true ;
}
}
else if ( SetCurrentTotal ( currentTotal , current + 1 , total , sense ) )
{
break ;
}
spinner . SpinOnce ( ) ;
}
// ** Perform the real wait **
// select the correct event to wait on, based on the current sense.
ManualResetEventSlim eventToWaitOn = ( sense ) ? m_evenEvent : m_oddEvent ;
bool waitWasCanceled = false ;
bool waitResult = false ;
try
{
waitResult = DiscontinuousWait ( eventToWaitOn , millisecondsTimeout , cancellationToken , phase ) ;
}
catch ( OperationCanceledException )
{
waitWasCanceled = true ;
}
catch ( ObjectDisposedException ) // in case a ---- happen where one of the thread returned from SignalAndWait and the current thread calls Wait on a disposed event
{
// make sure the current phase for this thread is already finished, otherwise propagate the exception
if ( phase < CurrentPhaseNumber )
waitResult = true ;
else
throw ;
}
if ( ! waitResult )
{
//reset the spinLock to prepare it for the next loop
spinner . Reset ( ) ;
//If the wait timeout expired and all other thread didn't reach the barrier yet, update the current count back
while ( true )
{
bool newSense ;
currentTotal = m_currentTotalCount ;
GetCurrentTotal ( currentTotal , out current , out total , out newSense ) ;
// If the timeout expired and the phase has just finished, return true and this is considered as succeeded SignalAndWait
//otherwise the timeout expired and the current phase has not been finished yet, return false
//The phase is finished if the phase member variable is changed (incremented) or the sense has been changed
// we have to use the statements in the comparison below for two cases:
// 1- The sense is changed but the last thread didn't update the phase yet
// 2- The phase is already incremented but the sense flipped twice due to the termination of the next phase
if ( phase < CurrentPhaseNumber | | sense ! = newSense )
{
// The current phase has been finished, but we shouldn't return before the events are set/reset otherwise this thread could start
// next phase and the appropriate event has not reset yet which could make it return immediately from the next phase SignalAndWait
// before waiting other threads
WaitCurrentPhase ( eventToWaitOn , phase ) ;
Debug . Assert ( phase < CurrentPhaseNumber ) ;
break ;
}
//The phase has not been finished yet, try to update the current count.
if ( SetCurrentTotal ( currentTotal , current - 1 , total , sense ) )
{
//if here, then the attempt to backout was successful.
//throw (a fresh) oce if cancellation woke the wait
//or return false if it was the timeout that woke the wait.
//
if ( waitWasCanceled )
throw new OperationCanceledException ( SR . GetString ( SR . Common_OperationCanceled ) , cancellationToken ) ;
else
return false ;
}
spinner . SpinOnce ( ) ;
}
}
if ( m_exception ! = null )
throw new BarrierPostPhaseException ( m_exception ) ;
return true ;
}
/// <summary>
/// Finish the phase by invoking the post phase action, and setting the event, this must be called by the
/// last arrival thread
/// </summary>
/// <param name="observedSense">The current phase sense</param>
[SecuritySafeCritical]
private void FinishPhase ( bool observedSense )
{
// Execute the PHA in try/finally block to reset the variables back in case of it threw an exception
if ( m_postPhaseAction ! = null )
{
try
{
// Capture the caller thread ID to check if the Add/RemoveParticipant(s) is called from the PHA
m_actionCallerID = Thread . CurrentThread . ManagedThreadId ;
if ( m_ownerThreadContext ! = null )
{
var currentContext = m_ownerThreadContext ;
m_ownerThreadContext = m_ownerThreadContext . CreateCopy ( ) ; // create a copy for the next run
ContextCallback handler = s_invokePostPhaseAction ;
if ( handler = = null )
{
s_invokePostPhaseAction = handler = InvokePostPhaseAction ;
}
ExecutionContext . Run ( currentContext , handler , this ) ;
#if ! PFX_LEGACY_3_5
// Dispose the context directly after using it,
// the copy will either be used and siposed in the next phase or in the Dispose
currentContext . Dispose ( ) ;
#endif
}
else
{
m_postPhaseAction ( this ) ;
}
m_exception = null ; // reset the exception if it was set previously
}
catch ( Exception ex )
{
m_exception = ex ;
}
finally
{
m_actionCallerID = 0 ;
SetResetEvents ( observedSense ) ;
if ( m_exception ! = null )
throw new BarrierPostPhaseException ( m_exception ) ;
}
}
else
{
SetResetEvents ( observedSense ) ;
}
}
/// <summary>
/// Helper method to call the post phase action
/// </summary>
/// <param name="obj"></param>
[SecurityCritical]
private static void InvokePostPhaseAction ( object obj )
{
var thisBarrier = ( Barrier ) obj ;
thisBarrier . m_postPhaseAction ( thisBarrier ) ;
}
/// <summary>
/// Sets the current phase event and reset the next phase event
/// </summary>
/// <param name="observedSense">The current phase sense</param>
private void SetResetEvents ( bool observedSense )
{
// Increment the phase count using Volatile class because m_currentPhase is 64 bit long type, that could cause torn write on 32 bit machines
CurrentPhaseNumber = CurrentPhaseNumber + 1 ;
if ( observedSense )
{
m_oddEvent . Reset ( ) ;
m_evenEvent . Set ( ) ;
}
else
{
m_evenEvent . Reset ( ) ;
m_oddEvent . Set ( ) ;
}
}
/// <summary>
/// Wait until the current phase finishes completely by spinning until either the event is set,
/// or the phase count is incremented more than one time
/// </summary>
/// <param name="currentPhaseEvent">The current phase event</param>
/// <param name="observedPhase">The current phase for that thread</param>
private void WaitCurrentPhase ( ManualResetEventSlim currentPhaseEvent , long observedPhase )
{
//spin until either of these two conditions succeeds
//1- The event is set
//2- the phase count is incremented more than one time, this means the next phase is finished as well,
//but the event will be reset again, so we check the phase count instead
SpinWait spinner = new SpinWait ( ) ;
while ( ! currentPhaseEvent . IsSet & & CurrentPhaseNumber - observedPhase < = 1 )
{
spinner . SpinOnce ( ) ;
}
}
/// <summary>
/// The reason of discontinuous waiting instead of direct waiting on the event is to avoid the ---- where the sense is
/// changed twice because the next phase is finished (due to either RemoveParticipant is called or another thread joined
/// the next phase instead of the current thread) so the current thread will be stuck on the event because it is reset back
/// The maxwait and the shift numbers are arbitrarily choosen, there were no references picking them
/// </summary>
/// <param name="currentPhaseEvent">The current phase event</param>
/// <param name="totalTimeout">wait timeout in milliseconds</param>
/// <param name="token">cancellation token passed to SignalAndWait</param>
/// <param name="observedPhase">The current phase number for this thread</param>
/// <returns>True if the event is set or the phasenumber changed, false if the timeout expired</returns>
private bool DiscontinuousWait ( ManualResetEventSlim currentPhaseEvent , int totalTimeout , CancellationToken token , long observedPhase )
{
int maxWait = 100 ; // 100 ms
int waitTimeCeiling = 10000 ; // 10 seconds
while ( observedPhase = = CurrentPhaseNumber )
{
// the next wait time, the min of the maxWait and the totalTimeout
int waitTime = totalTimeout = = Timeout . Infinite ? maxWait : Math . Min ( maxWait , totalTimeout ) ;
if ( currentPhaseEvent . Wait ( waitTime , token ) ) return true ;
//update the total wait time
if ( totalTimeout ! = Timeout . Infinite )
{
totalTimeout - = waitTime ;
if ( totalTimeout < = 0 ) return false ;
}
//if the maxwait exceeded 10 seconds then we will stop increasing the maxWait time and keep it 10 seconds, otherwise keep doubling it
maxWait = maxWait > = waitTimeCeiling ? waitTimeCeiling : Math . Min ( maxWait < < 1 , waitTimeCeiling ) ;
}
//if we exited the loop because the observed phase doesn't match the current phase, then we have to spin to mske sure
//the event is set or the next phase is finished
WaitCurrentPhase ( currentPhaseEvent , observedPhase ) ;
return true ;
}
/// <summary>
/// Releases all resources used by the current instance of <see cref="Barrier"/>.
/// </summary>
/// <exception cref="T:System.InvalidOperationException">
/// The method was invoked from within a post-phase action.
/// </exception>
/// <remarks>
/// Unlike most of the members of <see cref="Barrier"/>, Dispose is not thread-safe and may not be
/// used concurrently with other members of this instance.
/// </remarks>
public void Dispose ( )
{
// in case of this is called from the PHA
if ( m_actionCallerID ! = 0 & & Thread . CurrentThread . ManagedThreadId = = m_actionCallerID )
{
throw new InvalidOperationException ( SR . GetString ( SR . Barrier_InvalidOperation_CalledFromPHA ) ) ;
}
Dispose ( true ) ;
GC . SuppressFinalize ( this ) ;
}
/// <summary>
/// When overridden in a derived class, releases the unmanaged resources used by the
/// <see cref="Barrier"/>, and optionally releases the managed resources.
/// </summary>
/// <param name="disposing">true to release both managed and unmanaged resources; false to release
/// only unmanaged resources.</param>
/// <remarks>
/// Unlike most of the members of <see cref="Barrier"/>, Dispose is not thread-safe and may not be
/// used concurrently with other members of this instance.
/// </remarks>
protected virtual void Dispose ( bool disposing )
{
if ( ! m_disposed )
{
if ( disposing )
{
m_oddEvent . Dispose ( ) ;
m_evenEvent . Dispose ( ) ;
#if ! PFX_LEGACY_3_5
if ( m_ownerThreadContext ! = null )
{
m_ownerThreadContext . Dispose ( ) ;
m_ownerThreadContext = null ;
}
#endif
}
m_disposed = true ;
}
}
/// <summary>
/// Throw ObjectDisposedException if the barrier is disposed
/// </summary>
private void ThrowIfDisposed ( )
{
if ( m_disposed )
{
throw new ObjectDisposedException ( "Barrier" , SR . GetString ( SR . Barrier_Dispose ) ) ;
}
}
}
}