Files
UnrealEngineUWP/Engine/Source/Programs/UnrealBuildTool/Executors/ParallelExecutor.cs
bryan sefcik 1ad98523c4 Added support to give actions in UBT "weight". "Weight" is an average measurement of how many cores and memory an action uses. Using one core with a normal amount of memory for an action would be a value of 1.
Why do this:
Currently when compiling a cpp file with MSVC, it compiles across multiple cores while clang does not. This means that while we support limiting the number of cores(using ProcessorCountMultiplier), MSVC will use more cores than we specify. It also means that MSVC will always be faster when compiling because clang does not support compiling a cpp over multiple cores. To get similiar results when compiling with clang, we set the weight of MSVC to 1.5 and the weight of clang to 1.0. We then set the ProcessorCountMultiplier to 1.5. This results in MSVC and clang taking roughly the same amount of CPU utilization and clang compiles to be much faster.

	                     Old Timing(secs)	Old CPU Utilization	New Timing	New CPU Utilization(secs)
PlatformA AncientGame	590.94	51	                               431.47	73
MSVC AncientGameEditor	1016.96	94	                             1026.08	95
Clang AncientGameEditor	1543.72	63	                               1270.4	84
PlatformB AncientGame	494	52	                               396.95	74

Old = without weight path
New = with weight path

#jira
#rb christopher.waters, joe.kirchoff
#preflight 6409026c8832f48a4dc72025

[CL 24567859 by bryan sefcik in ue5-main branch]
2023-03-08 17:08:22 -05:00

573 lines
20 KiB
C#

