// Copyright Epic Games, Inc. All Rights Reserved. using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Security.Cryptography; using System.Threading; using System.Threading.Tasks; using EpicGames.Core; using EpicGames.Horde.Compute; using EpicGames.Horde.Compute.Clients; using EpicGames.Horde.Compute.Transports; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using Horde.Server.Agents; using Horde.Server.Agents.Leases; using Horde.Server.Server; using Horde.Server.Tasks; using HordeCommon; using HordeCommon.Rpc.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace Horde.Server.Compute { /// /// Information about a compute /// public class ComputeResource { /// /// IP address of the agent /// public IPAddress Ip { get; } /// /// Port to connect on /// public int Port { get; } /// /// Information about the compute task /// public ComputeTask Task { get; } /// /// Properties of the assigned agent /// public IReadOnlyList Properties { get; } /// /// Constructor /// public ComputeResource(IPAddress ip, int port, ComputeTask task, IReadOnlyList properties) { Ip = ip; Port = port; Task = task; Properties = properties; } } /// /// Dispatches requests for compute resources /// public class ComputeTaskSource : TaskSourceBase { class Waiter { public IAgent Agent { get; } public IPAddress Ip { get; } public int Port { get; } public TaskCompletionSource Lease { get; } = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); public Waiter(IAgent agent, IPAddress ip, int port) { Agent = agent; Ip = ip; Port = port; } } /// public override string Type => "Compute"; /// public override TaskSourceFlags Flags => TaskSourceFlags.None; readonly IOptionsMonitor _globalConfig; readonly ILogger _logger; readonly object _lockObject = new object(); readonly Dictionary> _waiters = new Dictionary>(); /// /// Constructor /// public ComputeTaskSource(IOptionsMonitor globalConfig, ILogger logger) { _globalConfig = globalConfig; _logger = logger; } /// public override Task> AssignLeaseAsync(IAgent agent, CancellationToken cancellationToken) { return Task.FromResult(WaitInternalAsync(agent, cancellationToken)); } async Task WaitInternalAsync(IAgent agent, CancellationToken cancellationToken) { string? ipStr = agent.GetPropertyValues("ComputeIp").FirstOrDefault(); if (ipStr == null || !IPAddress.TryParse(ipStr, out IPAddress? ip)) { return null; } string? portStr = agent.GetPropertyValues("ComputePort").FirstOrDefault(); if (portStr == null || !Int32.TryParse(portStr, out int port)) { return null; } // Add it to the wait queue List<(LinkedList, LinkedListNode)> nodes = new(); try { GlobalConfig globalConfig = _globalConfig.CurrentValue; Waiter? waiter = null; lock (_lockObject) { foreach (ComputeClusterConfig clusterConfig in globalConfig.Compute) { if (clusterConfig.Condition == null || agent.SatisfiesCondition(clusterConfig.Condition)) { LinkedList? list; if (!_waiters.TryGetValue(clusterConfig.Id, out list)) { list = new LinkedList(); _waiters.Add(clusterConfig.Id, list); } waiter ??= new Waiter(agent, ip, port); list.AddFirst(waiter); } } } if (waiter != null) { using (IDisposable disposable = cancellationToken.Register(() => waiter.Lease.TrySetResult(null))) { AgentLease? lease = await waiter.Lease.Task; if (lease != null) { _logger.LogInformation("Created compute lease for agent {AgentId}", agent.Id); return lease; } } } } finally { lock (_lockObject) { foreach ((LinkedList list, LinkedListNode node) in nodes) { list.Remove(node); } } } return null; } } }