Horde: Various remote execution improvements.

* Add a sample app which uploads an application, streams values to it, and prints the output it echoes back to us.
* Add an in-memory transport which uses a C# pipeline to buffer data.
* Add a client implementation that launches a local Horde agent and connects directly to it.
* Add an IPC message channel to allow attaching new shared memory buffers from remote applications.
* Simplify the API for establishing connections.

#preflight none

[CL 24779164 by Ben Marsh in ue5-main branch]
This commit is contained in:
Ben Marsh
2023-03-24 10:32:39 -04:00
parent 5ebf5846fe
commit ddae4304bd
35 changed files with 1670 additions and 844 deletions
@@ -2,6 +2,8 @@
using System;
using System.Diagnostics;
using System.IO.Pipelines;
using System.IO.Pipes;
using System.Linq;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
@@ -9,6 +11,9 @@ using System.Threading;
using System.Threading.Tasks;
using EpicGames.Horde.Compute;
using EpicGames.Horde.Compute.Buffers;
using EpicGames.Horde.Compute.Transports;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine;
using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace EpicGames.Horde.Tests
@@ -17,33 +22,38 @@ namespace EpicGames.Horde.Tests
public class BufferTests
{
[TestMethod]
public async Task TestHeapBuffer()
public async Task TestPooledBuffer()
{
using HeapBuffer buffer = new HeapBuffer(8000);
await TestProducerConsumerAsync(buffer.Writer, buffer.Reader);
await TestProducerConsumerAsync(length => new PooledBuffer(length));
}
[TestMethod]
public async Task TestIpcBuffer()
public async Task TestSharedMemoryBuffer()
{
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
using IpcBuffer buffer1 = IpcBuffer.CreateNew(16384);
string info = buffer1.DuplicateIntoProcess(Process.GetCurrentProcess().Handle);
using IpcBuffer buffer2 = IpcBuffer.OpenExisting(info);
await TestProducerConsumerAsync(buffer1.Writer, buffer2.Reader);
await TestProducerConsumerAsync(length => new SharedMemoryBuffer(length));
}
}
static async Task TestProducerConsumerAsync(IComputeBufferWriter writer, IComputeBufferReader reader)
static async Task TestProducerConsumerAsync(Func<int, IComputeBuffer> createBuffer)
{
const int Length = 8000;
Pipe sourceToTargetPipe = new Pipe();
Pipe targetToSourcePipe = new Pipe();
await using IComputeSocket sourceSocket = new ClientComputeSocket(new PipeTransport(targetToSourcePipe.Reader, sourceToTargetPipe.Writer), NullLogger.Instance);
await using IComputeSocket targetSocket = new ClientComputeSocket(new PipeTransport(sourceToTargetPipe.Reader, targetToSourcePipe.Writer), NullLogger.Instance);
using IComputeBufferWriter sourceWriter = sourceSocket.AttachSendBuffer(0, createBuffer(Length));
using IComputeBufferReader targetReader = targetSocket.AttachRecvBuffer(0, createBuffer(Length));
byte[] input = RandomNumberGenerator.GetBytes(Length);
Task producerTask = RunProducerAsync(writer, input);
Task producerTask = RunProducerAsync(sourceWriter, input);
byte[] output = new byte[Length];
await RunConsumerAsync(reader, output);
await RunConsumerAsync(targetReader, output);
await producerTask;
Assert.IsTrue(input.SequenceEqual(output));
@@ -72,7 +82,7 @@ namespace EpicGames.Horde.Tests
int length = Math.Min(memory.Length, 7);
memory.Slice(0, length).CopyTo(output.Slice(offset));
reader.Advance(length);
await reader.WaitAsync(memory.Length - length, CancellationToken.None);
await reader.WaitForDataAsync(memory.Length - length, CancellationToken.None);
offset += length;
}
}
@@ -74,7 +74,7 @@ namespace EpicGames.Horde.Tests
for (int offset = 0; offset < output.Length;)
{
offset += await aesTransport.ReadAsync(output.AsMemory(offset), CancellationToken.None);
offset += await aesTransport.ReadPartialAsync(output.AsMemory(offset), CancellationToken.None);
}
}
@@ -89,7 +89,7 @@ namespace EpicGames.Horde.Tests
{
for (int offset = 0; offset < buffer.Length;)
{
int size = await transport.ReadAsync(buffer.Slice(offset), CancellationToken.None);
int size = await transport.ReadPartialAsync(buffer.Slice(offset), CancellationToken.None);
Assert.IsTrue(size > 0);
offset += size;
}
@@ -1,286 +0,0 @@
// Copyright Epic Games, Inc. All Rights Reserved.
using System;
using System.Buffers.Binary;
using System.Globalization;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using EpicGames.Core;
using Microsoft.Win32.SafeHandles;
using System.IO.MemoryMappedFiles;
using System.Reflection;
using System.IO;
namespace EpicGames.Horde.Compute.Buffers
{
/// <summary>
/// Implementation of <see cref="IComputeBuffer"/> suitable for cross-process communication
/// </summary>
public sealed class IpcBuffer : ComputeBuffer
{
[StructLayout(LayoutKind.Sequential)]
class SECURITY_ATTRIBUTES
{
public int nLength;
public IntPtr lpSecurityDescriptor;
public int bInheritHandle;
}
class NativeEvent : WaitHandle
{
public NativeEvent(HandleInheritability handleInheritability)
{
SECURITY_ATTRIBUTES securityAttributes = new SECURITY_ATTRIBUTES();
securityAttributes.nLength = Marshal.SizeOf(securityAttributes);
securityAttributes.bInheritHandle = (handleInheritability == HandleInheritability.Inheritable)? 1 : 0;
SafeWaitHandle = CreateEvent(securityAttributes, false, false, null);
}
public NativeEvent(IntPtr handle, bool ownsHandle)
{
SafeWaitHandle = new SafeWaitHandle(handle, ownsHandle);
}
public void Set() => SetEvent(SafeWaitHandle);
[DllImport("kernel32.dll")]
static extern SafeWaitHandle CreateEvent(SECURITY_ATTRIBUTES lpEventAttributes, bool bManualReset, bool bInitialState, string? lpName);
[DllImport("kernel32.dll")]
static extern bool SetEvent(SafeWaitHandle hEvent);
}
const int HeaderLength = 32;
long ReadPosition
{
get => BinaryPrimitives.ReadInt64LittleEndian(_header.Span.Slice(8));
set => BinaryPrimitives.WriteInt64LittleEndian(_header.Span.Slice(8), value);
}
long WritePosition
{
get => BinaryPrimitives.ReadInt64LittleEndian(_header.Span.Slice(16));
set => BinaryPrimitives.WriteInt64LittleEndian(_header.Span.Slice(16), value);
}
bool FinishedWriting
{
get => _header.Span[0] != 0;
set => _header.Span[0] = value? (byte)1 : (byte)0;
}
readonly MemoryMappedFile _memoryMappedFile;
readonly MemoryMappedViewAccessor _memoryMappedViewAccessor;
readonly MemoryMappedView _memoryMappedView;
readonly Memory<byte> _header;
readonly Memory<byte> _memory;
readonly NativeEvent _writtenEvent;
readonly NativeEvent _flushedEvent;
private IpcBuffer(MemoryMappedFile memoryMappedFile, NativeEvent writtenEvent, NativeEvent flushedEvent)
{
_memoryMappedFile = memoryMappedFile;
_memoryMappedViewAccessor = _memoryMappedFile.CreateViewAccessor();
_memoryMappedView = new MemoryMappedView(_memoryMappedViewAccessor);
Memory<byte> memory = _memoryMappedView.GetMemory(0, (int)_memoryMappedViewAccessor.Capacity);
_header = memory.Slice(0, HeaderLength);
_memory = memory.Slice(HeaderLength);
_writtenEvent = writtenEvent;
_flushedEvent = flushedEvent;
}
/// <summary>
/// Creates a new IPC buffer with the given capacity
/// </summary>
/// <param name="capacity">Capacity of the buffer</param>
public static IpcBuffer CreateNew(long capacity)
{
MemoryMappedFile memoryMappedFile = MemoryMappedFile.CreateNew(null, HeaderLength + capacity, MemoryMappedFileAccess.ReadWrite, MemoryMappedFileOptions.None, HandleInheritability.Inheritable);
NativeEvent writtenEvent = new NativeEvent(HandleInheritability.Inheritable);
NativeEvent flushedEvent = new NativeEvent(HandleInheritability.Inheritable);
return new IpcBuffer(memoryMappedFile, writtenEvent, flushedEvent);
}
/// <summary>
/// Opens an IpcBuffer from a string passed in from another process
/// </summary>
/// <param name="handle">Descriptor for the buffer to open</param>
public static IpcBuffer OpenExisting(string handle)
{
string[] values = handle.Split(',');
if (values.Length != 3)
{
throw new ArgumentException("Expected three handle values for IPC buffer", nameof(handle));
}
IntPtr memoryHandle = new IntPtr((long)UInt64.Parse(values[0], NumberStyles.None));
MemoryMappedFile memoryMappedFile = OpenMemoryMappedFile(memoryHandle);
IntPtr writtenEventHandle = new IntPtr((long)UInt64.Parse(values[1], NumberStyles.None));
NativeEvent writtenEvent = new NativeEvent(writtenEventHandle, true);
IntPtr flushedEventHandle = new IntPtr((long)UInt64.Parse(values[2], NumberStyles.None));
NativeEvent flushedEvent = new NativeEvent(flushedEventHandle, true);
return new IpcBuffer(memoryMappedFile, writtenEvent, flushedEvent);
}
static MemoryMappedFile OpenMemoryMappedFile(IntPtr handle)
{
MethodInfo? setHandleInfo = typeof(SafeMemoryMappedFileHandle).GetMethod("SetHandle", BindingFlags.Instance | BindingFlags.NonPublic, new[] { typeof(IntPtr) });
if (setHandleInfo == null)
{
throw new InvalidOperationException("Cannot find SetHandle method for SafeMemoryMappedFileHandle");
}
ConstructorInfo? constructorInfo = typeof(MemoryMappedFile).GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic, null, new[] { typeof(SafeMemoryMappedFileHandle) }, null);
if (constructorInfo == null)
{
throw new InvalidOperationException("Cannot find private constructor for memory mapped file");
}
SafeMemoryMappedFileHandle safeHandle = new SafeMemoryMappedFileHandle();
setHandleInfo.Invoke(safeHandle, new object[] { handle });
return (MemoryMappedFile)constructorInfo.Invoke(new object[] { safeHandle });
}
/// <summary>
/// Gets the descriptor for the current buffer. This can be used for calls to <see cref="OpenExisting(String)"/> by newly spawned processes that inherit handles.
/// </summary>
/// <returns>Descriptor string for the buffer</returns>
public string GetHandle()
{
ulong targetMemoryHandle = (ulong)_memoryMappedFile.SafeMemoryMappedFileHandle.DangerousGetHandle().ToInt64();
ulong targetWrittenEventHandle = (ulong)_writtenEvent.SafeWaitHandle.DangerousGetHandle();
ulong targetFlushedEventHandle = (ulong)_flushedEvent.SafeWaitHandle.DangerousGetHandle();
return $"{targetMemoryHandle},{targetWrittenEventHandle},{targetFlushedEventHandle}";
}
/// <summary>
/// Duplicates handles for this buffer into the given process, and returns a string that the target process can use to open it.
/// </summary>
/// <param name="targetProcessHandle">Handle to the target process</param>
/// <returns>String that the target process can pass into a call to <see cref="OpenExisting(String)"/> to open the buffer.</returns>
public string DuplicateIntoProcess(IntPtr targetProcessHandle)
{
IntPtr sourceProcessHandle = GetCurrentProcess();
IntPtr targetMemoryHandle;
DuplicateHandle(sourceProcessHandle, _memoryMappedFile.SafeMemoryMappedFileHandle.DangerousGetHandle(), targetProcessHandle, out targetMemoryHandle, 0, false, DUPLICATE_SAME_ACCESS);
IntPtr targetWrittenEventHandle;
DuplicateHandle(sourceProcessHandle, _writtenEvent.SafeWaitHandle.DangerousGetHandle(), targetProcessHandle, out targetWrittenEventHandle, 0, false, DUPLICATE_SAME_ACCESS);
IntPtr targetFlushedEventHandle;
DuplicateHandle(sourceProcessHandle, _flushedEvent.SafeWaitHandle.DangerousGetHandle(), targetProcessHandle, out targetFlushedEventHandle, 0, false, DUPLICATE_SAME_ACCESS);
return $"{(ulong)targetMemoryHandle.ToInt64()},{(ulong)targetWrittenEventHandle.ToInt64()},{(ulong)targetFlushedEventHandle.ToInt64()}";
}
const uint DUPLICATE_SAME_ACCESS = 2;
[DllImport("kernel32.dll", SetLastError = true)]
static extern IntPtr GetCurrentProcess();
[DllImport("kernel32.dll", SetLastError = true)]
[return: MarshalAs(UnmanagedType.Bool)]
static extern bool DuplicateHandle(IntPtr hSourceProcessHandle, IntPtr hSourceHandle, IntPtr hTargetProcessHandle, out IntPtr lpTargetHandle, uint dwDesiredAccess, [MarshalAs(UnmanagedType.Bool)] bool bInheritHandle, uint dwOptions);
/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
_flushedEvent.Dispose();
_writtenEvent.Dispose();
_memoryMappedView.Dispose();
_memoryMappedViewAccessor.Dispose();
_memoryMappedFile.Dispose();
}
}
#region Reader
/// <inheritdoc/>
protected override bool FinishedReading() => FinishedWriting && ReadPosition == WritePosition;
/// <inheritdoc/>
protected override void AdvanceReadPosition(int size)
{
ReadPosition += size;
if (ReadPosition == WritePosition)
{
_flushedEvent.Set();
}
}
/// <inheritdoc/>
protected override ReadOnlyMemory<byte> GetReadMemory() => _memory.Slice((int)ReadPosition, (int)(WritePosition - ReadPosition));
/// <inheritdoc/>
protected override async ValueTask WaitAsync(int currentLength, CancellationToken cancellationToken)
{
long initialWritePosition = ReadPosition + currentLength;
for (; ; )
{
if (FinishedWriting || WritePosition > initialWritePosition)
{
return;
}
else
{
await _writtenEvent.WaitOneAsync(cancellationToken);
}
}
}
#endregion
#region Writer
/// <inheritdoc/>
protected override void FinishWriting()
{
FinishedWriting = true;
_writtenEvent.Set();
}
/// <inheritdoc/>
protected override void AdvanceWritePosition(int size)
{
if (FinishedWriting)
{
throw new InvalidOperationException("Cannot update write position after marking as complete");
}
WritePosition += size;
_writtenEvent.Set();
}
/// <inheritdoc/>
protected override Memory<byte> GetWriteMemory() => _memory.Slice((int)WritePosition);
/// <inheritdoc/>
protected override async ValueTask FlushWritesAsync(CancellationToken cancellationToken)
{
while (ReadPosition < WritePosition)
{
await _flushedEvent.WaitOneAsync(cancellationToken);
}
}
#endregion
}
}
@@ -8,9 +8,9 @@ using System.Threading.Tasks;
namespace EpicGames.Horde.Compute.Buffers
{
/// <summary>
/// In-process buffer used for compute messages
/// In-process buffer used to store compute messages
/// </summary>
public class HeapBuffer : ComputeBuffer
class PooledBuffer : ComputeBufferBase
{
readonly IMemoryOwner<byte> _memoryOwner;
readonly Memory<byte> _memory;
@@ -21,11 +21,14 @@ namespace EpicGames.Horde.Compute.Buffers
TaskCompletionSource? _writtenTcs;
TaskCompletionSource? _flushedTcs;
/// <inheritdoc/>
public override long Length => _memory.Length;
/// <summary>
/// Creates a local buffer with the given capacity
/// </summary>
/// <param name="capacity">Capacity of the buffer</param>
public HeapBuffer(int capacity)
public PooledBuffer(int capacity)
{
_memoryOwner = MemoryPool<byte>.Shared.Rent(capacity);
_memory = _memoryOwner.Memory.Slice(0, capacity);
@@ -42,13 +45,23 @@ namespace EpicGames.Horde.Compute.Buffers
}
}
/// <inheritdoc/>
public override Memory<byte> GetMemory(long offset, int length)
{
if (offset > Int32.MaxValue)
{
throw new ArgumentException("Offset is out of range", nameof(offset));
}
return _memory.Slice((int)offset, length);
}
#region Reader
/// <inheritdoc/>
protected override bool FinishedReading() => _finishedWriting && _readPosition == _writePosition;
public override bool FinishedReading() => _finishedWriting && _readPosition == _writePosition;
/// <inheritdoc/>
protected override void AdvanceReadPosition(int size)
public override void AdvanceReadPosition(int size)
{
_readPosition += size;
@@ -59,10 +72,10 @@ namespace EpicGames.Horde.Compute.Buffers
}
/// <inheritdoc/>
protected override ReadOnlyMemory<byte> GetReadMemory() => _memory.Slice(_readPosition, _writePosition - _readPosition);
public override ReadOnlyMemory<byte> GetReadMemory() => _memory.Slice(_readPosition, _writePosition - _readPosition);
/// <inheritdoc/>
protected override async ValueTask WaitAsync(int currentLength, CancellationToken cancellationToken)
public override async ValueTask WaitForDataAsync(int currentLength, CancellationToken cancellationToken)
{
int initialWritePosition = _readPosition + currentLength;
@@ -102,14 +115,14 @@ namespace EpicGames.Horde.Compute.Buffers
#region Writer
/// <inheritdoc/>
protected override void FinishWriting()
public override void FinishWriting()
{
_finishedWriting = true;
_writtenTcs?.TrySetResult();
}
/// <inheritdoc/>
protected override void AdvanceWritePosition(int size)
public override void AdvanceWritePosition(int size)
{
if (_finishedWriting)
{
@@ -121,10 +134,10 @@ namespace EpicGames.Horde.Compute.Buffers
}
/// <inheritdoc/>
protected override Memory<byte> GetWriteMemory() => _memory.Slice(_writePosition);
public override Memory<byte> GetWriteMemory() => _memory.Slice(_writePosition);
/// <inheritdoc/>
protected override async ValueTask FlushWritesAsync(CancellationToken cancellationToken)
public override async ValueTask FlushWritesAsync(CancellationToken cancellationToken)
{
if (_readPosition < _writePosition)
{
@@ -0,0 +1,289 @@
// Copyright Epic Games, Inc. All Rights Reserved.
using System;
using EpicGames.Core;
using Microsoft.Win32.SafeHandles;
using System.IO.MemoryMappedFiles;
using System.Reflection;
using System.IO;
using System.Buffers.Binary;
using System.Threading.Tasks;
using System.Threading;
using System.Linq;
using System.Globalization;
using System.ComponentModel;
namespace EpicGames.Horde.Compute.Buffers
{
/// <summary>
/// Implementation of <see cref="IComputeBuffer"/> suitable for cross-process communication
/// </summary>
public sealed class SharedMemoryBuffer : ComputeBufferBase
{
const int HeaderLength = 32;
long ReadPosition
{
get => BinaryPrimitives.ReadInt64LittleEndian(_header.Span.Slice(8));
set => BinaryPrimitives.WriteInt64LittleEndian(_header.Span.Slice(8), value);
}
long WritePosition
{
get => BinaryPrimitives.ReadInt64LittleEndian(_header.Span.Slice(16));
set => BinaryPrimitives.WriteInt64LittleEndian(_header.Span.Slice(16), value);
}
bool FinishedWriting
{
get => _header.Span[24] != 0;
set => _header.Span[24] = value ? (byte)1 : (byte)0;
}
const int ChunkOffsetPow2 = 10;
readonly MemoryMappedFile _memoryMappedFile;
readonly MemoryMappedViewAccessor _memoryMappedViewAccessor;
readonly MemoryMappedView _memoryMappedView;
readonly long _length;
readonly Memory<byte> _header;
readonly Memory<byte> _memory; // TODO: DEPRECATE
readonly Memory<byte>[] _chunks;
readonly Native.EventHandle _writtenEvent;
readonly Native.EventHandle _flushedEvent;
/// <inheritdoc/>
public override long Length => _length;
/// <summary>
/// Creates a new shared memory buffer with the given capacity
/// </summary>
/// <param name="capacity">Capacity of the buffer</param>
public SharedMemoryBuffer(long capacity)
: this(MemoryMappedFile.CreateNew(null, capacity, MemoryMappedFileAccess.ReadWrite, MemoryMappedFileOptions.None, HandleInheritability.Inheritable))
{
}
/// <summary>
///
/// </summary>
/// <param name="buffer"></param>
public SharedMemoryBuffer(MemoryMappedFile buffer)
: this(buffer, new Native.EventHandle(HandleInheritability.Inheritable), new Native.EventHandle(HandleInheritability.Inheritable))
{
}
/// <summary>
/// Creates a shared memory buffer from a memory mapped file
/// </summary>
internal SharedMemoryBuffer(MemoryMappedFile memoryMappedFile, Native.EventHandle writtenEvent, Native.EventHandle flushedEvent)
{
_memoryMappedFile = memoryMappedFile;
_memoryMappedViewAccessor = memoryMappedFile.CreateViewAccessor();
_memoryMappedView = new MemoryMappedView(_memoryMappedViewAccessor);
_writtenEvent = writtenEvent;
_flushedEvent = flushedEvent;
_header = _memoryMappedView.GetMemory(0, HeaderLength);
_length = _memoryMappedViewAccessor.Capacity;
_memory = _memoryMappedView.GetMemory(HeaderLength, (int)(_length - HeaderLength)); // TODO: REMOVE
int chunkCount = (int)((_length + ((1 << ChunkOffsetPow2) - 1)) >> ChunkOffsetPow2);
_chunks = new Memory<byte>[chunkCount];
for (int chunkIdx = 0; chunkIdx < chunkCount; chunkIdx++)
{
long offset = (chunkIdx << ChunkOffsetPow2) + HeaderLength;
int length = (int)Math.Min(Length - offset, Int32.MaxValue);
_chunks[chunkIdx] = _memoryMappedView.GetMemory(offset, length);
}
}
/// <summary>
/// Opens an IpcBuffer from a string passed in from another process
/// </summary>
/// <param name="handle">Descriptor for the buffer to open</param>
public static MemoryMappedFile OpenMemoryMappedFileFromHandle(IntPtr handle)
{
MethodInfo? setHandleInfo = typeof(SafeMemoryMappedFileHandle).GetMethod("SetHandle", BindingFlags.Instance | BindingFlags.NonPublic, new[] { typeof(IntPtr) });
if (setHandleInfo == null)
{
throw new InvalidOperationException("Cannot find SetHandle method for SafeMemoryMappedFileHandle");
}
ConstructorInfo? constructorInfo = typeof(MemoryMappedFile).GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic, null, new[] { typeof(SafeMemoryMappedFileHandle) }, null);
if (constructorInfo == null)
{
throw new InvalidOperationException("Cannot find private constructor for memory mapped file");
}
SafeMemoryMappedFileHandle safeHandle = new SafeMemoryMappedFileHandle();
setHandleInfo.Invoke(safeHandle, new object[] { handle });
return (MemoryMappedFile)constructorInfo.Invoke(new object[] { safeHandle });
}
/// <summary>
/// Opens a <see cref="SharedMemoryBuffer"/> from handles returned by <see cref="GetIpcHandle()"/>
/// </summary>
public static SharedMemoryBuffer OpenIpcHandle(string handle)
{
IntPtr[] handles = handle.Split('.').Select(x => new IntPtr((long)UInt64.Parse(x, NumberStyles.None, null))).ToArray();
if (handles.Length != 3)
{
throw new ArgumentException($"Malformed ipc handle string: {handle}", nameof(handle));
}
MemoryMappedFile memoryMappedFile = OpenMemoryMappedFileFromHandle(handles[0]);
Native.EventHandle writtenEvent = new Native.EventHandle(handles[1], true);
Native.EventHandle flushedEvent = new Native.EventHandle(handles[2], true);
return new SharedMemoryBuffer(memoryMappedFile, writtenEvent, flushedEvent);
}
/// <summary>
/// Gets a string that can be used to open the same buffer in another process
/// </summary>
public string GetIpcHandle()
{
IntPtr memoryHandle = _memoryMappedFile.SafeMemoryMappedFileHandle.DangerousGetHandle();
IntPtr writtenEventHandle = _writtenEvent.SafeWaitHandle.DangerousGetHandle();
IntPtr flushedEventHandle = _flushedEvent.SafeWaitHandle.DangerousGetHandle();
return GetIpcHandle(memoryHandle, writtenEventHandle, flushedEventHandle);
}
/// <summary>
/// Duplicates handles for this buffer into the given process, and returns a string that the target process can use to open it.
/// </summary>
/// <param name="targetProcessHandle">Handle to the target process</param>
/// <returns>String that the target process can pass into a call to <see cref="OpenIpcHandle(String)"/> to open the buffer.</returns>
public string GetIpcHandle(IntPtr targetProcessHandle)
{
IntPtr sourceProcessHandle = Native.GetCurrentProcess();
IntPtr targetMemoryHandle;
if (!Native.DuplicateHandle(sourceProcessHandle, _memoryMappedFile.SafeMemoryMappedFileHandle.DangerousGetHandle(), targetProcessHandle, out targetMemoryHandle, 0, false, Native.DUPLICATE_SAME_ACCESS))
{
throw new Win32Exception();
}
IntPtr targetWrittenEventHandle;
if (!Native.DuplicateHandle(sourceProcessHandle, _writtenEvent.SafeWaitHandle.DangerousGetHandle(), targetProcessHandle, out targetWrittenEventHandle, 0, false, Native.DUPLICATE_SAME_ACCESS))
{
throw new Win32Exception();
}
IntPtr targetFlushedEventHandle;
if(!Native.DuplicateHandle(sourceProcessHandle, _flushedEvent.SafeWaitHandle.DangerousGetHandle(), targetProcessHandle, out targetFlushedEventHandle, 0, false, Native.DUPLICATE_SAME_ACCESS))
{
throw new Win32Exception();
}
return GetIpcHandle(targetMemoryHandle, targetWrittenEventHandle, targetFlushedEventHandle);
}
static string GetIpcHandle(IntPtr memoryHandle, IntPtr writtenEventHandle, IntPtr flushedEventHandle)
{
return $"{(ulong)memoryHandle.ToInt64()}.{(ulong)writtenEventHandle.ToInt64()}.{(ulong)flushedEventHandle.ToInt64()}";
}
/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
_flushedEvent.Dispose();
_writtenEvent.Dispose();
_memoryMappedView.Dispose();
_memoryMappedViewAccessor.Dispose();
_memoryMappedFile.Dispose();
}
}
/// <inheritdoc/>
public override Memory<byte> GetMemory(long offset, int length)
{
int chunkIdx = (int)(offset >> ChunkOffsetPow2);
long baseOffset = offset - (chunkIdx << ChunkOffsetPow2);
return _chunks[chunkIdx].Slice((int)(offset - baseOffset), length);
}
#region Reader
/// <inheritdoc/>
public override bool FinishedReading() => FinishedWriting && ReadPosition == WritePosition;
/// <inheritdoc/>
public override void AdvanceReadPosition(int size)
{
ReadPosition += size;
if (ReadPosition == WritePosition)
{
_flushedEvent.Set();
}
}
/// <inheritdoc/>
public override ReadOnlyMemory<byte> GetReadMemory() => _memory.Slice((int)ReadPosition, (int)(WritePosition - ReadPosition));
/// <inheritdoc/>
public override async ValueTask WaitForDataAsync(int currentLength, CancellationToken cancellationToken)
{
long initialWritePosition = ReadPosition + currentLength;
for (; ; )
{
if (FinishedWriting || WritePosition > initialWritePosition)
{
return;
}
else
{
await _writtenEvent.WaitOneAsync(cancellationToken);
}
}
}
#endregion
#region Writer
/// <inheritdoc/>
public override void FinishWriting()
{
FinishedWriting = true;
_writtenEvent.Set();
}
/// <inheritdoc/>
public override void AdvanceWritePosition(int size)
{
if (FinishedWriting)
{
throw new InvalidOperationException("Cannot update write position after marking as complete");
}
WritePosition += size;
_writtenEvent.Set();
}
/// <inheritdoc/>
public override Memory<byte> GetWriteMemory() => _memory.Slice((int)WritePosition);
/// <inheritdoc/>
public override async ValueTask FlushWritesAsync(CancellationToken cancellationToken)
{
while (ReadPosition < WritePosition)
{
await _flushedEvent.WaitOneAsync(cancellationToken);
}
}
#endregion
}
}
@@ -0,0 +1,113 @@
// Copyright Epic Games, Inc. All Rights Reserved.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using EpicGames.Core;
using EpicGames.Horde.Compute.Transports;
using Microsoft.Extensions.Logging;
namespace EpicGames.Horde.Compute.Clients
{
/// <summary>
/// Runs a local Horde Agent process to process compute requests without communicating with a server
/// </summary>
public sealed class AgentComputeClient : IComputeClient
{
readonly string _hordeAgentAssembly;
readonly int _port;
readonly ILogger _logger;
/// <summary>
/// Constructor
/// </summary>
/// <param name="hordeAgentFile">Path to the Horde Agent assembly</param>
/// <param name="port">Loopback port to connect on</param>
/// <param name="logger">Factory for logger instances</param>
public AgentComputeClient(string hordeAgentFile, int port, ILogger logger)
{
_hordeAgentAssembly = hordeAgentFile;
_port = port;
_logger = logger;
}
/// <inheritdoc/>
public ValueTask DisposeAsync() => new ValueTask();
/// <inheritdoc/>
public async Task<TResult> ExecuteAsync<TResult>(ClusterId clusterId, Requirements? requirements, Func<IComputeLease, CancellationToken, Task<TResult>> handler, CancellationToken cancellationToken)
{
_logger.LogInformation("** CLIENT **");
_logger.LogInformation("Launching {Path} to handle remote", _hordeAgentAssembly);
TResult result;
using (Socket listener = new Socket(SocketType.Stream, ProtocolType.IP))
{
listener.Bind(new IPEndPoint(IPAddress.Loopback, _port));
listener.Listen();
using CancellationTokenSource cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
Task<TResult> listenTask = ListenAsync<TResult>(listener, handler, cancellationSource.Token);
using (ManagedProcessGroup group = new ManagedProcessGroup())
{
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
List<string> arguments = new List<string>();
arguments.Add(_hordeAgentAssembly);
arguments.Add("computeworker");
arguments.Add($"-port={_port}");
using ManagedProcess process = new ManagedProcess(group, "dotnet", CommandLineArguments.Join(arguments), null, null, ProcessPriorityClass.Normal);
string? line;
while ((line = await process.ReadLineAsync(cancellationToken)) != null)
{
_logger.LogInformation("Output: {Line}", line);
}
await process.WaitForExitAsync(cancellationToken);
}
else
{
using Process process = new Process();
process.StartInfo.FileName = "dotnet";
process.StartInfo.ArgumentList.Add(_hordeAgentAssembly);
process.StartInfo.ArgumentList.Add("computeworker");
process.StartInfo.ArgumentList.Add($"-port={_port}");
process.StartInfo.UseShellExecute = true;
process.Start();
await process.WaitForExitAsync(cancellationToken);
}
}
cancellationSource.Cancel();
result = await listenTask;
}
_logger.LogInformation("Client terminated.");
return result;
}
async Task<TResult> ListenAsync<TResult>(Socket listener, Func<IComputeLease, CancellationToken, Task<TResult>> handler, CancellationToken cancellationToken)
{
TResult result;
using (Socket tcpSocket = await listener.AcceptAsync(cancellationToken))
{
await using (ClientComputeSocket socket = new ClientComputeSocket(new TcpTransport(tcpSocket), _logger))
{
await using ComputeLease lease = new ComputeLease(new List<string>(), new Dictionary<string, int>(), socket);
result = await handler(lease, cancellationToken);
await socket.CloseAsync(cancellationToken);
}
}
return result;
}
}
}
@@ -13,30 +13,30 @@ using Microsoft.Extensions.Logging;
namespace EpicGames.Horde.Compute.Clients
{
/// <summary>
/// Implementation of <see cref="IComputeChannel"/> which marshals data over a loopback connection to a method running on a background task in the same process.
/// Implementation of <see cref="IComputeClient"/> which marshals data over a loopback connection to a method running on a background task in the same process.
/// </summary>
public sealed class LocalComputeClient : IComputeClient
{
readonly BackgroundTask _listenerTask;
readonly Socket _listener;
readonly Socket _socket;
readonly ILoggerFactory _loggerFactory;
readonly ILogger _logger;
/// <summary>
/// Constructor
/// </summary>
/// <param name="serverFunc">Callback to process the server side of the connection</param>
/// <param name="port">Port to connect on</param>
/// <param name="loggerFactory">Logger for diagnostic output</param>
public LocalComputeClient(Func<IComputeSocket, CancellationToken, Task> serverFunc, int port, ILoggerFactory loggerFactory)
/// <param name="logger">Logger for diagnostic output</param>
public LocalComputeClient(Func<IComputeSocket, CancellationToken, Task> serverFunc, int port, ILogger logger)
{
_loggerFactory = loggerFactory;
_logger = logger;
_listener = new Socket(SocketType.Stream, ProtocolType.IP);
_listener.Bind(new IPEndPoint(IPAddress.Loopback, port));
_listener.Listen();
_listenerTask = BackgroundTask.StartNew(ctx => RunListenerAsync(_listener, serverFunc, loggerFactory, ctx));
_listenerTask = BackgroundTask.StartNew(ctx => RunListenerAsync(_listener, serverFunc, logger, ctx));
_socket = new Socket(SocketType.Stream, ProtocolType.IP);
_socket.Connect(IPAddress.Loopback, port);
@@ -53,11 +53,11 @@ namespace EpicGames.Horde.Compute.Clients
/// <summary>
/// Sets up the loopback listener and calls the server method
/// </summary>
static async Task RunListenerAsync(Socket listener, Func<IComputeSocket, CancellationToken, Task> serverFunc, ILoggerFactory loggerFactory, CancellationToken cancellationToken)
static async Task RunListenerAsync(Socket listener, Func<IComputeSocket, CancellationToken, Task> serverFunc, ILogger logger, CancellationToken cancellationToken)
{
using Socket tcpSocket = await listener.AcceptAsync(cancellationToken);
await using (ComputeSocket socket = new ComputeSocket(new TcpTransport(tcpSocket), loggerFactory))
await using (ClientComputeSocket socket = new ClientComputeSocket(new TcpTransport(tcpSocket), logger))
{
await serverFunc(socket, cancellationToken);
await socket.CloseAsync(cancellationToken);
@@ -67,7 +67,7 @@ namespace EpicGames.Horde.Compute.Clients
/// <inheritdoc/>
public async Task<TResult> ExecuteAsync<TResult>(ClusterId clusterId, Requirements? requirements, Func<IComputeLease, CancellationToken, Task<TResult>> handler, CancellationToken cancellationToken)
{
await using ComputeSocket socket = new ComputeSocket(new TcpTransport(_socket), _loggerFactory);
await using ClientComputeSocket socket = new ClientComputeSocket(new TcpTransport(_socket), _logger);
await using ComputeLease lease = new ComputeLease(new List<string>(), new Dictionary<string, int>(), socket);
TResult result = await handler(lease, cancellationToken);
await socket.CloseAsync(cancellationToken);
@@ -7,6 +7,7 @@ using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Net.Http.Json;
using System.Net.Sockets;
using System.Threading;
@@ -42,15 +43,15 @@ namespace EpicGames.Horde.Compute.Clients
readonly HttpClient? _defaultHttpClient;
readonly Func<HttpClient> _createHttpClient;
readonly CancellationTokenSource _cancellationSource = new CancellationTokenSource();
readonly ILoggerFactory _loggerFactory;
readonly ILogger _logger;
/// <summary>
/// Constructor
/// </summary>
/// <param name="serverUri">Uri of the server to connect to</param>
/// <param name="loggerFactory">Logger for diagnostic messages</param>
public ServerComputeClient(Uri serverUri, ILoggerFactory loggerFactory)
/// <param name="authHeader">Authentication header</param>
/// <param name="logger">Logger for diagnostic messages</param>
public ServerComputeClient(Uri serverUri, AuthenticationHeaderValue? authHeader, ILogger logger)
{
#pragma warning disable CA2000 // Dispose objects before losing scope
// This is disposed via HttpClient
@@ -59,23 +60,22 @@ namespace EpicGames.Horde.Compute.Clients
_defaultHttpClient = new HttpClient(handler, true);
_defaultHttpClient.BaseAddress = serverUri;
_defaultHttpClient.DefaultRequestHeaders.Authorization = authHeader;
#pragma warning restore CA2000 // Dispose objects before losing scope
_createHttpClient = GetDefaultHttpClient;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<ServerComputeClient>();
_logger = logger;
}
/// <summary>
/// Constructor
/// </summary>
/// <param name="createHttpClient">Creates an HTTP client with the correct base address for the server</param>
/// <param name="loggerFactory">Logger for diagnostic messages</param>
public ServerComputeClient(Func<HttpClient> createHttpClient, ILoggerFactory loggerFactory)
/// <param name="logger">Logger for diagnostic messages</param>
public ServerComputeClient(Func<HttpClient> createHttpClient, ILogger logger)
{
_createHttpClient = createHttpClient;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<ServerComputeClient>();
_logger = logger;
}
/// <summary>
@@ -134,7 +134,7 @@ namespace EpicGames.Horde.Compute.Clients
// Pass the rest of the call over to the handler
byte[] key = StringUtils.ParseHexString(responseMessage.Key);
await using ComputeSocket computeSocket = new ComputeSocket(new TcpTransport(socket), _loggerFactory);
await using ClientComputeSocket computeSocket = new ClientComputeSocket(new TcpTransport(socket), _logger);
await using ComputeLease lease = new ComputeLease(responseMessage.Properties, responseMessage.AssignedResources, computeSocket);
return await handler(lease, cancellationToken);
}
@@ -1,112 +0,0 @@
// Copyright Epic Games, Inc. All Rights Reserved.
using System;
using System.Threading;
using System.Threading.Tasks;
namespace EpicGames.Horde.Compute
{
/// <summary>
/// Base implementation for <see cref="IComputeBuffer"/>, which offers reader and writer objects that redirect to the owner
/// </summary>
public abstract class ComputeBuffer : IComputeBuffer
{
class ReaderImpl : IComputeBufferReader
{
readonly ComputeBuffer _owner;
public ReaderImpl(ComputeBuffer owner) => _owner = owner;
/// <inheritdoc/>
public bool IsComplete => _owner.FinishedReading();
/// <inheritdoc/>
public void Advance(int size) => _owner.AdvanceReadPosition(size);
/// <inheritdoc/>
public ReadOnlyMemory<byte> GetMemory() => _owner.GetReadMemory();
/// <inheritdoc/>
public ValueTask WaitAsync(int currentLength, CancellationToken cancellationToken) => _owner.WaitAsync(currentLength, cancellationToken);
}
class WriterImpl : IComputeBufferWriter
{
readonly ComputeBuffer _owner;
public WriterImpl(ComputeBuffer owner) => _owner = owner;
/// <inheritdoc/>
public void MarkComplete() => _owner.FinishWriting();
/// <inheritdoc/>
public void Advance(int size) => _owner.AdvanceWritePosition(size);
/// <inheritdoc/>
public Memory<byte> GetMemory() => _owner.GetWriteMemory();
/// <inheritdoc/>
public ValueTask FlushAsync(CancellationToken cancellationToken) => _owner.FlushWritesAsync(cancellationToken);
}
/// <inheritdoc/>
public IComputeBufferReader Reader { get; }
/// <inheritdoc/>
public IComputeBufferWriter Writer { get; }
/// <summary>
/// Constructor
/// </summary>
protected ComputeBuffer()
{
Reader = new ReaderImpl(this);
Writer = new WriterImpl(this);
}
/// <inheritdoc/>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Standard dispose pattern
/// </summary>
protected virtual void Dispose(bool disposing)
{
}
#region Reader
/// <inheritdoc cref="IComputeBufferReader.IsComplete"/>
protected abstract bool FinishedReading();
/// <inheritdoc cref="IComputeBufferReader.Advance(Int32)"/>
protected abstract void AdvanceReadPosition(int size);
/// <inheritdoc cref="IComputeBufferReader.GetMemory"/>
protected abstract ReadOnlyMemory<byte> GetReadMemory();
/// <inheritdoc cref="IComputeBufferReader.WaitAsync(Int32, CancellationToken)"/>
protected abstract ValueTask WaitAsync(int currentLength, CancellationToken cancellationToken);
#endregion
#region Writer
/// <inheritdoc cref="IComputeBufferWriter.MarkComplete"/>
protected abstract void FinishWriting();
/// <inheritdoc cref="IComputeBufferWriter.Advance(Int32)"/>
protected abstract void AdvanceWritePosition(int size);
/// <inheritdoc cref="IComputeBufferWriter.GetMemory()"/>
protected abstract Memory<byte> GetWriteMemory();
/// <inheritdoc cref="IComputeBufferWriter.FlushAsync(CancellationToken)"/>
protected abstract ValueTask FlushWritesAsync(CancellationToken cancellationToken);
#endregion
}
}
@@ -0,0 +1,155 @@
// Copyright Epic Games, Inc. All Rights Reserved.
using System;
using System.Threading;
using System.Threading.Tasks;
namespace EpicGames.Horde.Compute
{
/// <summary>
/// Base implementation for <see cref="IComputeBuffer"/>, which offers reader and writer objects that redirect to the owner
/// </summary>
public abstract class ComputeBufferBase : IComputeBuffer
{
class ReaderImpl : IComputeBufferReader
{
readonly ComputeBufferBase _buffer;
/// <inheritdoc/>
public IComputeBuffer Buffer => _buffer;
public ReaderImpl(ComputeBufferBase outer) => _buffer = outer;
/// <inheritdoc/>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
///
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
}
/// <inheritdoc/>
public bool IsComplete => _buffer.FinishedReading();
/// <inheritdoc/>
public void Advance(int size) => _buffer.AdvanceReadPosition(size);
/// <inheritdoc/>
public ReadOnlyMemory<byte> GetMemory() => _buffer.GetReadMemory();
/// <inheritdoc/>
public ValueTask WaitForDataAsync(int currentLength, CancellationToken cancellationToken) => _buffer.WaitForDataAsync(currentLength, cancellationToken);
}
class WriterImpl : IComputeBufferWriter
{
readonly ComputeBufferBase _buffer;
/// <inheritdoc/>
public IComputeBuffer Buffer => _buffer;
public WriterImpl(ComputeBufferBase buffer) => _buffer = buffer;
/// <inheritdoc/>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
///
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
_buffer.FinishWriting();
}
/// <inheritdoc/>
public void MarkComplete() => _buffer.FinishWriting();
/// <inheritdoc/>
public void Advance(int size) => _buffer.AdvanceWritePosition(size);
/// <inheritdoc/>
public Memory<byte> GetMemory() => _buffer.GetWriteMemory();
/// <inheritdoc/>
public ValueTask FlushAsync(CancellationToken cancellationToken) => _buffer.FlushWritesAsync(cancellationToken);
}
/// <inheritdoc/>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <inheritdoc/>
public abstract long Length { get; }
/// <inheritdoc/>
public IComputeBufferReader Reader { get; }
/// <inheritdoc/>
public IComputeBufferWriter Writer { get; }
/// <summary>
/// Default constructor
/// </summary>
protected ComputeBufferBase()
{
Reader = new ReaderImpl(this);
Writer = new WriterImpl(this);
}
/// <summary>
/// Standard dispose pattern
/// </summary>
protected virtual void Dispose(bool disposing)
{
}
/// <inheritdoc/>
public abstract Memory<byte> GetMemory(long offset, int length);
#region Reader
/// <inheritdoc cref="IComputeBufferReader.IsComplete"/>
public abstract bool FinishedReading();
/// <inheritdoc cref="IComputeBufferReader.Advance(Int32)"/>
public abstract void AdvanceReadPosition(int size);
/// <inheritdoc cref="IComputeBufferReader.GetMemory"/>
public abstract ReadOnlyMemory<byte> GetReadMemory();
/// <inheritdoc cref="IComputeBufferReader.WaitForDataAsync(Int32, CancellationToken)"/>
public abstract ValueTask WaitForDataAsync(int currentLength, CancellationToken cancellationToken);
#endregion
#region Writer
/// <inheritdoc cref="IComputeBufferWriter.MarkComplete"/>
public abstract void FinishWriting();
/// <inheritdoc cref="IComputeBufferWriter.Advance(Int32)"/>
public abstract void AdvanceWritePosition(int size);
/// <inheritdoc cref="IComputeBufferWriter.GetMemory()"/>
public abstract Memory<byte> GetWriteMemory();
/// <inheritdoc cref="IComputeBufferWriter.FlushAsync(CancellationToken)"/>
public abstract ValueTask FlushWritesAsync(CancellationToken cancellationToken);
#endregion
}
}
@@ -13,7 +13,7 @@ namespace EpicGames.Horde.Compute
/// <summary>
/// Implementation of a compute channel
/// </summary>
public class ComputeChannel : IComputeChannel, IAsyncDisposable
public class ComputeMessageChannel : IComputeMessageChannel, IAsyncDisposable
{
// Length of a message header. Consists of a 1 byte type field, followed by 4 byte length field.
const int HeaderLength = 5;
@@ -59,7 +59,7 @@ namespace EpicGames.Horde.Compute
/// </summary>
class MessageBuilder : IComputeMessageBuilder
{
readonly ComputeChannel _channel;
readonly ComputeMessageChannel _channel;
readonly IComputeBufferWriter _sendBufferWriter;
readonly ComputeMessageType _type;
int _length;
@@ -67,7 +67,7 @@ namespace EpicGames.Horde.Compute
/// <inheritdoc/>
public int Length => _length;
public MessageBuilder(ComputeChannel channel, IComputeBufferWriter sendBufferWriter, ComputeMessageType type)
public MessageBuilder(ComputeMessageChannel channel, IComputeBufferWriter sendBufferWriter, ComputeMessageType type)
{
_channel = channel;
_sendBufferWriter = sendBufferWriter;
@@ -110,6 +110,8 @@ namespace EpicGames.Horde.Compute
readonly IComputeBufferReader _receiveBufferReader;
readonly IComputeBufferWriter _sendBufferWriter;
// Can lock chunked memory writer to acuqire pointer
readonly ILogger _logger;
#pragma warning disable CA2213
@@ -122,7 +124,7 @@ namespace EpicGames.Horde.Compute
/// <param name="receiveBufferReader">Reader for incoming messages</param>
/// <param name="sendBufferWriter">Writer for outgoing messages</param>
/// <param name="logger">Logger for diagnostic output</param>
public ComputeChannel(IComputeBufferReader receiveBufferReader, IComputeBufferWriter sendBufferWriter, ILogger logger)
public ComputeMessageChannel(IComputeBufferReader receiveBufferReader, IComputeBufferWriter sendBufferWriter, ILogger logger)
{
_receiveBufferReader = receiveBufferReader;
_sendBufferWriter = sendBufferWriter;
@@ -144,6 +146,7 @@ namespace EpicGames.Horde.Compute
_currentBuilder?.Dispose();
_sendBufferWriter.MarkComplete();
await _sendBufferWriter.FlushAsync(CancellationToken.None);
_sendBufferWriter.Dispose();
}
/// <inheritdoc/>
@@ -167,7 +170,7 @@ namespace EpicGames.Horde.Compute
return message;
}
}
await _receiveBufferReader.WaitAsync(memory.Length, cancellationToken);
await _receiveBufferReader.WaitForDataAsync(memory.Length, cancellationToken);
}
return new Message(ComputeMessageType.None, ReadOnlyMemory<byte>.Empty);
}
@@ -183,11 +186,11 @@ namespace EpicGames.Horde.Compute
{
bytes.Append("..");
}
_logger.LogTrace("{Verb} {Type,-22} [{Length,8:n0}] = {Bytes}", verb, type, data.Length, bytes.ToString());
_logger.LogTrace("{Verb} {Type,-22} [{Length,10:n0}] = {Bytes}", verb, type, data.Length, bytes.ToString());
}
/// <inheritdoc/>
public IComputeMessageBuilder CreateMessage(ComputeMessageType type, int sizeHint = 0)
public IComputeMessageBuilder CreateMessage(ComputeMessageType type, int maxSize)
{
if (_currentBuilder != null)
{
@@ -13,22 +13,21 @@ using Microsoft.Extensions.Logging;
namespace EpicGames.Horde.Compute
{
/// <summary>
/// Implementation of a compute lease using sockets to transfer data
/// Manages a set of readers and writers to buffers across a transport layer
/// </summary>
public sealed class ComputeSocket : IComputeSocket, IAsyncDisposable
public abstract class ComputeSocketBase : IComputeSocket, IAsyncDisposable
{
readonly object _lockObject = new object();
bool _complete;
readonly IComputeTransport _transport;
readonly ILoggerFactory _loggerFactory;
readonly ILogger _logger;
readonly Task _readTask;
CancellationTokenSource _cancellationSource = new CancellationTokenSource();
readonly Dictionary<int, IComputeBufferWriter> _receiveBuffers = new Dictionary<int, IComputeBufferWriter>();
readonly AsyncEvent _receiveBuffersChangedEvent = new AsyncEvent();
readonly AsyncEvent _recvBufferWritersChangedEvent = new AsyncEvent();
readonly SemaphoreSlim _sendSemaphore = new SemaphoreSlim(1, 1);
readonly Dictionary<int, IComputeBufferReader> _sendBuffers = new Dictionary<int, IComputeBufferReader>();
@@ -38,12 +37,11 @@ namespace EpicGames.Horde.Compute
/// Constructor
/// </summary>
/// <param name="transport">Transport to communicate with the remote</param>
/// <param name="loggerFactory">Logger for trace output</param>
public ComputeSocket(IComputeTransport transport, ILoggerFactory loggerFactory)
/// <param name="logger">Logger for trace output</param>
protected ComputeSocketBase(IComputeTransport transport, ILogger logger)
{
_transport = transport;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<ComputeSocket>();
_logger = logger;
_readTask = RunReaderAsync(transport, _cancellationSource.Token);
}
@@ -72,7 +70,7 @@ namespace EpicGames.Horde.Compute
}
/// <inheritdoc/>
public async ValueTask DisposeAsync()
public virtual async ValueTask DisposeAsync()
{
if (_cancellationSource != null)
{
@@ -86,6 +84,7 @@ namespace EpicGames.Horde.Compute
}
_sendSemaphore.Dispose();
GC.SuppressFinalize(this);
}
async Task RunReaderAsync(IComputeTransport transport, CancellationToken cancellationToken)
@@ -98,12 +97,13 @@ namespace EpicGames.Horde.Compute
// Maintain a local cache of buffers to be able to query for them without having to acquire a global lock
Dictionary<int, IComputeBufferWriter> cachedWriters = new Dictionary<int, IComputeBufferWriter>();
Memory<byte> last = Memory<byte>.Empty;
// Process messages from the remote
for (; ; )
{
// Read the next packet header
int bytesRead = await transport.ReadAsync(header, cancellationToken);
if (bytesRead == 0)
if (!await transport.ReadOptionalAsync(header, cancellationToken))
{
_logger.LogTrace("End of socket");
break;
@@ -112,7 +112,6 @@ namespace EpicGames.Horde.Compute
// Parse the target buffer and packet size
int id = BinaryPrimitives.ReadInt32LittleEndian(header);
int size = BinaryPrimitives.ReadInt32LittleEndian(header.AsSpan(4));
_logger.LogTrace("Read {ChannelId} -> {Size} bytes", id, size);
// If the size if negative, we're closing the entire connection
if (size == 0)
@@ -125,12 +124,12 @@ namespace EpicGames.Horde.Compute
if (size < 0)
{
_logger.LogTrace("Detaching buffer {Id}", id);
DetachReceiveBuffer(cachedWriters, id);
DetachRecvBufferWriter(cachedWriters, id);
}
else
{
IComputeBufferWriter writer = await GetReceiveBufferAsync(cachedWriters, id);
await transport.ReadAsync(writer.GetMemory().Slice(0, size), cancellationToken);
await transport.ReadFullAsync(writer.GetMemory().Slice(0, size), cancellationToken);
writer.Advance(size);
}
}
@@ -174,7 +173,7 @@ namespace EpicGames.Horde.Compute
cachedWriters.Add(id, writer);
return writer;
}
waitTask = _receiveBuffersChangedEvent.Task;
waitTask = _recvBufferWritersChangedEvent.Task;
}
timer ??= Stopwatch.StartNew();
@@ -217,7 +216,7 @@ namespace EpicGames.Horde.Compute
for (; ; )
{
// Wait for something to read
await reader.WaitAsync(0, cancellationToken);
await reader.WaitForDataAsync(0, cancellationToken);
// Acquire the semaphore
await _sendSemaphore.WaitAsync(cancellationToken);
@@ -230,7 +229,6 @@ namespace EpicGames.Horde.Compute
bodySegment.Set(memory, null, header.Length);
ReadOnlySequence<byte> sequence = new ReadOnlySequence<byte>(headerSegment, 0, bodySegment, memory.Length);
await _transport.WriteAsync(sequence, cancellationToken);
reader.Advance(memory.Length);
@@ -257,44 +255,39 @@ namespace EpicGames.Horde.Compute
}
/// <inheritdoc/>
public void AttachBuffers(int channelId, IComputeBufferReader? sendBufferReader, IComputeBufferWriter? receiveBufferWriter)
public abstract IComputeBuffer CreateBuffer(long capacity);
/// <summary>
/// Registers the write end of an attached receive buffer
/// </summary>
/// <param name="channelId">Channel id</param>
/// <param name="recvBufferWriter">Writer for the receive buffer</param>
public void AttachRecvBuffer(int channelId, IComputeBufferWriter recvBufferWriter)
{
bool complete;
lock (_lockObject)
{
complete = _complete;
if (sendBufferReader != null)
if (!complete)
{
_sendBuffers.Add(channelId, sendBufferReader);
_sendTasks.Add(channelId, Task.Run(() => SendFromBufferAsync(channelId, sendBufferReader, _cancellationSource.Token), _cancellationSource.Token));
}
if (receiveBufferWriter != null && !complete)
{
_receiveBuffers.Add(channelId, receiveBufferWriter);
_receiveBuffers.Add(channelId, recvBufferWriter);
}
}
if (receiveBufferWriter != null)
if (recvBufferWriter != null)
{
if (complete)
{
receiveBufferWriter.MarkComplete();
recvBufferWriter.MarkComplete();
}
else
{
_receiveBuffersChangedEvent.Set();
_recvBufferWritersChangedEvent.Set();
}
}
}
/// <inheritdoc/>
public IComputeChannel AttachMessageChannel(int channelId, IComputeBuffer sendBuffer, IComputeBuffer receiveBuffer)
{
AttachBuffers(channelId, sendBuffer.Reader, receiveBuffer.Writer);
return new ComputeChannel(receiveBuffer.Reader, sendBuffer.Writer, _loggerFactory.CreateLogger<ComputeChannel>());
}
void DetachReceiveBuffer(Dictionary<int, IComputeBufferWriter> cachedWriters, int id)
void DetachRecvBufferWriter(Dictionary<int, IComputeBufferWriter> cachedWriters, int id)
{
cachedWriters.Remove(id);
@@ -308,6 +301,21 @@ namespace EpicGames.Horde.Compute
if (writer != null)
{
writer.MarkComplete();
writer.Dispose();
}
}
/// <summary>
/// Registers the read end of an attached send buffer
/// </summary>
/// <param name="channelId">Channel id</param>
/// <param name="sendBufferReader">Writer for the receive buffer</param>
public void AttachSendBuffer(int channelId, IComputeBufferReader sendBufferReader)
{
lock (_lockObject)
{
_sendBuffers.Add(channelId, sendBufferReader);
_sendTasks.Add(channelId, Task.Run(() => SendFromBufferAsync(channelId, sendBufferReader, _cancellationSource.Token), _cancellationSource.Token));
}
}
}
@@ -15,14 +15,14 @@ namespace EpicGames.Horde.Compute
/// </summary>
public sealed class ComputeStorageClient : StorageClientBase, IDisposable
{
readonly IComputeChannel _channel;
readonly IComputeMessageChannel _channel;
readonly SemaphoreSlim _semaphore;
/// <summary>
/// Constructor
/// </summary>
/// <param name="channel"></param>
public ComputeStorageClient(IComputeChannel channel)
public ComputeStorageClient(IComputeMessageChannel channel)
{
_channel = channel;
_semaphore = new SemaphoreSlim(1);
@@ -1,6 +1,7 @@
// Copyright Epic Games, Inc. All Rights Reserved.
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
@@ -12,21 +13,36 @@ namespace EpicGames.Horde.Compute
public interface IComputeBuffer : IDisposable
{
/// <summary>
/// Reader from the buffer
/// Total length of this buffer
/// </summary>
long Length { get; }
/// <summary>
/// Reader for this buffer
/// </summary>
IComputeBufferReader Reader { get; }
/// <summary>
/// Writer to the buffer
/// Writer for this buffer
/// </summary>
IComputeBufferWriter Writer { get; }
/// <summary>
/// Access the underlying memory for the buffer
/// </summary>
Memory<byte> GetMemory(long offset, int length);
}
/// <summary>
/// Read interface for a compute buffer
/// </summary>
public interface IComputeBufferReader
public interface IComputeBufferReader : IDisposable
{
/// <summary>
/// Accessor for the buffer this is reading from
/// </summary>
IComputeBuffer Buffer { get; }
/// <summary>
/// Whether this buffer is complete (no more data will be added)
/// </summary>
@@ -49,14 +65,19 @@ namespace EpicGames.Horde.Compute
/// </summary>
/// <param name="currentLength">Current length of the buffer</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
ValueTask WaitAsync(int currentLength, CancellationToken cancellationToken);
ValueTask WaitForDataAsync(int currentLength, CancellationToken cancellationToken);
}
/// <summary>
/// Buffer that can receive data from a remote machine.
/// </summary>
public interface IComputeBufferWriter
public interface IComputeBufferWriter : IDisposable
{
/// <summary>
/// Accessor for the buffer we're writing to
/// </summary>
IComputeBuffer Buffer { get; }
/// <summary>
/// Mark the output to this buffer as complete
/// </summary>
@@ -81,18 +102,111 @@ namespace EpicGames.Horde.Compute
}
/// <summary>
/// Default environment variable names for compute channels
/// Extension methods for <see cref="IComputeBuffer"/>
/// </summary>
public static class ComputeEnvVarNames
public static class ComputeBufferExtensions
{
/// <summary>
/// Default name for a channel that can be used to send messages to the initiator
/// </summary>
public const string DefaultSendBuffer = "UE_HORDE_SEND_BUFFER";
sealed class RefCountedBuffer
{
int _refFlags = 1 | 2;
public IComputeBuffer Buffer { get; }
public RefCountedBuffer(IComputeBuffer buffer) => Buffer = buffer;
public void Release(int flag)
{
for (; ; )
{
int initialRefFlags = _refFlags;
if (Interlocked.CompareExchange(ref _refFlags, initialRefFlags & ~flag, initialRefFlags) != initialRefFlags)
{
continue;
}
if (initialRefFlags == flag)
{
Buffer.Dispose();
}
break;
}
}
}
class RefCountedReader : IComputeBufferReader
{
public readonly RefCountedBuffer _buffer;
public readonly IComputeBufferReader _reader;
public IComputeBuffer Buffer => _reader.Buffer;
public RefCountedReader(RefCountedBuffer refCountedBuffer)
{
_buffer = refCountedBuffer;
_reader = refCountedBuffer.Buffer.Reader;
}
public void Dispose() => _buffer.Release(1);
public bool IsComplete => _reader.IsComplete;
public void Advance(int size) => _reader.Advance(size);
public ReadOnlyMemory<byte> GetMemory() => _reader.GetMemory();
public ValueTask WaitForDataAsync(int currentLength, CancellationToken cancellationToken) => _reader.WaitForDataAsync(currentLength, cancellationToken);
}
class RefCountedWriter : IComputeBufferWriter
{
public readonly RefCountedBuffer _buffer;
public readonly IComputeBufferWriter _writer;
public IComputeBuffer Buffer => _writer.Buffer;
public RefCountedWriter(RefCountedBuffer refCountedBuffer)
{
_buffer = refCountedBuffer;
_writer = refCountedBuffer.Buffer.Writer;
}
public void Dispose() => _buffer.Release(2);
public void Advance(int size) => _writer.Advance(size);
public ValueTask FlushAsync(CancellationToken cancellationToken) => _writer.FlushAsync(cancellationToken);
public Memory<byte> GetMemory() => _writer.GetMemory();
public void MarkComplete() => _writer.MarkComplete();
}
/// <summary>
/// Default name for a channel that can be used to receive messages from the initiator
/// Converts a compute buffer to be disposable through its reader/writer
/// </summary>
public const string DefaultRecvBuffer = "UE_HORDE_RECV_BUFFER";
/// <param name="buffer"></param>
/// <returns></returns>
public static (IComputeBufferReader, IComputeBufferWriter) ToShared(this IComputeBuffer buffer)
{
RefCountedBuffer refCountedBuffer = new RefCountedBuffer(buffer);
return (new RefCountedReader(refCountedBuffer), new RefCountedWriter(refCountedBuffer));
}
/// <summary>
/// Waits until there is a block of the certain size in the buffer, and returns it
/// </summary>
/// <param name="reader">Instance to read from</param>
/// <param name="minLength">Minimum length of the memory</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static async ValueTask<ReadOnlyMemory<byte>> GetMemoryAsync(this IComputeBufferReader reader, int minLength, CancellationToken cancellationToken)
{
for (; ; )
{
bool complete = reader.IsComplete;
ReadOnlyMemory<byte> memory = reader.GetMemory();
if (memory.Length >= minLength)
{
return memory;
}
else if (complete)
{
throw new EndOfStreamException();
}
await reader.WaitForDataAsync(memory.Length, cancellationToken);
}
}
}
}
@@ -4,6 +4,7 @@ using System;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using EpicGames.Core;
@@ -55,25 +56,20 @@ namespace EpicGames.Horde.Compute
/// </summary>
DeleteFiles = 0x12,
/// <summary>
/// Delete a directory in the remote (Initiator -> Remote)
/// </summary>
DeleteDirectory = 0x13,
/// <summary>
/// Execute a process in a sandbox (Initiator -> Remote)
/// </summary>
ExecuteProcess = 0x16,
Execute = 0x16,
/// <summary>
/// Returns the process exit code (Remote -> Initiator)
/// </summary>
ExecuteProcessResponse = 0x17,
ExecuteResult = 0x17,
/// <summary>
/// Returns output from the child process to the caller (Remote -> Initiator)
/// </summary>
ProcessOutput = 0x18,
ExecuteOutput = 0x18,
#endregion
@@ -94,14 +90,14 @@ namespace EpicGames.Horde.Compute
#region Buffers
/// <summary>
/// Creates a new receive buffer for messages
/// Registers a send buffer created by a child process
/// </summary>
CreateBufferRequest = 0x30,
AttachSendBuffer = 0x30,
/// <summary>
/// Receives information about the new receive buffer
/// Registers a receive buffer created by a child process
/// </summary>
CreateBufferResponse = 0x31,
AttachRecvBuffer = 0x31,
#endregion
@@ -180,14 +176,14 @@ namespace EpicGames.Horde.Compute
public record struct ReadBlobMessage(BlobLocator Locator, int Offset, int Length);
/// <summary>
/// Request creation of a new buffer, and attaching it to the lease
/// Request a receive buffer in a worker process
/// </summary>
public record struct CreateBufferRequest(int ChannelId, long Capacity, bool Send);
public record struct AttachSendBufferRequest(int ChannelId, string Handle);
/// <summary>
/// Response to create a new buffer
/// Request a receive buffer in a worker process
/// </summary>
public record struct CreateBufferResponse(string Handle);
public record struct AttachRecvBufferRequest(int ChannelId, string Handle);
/// <summary>
/// Message for running an XOR command
@@ -204,19 +200,17 @@ namespace EpicGames.Horde.Compute
/// <summary>
/// Sends an exception response to the remote
/// </summary>
public static void Exception(this IComputeChannel channel, Exception ex) => Exception(channel, ex.Message, ex.StackTrace);
public static void SendException(this IComputeMessageChannel channel, Exception ex) => SendException(channel, ex.Message, ex.StackTrace);
/// <summary>
/// Sends an exception response to the remote
/// </summary>
public static void Exception(this IComputeChannel channel, string description, string? trace)
public static void SendException(this IComputeMessageChannel channel, string description, string? trace)
{
using (IComputeMessageBuilder builder = channel.CreateMessage(ComputeMessageType.Exception))
{
builder.WriteString(description);
builder.WriteOptionalString(trace);
builder.Send();
}
using IComputeMessageBuilder message = channel.CreateMessage(ComputeMessageType.Exception);
message.WriteString(description);
message.WriteOptionalString(trace);
message.Send();
}
/// <summary>
@@ -228,9 +222,9 @@ namespace EpicGames.Horde.Compute
string description = message.ReadString();
return new ExceptionMessage(msg, description);
}
/*
/// <inheritdoc/>
public static void Fork(this IComputeChannel channel, int newChannelId)
public static void Fork(this IComputeMessageChannel channel, int newChannelId)
{
using (IComputeMessageBuilder builder = channel.CreateMessage(ComputeMessageType.Fork))
{
@@ -238,7 +232,7 @@ namespace EpicGames.Horde.Compute
builder.Send();
}
}
*/
/// <summary>
/// Parses a fork message from the given compute message
/// </summary>
@@ -252,19 +246,17 @@ namespace EpicGames.Horde.Compute
/// Sends untyped user data to the remote
/// </summary>
/// <param name="channel">Current channel</param>
/// <param name="span">Data to send</param>
public static void SendUserData(this IComputeChannel channel, ReadOnlySpan<byte> span)
/// <param name="memory">Data to send</param>
public static void SendUserData(this IComputeMessageChannel channel, ReadOnlyMemory<byte> memory)
{
using (IComputeMessageBuilder builder = channel.CreateMessage(ComputeMessageType.UserData))
{
builder.WriteFixedLengthBytes(span);
builder.Send();
}
using IComputeMessageBuilder message = channel.CreateMessage(ComputeMessageType.UserData, memory.Length);
message.WriteFixedLengthBytes(memory.Span);
message.Send();
}
#region Process
static async Task<IComputeMessage> RunStorageServer(this IComputeChannel channel, IStorageClient storage, CancellationToken cancellationToken)
static async Task<IComputeMessage> RunStorageServer(this IComputeMessageChannel channel, IStorageClient storage, CancellationToken cancellationToken)
{
for (; ; )
{
@@ -294,13 +286,13 @@ namespace EpicGames.Horde.Compute
/// <param name="locator">Location of a <see cref="DirectoryNode"/> describing contents of the sandbox</param>
/// <param name="storage">Storage for the sandbox data</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
public static async Task UploadFilesAsync(this IComputeChannel channel, string path, NodeLocator locator, IStorageClient storage, CancellationToken cancellationToken)
public static async Task UploadFilesAsync(this IComputeMessageChannel channel, string path, NodeLocator locator, IStorageClient storage, CancellationToken cancellationToken)
{
using (IComputeMessageBuilder writer = channel.CreateMessage(ComputeMessageType.WriteFiles))
using (IComputeMessageBuilder request = channel.CreateMessage(ComputeMessageType.WriteFiles))
{
writer.WriteString(path);
writer.WriteNodeLocator(locator);
writer.Send();
request.WriteString(path);
request.WriteNodeLocator(locator);
request.Send();
}
using IComputeMessage response = await RunStorageServer(channel, storage, cancellationToken);
@@ -325,13 +317,11 @@ namespace EpicGames.Horde.Compute
/// </summary>
/// <param name="channel">Current channel</param>
/// <param name="paths">Paths of files or directories to delete</param>
public static void DeleteFiles(this IComputeChannel channel, IReadOnlyList<string> paths)
public static void DeleteFiles(this IComputeMessageChannel channel, IReadOnlyList<string> paths)
{
using (IComputeMessageBuilder writer = channel.CreateMessage(ComputeMessageType.DeleteDirectory))
{
writer.WriteList(paths, MemoryWriterExtensions.WriteString);
writer.Send();
}
IComputeMessageBuilder request = channel.CreateMessage(ComputeMessageType.DeleteFiles);
request.WriteList(paths, MemoryWriterExtensions.WriteString);
request.Send();
}
/// <summary>
@@ -343,6 +333,53 @@ namespace EpicGames.Horde.Compute
return new DeleteFilesMessage(files);
}
class StringOutputWriter
{
byte[] _buffer;
int _length;
readonly Action<string> _handler;
public StringOutputWriter(Action<string> handler)
{
_buffer = new byte[10];
_handler = handler;
}
public void WriteData(ReadOnlyMemory<byte> data)
{
int requiredLength = _length + data.Length;
if (requiredLength > _buffer.Length)
{
Array.Resize(ref _buffer, requiredLength);
}
data.CopyTo(_buffer.AsMemory(_length));
_length += data.Length;
ReadOnlySpan<byte> output = _buffer.AsSpan(0, _length);
for (; ; )
{
int newLineIdx = output.IndexOf((byte)'\n');
if (newLineIdx == -1)
{
break;
}
int length = newLineIdx;
if (length > 0 && output[length - 1] == (byte)'\r')
{
length--;
}
_handler(Encoding.UTF8.GetString(output.Slice(0, length)));
output = output.Slice(newLineIdx + 1);
}
_length = output.Length;
output.CopyTo(_buffer);
}
}
/// <summary>
/// Executes a remote process
/// </summary>
@@ -353,15 +390,31 @@ namespace EpicGames.Horde.Compute
/// <param name="outputHandler">Output callback for stdout</param>
/// <param name="envVars">Environment variables for the child process</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
public static async Task<int> ExecuteProcessAsync(this IComputeChannel channel, string executable, IReadOnlyList<string> arguments, string? workingDir, IReadOnlyDictionary<string, string?>? envVars, Action<ReadOnlyMemory<byte>> outputHandler, CancellationToken cancellationToken)
public static Task<int> ExecuteAsync(this IComputeMessageChannel channel, string executable, IReadOnlyList<string> arguments, string? workingDir, IReadOnlyDictionary<string, string?>? envVars, Action<string> outputHandler, CancellationToken cancellationToken)
{
using (IComputeMessageBuilder writer = channel.CreateMessage(ComputeMessageType.ExecuteProcess))
StringOutputWriter writer = new StringOutputWriter(outputHandler);
return ExecuteAsync(channel, executable, arguments, workingDir, envVars, writer.WriteData, cancellationToken);
}
/// <summary>
/// Executes a remote process
/// </summary>
/// <param name="channel">Current channel</param>
/// <param name="executable">Executable to run, relative to the sandbox root</param>
/// <param name="arguments">Arguments for the child process</param>
/// <param name="workingDir">Working directory for the process</param>
/// <param name="outputHandler">Output callback for stdout</param>
/// <param name="envVars">Environment variables for the child process</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
public static async Task<int> ExecuteAsync(this IComputeMessageChannel channel, string executable, IReadOnlyList<string> arguments, string? workingDir, IReadOnlyDictionary<string, string?>? envVars, Action<ReadOnlyMemory<byte>> outputHandler, CancellationToken cancellationToken)
{
using (IComputeMessageBuilder request = channel.CreateMessage(ComputeMessageType.Execute))
{
writer.WriteString(executable);
writer.WriteList(arguments, MemoryWriterExtensions.WriteString);
writer.WriteOptionalString(workingDir);
writer.WriteDictionary(envVars ?? new Dictionary<string, string?>(), MemoryWriterExtensions.WriteString, MemoryWriterExtensions.WriteOptionalString);
writer.Send();
request.WriteString(executable);
request.WriteList(arguments, MemoryWriterExtensions.WriteString);
request.WriteOptionalString(workingDir);
request.WriteDictionary(envVars ?? new Dictionary<string, string?>(), MemoryWriterExtensions.WriteString, MemoryWriterExtensions.WriteOptionalString);
request.Send();
}
for (; ; )
@@ -372,10 +425,10 @@ namespace EpicGames.Horde.Compute
case ComputeMessageType.Exception:
ExceptionMessage exception = message.ParseExceptionMessage();
throw new ComputeRemoteException(exception);
case ComputeMessageType.ProcessOutput:
case ComputeMessageType.ExecuteOutput:
outputHandler(message.Data);
break;
case ComputeMessageType.ExecuteProcessResponse:
case ComputeMessageType.ExecuteResult:
ExecuteProcessResponseMessage executeProcessResponse = message.ParseExecuteProcessResponse();
return executeProcessResponse.ExitCode;
default:
@@ -399,13 +452,11 @@ namespace EpicGames.Horde.Compute
/// <summary>
/// Sends output from a child process
/// </summary>
public static void SendProcessOutput(this IComputeChannel channel, ReadOnlyMemory<byte> data)
public static void SendExecuteOutput(this IComputeMessageChannel channel, ReadOnlyMemory<byte> data)
{
using (IComputeMessageBuilder builder = channel.CreateMessage(ComputeMessageType.ProcessOutput))
{
builder.WriteFixedLengthBytes(data.Span);
builder.Send();
}
using IComputeMessageBuilder message = channel.CreateMessage(ComputeMessageType.ExecuteOutput);
message.WriteFixedLengthBytes(data.Span);
message.Send();
}
/// <summary>
@@ -413,13 +464,11 @@ namespace EpicGames.Horde.Compute
/// </summary>
/// <param name="channel"></param>
/// <param name="exitCode">Exit code from the process</param>
public static void SendExecuteProcessResponse(this IComputeChannel channel, int exitCode)
public static void SendExecuteResult(this IComputeMessageChannel channel, int exitCode)
{
using (IComputeMessageBuilder builder = channel.CreateMessage(ComputeMessageType.ExecuteProcessResponse))
{
builder.WriteInt32(exitCode);
builder.Send();
}
using IComputeMessageBuilder builder = channel.CreateMessage(ComputeMessageType.ExecuteResult);
builder.WriteInt32(exitCode);
builder.Send();
}
/// <summary>
@@ -481,14 +530,14 @@ namespace EpicGames.Horde.Compute
/// <param name="length">Length of data to return</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
/// <returns>Stream containing the blob data</returns>
public static async Task<Stream> ReadBlobAsync(this IComputeChannel channel, BlobLocator locator, int offset, int length, CancellationToken cancellationToken)
public static async Task<Stream> ReadBlobAsync(this IComputeMessageChannel channel, BlobLocator locator, int offset, int length, CancellationToken cancellationToken)
{
using (IComputeMessageBuilder writer = channel.CreateMessage(ComputeMessageType.ReadBlob))
using (IComputeMessageBuilder request = channel.CreateMessage(ComputeMessageType.ReadBlob))
{
writer.WriteBlobLocator(locator);
writer.WriteUnsignedVarInt(offset);
writer.WriteUnsignedVarInt(length);
writer.Send();
request.WriteBlobLocator(locator);
request.WriteUnsignedVarInt(offset);
request.WriteUnsignedVarInt(length);
request.Send();
}
IComputeMessage response = await channel.ReceiveAsync(cancellationToken);
@@ -508,7 +557,7 @@ namespace EpicGames.Horde.Compute
/// <param name="message">The read request</param>
/// <param name="storage">Storage client to retrieve the blob from</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
public static Task SendBlobDataAsync(this IComputeChannel channel, ReadBlobMessage message, IStorageClient storage, CancellationToken cancellationToken)
public static Task SendBlobDataAsync(this IComputeMessageChannel channel, ReadBlobMessage message, IStorageClient storage, CancellationToken cancellationToken)
{
return SendBlobDataAsync(channel, message.Locator, message.Offset, message.Length, storage, cancellationToken);
}
@@ -522,7 +571,7 @@ namespace EpicGames.Horde.Compute
/// <param name="length">Length of the data</param>
/// <param name="storage">Storage client to retrieve the blob from</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
public static async Task SendBlobDataAsync(this IComputeChannel channel, BlobLocator locator, int offset, int length, IStorageClient storage, CancellationToken cancellationToken)
public static async Task SendBlobDataAsync(this IComputeMessageChannel channel, BlobLocator locator, int offset, int length, IStorageClient storage, CancellationToken cancellationToken)
{
byte[] data;
if (offset == 0 && length == 0)
@@ -544,131 +593,26 @@ namespace EpicGames.Horde.Compute
}
}
using (IComputeMessageBuilder writer = channel.CreateMessage(ComputeMessageType.ReadBlobResponse, length + 128))
using (IComputeMessageBuilder response = channel.CreateMessage(ComputeMessageType.ReadBlobResponse, data.Length + 128))
{
writer.WriteFixedLengthBytes(data);
writer.Send();
response.WriteFixedLengthBytes(data);
response.Send();
}
}
#endregion
#region Child process
/// <summary>
/// Wrapper around the lifetime of an <see cref="IpcBuffer"/> which only exposes the writer interface
/// </summary>
class WrappedBufferWriter : IComputeBufferWriter
{
readonly IpcBuffer _buffer;
public WrappedBufferWriter(IpcBuffer buffer) => _buffer = buffer;
public void Dispose() => _buffer.Dispose();
public void Advance(int size) => _buffer.Writer.Advance(size);
public ValueTask FlushAsync(CancellationToken cancellationToken) => _buffer.Writer.FlushAsync(cancellationToken);
public Memory<byte> GetMemory() => _buffer.Writer.GetMemory();
public void MarkComplete() => _buffer.Writer.MarkComplete();
}
/// <summary>
/// Creates a buffer which can be used to send data to the remote
/// </summary>
/// <param name="channel">Channel to write to</param>
/// <param name="id">Identifier for the new buffer</param>
/// <param name="capacity">Capacity of the buffer</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
/// <returns>New compute buffer</returns>
public static async Task<IComputeBufferWriter> CreateSendBufferAsync(this IComputeChannel channel, int id, long capacity, CancellationToken cancellationToken)
{
IpcBuffer buffer = await CreateBufferAsync(channel, id, capacity, true, cancellationToken);
return new WrappedBufferWriter(buffer);
}
/// <summary>
/// Wrapper around the lifetime of an <see cref="IpcBuffer"/> which only exposes the reader interface
/// </summary>
class WrappedBufferReader : IComputeBufferReader
{
readonly IpcBuffer _buffer;
public WrappedBufferReader(IpcBuffer buffer) => _buffer = buffer;
public void Dispose() => _buffer.Dispose();
public bool IsComplete => _buffer.Reader.IsComplete;
public void Advance(int size) => _buffer.Reader.Advance(size);
public ReadOnlyMemory<byte> GetMemory() => _buffer.Reader.GetMemory();
public ValueTask WaitAsync(int currentLength, CancellationToken cancellationToken) => _buffer.Reader.WaitAsync(currentLength, cancellationToken);
}
/// <summary>
/// Creates a buffer which can be used to receive data from the remote
/// </summary>
/// <param name="channel">Channel to write to</param>
/// <param name="id">Identifier for the new buffer</param>
/// <param name="capacity">Capacity of the buffer</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
/// <returns>New compute buffer</returns>
public static async Task<IComputeBufferReader> CreateReceiveBufferAsync(this IComputeChannel channel, int id, long capacity, CancellationToken cancellationToken)
{
IpcBuffer buffer = await CreateBufferAsync(channel, id, capacity, false, cancellationToken);
return new WrappedBufferReader(buffer);
}
static async Task<IpcBuffer> CreateBufferAsync(this IComputeChannel channel, int id, long capacity, bool send, CancellationToken cancellationToken)
{
using (IComputeMessageBuilder builder = channel.CreateMessage(ComputeMessageType.CreateBufferRequest))
{
builder.WriteInt32(id);
builder.WriteInt64(capacity);
builder.WriteBoolean(send);
builder.Send();
}
using IComputeMessage response = await channel.ReceiveAsync(cancellationToken);
if (response.Type != ComputeMessageType.CreateBufferResponse)
{
throw new NotSupportedException();
}
string handle = response.ReadString();
return IpcBuffer.OpenExisting(handle);
}
/// <summary>
/// Parse a <see cref="CreateBufferRequest"/> message
/// </summary>
public static CreateBufferRequest ParseCreateBufferRequest(this IComputeMessage message)
{
int channelId = message.ReadInt32();
long capacity = message.ReadInt64();
bool send = message.ReadBoolean();
return new CreateBufferRequest(channelId, capacity, send);
}
#endregion
#region Test Messages
/// <summary>
/// Send a message to request that a byte string be xor'ed with a particular value
/// </summary>
public static void XorRequest(this IComputeChannel channel, ReadOnlyMemory<byte> data, byte value)
public static void SendXorRequest(this IComputeMessageChannel channel, ReadOnlyMemory<byte> data, byte value)
{
using (IComputeMessageBuilder builder = channel.CreateMessage(ComputeMessageType.XorRequest))
{
builder.WriteFixedLengthBytes(data.Span);
builder.WriteUInt8(value);
builder.Send();
}
using IComputeMessageBuilder message = channel.CreateMessage(ComputeMessageType.XorRequest);
message.WriteFixedLengthBytes(data.Span);
message.WriteUInt8(value);
message.Send();
}
/// <summary>
@@ -10,22 +10,22 @@ namespace EpicGames.Horde.Compute
/// <summary>
/// Full-duplex channel for sending and receiving messages
/// </summary>
public interface IComputeChannel : IAsyncDisposable
public interface IComputeMessageChannel : IAsyncDisposable
{
/// <summary>
/// Reads a message from the channel
/// </summary>
/// <param name="cancellationToken">Cancellation token for the operation</param>
/// <returns>Data for a message that was read. Must be disposed.</returns>
ValueTask<IComputeMessage> ReceiveAsync(CancellationToken cancellationToken);
ValueTask<IComputeMessage> ReceiveAsync(CancellationToken cancellationToken = default);
/// <summary>
/// Creates a new builder for a message
/// </summary>
/// <param name="type">Type of the message</param>
/// <param name="sizeHint">Hint for the expected message size</param>
/// <param name="maxSize">Maximum size of the message that will be written</param>
/// <returns>New builder for messages</returns>
IComputeMessageBuilder CreateMessage(ComputeMessageType type, int sizeHint = 0);
IComputeMessageBuilder CreateMessage(ComputeMessageType type, int maxSize = 1024);
}
/// <summary>
@@ -49,7 +49,7 @@ namespace EpicGames.Horde.Compute
/// </summary>
/// <param name="channel">Channel to send on</param>
/// <param name="message">The message to be sent</param>
public static void Send(this IComputeChannel channel, IComputeMessage message)
public static void Send(this IComputeMessageChannel channel, IComputeMessage message)
{
using (IComputeMessageBuilder builder = channel.CreateMessage(message.Type, message.Data.Length))
{
@@ -1,9 +1,9 @@
// Copyright Epic Games, Inc. All Rights Reserved.
using System;
using System.Threading;
using System.Threading.Tasks;
using EpicGames.Horde.Compute.Buffers;
using System.Globalization;
using EpicGames.Horde.Compute.Sockets;
using Microsoft.Extensions.Logging;
namespace EpicGames.Horde.Compute
{
@@ -13,88 +13,137 @@ namespace EpicGames.Horde.Compute
public interface IComputeSocket : IAsyncDisposable
{
/// <summary>
/// Attaches buffers to a channel.
/// Creates a new buffer of the given capacity, suitable for this socket type
/// </summary>
/// <param name="channelId">Identifier of the channel</param>
/// <param name="sendBufferReader">The reader to attach</param>
/// <param name="receiveBufferWriter">The writer to attach</param>
/// <returns>Identifier for the message channel</returns>
void AttachBuffers(int channelId, IComputeBufferReader? sendBufferReader, IComputeBufferWriter? receiveBufferWriter);
/// <param name="capacity">Capacity of the buffer</param>
IComputeBuffer CreateBuffer(long capacity);
/// <summary>
/// Creates a channel to receive data on the given channel id
/// Attaches a buffer to receive data.
/// </summary>
/// <param name="channelId">Identifier of the channel</param>
/// <param name="sendBuffer">Buffer for sending messages</param>
/// <param name="receiveBuffer">Buffer for receiving messages</param>
IComputeChannel AttachMessageChannel(int channelId, IComputeBuffer sendBuffer, IComputeBuffer receiveBuffer);
/// <param name="channelId">Channel to receive data on</param>
/// <param name="writer">Writer for the received data</param>
void AttachRecvBuffer(int channelId, IComputeBufferWriter writer);
/// <summary>
/// Attaches a buffer to receive data.
/// </summary>
/// <param name="channelId">Channel to receive data on</param>
/// <param name="reader">Reader for incoming data</param>
void AttachSendBuffer(int channelId, IComputeBufferReader reader);
}
/// <summary>
/// Extension methods for sockets
/// Utility methods for creating sockets
/// </summary>
public static class ComputeSocket
{
/// <summary>
/// Environment variable for channel info for communicating with the worker process
/// </summary>
public const string WorkerIpcEnvVar = "UE_HORDE_COMPUTE_IPC";
/// <summary>
/// Creates a socket for a worker
/// </summary>
/// <param name="logger">Logger for diagnostic messages</param>
public static IComputeSocket ConnectAsWorker(ILogger logger)
{
string? handle = Environment.GetEnvironmentVariable(WorkerIpcEnvVar);
if (handle == null)
{
throw new InvalidOperationException($"Environment variable {WorkerIpcEnvVar} is not defined; cannot connect as worker.");
}
try
{
IpcComputeMessageChannel channel = IpcComputeMessageChannel.FromStringHandle(handle, logger);
return new WorkerComputeSocket(channel, logger);
}
catch(Exception ex)
{
throw new Exception($"While connecting using '{handle}'", ex);
}
}
}
/// <summary>
/// Extension methods for <see cref="IComputeMessageChannel"/>
/// </summary>
public static class ComputeSocketExtensions
{
class HeapBufferChannel : IComputeChannel
/// <summary>
/// Attaches a buffer to receive data.
/// </summary>
/// <param name="socket">Socket to attach to</param>
/// <param name="channelId">Channel to receive data on</param>
/// <param name="capacity">Capacity for the buffer</param>
public static IComputeBufferReader AttachRecvBuffer(this IComputeSocket socket, int channelId, long capacity) => AttachRecvBuffer(socket, channelId, socket.CreateBuffer(capacity));
/// <summary>
/// Attaches a buffer to receive data.
/// </summary>
/// <param name="socket">Socket to attach to</param>
/// <param name="channelId">Channel to receive data on</param>
/// <param name="buffer">Buffer to attach to the socket</param>
public static IComputeBufferReader AttachRecvBuffer(this IComputeSocket socket, int channelId, IComputeBuffer buffer)
{
readonly IComputeChannel _inner;
readonly HeapBuffer _receiveBuffer;
readonly HeapBuffer _sendBuffer;
public HeapBufferChannel(IComputeChannel inner, HeapBuffer receiveBuffer, HeapBuffer sendBuffer)
{
_inner = inner;
_sendBuffer = sendBuffer;
_receiveBuffer = receiveBuffer;
}
/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
await _inner.DisposeAsync();
_sendBuffer.Dispose();
_receiveBuffer.Dispose();
}
/// <inheritdoc/>
public ValueTask<IComputeMessage> ReceiveAsync(CancellationToken cancellationToken) => _inner.ReceiveAsync(cancellationToken);
/// <inheritdoc/>
public IComputeMessageBuilder CreateMessage(ComputeMessageType type, int sizeHint = 0) => _inner.CreateMessage(type, sizeHint);
(IComputeBufferReader reader, IComputeBufferWriter writer) = buffer.ToShared();
socket.AttachRecvBuffer(channelId, writer);
return reader;
}
/// <summary>
/// Creates a channel to receive data on the given channel id
/// Attaches a buffer from which to send data.
/// </summary>
/// <param name="socket">Socket to attach a channel on</param>
/// <param name="channelId">Identifier of the channel</param>
public static IComputeChannel AttachMessageChannel(this IComputeSocket socket, int channelId)
/// <param name="socket">Socket to attach to</param>
/// <param name="channelId">Channel to receive data on</param>
/// <param name="capacity">Capacity for the buffer</param>
public static IComputeBufferWriter AttachSendBuffer(this IComputeSocket socket, int channelId, long capacity) => AttachSendBuffer(socket, channelId, socket.CreateBuffer(capacity));
/// <summary>
/// Attaches a buffer from which to send data.
/// </summary>
/// <param name="socket">Socket to attach to</param>
/// <param name="channelId">Channel to receive data on</param>
/// <param name="buffer">Buffer for the data to send</param>
public static IComputeBufferWriter AttachSendBuffer(this IComputeSocket socket, int channelId, IComputeBuffer buffer)
{
const int MaxMessageSize = 64 * 1024;
const int BufferSize = MaxMessageSize * 3;
HeapBuffer receiveBuffer = new HeapBuffer(BufferSize);
HeapBuffer sendBuffer = new HeapBuffer(BufferSize);
IComputeChannel inner = socket.AttachMessageChannel(channelId, sendBuffer, receiveBuffer);
return new HeapBufferChannel(inner, receiveBuffer, sendBuffer);
(IComputeBufferReader reader, IComputeBufferWriter writer) = buffer.ToShared();
socket.AttachSendBuffer(channelId, reader);
return writer;
}
/// <summary>
/// Attaches a send buffer to a socket. Data will be read from this buffer and replicated a receive buffer attached with the same id on the remote.
/// Creates a message channel with the given identifier
/// </summary>
/// <param name="socket">Socket to attach to</param>
/// <param name="channelId">Identifier for the buffer</param>
/// <param name="reader">Source to read from</param>
public static void AttachSendBuffer(this IComputeSocket socket, int channelId, IComputeBufferReader reader) => socket.AttachBuffers(channelId, reader, null);
/// <param name="socket">Socket to create a channel for</param>
/// <param name="channelId">Identifier for the channel</param>
/// <param name="logger">Logger for the channel</param>
public static IComputeMessageChannel CreateMessageChannel(this IComputeSocket socket, int channelId, ILogger logger) => socket.CreateMessageChannel(channelId, 65536, logger);
/// <summary>
/// Attaches a receive buffer to a socket. Data will be read into this buffer from the other end of the lease.
/// Creates a message channel with the given identifier
/// </summary>
/// <param name="socket">Socket to attach to</param>
/// <param name="channelId">Identifier for the buffer</param>
/// <param name="writer">The buffer to attach</param>
public static void AttachReceiveBuffer(this IComputeSocket socket, int channelId, IComputeBufferWriter writer) => socket.AttachBuffers(channelId, null, writer);
/// <param name="socket">Socket to create a channel for</param>
/// <param name="channelId">Identifier for the channel</param>
/// <param name="bufferSize">Size of the send and receive buffer</param>
/// <param name="logger">Logger for the channel</param>
public static IComputeMessageChannel CreateMessageChannel(this IComputeSocket socket, int channelId, long bufferSize, ILogger logger) => CreateMessageChannel(socket, channelId, bufferSize, bufferSize, logger);
/// <summary>
/// Creates a message channel with the given identifier
/// </summary>
/// <param name="socket">Socket to create a channel for</param>
/// <param name="channelId">Identifier for the channel</param>
/// <param name="sendBufferSize">Size of the send buffer</param>
/// <param name="recvBufferSize">Size of the recieve buffer</param>
/// <param name="logger">Logger for the channel</param>
public static IComputeMessageChannel CreateMessageChannel(this IComputeSocket socket, int channelId, long sendBufferSize, long recvBufferSize, ILogger logger)
{
IComputeBufferReader reader = socket.AttachRecvBuffer(channelId, recvBufferSize);
IComputeBufferWriter writer = socket.AttachSendBuffer(channelId, sendBufferSize);
return new ComputeMessageChannel(reader, writer, logger);
}
}
}
@@ -2,6 +2,7 @@
using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
@@ -12,19 +13,24 @@ namespace EpicGames.Horde.Compute
/// </summary>
public interface IComputeTransport
{
/// <summary>
/// Position in the stream; used for debugging
/// </summary>
long Position { get; }
/// <summary>
/// Reads data from the underlying transport into an output buffer
/// </summary>
/// <param name="buffer">Buffer to read into</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken);
ValueTask<int> ReadPartialAsync(Memory<byte> buffer, CancellationToken cancellationToken);
/// <summary>
/// Writes data to the underlying transport
/// </summary>
/// <param name="buffer">Buffer to be written</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
public ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken);
ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken);
}
/// <summary>
@@ -32,6 +38,46 @@ namespace EpicGames.Horde.Compute
/// </summary>
public static class ComputeTransportExtensions
{
/// <summary>
/// Fill the given buffer with data
/// </summary>
/// <param name="transport">Transport object</param>
/// <param name="buffer">Buffer to read into</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
public static async ValueTask ReadFullAsync(this IComputeTransport transport, Memory<byte> buffer, CancellationToken cancellationToken)
{
int read = 0;
while (read < buffer.Length)
{
int partialRead = await transport.ReadPartialAsync(buffer.Slice(read, buffer.Length - read), cancellationToken);
if (partialRead == 0)
{
throw new EndOfStreamException();
}
read += partialRead;
}
}
/// <summary>
/// Fill the given buffer with data
/// </summary>
/// <param name="transport">Transport object</param>
/// <param name="buffer">Buffer to read into</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
public static async ValueTask<bool> ReadOptionalAsync(this IComputeTransport transport, Memory<byte> buffer, CancellationToken cancellationToken)
{
int read = await transport.ReadPartialAsync(buffer, cancellationToken);
if (read == 0)
{
return false;
}
if (read < buffer.Length)
{
await transport.ReadFullAsync(buffer.Slice(read), cancellationToken);
}
return true;
}
/// <summary>
/// Writes data to the underlying transport
/// </summary>
@@ -0,0 +1,136 @@
// Copyright Epic Games, Inc. All Rights Reserved.
using System;
using System.Globalization;
using System.Threading.Tasks;
using EpicGames.Core;
using EpicGames.Horde.Compute.Buffers;
using Microsoft.Extensions.Logging;
namespace EpicGames.Horde.Compute
{
/// <summary>
/// Message channel for communicating with child processes without using a socket
/// </summary>
public class IpcComputeMessageChannel : ComputeMessageChannel
{
readonly IntPtr _parentProcessHandle;
readonly SharedMemoryBuffer _recvBuffer;
readonly SharedMemoryBuffer _sendBuffer;
static IntPtr s_currentProcessHandle;
/// <summary>
///
/// </summary>
/// <param name="capacity"></param>
/// <param name="logger"></param>
public IpcComputeMessageChannel(long capacity, ILogger logger)
: this(IntPtr.Zero, new SharedMemoryBuffer(capacity), new SharedMemoryBuffer(capacity), logger)
{
}
/// <summary>
/// Constructor
/// </summary>
internal IpcComputeMessageChannel(IntPtr parentProcessHandle, SharedMemoryBuffer recvBuffer, SharedMemoryBuffer sendBuffer, ILogger logger)
: base(recvBuffer.Reader, sendBuffer.Writer, logger)
{
_parentProcessHandle = parentProcessHandle;
_recvBuffer = recvBuffer;
_sendBuffer = sendBuffer;
}
/// <inheritdoc/>
protected override ValueTask DisposeAsync(bool disposing)
{
_recvBuffer.Dispose();
_sendBuffer.Dispose();
return new ValueTask();
}
/// <summary>
/// Creates an IPC channel from a string handle
/// </summary>
/// <param name="handle">Handle to parse</param>
/// <param name="logger">Logger for the new channel</param>
public static IpcComputeMessageChannel FromStringHandle(string handle, ILogger logger)
{
string[] handles = handle.Split('/');
if (handles.Length != 3)
{
throw new ArgumentException($"Invalid handle for IPC channel: {handle}", nameof(handle));
}
IntPtr parentProcessHandle = new IntPtr((long)UInt64.Parse(handles[0], NumberStyles.None, null));
SharedMemoryBuffer recvBuffer = SharedMemoryBuffer.OpenIpcHandle(handles[1]);
SharedMemoryBuffer sendBuffer = SharedMemoryBuffer.OpenIpcHandle(handles[2]);
return new IpcComputeMessageChannel(parentProcessHandle, sendBuffer, recvBuffer, logger); // Note: Swapping send/recv buffer here
}
/// <summary>
/// Gets a string handle that can be used to open this channel in another process
/// </summary>
public string GetStringHandle()
{
if (s_currentProcessHandle == IntPtr.Zero)
{
IntPtr currentProcessHandle = Native.GetCurrentProcess();
Native.DuplicateHandle(currentProcessHandle, currentProcessHandle, currentProcessHandle, out s_currentProcessHandle, 0, true, Native.DUPLICATE_SAME_ACCESS);
}
return $"{(ulong)s_currentProcessHandle.ToInt64()}/{_recvBuffer.GetIpcHandle()}/{_sendBuffer.GetIpcHandle()}";
}
/// <summary>
/// Force the stream closed
/// </summary>
public void ForceComplete() => _recvBuffer.FinishWriting();
/// <summary>
/// Registers a receive buffer with the agent process
/// </summary>
public void AttachRecvBuffer(int channelId, SharedMemoryBuffer buffer)
{
string handle = buffer.GetIpcHandle(_parentProcessHandle);
using IComputeMessageBuilder message = CreateMessage(ComputeMessageType.AttachRecvBuffer, handle.Length + 20);
message.WriteInt32(channelId);
message.WriteString(handle);
message.Send();
}
/// <summary>
/// Parse a <see cref="AttachRecvBufferRequest"/> message
/// </summary>
public static AttachRecvBufferRequest ParseAttachRecvBuffer(IComputeMessage message)
{
int channelId = message.ReadInt32();
string handle = message.ReadString();
return new AttachRecvBufferRequest(channelId, handle);
}
/// <summary>
/// Registers a send buffer with the agent process
/// </summary>
public void AttachSendBuffer(int channelId, SharedMemoryBuffer buffer)
{
string handle = buffer.GetIpcHandle(_parentProcessHandle);
using IComputeMessageBuilder message = CreateMessage(ComputeMessageType.AttachSendBuffer, handle.Length + 20);
message.WriteInt32(channelId);
message.WriteString(handle);
message.Send();
}
/// <summary>
/// Parse a <see cref="AttachSendBufferRequest"/> message
/// </summary>
public static AttachSendBufferRequest ParseAttachSendBuffer(IComputeMessage message)
{
int channelId = message.ReadInt32();
string handle = message.ReadString();
return new AttachSendBufferRequest(channelId, handle);
}
}
}
@@ -0,0 +1,37 @@
// Copyright Epic Games, Inc. All Rights Reserved.
using System;
using EpicGames.Horde.Compute.Buffers;
using Microsoft.Extensions.Logging;
namespace EpicGames.Horde.Compute
{
/// <summary>
/// Manages a set of readers and writers to buffers across a transport layer
/// </summary>
public sealed class ClientComputeSocket : ComputeSocketBase, IAsyncDisposable
{
/// <summary>
/// Constructor
/// </summary>
/// <param name="transport">Transport to communicate with the remote</param>
/// <param name="logger">Logger for trace output</param>
public ClientComputeSocket(IComputeTransport transport, ILogger logger)
: base(transport, logger)
{
}
/// <inheritdoc/>
public override IComputeBuffer CreateBuffer(long capacity)
{
if (capacity > Int32.MaxValue)
{
return new SharedMemoryBuffer(capacity);
}
else
{
return new PooledBuffer((int)capacity);
}
}
}
}

Some files were not shown because too many files have changed in this diff Show More