// Copyright Epic Games, Inc. All Rights Reserved.
using EpicGames.Core;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace UnrealBuildTool
{
/// <summary>
/// This executor uses async Tasks to process the action graph
/// </summary>
class ParallelExecutor : ActionExecutor
{
/// <summary>
/// Maximum processor count for local execution.
/// </summary>
[XmlConfigFile]
[Obsolete("ParallelExecutor.MaxProcessorCount is deprecated. Please update xml to use BuildConfiguration.MaxParallelActions")]
#pragma warning disable 0169
private static int MaxProcessorCount;
#pragma warning restore 0169
/// <summary>
/// Processor count multiplier for local execution. Can be below 1 to reserve CPU for other tasks.
/// When using the local executor (not XGE), run a single action on each CPU core. Note that you can set this to a larger value
/// to get slightly faster build times in many cases, but your computer's responsiveness during compiling may be much worse.
/// This value is ignored if the CPU does not support hyper-threading.
/// </summary>
[XmlConfigFile]
private static double ProcessorCountMultiplier = 1.0;
/// <summary>
/// Free memory per action in bytes, used to limit the number of parallel actions if the machine is memory starved.
/// Set to 0 to disable free memory checking.
/// </summary>
[XmlConfigFile]
private static double MemoryPerActionBytes = 1.5 * 1024 * 1024 * 1024;
/// <summary>
/// The priority to set for spawned processes.
/// Valid Settings: Idle, BelowNormal, Normal, AboveNormal, High
/// Default: BelowNormal or Normal for an Asymmetrical processor as BelowNormal can cause scheduling issues.
/// </summary>
[XmlConfigFile]
private static ProcessPriorityClass ProcessPriority = Utils.IsAsymmetricalProcessor() ? ProcessPriorityClass.Normal : ProcessPriorityClass.BelowNormal;
/// <summary>
/// When enabled, will stop compiling targets after a compile error occurs.
/// </summary>
[XmlConfigFile]
private static bool bStopCompilationAfterErrors = false;
/// <summary>
/// Whether to show compilation times along with worst offenders or not.
/// </summary>
[XmlConfigFile]
private static bool bShowCompilationTimes = false;
/// <summary>
/// Whether to show compilation times for each executed action
/// </summary>
[XmlConfigFile]
private static bool bShowPerActionCompilationTimes = false;
/// <summary>
/// Whether to log command lines for actions being executed
/// </summary>
[XmlConfigFile]
private static bool bLogActionCommandLines = false;
/// <summary>
/// Add target names for each action executed
/// </summary>
[XmlConfigFile]
private static bool bPrintActionTargetNames = false;
/// <summary>
/// Whether to take into account the Action's weight when determining to do more work or not.
/// </summary>
[XmlConfigFile]
private static bool bUseActionWeights = false;
/// <summary>
/// Whether to show CPU utilization after the work is complete.
/// </summary>
[XmlConfigFile]
private static bool bShowCPUUtilization = false;
/// <summary>
/// Collapse non-error output lines
/// </summary>
private bool bCompactOutput = false;
/// <summary>
/// How many processes that will be executed in parallel
/// </summary>
public int NumParallelProcesses { get; private set; }
private static readonly char[] LineEndingSplit = new char[] { '\n', '\r' };
public static int GetDefaultNumParallelProcesses(int MaxLocalActions, bool bAllCores, ILogger Logger)
{
double MemoryPerActionBytesComputed = Math.Max(MemoryPerActionBytes, MemoryPerActionBytesOverride);
if (MemoryPerActionBytesComputed > MemoryPerActionBytes)
{
Logger.LogInformation("Overriding MemoryPerAction with target-defined value of {Memory} bytes", MemoryPerActionBytesComputed / 1024 / 1024 / 1024);
}
return Utils.GetMaxActionsToExecuteInParallel(MaxLocalActions, ProcessorCountMultiplier, bAllCores, Convert.ToInt64(MemoryPerActionBytesComputed));
}
/// <summary>
/// Constructor
/// </summary>
/// <param name="MaxLocalActions">How many actions to execute in parallel</param>
/// <param name="bAllCores">Consider logical cores when determining how many total cpu cores are available</param>
/// <param name="bCompactOutput">Should output be written in a compact fashion</param>
/// <param name="Logger">Logger for output</param>
public ParallelExecutor(int MaxLocalActions, bool bAllCores, bool bCompactOutput, ILogger Logger)
{
XmlConfig.ApplyTo(this);
// Figure out how many processors to use
NumParallelProcesses = GetDefaultNumParallelProcesses(MaxLocalActions, bAllCores, Logger);
this.bCompactOutput = bCompactOutput;
}
/// <summary>
/// Returns the name of this executor
/// </summary>
public override string Name
{
get { return "Parallel"; }
}
/// <summary>
/// Checks whether the task executor can be used
/// </summary>
/// <returns>True if the task executor can be used</returns>
public static bool IsAvailable()
{
return true;
}
protected class ExecuteResults
{
public List<string> LogLines { get; private set; }
public int ExitCode { get; private set; }
public TimeSpan ExecutionTime { get; private set; }
public TimeSpan ProcessorTime { get; private set; }
public string? AdditionalDescription { get; protected set; } = null;
public ExecuteResults(List<string> LogLines, int ExitCode, TimeSpan ExecutionTime, TimeSpan ProcessorTime)
{
this.LogLines = LogLines;
this.ExitCode = ExitCode;
this.ProcessorTime = ProcessorTime;
this.ExecutionTime = ExecutionTime;
}
public ExecuteResults(List<string> LogLines, int ExitCode)
{
this.LogLines = LogLines;
this.ExitCode = ExitCode;
}
}
enum ActionStatus : byte
{
Queued,
Running,
Finished,
Error,
}
/// <summary>
/// Executes the specified actions locally.
/// </summary>
/// <returns>True if all the tasks successfully executed, or false if any of them failed.</returns>
public override bool ExecuteActions(IEnumerable<LinkedAction> InputActions, ILogger Logger)
{
if (!InputActions.Any())
{
return true;
}
int TotalActions = InputActions.Count();
int NumCompletedActions = 0;
int ActualNumParallelProcesses = Math.Min(TotalActions, NumParallelProcesses);
using ManagedProcessGroup ProcessGroup = new ManagedProcessGroup();
CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();
CancellationToken CancellationToken = CancellationTokenSource.Token;
using ProgressWriter ProgressWriter = new ProgressWriter("Compiling C++ source code...", false, Logger);
Dictionary<LinkedAction, Task<ExecuteResults>> ActionToTaskLookup = new();
Logger.LogInformation($"------ Building {TotalActions} action(s) started ------");
var Actions = new LinkedAction[TotalActions];
var ActionsStatus = new ActionStatus[TotalActions];
int Index = 0;
foreach (var Action in InputActions)
{
Actions[Index] = Action;
ActionsStatus[Index] = ActionStatus.Queued;
Action.SortIndex = Index++;
}
var ActiveTasks = new Task<ExecuteResults>[ActualNumParallelProcesses];
var ActiveActionIndices = new int[ActualNumParallelProcesses];
float MaxParallelProcessWeight = (float)ActualNumParallelProcesses;
float ActiveTaskWeight = 0.0f;
int ActiveTaskCount = 0;
bool Success = true;
List<float> LoggedCpuUtilization = new List<float>();
Stopwatch CpuQueryUtilization = new Stopwatch();
CpuQueryUtilization.Start();
int FirstQueuedAction = 0;
Task LastLoggingTask = Task.CompletedTask;
// Loop until all actions has been processed and finished
while (!CancellationTokenSource.IsCancellationRequested)
{
// Always fill up to allowed number of processes
while (bUseActionWeights ?
(ActiveTaskWeight < MaxParallelProcessWeight && ActiveTaskCount < ActualNumParallelProcesses)
: ActiveTaskCount < ActualNumParallelProcesses)
{
// Always traverse from first not started/finished action. We always want to respect the sorting.
// Note that there might be multiple actions we need to search passed to find the one we can run next
int ActionToRunIndex = -1;
bool FoundFirstQueued = false;
for (int i = FirstQueuedAction; i!= Actions.Length; ++i)
{
if (ActionsStatus[i] != ActionStatus.Queued)
{
continue;
}
if (!FoundFirstQueued) // We found the first queued action, let's move our search start index
{
FirstQueuedAction = i;
FoundFirstQueued = true;
}
// Let's see if all prerequisite actions are finished
bool ReadyToRun = true;
bool HasError = false;
foreach (var Prereq in Actions[i].PrerequisiteActions)
{
ActionStatus PrereqStatus = ActionsStatus[Prereq.SortIndex];
ReadyToRun = ReadyToRun && PrereqStatus == ActionStatus.Finished;
HasError = HasError || PrereqStatus == ActionStatus.Error;
}
// Not ready, look for next queued action
if (!ReadyToRun)
{
// If Prereq has error, this action inherits the error
if (HasError)
{
ActionsStatus[i] = ActionStatus.Error;
continue;
}
continue;
}
ActionToRunIndex = i;
break;
}
if (bShowCPUUtilization && CpuQueryUtilization.Elapsed.Seconds > 1)
{
if (Utils.GetTotalCpuUtilization(out float CpuUtilization))
{
LoggedCpuUtilization.Add(CpuUtilization);
}
CpuQueryUtilization.Restart();
}
// Didn't find an action we can run. Either there are none left or all depends on currently running actions
if (ActionToRunIndex == -1)
{
break;
}
// Start new action and add it to the ActiveTasks
var ActionToRun = Actions[ActionToRunIndex];
Task<ExecuteResults> NewTask = Task.Run(() => RunAction(ActionToRun, ProcessGroup, CancellationToken));
ActionToTaskLookup.Add(ActionToRun, NewTask);
ActiveTasks[ActiveTaskCount] = NewTask;
ActionsStatus[ActionToRunIndex] = ActionStatus.Running;
ActiveActionIndices[ActiveTaskCount] = ActionToRunIndex;
ActiveTaskWeight += ActionToRun.Weight;
++ActiveTaskCount;
}
// We tried starting new actions and none was added and none is running, that means that we are done!
if (ActiveTaskCount == 0)
{
break;
}
// Wait for all tasks and continue if any is done.
Task<ExecuteResults>[] TasksToWaitOn = ActiveTasks;
if (ActiveTaskCount < ActiveTasks.Length)
{
TasksToWaitOn = new Task<ExecuteResults>[ActiveTaskCount];
Array.Copy(ActiveTasks, TasksToWaitOn, ActiveTaskCount);
}
int ActiveTaskIndex = Task.WaitAny(TasksToWaitOn, Timeout.Infinite, CancellationToken);
// Timeout? This should never happen.. We have a check here just in case
if (ActiveTaskIndex == -1)
{
Logger.LogError("Task timeout? This should never happen. Report to Epic if you see this");
Success = false;
break;
}
int FinishedActionIndex = ActiveActionIndices[ActiveTaskIndex];
LinkedAction FinishedAction = Actions[FinishedActionIndex];
Task<ExecuteResults> FinishedTask = ActiveTasks[ActiveTaskIndex];
bool ActionSuccess = FinishedTask.Result.ExitCode == 0;
ActionsStatus[FinishedActionIndex] = ActionSuccess ? ActionStatus.Finished : ActionStatus.Error;
// Log (chain log entries together so we don't get annoying overlapping logging)
LastLoggingTask = LastLoggingTask.ContinueWith((foo) =>
{
LogCompletedAction(FinishedAction, FinishedTask, CancellationTokenSource, ProgressWriter, TotalActions, ref NumCompletedActions, Logger);
});
// Swap finished task with last and pop last.
--ActiveTaskCount;
ActiveTasks[ActiveTaskIndex] = ActiveTasks[ActiveTaskCount];
ActiveActionIndices[ActiveTaskIndex] = ActiveActionIndices[ActiveTaskCount];
ActiveTaskWeight -= FinishedAction.Weight;
// Update Success status
Success = Success && ActionSuccess;
}
// Wait for last logging entry before showing summary
LastLoggingTask.Wait();
// Summary
TraceSummary(ActionToTaskLookup, ProcessGroup, Logger, LoggedCpuUtilization);
return Success;
}
protected static async Task<ExecuteResults> RunAction(LinkedAction Action, ManagedProcessGroup ProcessGroup, CancellationToken CancellationToken)
{
CancellationToken.ThrowIfCancellationRequested();
using ManagedProcess Process = new ManagedProcess(ProcessGroup, Action.CommandPath.FullName, Action.CommandArguments, Action.WorkingDirectory.FullName, null, null, ProcessPriority);
using MemoryStream StdOutStream = new MemoryStream();
await Process.CopyToAsync(StdOutStream, CancellationToken);
CancellationToken.ThrowIfCancellationRequested();
Process.WaitForExit();
List<string> LogLines = Console.OutputEncoding.GetString(StdOutStream.GetBuffer(), 0, Convert.ToInt32(StdOutStream.Length)).Split(LineEndingSplit, StringSplitOptions.RemoveEmptyEntries).ToList();
int ExitCode = Process.ExitCode;
TimeSpan ProcessorTime = Process.TotalProcessorTime;
TimeSpan ExecutionTime = Process.ExitTime - Process.StartTime;
return new ExecuteResults(LogLines, ExitCode, ExecutionTime, ProcessorTime);
}
private static int s_previousLineLength = -1;
protected void LogCompletedAction(LinkedAction Action, Task<ExecuteResults> ExecuteTask, CancellationTokenSource CancellationTokenSource, ProgressWriter ProgressWriter, int TotalActions, ref int NumCompletedActions, ILogger Logger)
{
List<string> LogLines = new List<string>();
int ExitCode = int.MaxValue;
TimeSpan ExecutionTime = TimeSpan.Zero;
TimeSpan ProcessorTime = TimeSpan.Zero;
string? AdditionalDescription = null;
if (ExecuteTask.Status == TaskStatus.RanToCompletion)
{
ExecuteResults ExecuteTaskResult = ExecuteTask.Result;
LogLines = ExecuteTaskResult.LogLines;
ExitCode = ExecuteTaskResult.ExitCode;
ExecutionTime = ExecuteTaskResult.ExecutionTime;
ProcessorTime = ExecuteTaskResult.ProcessorTime;
AdditionalDescription = ExecuteTaskResult.AdditionalDescription;
}
// Write it to the log
string Description = string.Empty;
if (Action.bShouldOutputStatusDescription || LogLines.Count == 0)
{
Description = $"{(Action.CommandDescription ?? Action.CommandPath.GetFileNameWithoutExtension())} {Action.StatusDescription}".Trim();
}
else if (LogLines.Count > 0)
{
Description = $"{(Action.CommandDescription ?? Action.CommandPath.GetFileNameWithoutExtension())} {LogLines[0]}".Trim();
}
if (!string.IsNullOrEmpty(AdditionalDescription))
{
Description = $"{AdditionalDescription} {Description}";
}
lock (ProgressWriter)
{
int CompletedActions;
CompletedActions = Interlocked.Increment(ref NumCompletedActions);
ProgressWriter.Write(CompletedActions, TotalActions);
// Cancelled
if (ExitCode == int.MaxValue)
{
Logger.LogInformation("[{CompletedActions}/{TotalActions}] {Description} cancelled", CompletedActions, TotalActions, Description);
return;
}
string TargetDetails = "";
TargetDescriptor? Target = Action.Target;
if (bPrintActionTargetNames && Target != null)
{
TargetDetails = $"[{Target.Name} {Target.Platform} {Target.Configuration}]";
}
if (bLogActionCommandLines)
{
Logger.LogDebug("[{CompletedActions}/{TotalActions}]{TargetDetails} Command: {CommandPath} {CommandArguments}", CompletedActions, TotalActions, TargetDetails, Action.CommandPath, Action.CommandArguments);
}
string CompilationTimes = "";
if (bShowPerActionCompilationTimes)
{
if (ProcessorTime.Ticks > 0)
{
CompilationTimes = $" (Wall: {ExecutionTime.TotalSeconds:0.00}s CPU: {ProcessorTime.TotalSeconds:0.00}s)";
}
else
{
CompilationTimes = $" (Wall: {ExecutionTime.TotalSeconds:0.00}s)";
}
}
string message = ($"[{CompletedActions}/{TotalActions}]{TargetDetails}{CompilationTimes} {Description}");
if (bCompactOutput)
{
if (s_previousLineLength > 0)
{
// move the cursor to the far left position, one line back
Console.CursorLeft = 0;
Console.CursorTop -= 1;
// clear the line
Console.Write("".PadRight(s_previousLineLength));
// move the cursor back to the left, so output is written to the desired location
Console.CursorLeft = 0;
}
}
s_previousLineLength = message.Length;
Log.TraceInformation("{0}", message); // Need to send this through registered event parser; using old logger
foreach (string Line in LogLines.Skip(Action.bShouldOutputStatusDescription ? 0 : 1))
{
// suppress library creation messages when writing compact output
if (bCompactOutput && Line.StartsWith(" Creating library ") && Line.EndsWith(".exp"))
{
continue;
}
Log.TraceInformation("{0}", Line);
// Prevent overwriting of logged lines
s_previousLineLength = -1;
}
if (ExitCode != 0)
{
// BEGIN TEMPORARY TO CATCH PVS-STUDIO ISSUES
if (LogLines.Count == 0)
{
Logger.LogError("{TargetDetails} {Description}: Exited with error code {ExitCode}", TargetDetails, Description, ExitCode);
Logger.LogInformation("{TargetDetails} {Description}: WorkingDirectory {WorkingDirectory}", TargetDetails, Description, Action.WorkingDirectory);
Logger.LogInformation("{TargetDetails} {Description}: {CommandPath} {CommandArguments}", TargetDetails, Description, Action.CommandPath, Action.CommandArguments);
}
// END TEMPORARY
// prevent overwriting of error text
s_previousLineLength = -1;
// Cancel all other pending tasks
if (bStopCompilationAfterErrors)
{
CancellationTokenSource.Cancel();
}
}
}
}
protected static void TraceSummary(Dictionary<LinkedAction, Task<ExecuteResults>> Tasks, ManagedProcessGroup ProcessGroup, ILogger Logger, IList<float>? LoggedCpuUtilization)
{
if (bShowCPUUtilization && LoggedCpuUtilization != null && LoggedCpuUtilization.Any())
{
Logger.LogInformation("");
Logger.LogInformation("Average CPU Utilization: {CPUPercentage}%",(int)(LoggedCpuUtilization.Average()));
}
if (!bShowCompilationTimes)
{
return;
}
Logger.LogInformation("");
if (ProcessGroup.TotalProcessorTime.Ticks > 0)
{
Logger.LogInformation("Total CPU Time: {TotalSeconds} s", ProcessGroup.TotalProcessorTime.TotalSeconds);
Logger.LogInformation("");
}
var CompletedTasks = Tasks.Where(x => x.Value.Status == TaskStatus.RanToCompletion).OrderByDescending(x => x.Value.Result.ExecutionTime).Take(20);
if (CompletedTasks.Any())
{
Logger.LogInformation("Compilation Time Top {CompletedTaskCount}", CompletedTasks.Count());
Logger.LogInformation("");
foreach (var Pair in CompletedTasks)
{
string Description = $"{(Pair.Key.Inner.CommandDescription ?? Pair.Key.Inner.CommandPath.GetFileNameWithoutExtension())} {Pair.Key.Inner.StatusDescription}".Trim();
if (Pair.Value.Result.ProcessorTime.Ticks > 0)
{
Logger.LogInformation("{Description} [ Wall Time {ExecutionTime:0.00} s / CPU Time {ProcessorTime:0.00} s ]", Description, Pair.Value.Result.ExecutionTime.TotalSeconds, Pair.Value.Result.ProcessorTime.TotalSeconds);
}
else
{
Logger.LogInformation("{Description} [ Time {ExecutionTime:0.00} s ]", Description, Pair.Value.Result.ExecutionTime.TotalSeconds);
}
}
Logger.LogInformation("");
}
}
}
/// <summary>
/// Publicly visible static class that allows external access to the parallel executor config
/// </summary>
public static class ParallelExecutorConfiguration
{
/// <summary>
/// Maximum number of processes that should be used for execution
/// </summary>
public static int GetMaxParallelProcesses(ILogger Logger) => ParallelExecutor.GetDefaultNumParallelProcesses(0, false, Logger);
/// <summary>
/// Maximum number of processes that should be used for execution
/// </summary>
public static int GetMaxParallelProcesses(int MaxLocalActions, bool bAllCores, ILogger Logger) => ParallelExecutor.GetDefaultNumParallelProcesses(MaxLocalActions, bAllCores, Logger);
}
}