// Copyright 1998-2019 Epic Games, Inc. All Rights Reserved.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Tools.DotNETCommon
{
///
/// Allows queuing a large number of tasks to a thread pool and waiting for them all to complete.
///
public class ThreadPoolWorkQueue : IDisposable
{
///
/// Object used for controlling access to NumOutstandingJobs and updating EmptyEvent
///
object LockObject = new object();
///
/// Number of jobs remaining in the queue. This is updated in an atomic way.
///
int NumOutstandingJobs;
///
/// Event which indicates whether the queue is empty.
///
ManualResetEvent EmptyEvent = new ManualResetEvent(true);
///
/// Exceptions which occurred while executing tasks
///
List Exceptions = new List();
///
/// Default constructor
///
public ThreadPoolWorkQueue()
{
}
///
/// Waits for the queue to drain, and disposes of it
///
public void Dispose()
{
if(EmptyEvent != null)
{
Wait();
EmptyEvent.Dispose();
EmptyEvent = null;
}
}
///
/// Returns the number of items remaining in the queue
///
public int NumRemaining
{
get { return NumOutstandingJobs; }
}
///
/// Adds an item to the queue
///
/// The action to add
public void Enqueue(Action ActionToExecute)
{
lock(LockObject)
{
if(NumOutstandingJobs == 0)
{
EmptyEvent.Reset();
}
NumOutstandingJobs++;
}
#if SINGLE_THREAD
Execute(ActionToExecute);
#else
ThreadPool.QueueUserWorkItem(Execute, ActionToExecute);
#endif
}
///
/// Internal method to execute an action
///
/// The action to execute
void Execute(object ActionToExecute)
{
try
{
((Action)ActionToExecute)();
}
catch(Exception Ex)
{
lock(LockObject)
{
Exceptions.Add(Ex);
}
throw;
}
finally
{
lock(LockObject)
{
NumOutstandingJobs--;
if(NumOutstandingJobs == 0)
{
EmptyEvent.Set();
}
}
}
}
///
/// Waits for all queued tasks to finish
///
public void Wait()
{
EmptyEvent.WaitOne();
RethrowExceptions();
}
///
/// Waits for all queued tasks to finish, or the timeout to elapse
///
/// Maximum time to wait
/// True if the queue completed, false if the timeout elapsed
public bool Wait(int MillisecondsTimeout)
{
bool bResult = EmptyEvent.WaitOne(MillisecondsTimeout);
if(bResult)
{
RethrowExceptions();
}
return bResult;
}
///
/// Checks for any exceptions which ocurred in queued tasks, and re-throws them on the current thread
///
public void RethrowExceptions()
{
lock(LockObject)
{
if(Exceptions.Count > 0)
{
throw new AggregateException(Exceptions.ToArray());
}
}
}
}
}