Files
UnrealEngineUWP/Engine/Source/Runtime/StorageServerClient/Private/StorageServerConnection.cpp
brian white cebe325639 [StorageServer] Change fatal error to log when connecting to potential storage server addresses. This allows connection attempts to continue, iterating through the specified list of host addresses
#jira UE-127311
#rb CarlMagnus.Nordin Per.Larsson
#preflight 61571465998a2100011bdb8b

#ROBOMERGE-AUTHOR: brian.white
#ROBOMERGE-SOURCE: CL 17692405 in //UE5/Main/...
#ROBOMERGE-BOT: STARSHIP (Main -> Release-Engine-Test) (v875-17642767)
#ROBOMERGE[STARSHIP]: UE5-Release-Engine-Staging Release-5.0

[CL 17692419 by brian white in ue5-release-engine-test branch]
2021-10-01 11:35:46 -04:00

559 lines
15 KiB
C++

// Copyright Epic Games, Inc. All Rights Reserved.
#include "StorageServerConnection.h"
#include "IPAddress.h"
#include "SocketSubsystem.h"
#include "Sockets.h"
#include "Misc/App.h"
#include "Misc/Paths.h"
#include "IO/IoDispatcher.h"
#include "Misc/StringBuilder.h"
#include "Misc/ScopeLock.h"
#include "Serialization/CompactBinary.h"
#include "Serialization/CompactBinarySerialization.h"
#if !UE_BUILD_SHIPPING
DEFINE_LOG_CATEGORY_STATIC(LogStorageServerConnection, Log, All);
static TArray<TSharedPtr<FInternetAddr>> GetAddressFromString(ISocketSubsystem& SocketSubsystem, TArrayView<const FString> HostAddresses, const int32 Port)
{
TArray<TSharedPtr<FInternetAddr>> InterntAddresses;
for (const FString& HostAddr : HostAddresses)
{
TSharedPtr<FInternetAddr> Addr = SocketSubsystem.GetAddressFromString(HostAddr);
if (!Addr.IsValid() || !Addr->IsValid())
{
FAddressInfoResult GAIRequest = SocketSubsystem.GetAddressInfo(*HostAddr, nullptr, EAddressInfoFlags::Default, NAME_None);
if (GAIRequest.ReturnCode == SE_NO_ERROR && GAIRequest.Results.Num() > 0)
{
Addr = GAIRequest.Results[0].Address;
}
}
if (Addr.IsValid() && Addr->IsValid())
{
Addr->SetPort(Port);
InterntAddresses.Emplace(MoveTemp(Addr));
}
}
return InterntAddresses;
}
FStorageServerRequest::FStorageServerRequest(FAnsiStringView Verb, FAnsiStringView Resource, FAnsiStringView Hostname)
{
SetIsSaving(true);
HeaderBuffer << Verb << " " << Resource << " HTTP/1.1\r\n"
<< "Host: " << Hostname << "\r\n"
<< "Connection: Keep-Alive\r\n";
}
FSocket* FStorageServerRequest::Send(FStorageServerConnection& Owner)
{
if (BodyBuffer.Num())
{
HeaderBuffer.Append("Content-Length: ").Appendf("%d\r\n", BodyBuffer.Num());
}
HeaderBuffer << "\r\n";
int32 BytesLeft = HeaderBuffer.Len();
auto Send = [](FSocket* Socket, const uint8* Data, int32 Length)
{
int32 BytesLeft = Length;
while (BytesLeft > 0)
{
int32 BytesSent;
if (!Socket->Send(Data, BytesLeft, BytesSent))
{
return false;
}
check(BytesSent >= 0);
BytesLeft -= BytesSent;
Data += BytesSent;
}
return true;
};
int32 Attempts = 0;
while (Attempts++ < 10)
{
FSocket* Socket = Owner.AcquireSocket();
if (!Socket)
{
break;
}
if (Send(Socket, reinterpret_cast<const uint8*>(HeaderBuffer.GetData()), HeaderBuffer.Len()) &&
Send(Socket, BodyBuffer.GetData(), BodyBuffer.Num()))
{
return Socket;
}
UE_LOG(LogStorageServerConnection, Warning, TEXT("Failed to send request to storage server. Retrying..."));
Owner.ReleaseSocket(Socket, false);
}
return nullptr;
}
void FStorageServerRequest::Serialize(void* V, int64 Length)
{
int32 Index = BodyBuffer.AddUninitialized(Length);
uint8* Dest = BodyBuffer.GetData() + Index;
FMemory::Memcpy(Dest, V, Length);
}
FStorageServerResponse::FStorageServerResponse(FStorageServerConnection& InOwner, FSocket& InSocket)
: Owner(InOwner)
, Socket(&InSocket)
{
SetIsLoading(true);
uint8 Buffer[1024];
int32 TotalReadFromSocket = 0;
auto ReadResponseLine = [&Buffer, &InSocket, &TotalReadFromSocket]() -> FAnsiStringView
{
for (;;)
{
int32 BytesRead;
InSocket.Recv(Buffer, 1024, BytesRead, ESocketReceiveFlags::Peek);
FAnsiStringView ResponseView(reinterpret_cast<const ANSICHAR*>(Buffer), BytesRead);
int32 LineEndIndex;
if (ResponseView.FindChar('\r', LineEndIndex) && BytesRead >= LineEndIndex + 2)
{
check(ResponseView[LineEndIndex + 1] == '\n');
InSocket.Recv(Buffer, LineEndIndex + 2, BytesRead, ESocketReceiveFlags::None);
check(BytesRead == LineEndIndex + 2);
TotalReadFromSocket += BytesRead;
return ResponseView.Left(LineEndIndex);
}
}
};
FAnsiStringView ResponseLine = ReadResponseLine();
if (ResponseLine == "HTTP/1.1 200 OK")
{
bIsOk = true;
}
else if (ResponseLine.StartsWith("HTTP/1.1 "))
{
ErrorCode = TCString<ANSICHAR>::Atoi64(ResponseLine.GetData() + 9);
}
while (!ResponseLine.IsEmpty())
{
ResponseLine = ReadResponseLine();
if (ResponseLine.StartsWith("Content-Length: "))
{
ContentLength = TCString<ANSICHAR>::Atoi64(ResponseLine.GetData() + 16);
}
}
if (!bIsOk && ContentLength)
{
TArray<uint8> ErrorBuffer;
ErrorBuffer.SetNumUninitialized(ContentLength);
int32 BytesRead;
InSocket.Recv(ErrorBuffer.GetData(), ContentLength, BytesRead, ESocketReceiveFlags::WaitAll);
ErrorMessage = FString(ContentLength, ANSI_TO_TCHAR(reinterpret_cast<ANSICHAR*>(ErrorBuffer.GetData())));
ContentLength = 0;
}
if (ContentLength == 0)
{
ReleaseSocket(true);
}
}
FStorageServerChunkBatchRequest::FStorageServerChunkBatchRequest(FStorageServerConnection& InOwner, FAnsiStringView Resource, FAnsiStringView Hostname)
: FStorageServerRequest("POST", Resource, Hostname)
, Owner(InOwner)
{
uint32 Magic = 0xAAAA'77AC;
uint32 ChunkCountPlaceHolder = 0;
uint32 Reserved1 = 0;
uint32 Reserved2 = 0;
*this << Magic;
ChunkCountOffset = BodyBuffer.Num();
*this << ChunkCountPlaceHolder << Reserved1 << Reserved2;
}
FStorageServerChunkBatchRequest& FStorageServerChunkBatchRequest::AddChunk(const FIoChunkId& ChunkId, int64 Offset, int64 Size)
{
uint32* ChunkCount = reinterpret_cast<uint32*>(BodyBuffer.GetData() + ChunkCountOffset);
*this << const_cast<FIoChunkId&>(ChunkId) << *ChunkCount << Offset << Size;
++(*ChunkCount);
return *this;
}
bool FStorageServerChunkBatchRequest::Issue(TFunctionRef<void(uint32 ChunkCount, uint32* ChunkIndices, uint64* ChunkSizes, FStorageServerResponse& ChunkDataStream)> OnResponse)
{
FSocket* Socket = Send(Owner);
if (!Socket)
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Failed to send chunk batch request to storage server."));
return false;
}
FStorageServerResponse Response(Owner, *Socket);
if (!Response.IsOk())
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Failed to read chunk batch from storage server. '%s'"), *Response.GetErrorMessage());
return false;
}
uint32 Magic;
uint32 ChunkCount;
uint32 Reserved1;
uint32 Reserved2;
Response << Magic;
if (Magic != 0xbada'b00f)
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Invalid magic in chunk batch response from storage server."));
return false;
}
Response << ChunkCount;
Response << Reserved1;
Response << Reserved2;
TArray<uint32, TInlineAllocator<64>> ChunkIndices;
ChunkIndices.Reserve(ChunkCount);
TArray<uint64, TInlineAllocator<64>> ChunkSizes;
ChunkSizes.Reserve(ChunkCount);
for (uint32 Index = 0; Index < ChunkCount; ++Index)
{
uint32 ChunkIndex;
uint32 Flags;
int64 ChunkSize;
Response << ChunkIndex;
Response << Flags;
Response << ChunkSize;
ChunkIndices.Add(ChunkIndex);
ChunkSizes.Emplace(ChunkSize);
}
OnResponse(ChunkCount, ChunkIndices.GetData(), ChunkSizes.GetData(), Response);
return true;
}
void FStorageServerResponse::ReleaseSocket(bool bKeepAlive)
{
Owner.ReleaseSocket(Socket, bKeepAlive);
Socket = nullptr;
}
void FStorageServerResponse::Serialize(void* V, int64 Length)
{
if (Length == 0)
{
return;
}
if (!Socket)
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Trying to read %lld bytes from released socket"), Length);
return;
}
if (Position + Length > ContentLength)
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Trying to read %lld bytes from socket with only %lld available"), Length, ContentLength - Position);
return;
}
uint64 RemainingBytesToRead = Length;
uint8* Destination = reinterpret_cast<uint8*>(V);
while (RemainingBytesToRead)
{
uint64 BytesToRead32 = FMath::Min(RemainingBytesToRead, static_cast<uint64>(INT32_MAX));
int32 BytesRead;
if (!Socket->Recv(Destination, static_cast<int32>(BytesToRead32), BytesRead, ESocketReceiveFlags::WaitAll))
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Failed reading %d bytes from socket"), BytesToRead32);
return;
}
RemainingBytesToRead -= BytesRead;
Destination += BytesRead;
Position += BytesRead;
}
if (Position == ContentLength)
{
ReleaseSocket(true);
}
}
FStorageServerConnection::FStorageServerConnection()
: SocketSubsystem(*ISocketSubsystem::Get())
{
}
FStorageServerConnection::~FStorageServerConnection()
{
for (FSocket* Socket : SocketPool)
{
Socket->Close();
delete Socket;
}
}
bool FStorageServerConnection::Initialize(TArrayView<const FString> InHostAddresses, int32 InPort, const TCHAR* InProjectNameOverride, const TCHAR* InPlatformNameOverride)
{
TArray<TSharedPtr<FInternetAddr>> HostAddresses = GetAddressFromString(SocketSubsystem, InHostAddresses, InPort);
if (!HostAddresses.Num())
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("No valid Zen store host address specified"));
return false;
}
OplogPath.Append("/prj/");
if (InProjectNameOverride)
{
OplogPath.Append(TCHAR_TO_ANSI(InProjectNameOverride));
}
else
{
OplogPath.Append(TCHAR_TO_ANSI(FApp::GetProjectName()));
}
OplogPath.Append("/oplog/");
if (InPlatformNameOverride)
{
OplogPath.Append(TCHAR_TO_ANSI(InPlatformNameOverride));
}
else
{
OplogPath.Append(FPlatformProperties::PlatformName());
}
const int32 ServerVersion = HandshakeRequest(HostAddresses);
if (ServerVersion != 1)
{
return false;
}
UE_LOG(LogStorageServerConnection, Display, TEXT("Connected to Zen storage server at '%s'"), *ServerAddr->ToString(true));
return true;
}
int32 FStorageServerConnection::HandshakeRequest(TArrayView<const TSharedPtr<FInternetAddr>> HostAddresses)
{
TAnsiStringBuilder<256> ResourceBuilder;
ResourceBuilder.Append(OplogPath);
for (const TSharedPtr<FInternetAddr>& Addr : HostAddresses)
{
Hostname.Reset();
Hostname.Append(TCHAR_TO_ANSI(*Addr->ToString(false)));
ServerAddr = Addr;
UE_LOG(LogStorageServerConnection, Display, TEXT("Trying to handshake with Zen at '%s'"), *Addr->ToString(true));
FStorageServerRequest Request("GET", *ResourceBuilder, Hostname);
if (FSocket* Socket = Request.Send(*this))
{
FStorageServerResponse Response(*this, *Socket);
if (Response.IsOk())
{
FCbObject ResponseObj = Response.GetResponseObject();
// we currently don't have any concept of protocol versioning, if
// we succeed in communicating with the endpoint we're good since
// any breaking API change would need to be done in a backward
// compatible manner
return 1;
}
else
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Failed to handshake with Zen at %s. '%s'"), *ServerAddr->ToString(true), *Response.GetErrorMessage());
}
}
}
Hostname.Reset();
ServerAddr.Reset();
return -1;
}
void FStorageServerConnection::FileManifestRequest(TFunctionRef<void(FIoChunkId Id, FStringView Path)> Callback)
{
TAnsiStringBuilder<256> ResourceBuilder;
ResourceBuilder.Append(OplogPath).Append("/files?filter=client");
FStorageServerRequest Request("GET", *ResourceBuilder, Hostname);
FSocket* Socket = Request.Send(*this);
if (!Socket)
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Failed to send file manifest request to storage server at %s."), *ServerAddr->ToString(true));
return;
}
FStorageServerResponse Response(*this, *Socket);
if (Response.IsOk())
{
FCbObject ResponseObj = Response.GetResponseObject();
for (FCbField& FileArrayEntry : ResponseObj["files"].AsArray())
{
FCbObject Entry = FileArrayEntry.AsObject();
FCbObjectId Id = Entry["id"].AsObjectId();
TStringBuilder<128> WidePath;
WidePath.Append(FUTF8ToTCHAR(Entry["clientpath"].AsString()));
FIoChunkId ChunkId;
ChunkId.Set(Id.GetView());
Callback(ChunkId, WidePath);
}
}
else
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Failed to read file manifest from storage server at %s. '%s'"), *ServerAddr->ToString(true), *Response.GetErrorMessage());
}
}
int64 FStorageServerConnection::ChunkSizeRequest(const FIoChunkId& ChunkId)
{
TAnsiStringBuilder<256> ResourceBuilder;
ResourceBuilder.Append(OplogPath);
ResourceBuilder << "/" << ChunkId << "/info";
FStorageServerRequest Request("GET", *ResourceBuilder, Hostname);
FSocket* Socket = Request.Send(*this);
if (!Socket)
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Failed to send chunk size request to storage server at %s."), *ServerAddr->ToString(true));
return -1;
}
FStorageServerResponse Response(*this, *Socket);
if (Response.IsOk())
{
FCbObject ResponseObj = Response.GetResponseObject();
const int64 ChunkSize = ResponseObj["size"].AsInt64(0);
return ChunkSize;
}
else if (Response.GetErrorCode() == 404)
{
return -1;
}
else
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Failed to get chunk size from storage server at %s. '%s'"), *ServerAddr->ToString(true), *Response.GetErrorMessage());
}
return -1;
}
bool FStorageServerConnection::ReadChunkRequest(const FIoChunkId& ChunkId, uint64 Offset, uint64 Size, TFunctionRef<void(FStorageServerResponse&)> OnResponse)
{
TAnsiStringBuilder<256> ResourceBuilder;
ResourceBuilder.Append(OplogPath) << "/" << ChunkId;
bool HaveQuery = false;
auto AppendQueryDelimiter = [&]
{
if (HaveQuery)
{
ResourceBuilder.Append("&"_ASV);
}
else
{
ResourceBuilder.Append("?"_ASV);
HaveQuery = true;
}
};
if (Offset)
{
AppendQueryDelimiter();
ResourceBuilder.Appendf("offset=%" UINT64_FMT, Offset);
}
if (Size != ~uint64(0))
{
AppendQueryDelimiter();
ResourceBuilder.Appendf("size=%" UINT64_FMT, Size);
}
FStorageServerRequest Request("GET", *ResourceBuilder, Hostname);
FSocket* Socket = Request.Send(*this);
if (!Socket)
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Failed to send chunk read request to storage server at %s."), *ServerAddr->ToString(true));
return false;
}
FStorageServerResponse Response(*this, *Socket);
if (Response.IsOk())
{
OnResponse(Response);
return true;
}
else if (Response.GetErrorCode() == 404)
{
return false;
}
else
{
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Failed to read chunk from storage server at %s. '%s'"), *ServerAddr->ToString(true), *Response.GetErrorMessage());
return false;
}
}
FStorageServerChunkBatchRequest FStorageServerConnection::NewChunkBatchRequest()
{
TAnsiStringBuilder<256> ResourceBuilder;
ResourceBuilder.Append(OplogPath).Append("/batch");
return FStorageServerChunkBatchRequest(*this, *ResourceBuilder, Hostname);
}
FSocket* FStorageServerConnection::AcquireSocket()
{
{
FScopeLock Lock(&SocketPoolCritical);
if (!SocketPool.IsEmpty())
{
return SocketPool.Pop(false);
}
}
for (int32 Attempt = 0, MaxAttempts = 10; Attempt < MaxAttempts; Attempt++)
{
FSocket* Socket = SocketSubsystem.CreateSocket(NAME_Stream, TEXT("StorageServer"), ServerAddr->GetProtocolType());
check(Socket);
if (Socket->Connect(*ServerAddr))
{
return Socket;
}
delete Socket;
}
UE_LOG(LogStorageServerConnection, Log, TEXT("Failed to connect to storage server at %s."), *ServerAddr->ToString(true));
return nullptr;
}
FString FStorageServerConnection::GetHostAddr() const
{
return ServerAddr.IsValid() ? ServerAddr->ToString(false) : FString();
}
void FStorageServerConnection::ReleaseSocket(FSocket* Socket, bool bKeepAlive)
{
if (bKeepAlive)
{
uint32 PendingDataSize;
if (!Socket->HasPendingData(PendingDataSize))
{
FScopeLock Lock(&SocketPoolCritical);
SocketPool.Push(Socket);
return;
}
UE_LOG(LogStorageServerConnection, Fatal, TEXT("Socket was not fully drained"));
}
Socket->Close();
delete Socket;
}
#endif