2023-04-24 15:21:04 -04:00
|
|
|
// Copyright Epic Games, Inc. All Rights Reserved.
|
|
|
|
|
|
2023-11-22 12:25:59 -05:00
|
|
|
#include "Compute/ComputeSocket.h"
|
|
|
|
|
#include "Compute/ComputePlatform.h"
|
2023-04-24 15:21:04 -04:00
|
|
|
#include <iostream>
|
|
|
|
|
#include <assert.h>
|
2023-05-19 21:50:03 -04:00
|
|
|
#include <unordered_set>
|
|
|
|
|
#include <unordered_map>
|
|
|
|
|
#include <vector>
|
2023-07-22 14:17:04 -04:00
|
|
|
#include <thread>
|
|
|
|
|
#include <mutex>
|
2023-09-07 13:34:00 -04:00
|
|
|
#include <chrono>
|
2023-11-24 09:24:22 -05:00
|
|
|
#include "../HordePlatform.h"
|
2023-07-22 14:17:04 -04:00
|
|
|
|
|
|
|
|
FComputeSocket::FComputeSocket()
|
|
|
|
|
{
|
|
|
|
|
}
|
2023-04-24 15:21:04 -04:00
|
|
|
|
|
|
|
|
FComputeSocket::~FComputeSocket()
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-22 14:09:38 -05:00
|
|
|
TSharedPtr<FComputeChannel> FComputeSocket::CreateChannel(int ChannelId)
|
2023-07-22 14:17:04 -04:00
|
|
|
{
|
|
|
|
|
FComputeBuffer RecvBuffer;
|
|
|
|
|
if (!RecvBuffer.CreateNew(FComputeBuffer::FParams()))
|
|
|
|
|
{
|
2023-11-22 14:09:38 -05:00
|
|
|
return MakeShared<FComputeChannel>(FComputeChannel());
|
2023-07-22 14:17:04 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
FComputeBuffer SendBuffer;
|
|
|
|
|
if (!SendBuffer.CreateNew(FComputeBuffer::FParams()))
|
|
|
|
|
{
|
2023-11-22 14:09:38 -05:00
|
|
|
return MakeShared<FComputeChannel>(FComputeChannel());
|
2023-07-22 14:17:04 -04:00
|
|
|
}
|
|
|
|
|
|
2023-07-28 11:51:34 -04:00
|
|
|
return CreateChannel(ChannelId, std::move(RecvBuffer), std::move(SendBuffer));
|
2023-07-22 14:17:04 -04:00
|
|
|
}
|
|
|
|
|
|
2023-11-22 14:09:38 -05:00
|
|
|
TSharedPtr<FComputeChannel> FComputeSocket::CreateChannel(int ChannelId, FComputeBuffer RecvBuffer, FComputeBuffer SendBuffer)
|
2023-07-22 14:17:04 -04:00
|
|
|
{
|
2023-11-22 14:09:38 -05:00
|
|
|
TSharedPtr<FComputeChannel> Channel = MakeShared<FComputeChannel>(RecvBuffer.CreateReader(), SendBuffer.CreateWriter());
|
2023-07-22 14:17:04 -04:00
|
|
|
|
2023-07-28 11:51:34 -04:00
|
|
|
AttachRecvBuffer(ChannelId, std::move(RecvBuffer));
|
|
|
|
|
AttachSendBuffer(ChannelId, std::move(SendBuffer));
|
|
|
|
|
|
|
|
|
|
return Channel;
|
2023-07-22 14:17:04 -04:00
|
|
|
}
|
2023-05-19 21:50:03 -04:00
|
|
|
|
|
|
|
|
//////////////////////////////////////////////////////
|
|
|
|
|
|
2023-07-27 11:14:50 -04:00
|
|
|
const char* const FWorkerComputeSocket::IpcEnvVar = "UE_HORDE_COMPUTE_IPC";
|
2023-05-19 21:50:03 -04:00
|
|
|
|
|
|
|
|
enum class FWorkerComputeSocket::EMessageType
|
|
|
|
|
{
|
|
|
|
|
AttachRecvBuffer = 0,
|
|
|
|
|
AttachSendBuffer = 1,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
FWorkerComputeSocket::FWorkerComputeSocket()
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
FWorkerComputeSocket::~FWorkerComputeSocket()
|
2023-04-24 15:21:04 -04:00
|
|
|
{
|
|
|
|
|
Close();
|
|
|
|
|
}
|
|
|
|
|
|
2023-09-07 13:34:00 -04:00
|
|
|
void FWorkerComputeSocket::StartCommunication()
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
2023-05-19 21:50:03 -04:00
|
|
|
bool FWorkerComputeSocket::Open()
|
2023-04-24 15:21:04 -04:00
|
|
|
{
|
2023-07-27 11:14:50 -04:00
|
|
|
char EnvVar[FComputeBuffer::MaxNameLength];
|
2023-11-24 09:24:22 -05:00
|
|
|
if (!FHordePlatform::GetEnvironmentVariable(IpcEnvVar, EnvVar, sizeof(EnvVar) / sizeof(EnvVar[0])))
|
2023-04-24 15:21:04 -04:00
|
|
|
{
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
2023-05-19 21:50:03 -04:00
|
|
|
return Open(EnvVar);
|
2023-04-24 15:21:04 -04:00
|
|
|
}
|
|
|
|
|
|
2023-07-27 11:14:50 -04:00
|
|
|
bool FWorkerComputeSocket::Open(const char* CommandBufferName)
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
2023-07-28 11:51:34 -04:00
|
|
|
FComputeBuffer CommandBuffer;
|
|
|
|
|
if (CommandBuffer.OpenExisting(CommandBufferName))
|
|
|
|
|
{
|
|
|
|
|
CommandBufferWriter = CommandBuffer.CreateWriter();
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
return false;
|
2023-05-19 21:50:03 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void FWorkerComputeSocket::Close()
|
2023-04-24 15:21:04 -04:00
|
|
|
{
|
2023-07-28 11:51:34 -04:00
|
|
|
CommandBufferWriter.Close();
|
2023-04-24 15:21:04 -04:00
|
|
|
}
|
|
|
|
|
|
2023-07-28 11:51:34 -04:00
|
|
|
void FWorkerComputeSocket::AttachRecvBuffer(int ChannelId, FComputeBuffer RecvBuffer)
|
2023-04-24 15:21:04 -04:00
|
|
|
{
|
2023-07-28 11:51:34 -04:00
|
|
|
AttachBuffer(ChannelId, EMessageType::AttachRecvBuffer, RecvBuffer.GetName());
|
|
|
|
|
Buffers.push_back(std::move(RecvBuffer));
|
2023-04-24 15:21:04 -04:00
|
|
|
}
|
|
|
|
|
|
2023-07-28 11:51:34 -04:00
|
|
|
void FWorkerComputeSocket::AttachSendBuffer(int ChannelId, FComputeBuffer SendBuffer)
|
2023-04-24 15:21:04 -04:00
|
|
|
{
|
2023-07-28 11:51:34 -04:00
|
|
|
AttachBuffer(ChannelId, EMessageType::AttachSendBuffer, SendBuffer.GetName());
|
|
|
|
|
Buffers.push_back(std::move(SendBuffer));
|
2023-04-24 15:21:04 -04:00
|
|
|
}
|
|
|
|
|
|
2023-07-27 11:14:50 -04:00
|
|
|
void FWorkerComputeSocket::AttachBuffer(int ChannelId, EMessageType Type, const char* Name)
|
2023-04-24 15:21:04 -04:00
|
|
|
{
|
2023-07-28 11:51:34 -04:00
|
|
|
unsigned char* Data = CommandBufferWriter.WaitToWrite(1024);
|
2023-04-24 15:21:04 -04:00
|
|
|
|
|
|
|
|
size_t Len = 0;
|
2023-05-19 21:50:03 -04:00
|
|
|
Len += WriteVarUInt(Data + Len, (unsigned char)Type);
|
2023-04-24 15:21:04 -04:00
|
|
|
Len += WriteVarUInt(Data + Len, (unsigned int)ChannelId);
|
2023-05-19 21:50:03 -04:00
|
|
|
Len += WriteString(Data + Len, Name);
|
2023-04-24 15:21:04 -04:00
|
|
|
|
2023-07-28 11:51:34 -04:00
|
|
|
CommandBufferWriter.AdvanceWritePosition(Len);
|
2023-04-24 15:21:04 -04:00
|
|
|
}
|
|
|
|
|
|
2023-05-19 21:50:03 -04:00
|
|
|
void FWorkerComputeSocket::RunServer(FComputeBufferReader& CommandBufferReader, FComputeSocket& Socket)
|
|
|
|
|
{
|
|
|
|
|
const unsigned char* Message;
|
|
|
|
|
while ((Message = CommandBufferReader.WaitToRead(1)) != nullptr)
|
|
|
|
|
{
|
|
|
|
|
size_t Len = 0;
|
|
|
|
|
|
|
|
|
|
unsigned int Type;
|
|
|
|
|
Len += ReadVarUInt(Message + Len, &Type);
|
|
|
|
|
|
|
|
|
|
EMessageType MessageType = (EMessageType)*Message;
|
|
|
|
|
switch (MessageType)
|
|
|
|
|
{
|
|
|
|
|
case EMessageType::AttachSendBuffer:
|
|
|
|
|
{
|
2023-07-22 14:17:04 -04:00
|
|
|
unsigned int ChannelId;
|
|
|
|
|
Len += ReadVarUInt(Message + Len, &ChannelId);
|
|
|
|
|
|
2023-07-27 11:14:50 -04:00
|
|
|
char Name[FComputeBuffer::MaxNameLength];
|
2023-07-22 14:17:04 -04:00
|
|
|
Len += ReadString(Message + Len, Name, FComputeBuffer::MaxNameLength);
|
|
|
|
|
|
|
|
|
|
FComputeBuffer Buffer;
|
|
|
|
|
if (Buffer.OpenExisting(Name))
|
|
|
|
|
{
|
2023-07-28 11:51:34 -04:00
|
|
|
Socket.AttachSendBuffer(ChannelId, Buffer);
|
2023-07-22 14:17:04 -04:00
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
2023-11-22 14:09:38 -05:00
|
|
|
check(false);
|
2023-07-22 14:17:04 -04:00
|
|
|
}
|
2023-05-19 21:50:03 -04:00
|
|
|
}
|
2023-07-22 14:17:04 -04:00
|
|
|
break;
|
2023-05-19 21:50:03 -04:00
|
|
|
case EMessageType::AttachRecvBuffer:
|
|
|
|
|
{
|
2023-07-22 14:17:04 -04:00
|
|
|
unsigned int ChannelId;
|
|
|
|
|
Len += ReadVarUInt(Message + Len, &ChannelId);
|
|
|
|
|
|
2023-07-27 11:14:50 -04:00
|
|
|
char Name[FComputeBuffer::MaxNameLength];
|
2023-07-22 14:17:04 -04:00
|
|
|
Len += ReadString(Message + Len, Name, FComputeBuffer::MaxNameLength);
|
|
|
|
|
|
|
|
|
|
FComputeBuffer Buffer;
|
|
|
|
|
if (Buffer.OpenExisting(Name))
|
|
|
|
|
{
|
2023-07-28 11:51:34 -04:00
|
|
|
Socket.AttachRecvBuffer(ChannelId, Buffer);
|
2023-07-22 14:17:04 -04:00
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
2023-11-22 14:09:38 -05:00
|
|
|
check(false);
|
2023-07-22 14:17:04 -04:00
|
|
|
}
|
2023-05-19 21:50:03 -04:00
|
|
|
}
|
2023-07-22 14:17:04 -04:00
|
|
|
break;
|
2023-05-19 21:50:03 -04:00
|
|
|
default:
|
2023-11-22 14:09:38 -05:00
|
|
|
check(false);
|
2023-05-19 21:50:03 -04:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CommandBufferReader.AdvanceReadPosition(Len);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t FWorkerComputeSocket::ReadVarUInt(const unsigned char* Pos, unsigned int* OutValue)
|
|
|
|
|
{
|
2023-11-24 09:24:22 -05:00
|
|
|
size_t ByteCount = FHordePlatform::CountLeadingZeros((unsigned char)(~*static_cast<const unsigned char*>(Pos))) - 23;
|
2023-05-19 21:50:03 -04:00
|
|
|
|
|
|
|
|
unsigned int Value = *Pos++ & (unsigned char)(0xff >> ByteCount);
|
|
|
|
|
switch (ByteCount - 1)
|
|
|
|
|
{
|
|
|
|
|
case 8: Value <<= 8; Value |= *Pos++;
|
|
|
|
|
case 7: Value <<= 8; Value |= *Pos++;
|
|
|
|
|
case 6: Value <<= 8; Value |= *Pos++;
|
|
|
|
|
case 5: Value <<= 8; Value |= *Pos++;
|
|
|
|
|
case 4: Value <<= 8; Value |= *Pos++;
|
|
|
|
|
case 3: Value <<= 8; Value |= *Pos++;
|
|
|
|
|
case 2: Value <<= 8; Value |= *Pos++;
|
|
|
|
|
case 1: Value <<= 8; Value |= *Pos++;
|
|
|
|
|
default:
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
*OutValue = Value;
|
|
|
|
|
return ByteCount;
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-27 11:14:50 -04:00
|
|
|
size_t FWorkerComputeSocket::ReadString(const unsigned char* Pos, char* OutText, size_t OutTextMaxLen)
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
|
|
|
|
unsigned int TextLen;
|
|
|
|
|
|
2023-07-27 11:14:50 -04:00
|
|
|
size_t Len = ReadVarUInt(Pos, &TextLen);
|
2023-11-24 09:24:22 -05:00
|
|
|
FCStringAnsi::Strcpy(OutText, OutTextMaxLen, (const char*)Pos + Len);
|
2023-05-19 21:50:03 -04:00
|
|
|
|
|
|
|
|
return Len + TextLen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t FWorkerComputeSocket::WriteVarUInt(unsigned char* Pos, unsigned int Value)
|
2023-04-24 15:21:04 -04:00
|
|
|
{
|
|
|
|
|
// Use BSR to return the log2 of the integer
|
|
|
|
|
// return 0 if value is 0
|
2023-11-24 09:24:22 -05:00
|
|
|
unsigned int ByteCount = (unsigned int)(int(FHordePlatform::FloorLog2(Value)) / 7 + 1);
|
2023-04-24 15:21:04 -04:00
|
|
|
|
|
|
|
|
unsigned char* OutBytes = Pos + ByteCount - 1;
|
|
|
|
|
switch (ByteCount - 1)
|
|
|
|
|
{
|
2023-07-27 11:14:50 -04:00
|
|
|
case 4: *OutBytes-- = (unsigned char)(Value); Value >>= 8; [[fallthrough]];
|
|
|
|
|
case 3: *OutBytes-- = (unsigned char)(Value); Value >>= 8; [[fallthrough]];
|
|
|
|
|
case 2: *OutBytes-- = (unsigned char)(Value); Value >>= 8; [[fallthrough]];
|
|
|
|
|
case 1: *OutBytes-- = (unsigned char)(Value); Value >>= 8; [[fallthrough]];
|
|
|
|
|
default:
|
|
|
|
|
break;
|
2023-04-24 15:21:04 -04:00
|
|
|
}
|
|
|
|
|
*OutBytes = (unsigned char)(0xff << (9 - ByteCount)) | (unsigned char)(Value);
|
|
|
|
|
|
|
|
|
|
return ByteCount;
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-27 11:14:50 -04:00
|
|
|
size_t FWorkerComputeSocket::WriteString(unsigned char* Pos, const char* Text)
|
2023-04-24 15:21:04 -04:00
|
|
|
{
|
2023-07-27 11:14:50 -04:00
|
|
|
size_t TextLen = strlen(Text);
|
2023-05-19 21:50:03 -04:00
|
|
|
|
2023-07-27 11:14:50 -04:00
|
|
|
size_t Len = WriteVarUInt(Pos, (int)TextLen);
|
|
|
|
|
memcpy((char*)Pos + Len, Text, TextLen);
|
2023-07-22 14:17:04 -04:00
|
|
|
|
2023-07-27 11:14:50 -04:00
|
|
|
return Len + TextLen;
|
2023-05-19 21:50:03 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//////////////////////////////////////////////////////
|
|
|
|
|
|
2023-07-22 14:17:04 -04:00
|
|
|
class FRemoteComputeSocket : public FComputeSocket
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
2023-07-22 14:17:04 -04:00
|
|
|
public:
|
|
|
|
|
enum class EControlMessageType
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
2023-07-22 14:17:04 -04:00
|
|
|
Detach = -2,
|
2023-05-19 21:50:03 -04:00
|
|
|
};
|
|
|
|
|
|
2023-07-22 14:17:04 -04:00
|
|
|
struct FFrameHeader
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
2023-07-22 14:17:04 -04:00
|
|
|
int Channel;
|
|
|
|
|
int Size;
|
2023-05-19 21:50:03 -04:00
|
|
|
};
|
|
|
|
|
|
2023-11-22 14:09:38 -05:00
|
|
|
TUniquePtr<FComputeTransport> Transport;
|
2023-07-22 14:17:04 -04:00
|
|
|
const EComputeSocketEndpoint Endpoint;
|
|
|
|
|
std::mutex CriticalSection;
|
2023-05-19 21:50:03 -04:00
|
|
|
|
2023-09-07 13:34:00 -04:00
|
|
|
bool bPingThreadFinish;
|
|
|
|
|
std::mutex PingThreadFinishMutex;
|
|
|
|
|
std::condition_variable PingThreadFinishCV;
|
|
|
|
|
std::thread PingThread;
|
|
|
|
|
|
2023-07-22 14:17:04 -04:00
|
|
|
std::thread RecvThread;
|
2023-05-19 21:50:03 -04:00
|
|
|
|
2023-07-22 14:17:04 -04:00
|
|
|
std::unordered_map<int, FComputeBufferWriter> Writers;
|
|
|
|
|
std::vector<FComputeBufferReader> Readers;
|
|
|
|
|
std::unordered_map<int, std::thread> SendThreads;
|
2023-05-19 21:50:03 -04:00
|
|
|
|
2023-11-22 14:09:38 -05:00
|
|
|
FRemoteComputeSocket(TUniquePtr<FComputeTransport> InTransport, EComputeSocketEndpoint InEndpoint)
|
|
|
|
|
: Transport(MoveTemp(InTransport))
|
2023-07-22 14:17:04 -04:00
|
|
|
, Endpoint(InEndpoint)
|
|
|
|
|
, CriticalSection()
|
2023-09-07 13:34:00 -04:00
|
|
|
, bPingThreadFinish(0)
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
2023-09-07 13:34:00 -04:00
|
|
|
~FRemoteComputeSocket() override
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
2023-09-07 13:34:00 -04:00
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> Lock(PingThreadFinishMutex);
|
|
|
|
|
bPingThreadFinish = 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
PingThreadFinishCV.notify_all();
|
|
|
|
|
|
2023-07-22 14:17:04 -04:00
|
|
|
for (FComputeBufferReader& Reader : Readers)
|
|
|
|
|
{
|
2023-07-24 17:25:42 -04:00
|
|
|
Reader.Detach();
|
2023-07-22 14:17:04 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (std::pair<const int, std::thread>& Pair : SendThreads)
|
|
|
|
|
{
|
|
|
|
|
Pair.second.join();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Transport->Close();
|
|
|
|
|
RecvThread.join();
|
2023-09-07 13:34:00 -04:00
|
|
|
PingThread.join();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
virtual void StartCommunication()
|
|
|
|
|
{
|
|
|
|
|
bPingThreadFinish = 0;
|
|
|
|
|
|
|
|
|
|
// Initialize the receiver thread after having attached channel 0
|
|
|
|
|
RecvThread = std::thread(&FRemoteComputeSocket::RecvThreadProc, this);
|
|
|
|
|
PingThread = std::thread(&FRemoteComputeSocket::PingThreadProc, this);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PingThreadProc()
|
|
|
|
|
{
|
|
|
|
|
for (;;)
|
|
|
|
|
{
|
|
|
|
|
{ // Send the ping message
|
|
|
|
|
std::lock_guard<std::mutex> Lock(CriticalSection);
|
|
|
|
|
|
|
|
|
|
FFrameHeader Header;
|
|
|
|
|
Header.Channel = 0;
|
|
|
|
|
Header.Size = -3; // Ping control message.
|
|
|
|
|
|
|
|
|
|
Transport->SendMessage(&Header, sizeof(Header));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::unique_lock<std::mutex> Lock(PingThreadFinishMutex);
|
|
|
|
|
if (PingThreadFinishCV.wait_for(Lock, std::chrono::seconds(2), [this]() { return bPingThreadFinish; }))
|
|
|
|
|
{
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-05-19 21:50:03 -04:00
|
|
|
}
|
|
|
|
|
|
2023-07-22 14:17:04 -04:00
|
|
|
void RecvThreadProc()
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
2023-07-22 14:17:04 -04:00
|
|
|
std::unordered_map<int, FComputeBufferWriter> CachedWriters;
|
2023-05-19 21:50:03 -04:00
|
|
|
|
|
|
|
|
// Process messages from the remote
|
|
|
|
|
FFrameHeader Header;
|
2023-07-22 14:17:04 -04:00
|
|
|
while (Transport->RecvMessage(&Header, sizeof(Header)))
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
|
|
|
|
if (Header.Size >= 0)
|
|
|
|
|
{
|
2023-07-22 14:17:04 -04:00
|
|
|
ReadFrame(CachedWriters, Header.Channel, Header.Size);
|
2023-05-19 21:50:03 -04:00
|
|
|
}
|
|
|
|
|
else if (Header.Size == (int)EControlMessageType::Detach)
|
|
|
|
|
{
|
2023-07-22 14:17:04 -04:00
|
|
|
DetachRecvBuffer(CachedWriters, Header.Channel);
|
2023-05-19 21:50:03 -04:00
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
2023-11-22 14:09:38 -05:00
|
|
|
check(false);
|
2023-05-19 21:50:03 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-22 14:17:04 -04:00
|
|
|
void SendThreadProc(int Channel, FComputeBufferReader Reader)
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
2023-07-22 14:17:04 -04:00
|
|
|
FFrameHeader Header;
|
|
|
|
|
Header.Channel = Channel;
|
|
|
|
|
|
|
|
|
|
const unsigned char* Data;
|
|
|
|
|
while ((Data = Reader.WaitToRead(1)) != nullptr)
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> Lock(CriticalSection);
|
|
|
|
|
Header.Size = (int)Reader.GetMaxReadSize();
|
|
|
|
|
Transport->SendMessage(&Header, sizeof(Header));
|
|
|
|
|
Transport->SendMessage(Data, Header.Size);
|
|
|
|
|
Reader.AdvanceReadPosition(Header.Size);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (Reader.IsComplete())
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> Lock(CriticalSection);
|
|
|
|
|
Header.Size = (int)EControlMessageType::Detach;
|
|
|
|
|
Transport->SendMessage(&Header, sizeof(Header));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool ReadFrame(std::unordered_map<int, FComputeBufferWriter>& CachedWriters, int Channel, int Size)
|
|
|
|
|
{
|
|
|
|
|
std::unordered_map<int, FComputeBufferWriter>::iterator Iter = CachedWriters.find(Channel);
|
2023-05-19 21:50:03 -04:00
|
|
|
if (Iter == CachedWriters.end())
|
|
|
|
|
{
|
2023-07-22 14:17:04 -04:00
|
|
|
std::lock_guard<std::mutex> Lock(CriticalSection);
|
2023-05-19 21:50:03 -04:00
|
|
|
|
|
|
|
|
Iter = Writers.find(Channel);
|
|
|
|
|
if (Iter == Writers.end())
|
|
|
|
|
{
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Iter = CachedWriters.insert(*Iter).first;
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-22 14:17:04 -04:00
|
|
|
FComputeBufferWriter& Writer = Iter->second;
|
2023-05-19 21:50:03 -04:00
|
|
|
|
2023-07-22 14:17:04 -04:00
|
|
|
unsigned char* Data = Writer.WaitToWrite(Size);
|
|
|
|
|
if (!Transport->RecvMessage(Data, Size))
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-22 14:17:04 -04:00
|
|
|
Writer.AdvanceWritePosition(Size);
|
2023-05-19 21:50:03 -04:00
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
2023-07-28 11:51:34 -04:00
|
|
|
void AttachRecvBuffer(int ChannelId, FComputeBuffer RecvBuffer) override
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
2023-07-22 14:17:04 -04:00
|
|
|
std::lock_guard<std::mutex> Lock(CriticalSection);
|
2023-07-28 11:51:34 -04:00
|
|
|
FComputeBufferWriter Writer = RecvBuffer.CreateWriter();
|
2023-07-22 14:17:04 -04:00
|
|
|
Writers.insert(std::pair<int, FComputeBufferWriter>(ChannelId, std::move(Writer)));
|
2023-05-19 21:50:03 -04:00
|
|
|
}
|
|
|
|
|
|
2023-07-28 11:51:34 -04:00
|
|
|
void AttachSendBuffer(int ChannelId, FComputeBuffer SendBuffer) override
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
2023-07-22 14:17:04 -04:00
|
|
|
std::lock_guard<std::mutex> Lock(CriticalSection);
|
2023-07-28 11:51:34 -04:00
|
|
|
FComputeBufferReader Reader = SendBuffer.CreateReader();
|
2023-07-22 14:17:04 -04:00
|
|
|
Readers.push_back(Reader);
|
|
|
|
|
SendThreads.insert(std::make_pair(ChannelId, std::thread(&FRemoteComputeSocket::SendThreadProc, this, ChannelId, std::move(Reader))));
|
2023-05-19 21:50:03 -04:00
|
|
|
}
|
|
|
|
|
|
2023-07-22 14:17:04 -04:00
|
|
|
void DetachRecvBuffer(std::unordered_map<int, FComputeBufferWriter>& CachedWriters, int Channel)
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
|
|
|
|
CachedWriters.erase(Channel);
|
|
|
|
|
|
2023-07-22 14:17:04 -04:00
|
|
|
std::lock_guard<std::mutex> Lock(CriticalSection);
|
2023-05-19 21:50:03 -04:00
|
|
|
|
2023-07-22 14:17:04 -04:00
|
|
|
std::unordered_map<int, FComputeBufferWriter>::iterator Iter = Writers.find(Channel);
|
2023-05-19 21:50:03 -04:00
|
|
|
if (Iter != Writers.end())
|
|
|
|
|
{
|
2023-07-22 14:17:04 -04:00
|
|
|
Iter->second.MarkComplete();
|
2023-05-19 21:50:03 -04:00
|
|
|
Writers.erase(Iter);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2023-11-22 14:09:38 -05:00
|
|
|
TUniquePtr<FComputeSocket> CreateComputeSocket(TUniquePtr<FComputeTransport> Transport, EComputeSocketEndpoint Endpoint)
|
2023-05-19 21:50:03 -04:00
|
|
|
{
|
2023-11-22 14:09:38 -05:00
|
|
|
return TUniquePtr<FComputeSocket>(new FRemoteComputeSocket(MoveTemp(Transport), Endpoint));
|
2023-05-19 21:50:03 -04:00
|
|
|
}
|