// Copyright Epic Games, Inc. All Rights Reserved. using EpicGames.Core; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace UnrealBuildTool { /// /// This executor uses async Tasks to process the action graph /// class ParallelExecutor : ActionExecutor { /// /// Maximum processor count for local execution. /// [XmlConfigFile] [Obsolete("ParallelExecutor.MaxProcessorCount is deprecated. Please update xml to use BuildConfiguration.MaxParallelActions")] #pragma warning disable 0169 private static int MaxProcessorCount; #pragma warning restore 0169 /// /// Processor count multiplier for local execution. Can be below 1 to reserve CPU for other tasks. /// When using the local executor (not XGE), run a single action on each CPU core. Note that you can set this to a larger value /// to get slightly faster build times in many cases, but your computer's responsiveness during compiling may be much worse. /// This value is ignored if the CPU does not support hyper-threading. /// [XmlConfigFile] private static double ProcessorCountMultiplier = 1.0; /// /// Free memory per action in bytes, used to limit the number of parallel actions if the machine is memory starved. /// Set to 0 to disable free memory checking. /// [XmlConfigFile] private static double MemoryPerActionBytes = 1.5 * 1024 * 1024 * 1024; /// /// The priority to set for spawned processes. /// Valid Settings: Idle, BelowNormal, Normal, AboveNormal, High /// Default: BelowNormal or Normal for an Asymmetrical processor as BelowNormal can cause scheduling issues. /// [XmlConfigFile] private static ProcessPriorityClass ProcessPriority = Utils.IsAsymmetricalProcessor() ? ProcessPriorityClass.Normal : ProcessPriorityClass.BelowNormal; /// /// When enabled, will stop compiling targets after a compile error occurs. /// [XmlConfigFile] private static bool bStopCompilationAfterErrors = false; /// /// Whether to show compilation times along with worst offenders or not. /// [XmlConfigFile] private static bool bShowCompilationTimes = false; /// /// Whether to show compilation times for each executed action /// [XmlConfigFile] private static bool bShowPerActionCompilationTimes = false; /// /// Whether to log command lines for actions being executed /// [XmlConfigFile] private static bool bLogActionCommandLines = false; /// /// Add target names for each action executed /// [XmlConfigFile] private static bool bPrintActionTargetNames = false; /// /// Whether to take into account the Action's weight when determining to do more work or not. /// [XmlConfigFile] private static bool bUseActionWeights = false; /// /// Whether to show CPU utilization after the work is complete. /// [XmlConfigFile] private static bool bShowCPUUtilization = false; /// /// Collapse non-error output lines /// private bool bCompactOutput = false; /// /// How many processes that will be executed in parallel /// public int NumParallelProcesses { get; private set; } private static readonly char[] LineEndingSplit = new char[] { '\n', '\r' }; public static int GetDefaultNumParallelProcesses(int MaxLocalActions, bool bAllCores, ILogger Logger) { double MemoryPerActionBytesComputed = Math.Max(MemoryPerActionBytes, MemoryPerActionBytesOverride); if (MemoryPerActionBytesComputed > MemoryPerActionBytes) { Logger.LogInformation("Overriding MemoryPerAction with target-defined value of {Memory} bytes", MemoryPerActionBytesComputed / 1024 / 1024 / 1024); } return Utils.GetMaxActionsToExecuteInParallel(MaxLocalActions, ProcessorCountMultiplier, bAllCores, Convert.ToInt64(MemoryPerActionBytesComputed)); } /// /// Constructor /// /// How many actions to execute in parallel /// Consider logical cores when determining how many total cpu cores are available /// Should output be written in a compact fashion /// Logger for output public ParallelExecutor(int MaxLocalActions, bool bAllCores, bool bCompactOutput, ILogger Logger) { XmlConfig.ApplyTo(this); // Figure out how many processors to use NumParallelProcesses = GetDefaultNumParallelProcesses(MaxLocalActions, bAllCores, Logger); this.bCompactOutput = bCompactOutput; } /// /// Returns the name of this executor /// public override string Name { get { return "Parallel"; } } /// /// Checks whether the task executor can be used /// /// True if the task executor can be used public static bool IsAvailable() { return true; } protected class ExecuteResults { public List LogLines { get; private set; } public int ExitCode { get; private set; } public TimeSpan ExecutionTime { get; private set; } public TimeSpan ProcessorTime { get; private set; } public string? AdditionalDescription { get; protected set; } = null; public ExecuteResults(List LogLines, int ExitCode, TimeSpan ExecutionTime, TimeSpan ProcessorTime) { this.LogLines = LogLines; this.ExitCode = ExitCode; this.ProcessorTime = ProcessorTime; this.ExecutionTime = ExecutionTime; } public ExecuteResults(List LogLines, int ExitCode) { this.LogLines = LogLines; this.ExitCode = ExitCode; } } enum ActionStatus : byte { Queued, Running, Finished, Error, } /// /// Executes the specified actions locally. /// /// True if all the tasks successfully executed, or false if any of them failed. public override bool ExecuteActions(IEnumerable InputActions, ILogger Logger) { if (!InputActions.Any()) { return true; } int TotalActions = InputActions.Count(); int NumCompletedActions = 0; int ActualNumParallelProcesses = Math.Min(TotalActions, NumParallelProcesses); using ManagedProcessGroup ProcessGroup = new ManagedProcessGroup(); CancellationTokenSource CancellationTokenSource = new CancellationTokenSource(); CancellationToken CancellationToken = CancellationTokenSource.Token; using ProgressWriter ProgressWriter = new ProgressWriter("Compiling C++ source code...", false, Logger); Dictionary> ActionToTaskLookup = new(); Logger.LogInformation($"------ Building {TotalActions} action(s) started ------"); var Actions = new LinkedAction[TotalActions]; var ActionsStatus = new ActionStatus[TotalActions]; int Index = 0; foreach (var Action in InputActions) { Actions[Index] = Action; ActionsStatus[Index] = ActionStatus.Queued; Action.SortIndex = Index++; } var ActiveTasks = new Task[ActualNumParallelProcesses]; var ActiveActionIndices = new int[ActualNumParallelProcesses]; float MaxParallelProcessWeight = (float)ActualNumParallelProcesses; float ActiveTaskWeight = 0.0f; int ActiveTaskCount = 0; bool Success = true; List LoggedCpuUtilization = new List(); Stopwatch CpuQueryUtilization = new Stopwatch(); CpuQueryUtilization.Start(); int FirstQueuedAction = 0; Task LastLoggingTask = Task.CompletedTask; // Loop until all actions has been processed and finished while (!CancellationTokenSource.IsCancellationRequested) { // Always fill up to allowed number of processes while (bUseActionWeights ? (ActiveTaskWeight < MaxParallelProcessWeight && ActiveTaskCount < ActualNumParallelProcesses) : ActiveTaskCount < ActualNumParallelProcesses) { // Always traverse from first not started/finished action. We always want to respect the sorting. // Note that there might be multiple actions we need to search passed to find the one we can run next int ActionToRunIndex = -1; bool FoundFirstQueued = false; for (int i = FirstQueuedAction; i!= Actions.Length; ++i) { if (ActionsStatus[i] != ActionStatus.Queued) { continue; } if (!FoundFirstQueued) // We found the first queued action, let's move our search start index { FirstQueuedAction = i; FoundFirstQueued = true; } // Let's see if all prerequisite actions are finished bool ReadyToRun = true; bool HasError = false; foreach (var Prereq in Actions[i].PrerequisiteActions) { ActionStatus PrereqStatus = ActionsStatus[Prereq.SortIndex]; ReadyToRun = ReadyToRun && PrereqStatus == ActionStatus.Finished; HasError = HasError || PrereqStatus == ActionStatus.Error; } // Not ready, look for next queued action if (!ReadyToRun) { // If Prereq has error, this action inherits the error if (HasError) { ActionsStatus[i] = ActionStatus.Error; continue; } continue; } ActionToRunIndex = i; break; } if (bShowCPUUtilization && CpuQueryUtilization.Elapsed.Seconds > 1) { if (Utils.GetTotalCpuUtilization(out float CpuUtilization)) { LoggedCpuUtilization.Add(CpuUtilization); } CpuQueryUtilization.Restart(); } // Didn't find an action we can run. Either there are none left or all depends on currently running actions if (ActionToRunIndex == -1) { break; } // Start new action and add it to the ActiveTasks var ActionToRun = Actions[ActionToRunIndex]; Task NewTask = Task.Run(() => RunAction(ActionToRun, ProcessGroup, CancellationToken)); ActionToTaskLookup.Add(ActionToRun, NewTask); ActiveTasks[ActiveTaskCount] = NewTask; ActionsStatus[ActionToRunIndex] = ActionStatus.Running; ActiveActionIndices[ActiveTaskCount] = ActionToRunIndex; ActiveTaskWeight += ActionToRun.Weight; ++ActiveTaskCount; } // We tried starting new actions and none was added and none is running, that means that we are done! if (ActiveTaskCount == 0) { break; } // Wait for all tasks and continue if any is done. Task[] TasksToWaitOn = ActiveTasks; if (ActiveTaskCount < ActiveTasks.Length) { TasksToWaitOn = new Task[ActiveTaskCount]; Array.Copy(ActiveTasks, TasksToWaitOn, ActiveTaskCount); } int ActiveTaskIndex = Task.WaitAny(TasksToWaitOn, Timeout.Infinite, CancellationToken); // Timeout? This should never happen.. We have a check here just in case if (ActiveTaskIndex == -1) { Logger.LogError("Task timeout? This should never happen. Report to Epic if you see this"); Success = false; break; } int FinishedActionIndex = ActiveActionIndices[ActiveTaskIndex]; LinkedAction FinishedAction = Actions[FinishedActionIndex]; Task FinishedTask = ActiveTasks[ActiveTaskIndex]; bool ActionSuccess = FinishedTask.Result.ExitCode == 0; ActionsStatus[FinishedActionIndex] = ActionSuccess ? ActionStatus.Finished : ActionStatus.Error; // Log (chain log entries together so we don't get annoying overlapping logging) LastLoggingTask = LastLoggingTask.ContinueWith((foo) => { LogCompletedAction(FinishedAction, FinishedTask, CancellationTokenSource, ProgressWriter, TotalActions, ref NumCompletedActions, Logger); }); // Swap finished task with last and pop last. --ActiveTaskCount; ActiveTasks[ActiveTaskIndex] = ActiveTasks[ActiveTaskCount]; ActiveActionIndices[ActiveTaskIndex] = ActiveActionIndices[ActiveTaskCount]; ActiveTaskWeight -= FinishedAction.Weight; // Update Success status Success = Success && ActionSuccess; } // Wait for last logging entry before showing summary LastLoggingTask.Wait(); // Summary TraceSummary(ActionToTaskLookup, ProcessGroup, Logger, LoggedCpuUtilization); return Success; } protected static async Task RunAction(LinkedAction Action, ManagedProcessGroup ProcessGroup, CancellationToken CancellationToken) { CancellationToken.ThrowIfCancellationRequested(); using ManagedProcess Process = new ManagedProcess(ProcessGroup, Action.CommandPath.FullName, Action.CommandArguments, Action.WorkingDirectory.FullName, null, null, ProcessPriority); using MemoryStream StdOutStream = new MemoryStream(); await Process.CopyToAsync(StdOutStream, CancellationToken); CancellationToken.ThrowIfCancellationRequested(); Process.WaitForExit(); List LogLines = Console.OutputEncoding.GetString(StdOutStream.GetBuffer(), 0, Convert.ToInt32(StdOutStream.Length)).Split(LineEndingSplit, StringSplitOptions.RemoveEmptyEntries).ToList(); int ExitCode = Process.ExitCode; TimeSpan ProcessorTime = Process.TotalProcessorTime; TimeSpan ExecutionTime = Process.ExitTime - Process.StartTime; return new ExecuteResults(LogLines, ExitCode, ExecutionTime, ProcessorTime); } private static int s_previousLineLength = -1; protected void LogCompletedAction(LinkedAction Action, Task ExecuteTask, CancellationTokenSource CancellationTokenSource, ProgressWriter ProgressWriter, int TotalActions, ref int NumCompletedActions, ILogger Logger) { List LogLines = new List(); int ExitCode = int.MaxValue; TimeSpan ExecutionTime = TimeSpan.Zero; TimeSpan ProcessorTime = TimeSpan.Zero; string? AdditionalDescription = null; if (ExecuteTask.Status == TaskStatus.RanToCompletion) { ExecuteResults ExecuteTaskResult = ExecuteTask.Result; LogLines = ExecuteTaskResult.LogLines; ExitCode = ExecuteTaskResult.ExitCode; ExecutionTime = ExecuteTaskResult.ExecutionTime; ProcessorTime = ExecuteTaskResult.ProcessorTime; AdditionalDescription = ExecuteTaskResult.AdditionalDescription; } // Write it to the log string Description = string.Empty; if (Action.bShouldOutputStatusDescription || LogLines.Count == 0) { Description = $"{(Action.CommandDescription ?? Action.CommandPath.GetFileNameWithoutExtension())} {Action.StatusDescription}".Trim(); } else if (LogLines.Count > 0) { Description = $"{(Action.CommandDescription ?? Action.CommandPath.GetFileNameWithoutExtension())} {LogLines[0]}".Trim(); } if (!string.IsNullOrEmpty(AdditionalDescription)) { Description = $"{AdditionalDescription} {Description}"; } lock (ProgressWriter) { int CompletedActions; CompletedActions = Interlocked.Increment(ref NumCompletedActions); ProgressWriter.Write(CompletedActions, TotalActions); // Cancelled if (ExitCode == int.MaxValue) { Logger.LogInformation("[{CompletedActions}/{TotalActions}] {Description} cancelled", CompletedActions, TotalActions, Description); return; } string TargetDetails = ""; TargetDescriptor? Target = Action.Target; if (bPrintActionTargetNames && Target != null) { TargetDetails = $"[{Target.Name} {Target.Platform} {Target.Configuration}]"; } if (bLogActionCommandLines) { Logger.LogDebug("[{CompletedActions}/{TotalActions}]{TargetDetails} Command: {CommandPath} {CommandArguments}", CompletedActions, TotalActions, TargetDetails, Action.CommandPath, Action.CommandArguments); } string CompilationTimes = ""; if (bShowPerActionCompilationTimes) { if (ProcessorTime.Ticks > 0) { CompilationTimes = $" (Wall: {ExecutionTime.TotalSeconds:0.00}s CPU: {ProcessorTime.TotalSeconds:0.00}s)"; } else { CompilationTimes = $" (Wall: {ExecutionTime.TotalSeconds:0.00}s)"; } } string message = ($"[{CompletedActions}/{TotalActions}]{TargetDetails}{CompilationTimes} {Description}"); if (bCompactOutput) { if (s_previousLineLength > 0) { // move the cursor to the far left position, one line back Console.CursorLeft = 0; Console.CursorTop -= 1; // clear the line Console.Write("".PadRight(s_previousLineLength)); // move the cursor back to the left, so output is written to the desired location Console.CursorLeft = 0; } } s_previousLineLength = message.Length; Log.TraceInformation("{0}", message); // Need to send this through registered event parser; using old logger foreach (string Line in LogLines.Skip(Action.bShouldOutputStatusDescription ? 0 : 1)) { // suppress library creation messages when writing compact output if (bCompactOutput && Line.StartsWith(" Creating library ") && Line.EndsWith(".exp")) { continue; } Log.TraceInformation("{0}", Line); // Prevent overwriting of logged lines s_previousLineLength = -1; } if (ExitCode != 0) { // BEGIN TEMPORARY TO CATCH PVS-STUDIO ISSUES if (LogLines.Count == 0) { Logger.LogError("{TargetDetails} {Description}: Exited with error code {ExitCode}", TargetDetails, Description, ExitCode); Logger.LogInformation("{TargetDetails} {Description}: WorkingDirectory {WorkingDirectory}", TargetDetails, Description, Action.WorkingDirectory); Logger.LogInformation("{TargetDetails} {Description}: {CommandPath} {CommandArguments}", TargetDetails, Description, Action.CommandPath, Action.CommandArguments); } // END TEMPORARY // prevent overwriting of error text s_previousLineLength = -1; // Cancel all other pending tasks if (bStopCompilationAfterErrors) { CancellationTokenSource.Cancel(); } } } } protected static void TraceSummary(Dictionary> Tasks, ManagedProcessGroup ProcessGroup, ILogger Logger, IList? LoggedCpuUtilization) { if (bShowCPUUtilization && LoggedCpuUtilization != null && LoggedCpuUtilization.Any()) { Logger.LogInformation(""); Logger.LogInformation("Average CPU Utilization: {CPUPercentage}%",(int)(LoggedCpuUtilization.Average())); } if (!bShowCompilationTimes) { return; } Logger.LogInformation(""); if (ProcessGroup.TotalProcessorTime.Ticks > 0) { Logger.LogInformation("Total CPU Time: {TotalSeconds} s", ProcessGroup.TotalProcessorTime.TotalSeconds); Logger.LogInformation(""); } var CompletedTasks = Tasks.Where(x => x.Value.Status == TaskStatus.RanToCompletion).OrderByDescending(x => x.Value.Result.ExecutionTime).Take(20); if (CompletedTasks.Any()) { Logger.LogInformation("Compilation Time Top {CompletedTaskCount}", CompletedTasks.Count()); Logger.LogInformation(""); foreach (var Pair in CompletedTasks) { string Description = $"{(Pair.Key.Inner.CommandDescription ?? Pair.Key.Inner.CommandPath.GetFileNameWithoutExtension())} {Pair.Key.Inner.StatusDescription}".Trim(); if (Pair.Value.Result.ProcessorTime.Ticks > 0) { Logger.LogInformation("{Description} [ Wall Time {ExecutionTime:0.00} s / CPU Time {ProcessorTime:0.00} s ]", Description, Pair.Value.Result.ExecutionTime.TotalSeconds, Pair.Value.Result.ProcessorTime.TotalSeconds); } else { Logger.LogInformation("{Description} [ Time {ExecutionTime:0.00} s ]", Description, Pair.Value.Result.ExecutionTime.TotalSeconds); } } Logger.LogInformation(""); } } } /// /// Publicly visible static class that allows external access to the parallel executor config /// public static class ParallelExecutorConfiguration { /// /// Maximum number of processes that should be used for execution /// public static int GetMaxParallelProcesses(ILogger Logger) => ParallelExecutor.GetDefaultNumParallelProcesses(0, false, Logger); /// /// Maximum number of processes that should be used for execution /// public static int GetMaxParallelProcesses(int MaxLocalActions, bool bAllCores, ILogger Logger) => ParallelExecutor.GetDefaultNumParallelProcesses(MaxLocalActions, bAllCores, Logger); } }