Files
UnrealEngineUWP/Engine/Source/Developer/DerivedDataCache/Private/ZenDerivedDataBackend.cpp
zousar shaker c27625b056 Integrating //UE5/Dev-Cooker [at] 16678003 to //UE5/Main (Zousar.Shaker-YEG-0943-Quaternary)
Non-DevIteration_ShooterGame


#ROBOMERGE-SOURCE: CL 16678907 in //UE5/Main/...
#ROBOMERGE-BOT: STARSHIP (Main -> Release-Engine-Test) (v835-16672529)

[CL 16678928 by zousar shaker in ue5-release-engine-test branch]
2021-06-15 16:38:03 -04:00

1795 lines
52 KiB
C++

// Copyright Epic Games, Inc. All Rights Reserved.
#include "ZenDerivedDataBackend.h"
#if WITH_ZEN_DDC_BACKEND
#if PLATFORM_WINDOWS || PLATFORM_HOLOLENS
# include "Windows/WindowsHWrapper.h"
# include "Windows/AllowWindowsPlatformTypes.h"
#endif
#include "curl/curl.h"
#if PLATFORM_WINDOWS || PLATFORM_HOLOLENS
# include "Windows/HideWindowsPlatformTypes.h"
#endif
#include "Algo/Accumulate.h"
#include "Containers/StaticArray.h"
#include "Containers/Ticker.h"
#include "DerivedDataCacheRecord.h"
#include "Dom/JsonObject.h"
#include "GenericPlatform/GenericPlatformFile.h"
#include "HAL/PlatformFileManager.h"
#include "Misc/ConfigCacheIni.h"
#include "Misc/FileHelper.h"
#include "Misc/ScopeLock.h"
#include "Misc/SecureHash.h"
#include "ProfilingDebugging/CpuProfilerTrace.h"
#include "ProfilingDebugging/CountersTrace.h"
#include "Serialization/CompactBinary.h"
#include "Serialization/CompactBinaryPackage.h"
#include "Serialization/CompactBinaryValidation.h"
#include "Serialization/CompactBinaryWriter.h"
#include "Serialization/JsonReader.h"
#include "Serialization/JsonSerializer.h"
#include "Serialization/MemoryReader.h"
#include "ZenBackendUtils.h"
#define UE_ZENDDC_BACKEND_WAIT_INTERVAL 0.01f
#define UE_ZENDDC_HTTP_REQUEST_TIMEOUT_SECONDS 30L
#define UE_ZENDDC_HTTP_REQUEST_TIMOUT_ENABLED 1
#define UE_ZENDDC_HTTP_DEBUG 0
#define UE_ZENDDC_REQUEST_POOL_SIZE 16
#define UE_ZENDDC_MAX_FAILED_LOGIN_ATTEMPTS 16
#define UE_ZENDDC_MAX_ATTEMPTS 4
TRACE_DECLARE_INT_COUNTER(ZenDDC_Exist, TEXT("ZenDDC Exist"));
TRACE_DECLARE_INT_COUNTER(ZenDDC_ExistHit, TEXT("ZenDDC Exist Hit"));
TRACE_DECLARE_INT_COUNTER(ZenDDC_Get, TEXT("ZenDDC Get"));
TRACE_DECLARE_INT_COUNTER(ZenDDC_GetHit, TEXT("ZenDDC Get Hit"));
TRACE_DECLARE_INT_COUNTER(ZenDDC_Put, TEXT("ZenDDC Put"));
TRACE_DECLARE_INT_COUNTER(ZenDDC_PutHit, TEXT("ZenDDC Put Hit"));
TRACE_DECLARE_INT_COUNTER(ZenDDC_BytesReceived, TEXT("ZenDDC Bytes Received"));
TRACE_DECLARE_INT_COUNTER(ZenDDC_BytesSent, TEXT("ZenDDC Bytes Sent"));
namespace zen {
static bool IsSuccessCode(int64 ResponseCode)
{
return 200 <= ResponseCode && ResponseCode < 300;
}
enum class EContentType
{
Binary,
CompactBinary,
CompactBinaryPackage
};
/**
* Encapsulation for access token shared by all requests.
*/
/**
* Minimal HTTP request type wrapping CURL without the need for managers. This request
* is written to allow reuse of request objects, in order to allow connections to be reused.
*
* CURL has a global library initialization (curl_global_init). We rely on this happening in
* the Online/HTTP library which is a dependency on this module.
*/
class FRequest
{
public:
/**
* Supported request verbs
*/
enum RequestVerb
{
Get,
Put,
Post,
PostJson,
Delete,
Head
};
/**
* Convenience result type interpreted from HTTP response code.
*/
enum Result
{
Success,
Failed,
FailedTimeout
};
FRequest(const TCHAR* InDomain, bool bInLogErrors)
: bLogErrors(bInLogErrors)
, Domain(InDomain)
{
Curl = curl_easy_init();
Reset();
}
~FRequest()
{
curl_easy_cleanup(Curl);
}
/**
* Resets all options on the request except those that should always be set.
*/
void Reset()
{
Headers.Reset();
ResponseHeader.Reset();
ResponseBuffer.Reset();
ResponseCode = 0;
bResponseFormatValid = true;
ReadDataView = nullptr;
WriteDataBufferPtr = nullptr;
WriteHeaderBufferPtr = nullptr;
BytesSent = 0;
BytesReceived = 0;
CurlResult = CURL_LAST;
curl_easy_reset(Curl);
// Options that are always set for all connections.
#if UE_ZENDDC_HTTP_REQUEST_TIMOUT_ENABLED
curl_easy_setopt(Curl, CURLOPT_CONNECTTIMEOUT, UE_ZENDDC_HTTP_REQUEST_TIMEOUT_SECONDS);
#endif
curl_easy_setopt(Curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(Curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(Curl, CURLOPT_BUFFERSIZE, 256 * 1024L);
//curl_easy_setopt(Curl, CURLOPT_UPLOAD_BUFFERSIZE, 256 * 1024L);
// Response functions
curl_easy_setopt(Curl, CURLOPT_HEADERDATA, this);
curl_easy_setopt(Curl, CURLOPT_HEADERFUNCTION, &FRequest::StaticWriteHeaderFn);
curl_easy_setopt(Curl, CURLOPT_WRITEDATA, this);
curl_easy_setopt(Curl, CURLOPT_WRITEFUNCTION, StaticWriteBodyFn);
// Rewind method, handle special error case where request need to rewind data stream
curl_easy_setopt(Curl, CURLOPT_SEEKFUNCTION, StaticSeekFn);
curl_easy_setopt(Curl, CURLOPT_SEEKDATA, this);
// Debug hooks
#if UE_ZENDDC_HTTP_DEBUG
curl_easy_setopt(Curl, CURLOPT_DEBUGDATA, this);
curl_easy_setopt(Curl, CURLOPT_DEBUGFUNCTION, StaticDebugCallback);
curl_easy_setopt(Curl, CURLOPT_VERBOSE, 1L);
#endif
}
/** Gets the domain name for this request */
const FString& GetDomain() const
{
return Domain;
}
/** Returns the HTTP response code.*/
const int64 GetResponseCode() const
{
return ResponseCode;
}
const bool GetResponseFormatValid() const
{
return bResponseFormatValid;
}
/** Returns the number of bytes received this request (headers withstanding). */
const size_t GetBytesReceived() const
{
return BytesReceived;
}
/** Returns the number of bytes sent during this request (headers withstanding). */
const size_t GetBytesSent() const
{
return BytesSent;
}
/**
* Upload buffer using the request, using either "Put" or "Post" verbs.
* @param Uri Url to use.
* @param Buffer Data to upload
* @return Result of the request
*/
template<RequestVerb V>
Result PerformBlockingUpload(const TCHAR* Uri, const FCompositeBuffer& Buffer, EContentType ContentType)
{
static_assert(V == Put || V == Post || V == PostJson, "Upload should use either Put or Post verbs.");
uint32 ContentLength = 0u;
if (V == Put)
{
ContentLength = Buffer.GetSize();
curl_easy_setopt(Curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(Curl, CURLOPT_INFILESIZE, ContentLength);
curl_easy_setopt(Curl, CURLOPT_READDATA, this);
curl_easy_setopt(Curl, CURLOPT_READFUNCTION, StaticReadFn);
switch (ContentType)
{
case EContentType::Binary:
Headers.Add(FString(TEXT("Content-Type: application/octet-stream")));
break;
case EContentType::CompactBinary:
Headers.Add(FString(TEXT("Content-Type: application/x-ue-cb")));
break;
case EContentType::CompactBinaryPackage:
Headers.Add(FString(TEXT("Content-Type: application/x-ue-cbpkg")));
break;
default:
checkNoEntry();
break;
}
ReadDataView = &Buffer;
}
else if (V == Post || V == PostJson)
{
ContentLength = Buffer.GetSize();
curl_easy_setopt(Curl, CURLOPT_POST, 1L);
curl_easy_setopt(Curl, CURLOPT_INFILESIZE, ContentLength);
curl_easy_setopt(Curl, CURLOPT_READDATA, this);
curl_easy_setopt(Curl, CURLOPT_READFUNCTION, StaticReadFn);
switch (ContentType)
{
case EContentType::Binary:
Headers.Add(V == Post ? FString(TEXT("Content-Type: application/x-www-form-urlencoded")) : FString(TEXT("Content-Type: application/json")));
break;
case EContentType::CompactBinary:
Headers.Add(FString(TEXT("Content-Type: application/x-ue-cb")));
break;
case EContentType::CompactBinaryPackage:
Headers.Add(FString(TEXT("Content-Type: application/x-ue-cbpkg")));
break;
default:
checkNoEntry();
break;
}
ReadDataView = &Buffer;
}
return PerformBlocking(Uri, V, ContentLength);
}
/**
* Download an url into a buffer using the request.
* @param Uri Url to use.
* @param Buffer Optional buffer where data should be downloaded to. If empty downloaded data will
* be stored in an internal buffer and accessed GetResponse* methods.
* @return Result of the request
*/
Result PerformBlockingDownload(const TCHAR* Uri, TArray<uint8>* Buffer)
{
curl_easy_setopt(Curl, CURLOPT_HTTPGET, 1L);
WriteDataBufferPtr = Buffer;
return PerformBlocking(Uri, Get, 0u);
}
Result PerformBlockingDownload(const TCHAR* Uri, FCbPackage& OutPackage)
{
curl_easy_setopt(Curl, CURLOPT_HTTPGET, 1L);
OutPackage.Reset();
// TODO: When PackageBytes can be written in segments directly, set the WritePtr to the OutPackage and use that
TArray<uint8> PackageBytes;
WriteDataBufferPtr = &PackageBytes;
Result LocalResult = PerformBlocking(Uri, Get, 0u);
if (IsSuccessCode(ResponseCode))
{
if (ValidateCompactBinaryPackage(FMemoryView(PackageBytes.GetData(), PackageBytes.Num()), ECbValidateMode::All) != ECbValidateError::None)
{
bResponseFormatValid = false;
}
else
{
FMemoryReader PackageLoader(PackageBytes);
bResponseFormatValid = OutPackage.TryLoad(PackageLoader);
}
}
return LocalResult;
}
/**
* Query an url using the request. Queries can use either "Head" or "Delete" verbs.
* @param Uri Url to use.
* @return Result of the request
*/
template<RequestVerb V>
Result PerformBlockingQuery(const TCHAR* Uri)
{
static_assert(V == Head || V == Delete, "Queries should use either Head or Delete verbs.");
if (V == Delete)
{
curl_easy_setopt(Curl, CURLOPT_POST, 1L);
curl_easy_setopt(Curl, CURLOPT_CUSTOMREQUEST, "DELETE");
}
else if (V == Head)
{
curl_easy_setopt(Curl, CURLOPT_NOBODY, 1L);
}
return PerformBlocking(Uri, V, 0u);
}
/**
* Set a header to send with the request.
*/
void SetHeader(const TCHAR* Header, const TCHAR* Value)
{
check(CurlResult == CURL_LAST); // Cannot set header after request is sent
Headers.Add(FString::Printf(TEXT("%s: %s"), Header, Value));
}
/**
* Attempts to find the header from the response. Returns false if header is not present.
*/
bool GetHeader(const ANSICHAR* Header, FString& OutValue) const
{
check(CurlResult != CURL_LAST); // Cannot query headers before request is sent
const ANSICHAR* HeadersBuffer = (const ANSICHAR*)ResponseHeader.GetData();
size_t HeaderLen = strlen(Header);
// Find the header key in the (ANSI) response buffer. If not found we can exist immediately
if (const ANSICHAR* Found = strstr(HeadersBuffer, Header))
{
const ANSICHAR* Linebreak = strchr(Found, '\r');
const ANSICHAR* ValueStart = Found + HeaderLen + 2; //colon and space
const size_t ValueSize = Linebreak - ValueStart;
FUTF8ToTCHAR TCHARData(ValueStart, ValueSize);
OutValue = FString(TCHARData.Length(), TCHARData.Get());
return true;
}
return false;
}
/**
* Returns the response buffer. Note that is the request is performed
* with an external buffer as target buffer this string will be empty.
*/
const TArray<uint8>& GetResponseBuffer() const
{
return ResponseBuffer;
}
/**
* Returns the response buffer as a string. Note that is the request is performed
* with an external buffer as target buffer this string will be empty.
*/
FString GetResponseAsString() const
{
return GetAnsiBufferAsString(ResponseBuffer);
}
/**
* Returns the response header as a string.
*/
FString GetResponseHeaderAsString()
{
return GetAnsiBufferAsString(ResponseHeader);
}
/**
* Tries to parse the response buffer as a JsonObject. Return empty pointer if
* parse error occurs.
*/
TSharedPtr<FJsonObject> GetResponseAsJsonObject() const
{
FString Response = GetAnsiBufferAsString(ResponseBuffer);
TSharedPtr<FJsonObject> JsonObject;
TSharedRef<TJsonReader<> > JsonReader = TJsonReaderFactory<>::Create(Response);
if (!FJsonSerializer::Deserialize(JsonReader, JsonObject) || !JsonObject.IsValid())
{
return TSharedPtr<FJsonObject>(nullptr);
}
return JsonObject;
}
private:
CURL* Curl;
CURLcode CurlResult;
long ResponseCode;
size_t BytesSent;
size_t BytesReceived;
bool bLogErrors;
bool bResponseFormatValid;
const FCompositeBuffer* ReadDataView;
TArray<uint8>* WriteDataBufferPtr;
FCbPackage* WriteDataPackage;
TArray<uint8>* WriteHeaderBufferPtr;
TArray<uint8> ResponseHeader;
TArray<uint8> ResponseBuffer;
TArray<FString> Headers;
FString Domain;
/**
* Performs the request, blocking until finished.
* @param Uri Address on the domain to query
* @param Verb HTTP verb to use
* @param Buffer Optional buffer to directly receive the result of the request.
* If unset the response body will be stored in the request.
*/
Result PerformBlocking(const TCHAR* Uri, RequestVerb Verb, uint32 ContentLength)
{
static const char* CommonHeaders[] = {
"User-Agent: UE4",
nullptr
};
TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC_CurlPerform);
// Setup request options
FString Url = FString::Printf(TEXT("%s/%s"), *Domain, Uri);
curl_easy_setopt(Curl, CURLOPT_URL, TCHAR_TO_ANSI(*Url));
// Setup response header buffer. If caller has not setup a response data buffer, use internal.
WriteHeaderBufferPtr = &ResponseHeader;
if (WriteDataBufferPtr == nullptr)
{
WriteDataBufferPtr = &ResponseBuffer;
}
// Content-Length should always be set
Headers.Add(FString::Printf(TEXT("Content-Length: %d"), ContentLength));
// Build headers list
curl_slist* CurlHeaders = nullptr;
// Add common headers
for (uint8 i = 0; CommonHeaders[i] != nullptr; ++i)
{
CurlHeaders = curl_slist_append(CurlHeaders, CommonHeaders[i]);
}
// Setup added headers
for (const FString& Header : Headers)
{
CurlHeaders = curl_slist_append(CurlHeaders, TCHAR_TO_ANSI(*Header));
}
curl_easy_setopt(Curl, CURLOPT_HTTPHEADER, CurlHeaders);
// Shots fired!
CurlResult = curl_easy_perform(Curl);
// Get response code
bool bRedirected = false;
if (CURLE_OK == curl_easy_getinfo(Curl, CURLINFO_RESPONSE_CODE, &ResponseCode))
{
bRedirected = (ResponseCode >= 300 && ResponseCode < 400);
}
LogResult(CurlResult, Uri, Verb);
// Clean up
curl_slist_free_all(CurlHeaders);
return CurlResult == CURLE_OK ? Success : Failed;
}
void LogResult(CURLcode Result, const TCHAR* Uri, RequestVerb Verb) const
{
if (Result == CURLE_OK)
{
bool bSuccess = false;
const TCHAR* VerbStr = nullptr;
FString AdditionalInfo;
const bool Is400 = (ResponseCode >= 400) && (ResponseCode <= 499);
const bool Is200 = (ResponseCode >= 200) && (ResponseCode <= 299);
switch (Verb)
{
case Head:
bSuccess = Is400 || Is200;
VerbStr = TEXT("querying");
break;
case Get:
bSuccess = Is400 || Is200;
VerbStr = TEXT("fetching");
AdditionalInfo = FString::Printf(TEXT("Received: %d bytes."), BytesReceived);
break;
case Put:
bSuccess = Is200;
VerbStr = TEXT("updating");
AdditionalInfo = FString::Printf(TEXT("Sent: %d bytes."), BytesSent);
break;
case Post:
case PostJson:
bSuccess = Is200;
VerbStr = TEXT("posting");
break;
case Delete:
bSuccess = Is200;
VerbStr = TEXT("deleting");
break;
}
if (bSuccess)
{
UE_LOG(
LogDerivedDataCache,
Verbose,
TEXT("Finished %s HTTP cache entry (response %d) from %s. %s"),
VerbStr,
ResponseCode,
Uri,
*AdditionalInfo
);
}
else if (bLogErrors)
{
// Print the response body if we got one, otherwise print header.
FString Response = GetAnsiBufferAsString(ResponseBuffer.Num() > 0 ? ResponseBuffer : ResponseHeader);
Response.ReplaceCharInline('\n', ' ');
Response.ReplaceCharInline('\r', ' ');
UE_LOG(
LogDerivedDataCache,
Error,
TEXT("Failed %s HTTP cache entry (response %d) from %s. Response: %s"),
VerbStr,
ResponseCode,
Uri,
*Response
);
}
}
else if (bLogErrors)
{
UE_LOG(
LogDerivedDataCache,
Error,
TEXT("Error while connecting to %s: %s"),
*Domain,
ANSI_TO_TCHAR(curl_easy_strerror(Result))
);
}
}
FString GetAnsiBufferAsString(const TArray<uint8>& Buffer) const
{
// Content is NOT null-terminated; we need to specify lengths here
FUTF8ToTCHAR TCHARData(reinterpret_cast<const ANSICHAR*>(Buffer.GetData()), Buffer.Num());
return FString(TCHARData.Length(), TCHARData.Get());
}
static size_t StaticDebugCallback(CURL* Handle, curl_infotype DebugInfoType, char* DebugInfo, size_t DebugInfoSize, void* UserData)
{
FRequest* Request = static_cast<FRequest*>(UserData);
switch (DebugInfoType)
{
case CURLINFO_TEXT:
{
// Truncate at 1023 characters. This is just an arbitrary number based on a buffer size seen in
// the libcurl code.
DebugInfoSize = FMath::Min(DebugInfoSize, (size_t)1023);
// Calculate the actual length of the string due to incorrect use of snprintf() in lib/vtls/openssl.c.
char* FoundNulPtr = (char*)memchr(DebugInfo, 0, DebugInfoSize);
int CalculatedSize = FoundNulPtr != nullptr ? FoundNulPtr - DebugInfo : DebugInfoSize;
auto ConvertedString = StringCast<TCHAR>(static_cast<const ANSICHAR*>(DebugInfo), CalculatedSize);
FString DebugText(ConvertedString.Length(), ConvertedString.Get());
DebugText.ReplaceInline(TEXT("\n"), TEXT(""), ESearchCase::CaseSensitive);
DebugText.ReplaceInline(TEXT("\r"), TEXT(""), ESearchCase::CaseSensitive);
UE_LOG(LogDerivedDataCache, VeryVerbose, TEXT("%p: '%s'"), Request, *DebugText);
}
break;
case CURLINFO_HEADER_IN:
UE_LOG(LogDerivedDataCache, VeryVerbose, TEXT("%p: Received header (%d bytes)"), Request, DebugInfoSize);
break;
case CURLINFO_DATA_IN:
UE_LOG(LogDerivedDataCache, VeryVerbose, TEXT("%p: Received data (%d bytes)"), Request, DebugInfoSize);
break;
case CURLINFO_DATA_OUT:
UE_LOG(LogDerivedDataCache, VeryVerbose, TEXT("%p: Sent data (%d bytes)"), Request, DebugInfoSize);
break;
}
return 0;
}
static size_t StaticReadFn(void* Ptr, size_t SizeInBlocks, size_t BlockSizeInBytes, void* UserData)
{
FRequest* Request = static_cast<FRequest*>(UserData);
check(Request->ReadDataView);
const FCompositeBuffer& ReadDataView = *Request->ReadDataView;
const size_t Offset = Request->BytesSent;
const size_t ReadSize = FMath::Min((size_t)ReadDataView.GetSize() - Offset, SizeInBlocks * BlockSizeInBytes);
check(ReadDataView.GetSize() >= Offset + ReadSize);
Memcpy(Ptr, ReadDataView, Offset, ReadSize);
Request->BytesSent += ReadSize;
return ReadSize;
return 0;
}
static size_t StaticWriteHeaderFn(void* Ptr, size_t SizeInBlocks, size_t BlockSizeInBytes, void* UserData)
{
FRequest* Request = static_cast<FRequest*>(UserData);
const size_t WriteSize = SizeInBlocks * BlockSizeInBytes;
TArray<uint8>* WriteHeaderBufferPtr = Request->WriteHeaderBufferPtr;
if (WriteHeaderBufferPtr && WriteSize > 0)
{
const size_t CurrentBufferLength = WriteHeaderBufferPtr->Num();
if (CurrentBufferLength > 0)
{
// Remove the previous zero termination
(*WriteHeaderBufferPtr)[CurrentBufferLength - 1] = ' ';
}
// Write the header
WriteHeaderBufferPtr->Append((const uint8*)Ptr, WriteSize + 1);
(*WriteHeaderBufferPtr)[WriteHeaderBufferPtr->Num() - 1] = 0; // Zero terminate string
return WriteSize;
}
return 0;
}
static size_t StaticWriteBodyFn(void* Ptr, size_t SizeInBlocks, size_t BlockSizeInBytes, void* UserData)
{
FRequest* Request = static_cast<FRequest*>(UserData);
const size_t WriteSize = SizeInBlocks * BlockSizeInBytes;
TArray<uint8>* WriteDataBufferPtr = Request->WriteDataBufferPtr;
if (WriteDataBufferPtr && WriteSize > 0)
{
// If this is the first part of the body being received, try to reserve
// memory if content length is defined in the header.
if (Request->BytesReceived == 0 && Request->WriteHeaderBufferPtr)
{
static const ANSICHAR* ContentLengthHeaderStr = "Content-Length: ";
const ANSICHAR* Header = (const ANSICHAR*)Request->WriteHeaderBufferPtr->GetData();
if (const ANSICHAR* ContentLengthHeader = FCStringAnsi::Strstr(Header, ContentLengthHeaderStr))
{
size_t ContentLength = (size_t)FCStringAnsi::Atoi64(ContentLengthHeader + strlen(ContentLengthHeaderStr));
if (ContentLength > 0)
{
WriteDataBufferPtr->Reserve(ContentLength);
}
}
}
// Write to the target buffer
WriteDataBufferPtr->Append((const uint8*)Ptr, WriteSize);
Request->BytesReceived += WriteSize;
return WriteSize;
}
return 0;
}
static size_t StaticSeekFn(void* UserData, curl_off_t Offset, int Origin)
{
FRequest* Request = static_cast<FRequest*>(UserData);
size_t NewPosition = 0;
uint64 ReadDataSize = Request->ReadDataView ? Request->ReadDataView->GetSize() : 0;
switch (Origin)
{
case SEEK_SET: NewPosition = Offset; break;
case SEEK_CUR: NewPosition = Request->BytesSent + Offset; break;
case SEEK_END: NewPosition = ReadDataSize + Offset; break;
}
// Make sure we don't seek outside of the buffer
if (NewPosition < 0 || NewPosition >= ReadDataSize)
{
return CURL_SEEKFUNC_FAIL;
}
// Update the used offset
Request->BytesSent = NewPosition;
return CURL_SEEKFUNC_OK;
}
};
//----------------------------------------------------------------------------------------------------------
// Request pool
//----------------------------------------------------------------------------------------------------------
/**
* Pool that manages a fixed set of requests. Users are required to release requests that have been
* acquired. Usable with \ref zen::FScopedRequestPtr which handles this automatically.
*/
struct FRequestPool
{
FRequestPool(const TCHAR* InServiceUrl)
{
for (uint8 i = 0; i < Pool.Num(); ++i)
{
Pool[i].Usage = 0u;
Pool[i].Request = new FRequest(InServiceUrl, true);
}
}
~FRequestPool()
{
for (uint8 i = 0; i < Pool.Num(); ++i)
{
// No requests should be in use by now.
check(Pool[i].Usage.Load(EMemoryOrder::Relaxed) == 0u);
delete Pool[i].Request;
}
}
/**
* Block until a request is free. Once a request has been returned it is
* "owned by the caller and need to release it to the pool when work has been completed.
* @return Usable request instance.
*/
FRequest* WaitForFreeRequest()
{
TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC_WaitForConnPool);
while (true)
{
for (uint8 i = 0; i < Pool.Num(); ++i)
{
if (!Pool[i].Usage.Load(EMemoryOrder::Relaxed))
{
uint8 Expected = 0u;
if (Pool[i].Usage.CompareExchange(Expected, 1u))
{
return Pool[i].Request;
}
}
}
FPlatformProcess::Sleep(UE_ZENDDC_BACKEND_WAIT_INTERVAL);
}
}
/**
* Release request to the pool.
* @param Request Request that should be freed. Note that any buffer owened by the request can now be reset.
*/
void ReleaseRequestToPool(FRequest* Request)
{
for (uint8 i = 0; i < Pool.Num(); ++i)
{
if (Pool[i].Request == Request)
{
Request->Reset();
uint8 Expected = 1u;
Pool[i].Usage.CompareExchange(Expected, 0u);
return;
}
}
check(false);
}
private:
struct FEntry
{
TAtomic<uint8> Usage;
FRequest* Request;
};
TStaticArray<FEntry, UE_ZENDDC_REQUEST_POOL_SIZE> Pool;
FRequestPool() {}
};
//----------------------------------------------------------------------------------------------------------
// FScopedRequestPtr
//----------------------------------------------------------------------------------------------------------
/**
* Utility class to manage requesting and releasing requests from the \ref FRequestPool.
*/
struct FScopedRequestPtr
{
public:
FScopedRequestPtr(FRequestPool* InPool)
: Request(InPool->WaitForFreeRequest())
, Pool(InPool)
{}
~FScopedRequestPtr()
{
Pool->ReleaseRequestToPool(Request);
}
bool IsValid() const
{
return Request != nullptr;
}
FRequest* Get() const
{
check(IsValid());
return Request;
}
FRequest* operator->()
{
check(IsValid());
return Request;
}
private:
FRequest* Request;
FRequestPool* Pool;
};
}
namespace UE::DerivedData {
//----------------------------------------------------------------------------------------------------------
// FZenDerivedDataBackend
//----------------------------------------------------------------------------------------------------------
FZenDerivedDataBackend::FZenDerivedDataBackend(
ICacheFactory& InFactory,
const TCHAR* InServiceUrl,
const TCHAR* InNamespace)
: Factory(InFactory)
, Domain(InServiceUrl)
, Namespace(InNamespace)
{
if (IsServiceReady())
{
RequestPool = MakeUnique<zen::FRequestPool>(InServiceUrl);
bIsUsable = true;
}
bCacheRecordEndpointEnabled = false;
GConfig->GetBool(TEXT("Zen"), TEXT("CacheRecordEndpointEnabled"), bCacheRecordEndpointEnabled, GEngineIni);
}
FZenDerivedDataBackend::~FZenDerivedDataBackend()
{
}
FString FZenDerivedDataBackend::GetName() const
{
return Domain;
}
bool FZenDerivedDataBackend::IsServiceReady()
{
zen::FRequest Request(*Domain, false);
zen::FRequest::Result Result = Request.PerformBlockingDownload(TEXT("health/ready"), nullptr);
if (Result == zen::FRequest::Success && zen::IsSuccessCode(Request.GetResponseCode()))
{
UE_LOG(LogDerivedDataCache, Display, TEXT("Z$ HTTP DDC service status: %s."), *Request.GetResponseAsString());
return true;
}
else
{
UE_LOG(LogDerivedDataCache, Warning, TEXT("Unable to reach Z$ HTTP DDC service at %s. Status: %d . Response: %s"), *Domain, Request.GetResponseCode(), *Request.GetResponseAsString());
}
return false;
}
bool FZenDerivedDataBackend::ShouldRetryOnError(int64 ResponseCode)
{
// Access token might have expired, request a new token and try again.
if (ResponseCode == 401)
{
return true;
}
// Too many requests, make a new attempt
if (ResponseCode == 429)
{
return true;
}
return false;
}
bool FZenDerivedDataBackend::CachedDataProbablyExists(const TCHAR* CacheKey)
{
TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC_Exist);
TRACE_COUNTER_ADD(ZenDDC_Exist, int64(1));
COOK_STAT(auto Timer = UsageStats.TimeProbablyExists());
if (ShouldSimulateMiss(CacheKey))
{
return false;
}
FString Uri = MakeLegacyZenKey(CacheKey);
long ResponseCode = 0; uint32 Attempts = 0;
// Retry request until we get an accepted response or exhaust allowed number of attempts.
while (ResponseCode == 0 && ++Attempts < UE_ZENDDC_MAX_ATTEMPTS)
{
zen::FScopedRequestPtr Request(RequestPool.Get());
zen::FRequest::Result Result = Request->PerformBlockingQuery<zen::FRequest::Head>(*Uri);
ResponseCode = Request->GetResponseCode();
if (zen::IsSuccessCode(ResponseCode) || ResponseCode == 400)
{
const bool bIsHit = (Result == zen::FRequest::Success && zen::IsSuccessCode(ResponseCode));
if (bIsHit)
{
TRACE_COUNTER_ADD(ZenDDC_ExistHit, int64(1));
}
return bIsHit;
}
if (!ShouldRetryOnError(ResponseCode))
{
return false;
}
ResponseCode = 0;
}
return false;
}
bool FZenDerivedDataBackend::GetCachedData(const TCHAR* CacheKey, TArray<uint8>& OutData)
{
TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC_Get);
TRACE_COUNTER_ADD(ZenDDC_Get, int64(1));
COOK_STAT(auto Timer = UsageStats.TimeGet());
if (ShouldSimulateMiss(CacheKey))
{
return false;
}
double StartTime = FPlatformTime::Seconds();
EGetResult Result = GetZenData(*MakeLegacyZenKey(CacheKey), &OutData);
if (Result != EGetResult::Success)
{
switch (Result)
{
default:
UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache miss on %s"), *GetName(), CacheKey);
break;
case EGetResult::Corrupted:
UE_LOG(LogDerivedDataCache, Warning,
TEXT("Checksum from server on %s did not match recieved data. Discarding cached result."), CacheKey);
break;
}
return false;
}
TRACE_COUNTER_ADD(ZenDDC_GetHit, int64(1));
TRACE_COUNTER_ADD(ZenDDC_BytesReceived, int64(OutData.Num()));
COOK_STAT(Timer.AddHit(OutData.Num()));
double ReadDuration = FPlatformTime::Seconds() - StartTime;
double ReadSpeed = (OutData.Num() / ReadDuration) / (1024.0 * 1024.0);
UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache hit on %s (%d bytes, %.02f secs, %.2fMB/s)"),
*GetName(), CacheKey, OutData.Num(), ReadDuration, ReadSpeed);
return true;
}
FZenDerivedDataBackend::EGetResult
FZenDerivedDataBackend::GetZenData(const TCHAR* Uri, TArray<uint8>* OutData) const
{
// Retry request until we get an accepted response or exhaust allowed number of attempts.
EGetResult GetResult = EGetResult::NotFound;
for (uint32 Attempts = 0; Attempts < UE_ZENDDC_MAX_ATTEMPTS; ++Attempts)
{
zen::FScopedRequestPtr Request(RequestPool.Get());
if (Request.IsValid())
{
zen::FRequest::Result Result = Request->PerformBlockingDownload(Uri, OutData);
int64 ResponseCode = Request->GetResponseCode();
// Request was successful, make sure we got all the expected data.
if (zen::IsSuccessCode(ResponseCode))
{
return EGetResult::Success;
}
if (!ShouldRetryOnError(ResponseCode))
{
break;
}
}
}
if (OutData)
{
OutData->Reset();
}
return GetResult;
}
FZenDerivedDataBackend::EGetResult
FZenDerivedDataBackend::GetZenData(const FCacheKey& CacheKey, ECachePolicy CachePolicy, FCbPackage& OutPackage) const
{
TStringBuilder<256> QueryUri;
AppendZenUri(CacheKey, QueryUri);
AppendPolicyQueryString(CachePolicy, QueryUri);
// Retry request until we get an accepted response or exhaust allowed number of attempts.
EGetResult GetResult = EGetResult::NotFound;
for (uint32 Attempts = 0; Attempts < UE_ZENDDC_MAX_ATTEMPTS; ++Attempts)
{
zen::FScopedRequestPtr Request(RequestPool.Get());
if (Request.IsValid())
{
zen::FRequest::Result Result = Request->PerformBlockingDownload(QueryUri.ToString(), OutPackage);
int64 ResponseCode = Request->GetResponseCode();
bool bPackageValid = Request->GetResponseFormatValid();
// Request was successful, make sure we got all the expected data.
if (zen::IsSuccessCode(ResponseCode))
{
if (bPackageValid)
{
GetResult = EGetResult::Success;
}
else
{
GetResult = EGetResult::Corrupted;
}
break;
}
if (!ShouldRetryOnError(ResponseCode))
{
break;
}
}
}
if (GetResult != EGetResult::Success)
{
OutPackage.Reset();
}
return GetResult;
}
FDerivedDataBackendInterface::EPutStatus
FZenDerivedDataBackend::PutCachedData(const TCHAR* CacheKey, TArrayView<const uint8> InData, bool bPutEvenIfExists)
{
if (DidSimulateMiss(CacheKey))
{
return EPutStatus::NotCached;
}
FSharedBuffer DataBuffer = FSharedBuffer::MakeView(InData.GetData(), InData.Num());
return PutZenData(*MakeLegacyZenKey(CacheKey), FCompositeBuffer(DataBuffer), zen::EContentType::Binary);
}
FDerivedDataBackendInterface::EPutStatus
FZenDerivedDataBackend::PutZenData(const TCHAR* Uri, const FCompositeBuffer& InData, zen::EContentType ContentType)
{
TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC_Put);
COOK_STAT(auto Timer = UsageStats.TimePut());
int64 ResponseCode = 0; uint32 Attempts = 0;
// Retry request until we get an accepted response or exhaust allowed number of attempts.
while (ResponseCode == 0 && ++Attempts < UE_ZENDDC_MAX_ATTEMPTS)
{
zen::FScopedRequestPtr Request(RequestPool.Get());
if (Request.IsValid())
{
zen::FRequest::Result Result = Request->PerformBlockingUpload<zen::FRequest::Put>(Uri, InData, ContentType);
ResponseCode = Request->GetResponseCode();
if (zen::IsSuccessCode(ResponseCode))
{
TRACE_COUNTER_ADD(ZenDDC_BytesSent, int64(Request->GetBytesSent()));
COOK_STAT(Timer.AddHit(Request->GetBytesSent()));
return EPutStatus::Cached;
}
if (!ShouldRetryOnError(ResponseCode))
{
return EPutStatus::NotCached;
}
ResponseCode = 0;
}
}
return EPutStatus::NotCached;
}
FString FZenDerivedDataBackend::MakeLegacyZenKey(const TCHAR* CacheKey)
{
FIoHash KeyHash = FIoHash::HashBuffer(CacheKey, FCString::Strlen(CacheKey) * sizeof(TCHAR));
return FString::Printf(TEXT("/z$/legacy/%s"), *LexToString(KeyHash));
}
void FZenDerivedDataBackend::AppendZenUri(const FCacheKey& CacheKey, FStringBuilderBase& Out)
{
FStringView BucketStr = CacheKey.Bucket.ToString<TCHAR>();
Out.Appendf(TEXT("/z$/%.*s/"), BucketStr.Len(), BucketStr.GetData());
Out << CacheKey.Hash;
}
void FZenDerivedDataBackend::AppendZenUri(const FCacheKey& CacheKey, const FPayloadId& PayloadId, FStringBuilderBase& Out)
{
AppendZenUri(CacheKey, Out);
Out << TEXT("/");
UE::String::BytesToHexLower(PayloadId.GetBytes(), Out);
}
void FZenDerivedDataBackend::AppendPolicyQueryString(ECachePolicy Policy, FStringBuilderBase& Uri)
{
bool bQueryEmpty = true;
bool bValueEmpty = true;
auto AppendKey = [&Uri, &bQueryEmpty, &bValueEmpty](const TCHAR* Key)
{
if (bQueryEmpty)
{
TCHAR LastChar = Uri.Len() == 0 ? '\0' : Uri.LastChar();
if (LastChar != '?' && LastChar != '&')
{
Uri << '?';
}
bQueryEmpty = false;
}
else
{
Uri << '&';
}
bValueEmpty = true;
Uri << Key;
};
auto AppendValue = [&Uri, &bValueEmpty](const TCHAR* Value)
{
if (bValueEmpty)
{
bValueEmpty = false;
}
else
{
Uri << ',';
}
Uri << Value;
};
if (!EnumHasAllFlags(Policy, ECachePolicy::Query))
{
AppendKey(TEXT("query="));
if (EnumHasAnyFlags(Policy, ECachePolicy::QueryLocal)) { AppendValue(TEXT("local")); }
if (EnumHasAnyFlags(Policy, ECachePolicy::QueryRemote)) { AppendValue(TEXT("remote")); }
if (!EnumHasAnyFlags(Policy, ECachePolicy::Query)) { AppendValue(TEXT("none")); }
}
if (!EnumHasAllFlags(Policy, ECachePolicy::Store))
{
AppendKey(TEXT("store="));
if (EnumHasAnyFlags(Policy, ECachePolicy::StoreLocal)) { AppendValue(TEXT("local")); }
if (EnumHasAnyFlags(Policy, ECachePolicy::StoreRemote)) { AppendValue(TEXT("remote")); }
if (!EnumHasAnyFlags(Policy, ECachePolicy::Store)) { AppendValue(TEXT("none")); }
}
if (EnumHasAnyFlags(Policy, ECachePolicy::SkipData))
{
AppendKey(TEXT("skip="));
if (EnumHasAllFlags(Policy, ECachePolicy::SkipData)) { AppendValue(TEXT("data")); }
else
{
if (EnumHasAnyFlags(Policy, ECachePolicy::SkipMeta)) { AppendValue(TEXT("meta")); }
if (EnumHasAnyFlags(Policy, ECachePolicy::SkipValue)) { AppendValue(TEXT("value")); }
if (EnumHasAnyFlags(Policy, ECachePolicy::SkipAttachments)) { AppendValue(TEXT("attachments")); }
}
}
}
uint64 FZenDerivedDataBackend::MeasureCacheRecord(const FCacheRecord& Record)
{
return Record.GetMeta().GetSize() +
Record.GetValuePayload().GetRawSize() +
Algo::TransformAccumulate(Record.GetAttachmentPayloads(), &FPayload::GetRawSize, uint64(0));
}
void FZenDerivedDataBackend::RemoveCachedData(const TCHAR* CacheKey, bool bTransient)
{
TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC_Remove);
FString Uri = MakeLegacyZenKey(CacheKey);
int64 ResponseCode = 0; uint32 Attempts = 0;
// Retry request until we get an accepted response or exhaust allowed number of attempts.
while (ResponseCode == 0 && ++Attempts < UE_ZENDDC_MAX_ATTEMPTS)
{
zen::FScopedRequestPtr Request(RequestPool.Get());
if (Request.IsValid())
{
zen::FRequest::Result Result = Request->PerformBlockingQuery<zen::FRequest::Delete>(*Uri);
ResponseCode = Request->GetResponseCode();
if (zen::IsSuccessCode(ResponseCode))
{
return;
}
if (!ShouldRetryOnError(ResponseCode))
{
return;
}
ResponseCode = 0;
}
}
}
bool FZenDerivedDataBackend::IsWritable() const
{
return true;
}
FDerivedDataBackendInterface::ESpeedClass
FZenDerivedDataBackend::GetSpeedClass() const
{
return ESpeedClass::Fast;
}
TSharedRef<FDerivedDataCacheStatsNode> FZenDerivedDataBackend::GatherUsageStats() const
{
return MakeShared<FDerivedDataCacheStatsNode>(this, "foo");
}
bool FZenDerivedDataBackend::TryToPrefetch(TConstArrayView<FString> CacheKeys)
{
return GetSpeedClass() >= ESpeedClass::Fast && CachedDataProbablyExistsBatch(CacheKeys).CountSetBits() == CacheKeys.Num();
}
bool FZenDerivedDataBackend::WouldCache(const TCHAR* CacheKey, TArrayView<const uint8> InData)
{
return true;
}
bool FZenDerivedDataBackend::ApplyDebugOptions(FBackendDebugOptions& InOptions)
{
DebugOptions = InOptions;
return true;
}
bool FZenDerivedDataBackend::DidSimulateMiss(const TCHAR* InKey)
{
if (DebugOptions.RandomMissRate == 0 || DebugOptions.SimulateMissTypes.Num() == 0)
{
return false;
}
FScopeLock Lock(&MissedKeysCS);
return DebugMissedKeys.Contains(FName(InKey));
}
bool FZenDerivedDataBackend::ShouldSimulateMiss(const TCHAR* InKey)
{
// once missed, always missed
if (DidSimulateMiss(InKey))
{
return true;
}
if (DebugOptions.ShouldSimulateMiss(InKey))
{
FScopeLock Lock(&MissedKeysCS);
UE_LOG(LogDerivedDataCache, Verbose, TEXT("Simulating miss in %s for %s"), *GetName(), InKey);
DebugMissedKeys.Add(FName(InKey));
return true;
}
return false;
}
FRequest FZenDerivedDataBackend::Put(
TConstArrayView<UE::DerivedData::FCacheRecord> Records,
FStringView Context,
ECachePolicy Policy,
EPriority Priority,
FOnCachePutComplete&& OnComplete)
{
for (const FCacheRecord& Record : Records)
{
COOK_STAT(auto Timer = UsageStats.TimePut());
bool bResult;
if (bCacheRecordEndpointEnabled)
{
bResult = PutCacheRecord(Record, Context, Policy);
}
else
{
bResult = LegacyPutCacheRecord(Record, Context, Policy);
}
if (bResult)
{
UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache put complete for %s from '%.*s'"),
*GetName(), *WriteToString<96>(Record.GetKey()), Context.Len(), Context.GetData());
COOK_STAT(Timer.AddHit(MeasureCacheRecord(Record)));
if (OnComplete)
{
OnComplete({ Record.GetKey(), EStatus::Ok });
}
}
else
{
COOK_STAT(Timer.AddMiss(MeasureCacheRecord(Record)));
if (OnComplete)
{
OnComplete({ Record.GetKey(), EStatus::Error });
}
}
}
return FRequest();
}
FRequest FZenDerivedDataBackend::Get(
TConstArrayView<UE::DerivedData::FCacheKey> Keys,
FStringView Context,
UE::DerivedData::ECachePolicy Policy,
UE::DerivedData::EPriority Priority,
UE::DerivedData::FOnCacheGetComplete&& OnComplete)
{
for (const UE::DerivedData::FCacheKey& Key : Keys)
{
TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC_Get);
TRACE_COUNTER_ADD(ZenDDC_Get, int64(1));
COOK_STAT(auto Timer = UsageStats.TimeGet());
FOptionalCacheRecord Record;
if (bCacheRecordEndpointEnabled)
{
Record = GetCacheRecord(Key, Context, Policy);
}
else
{
Record = LegacyGetCacheRecord(Key, Context, Policy);
}
if (Record)
{
UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache hit for %s from '%.*s'"),
*GetName(), *WriteToString<96>(Key), Context.Len(), Context.GetData());
TRACE_COUNTER_ADD(ZenDDC_GetHit, int64(1));
int64 ReceivedSize = MeasureCacheRecord(Record.Get());
TRACE_COUNTER_ADD(ZenDDC_BytesReceived, ReceivedSize);
COOK_STAT(Timer.AddHit(ReceivedSize));
if (OnComplete)
{
OnComplete({ MoveTemp(Record).Get(), EStatus::Ok });
}
}
else
{
if (OnComplete)
{
OnComplete({ Factory.CreateRecord(Key).Build(), EStatus::Error });
}
}
}
return FRequest();
}
FRequest FZenDerivedDataBackend::GetPayload(
TConstArrayView<UE::DerivedData::FCachePayloadKey> Keys,
FStringView Context,
UE::DerivedData::ECachePolicy Policy,
UE::DerivedData::EPriority Priority,
UE::DerivedData::FOnCacheGetPayloadComplete&& OnComplete)
{
if (bCacheRecordEndpointEnabled)
{
for (const FCachePayloadKey& Key : Keys)
{
TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC_Get);
TRACE_COUNTER_ADD(ZenDDC_Get, int64(1));
COOK_STAT(auto Timer = UsageStats.TimeGet());
TStringBuilder<256> QueryUri;
AppendZenUri(Key.CacheKey, Key.Id, QueryUri);
AppendPolicyQueryString(Policy, QueryUri);
TArray<uint8> PayloadData;
EGetResult GetResult = GetZenData(QueryUri.ToString(), &PayloadData);
FCompressedBuffer CompressedBuffer;
if (GetResult == EGetResult::Success)
{
CompressedBuffer = FCompressedBuffer::FromCompressed(MakeSharedBufferFromArray(MoveTemp(PayloadData)));
}
if (CompressedBuffer)
{
FPayload Payload = FPayload(Key.Id, MoveTemp(CompressedBuffer));
UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache hit for %s from '%.*s'"),
*GetName(), *WriteToString<96>(Key), Context.Len(), Context.GetData());
TRACE_COUNTER_ADD(ZenDDC_GetHit, int64(1));
TRACE_COUNTER_ADD(ZenDDC_BytesReceived, Payload.GetRawSize());
COOK_STAT(Timer.AddHit(Payload.GetRawSize()));
if (OnComplete)
{
OnComplete({ Key.CacheKey, MoveTemp(Payload), EStatus::Ok });
}
}
else
{
UE_LOG(LogDerivedDataCache, Display, TEXT("%s: Cache miss with %s payload %s for %s from '%.*s'"),
*GetName(), (GetResult == EGetResult::NotFound ? TEXT("missing") : TEXT("corrupted")),
*WriteToString<16>(Key.Id), *WriteToString<96>(Key), Context.Len(), Context.GetData());
if (OnComplete)
{
OnComplete({ Key.CacheKey, FPayload(Key.Id), EStatus::Error });
}
}
}
}
else
{
// We have to load the CacheRecord for each Payload, so sort all the Payloads sharing
// the same key to occur together and we can load the CacheRecord at the start of each run of Payloads
TArray<FCachePayloadKey, TInlineAllocator<16>> SortedKeys(Keys);
SortedKeys.StableSort();
FOptionalCacheRecord Record;
for (const FCachePayloadKey& Key : SortedKeys)
{
TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC_Get);
TRACE_COUNTER_ADD(ZenDDC_Get, int64(1));
COOK_STAT(auto Timer = UsageStats.TimeGet());
if (!Record || Record.Get().GetKey() != Key.CacheKey)
{
Record = LegacyGetCacheRecord(Key.CacheKey, Context, Policy | ECachePolicy::SkipData, /*bAlwaysLoadInlineData*/ true);
}
if (FPayload Payload = Record ? LegacyGetCachePayload(Key.CacheKey, Context, Policy, Record.Get().GetPayload(Key.Id)) : FPayload::Null)
{
UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache hit for %s from '%.*s'"),
*GetName(), *WriteToString<96>(Key), Context.Len(), Context.GetData());
TRACE_COUNTER_ADD(ZenDDC_GetHit, int64(1));
TRACE_COUNTER_ADD(ZenDDC_BytesReceived, Payload.GetRawSize());
COOK_STAT(Timer.AddHit(Payload.GetRawSize()));
if (OnComplete)
{
OnComplete({ Key.CacheKey, MoveTemp(Payload), EStatus::Ok });
}
}
else
{
if (OnComplete)
{
OnComplete({ Key.CacheKey, FPayload(Key.Id), EStatus::Error });
}
}
}
}
return FRequest();
}
void FZenDerivedDataBackend::CancelAll()
{
}
bool FZenDerivedDataBackend::PutCacheRecord(const FCacheRecord& Record, FStringView Context, ECachePolicy Policy)
{
const FCacheKey& Key = Record.GetKey();
FCbPackage Package = Factory.SaveRecord(Record);
FCbWriter PackageWriter;
Package.Save(PackageWriter);
FUniqueBuffer Buffer = FUniqueBuffer::Alloc(PackageWriter.GetSaveSize());
PackageWriter.Save(Buffer);
TStringBuilder<256> Uri;
AppendZenUri(Record.GetKey(), Uri);
AppendPolicyQueryString(Policy, Uri);
if (PutZenData(Uri.ToString(), FCompositeBuffer(Buffer.MoveToShared()), zen::EContentType::CompactBinaryPackage)
!= FDerivedDataBackendInterface::EPutStatus::Cached)
{
return false;
}
return true;
}
FOptionalCacheRecord FZenDerivedDataBackend::GetCacheRecord(const FCacheKey& Key, FStringView Context,
ECachePolicy Policy) const
{
FCbPackage Package;
EGetResult GetResult = GetZenData(Key, Policy, Package);
if (GetResult != EGetResult::Success)
{
switch (GetResult)
{
default:
UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache miss with missing record for %s from '%.*s'"),
*GetName(), *WriteToString<96>(Key), Context.Len(), Context.GetData());
break;
case EGetResult::Corrupted:
UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache miss with corrupted record (corrupted package) for %s from '%.*s'"),
*GetName(), *WriteToString<96>(Key), Context.Len(), Context.GetData());
break;
}
return FOptionalCacheRecord();
}
FOptionalCacheRecord Record = Factory.LoadRecord(Package);
if (!Record)
{
UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache miss with corrupted record (corrupted record) for %s from '%.*s'"),
*GetName(), *WriteToString<96>(Key), Context.Len(), Context.GetData());
}
return Record;
}
bool FZenDerivedDataBackend::LegacyPutCacheRecord(const FCacheRecord& Record, FStringView Context, ECachePolicy Policy)
{
const FCacheKey& Key = Record.GetKey();
// Save the payloads and build the cache object.
const TConstArrayView<FPayload> Attachments = Record.GetAttachmentPayloads();
TCbWriter<512> Writer;
Writer.BeginObject();
if (const FCbObject& Meta = Record.GetMeta())
{
Writer.AddObject("Meta"_ASV, Meta);
Writer.AddHash("MetaHash"_ASV, Meta.GetHash());
}
if (const FPayload& Value = Record.GetValuePayload())
{
Writer.SetName("Value"_ASV);
if (!LegacyPutCachePayload(Key, Context, Value, Writer))
{
return false;
}
}
if (!Attachments.IsEmpty())
{
Writer.BeginArray("Attachments"_ASV);
for (const FPayload& Attachment : Attachments)
{
if (!LegacyPutCachePayload(Key, Context, Attachment, Writer))
{
return false;
}
}
Writer.EndArray();
}
Writer.EndObject();
// Save the record to storage.
FUniqueBuffer Buffer = FUniqueBuffer::Alloc(Writer.GetSaveSize());
Writer.Save(Buffer);
TStringBuilder<256> Uri;
LegacyMakeZenKey(Key, Uri);
if (PutZenData(Uri.ToString(), FCompositeBuffer(Buffer.MoveToShared()), zen::EContentType::CompactBinary)
!= FDerivedDataBackendInterface::EPutStatus::Cached)
{
return false;
}
return true;
}
bool FZenDerivedDataBackend::LegacyPutCachePayload(const FCacheKey& Key, FStringView Context, const FPayload& Payload, FCbWriter& Writer)
{
const FIoHash& RawHash = Payload.GetRawHash();
const FCompositeBuffer CompressedBuffer = Payload.GetData().GetCompressed();
const bool bStoreInline = !Payload.HasData();
if (!bStoreInline)
{
TStringBuilder<256> Uri;
LegacyMakePayloadKey(Key, RawHash, Uri);
if (PutZenData(Uri.ToString(), CompressedBuffer, zen::EContentType::Binary) != FDerivedDataBackendInterface::EPutStatus::Cached)
{
return false;
}
}
Writer.BeginObject();
Writer.AddObjectId("Id"_ASV, FCbObjectId(Payload.GetId().GetView()));
Writer.AddInteger("RawSize"_ASV, Payload.GetRawSize());
if (bStoreInline)
{
Writer.AddHash("RawHash"_ASV, RawHash);
Writer.AddHash("CompressedHash"_ASV, FIoHash::HashBuffer(CompressedBuffer));
Writer.AddBinary("CompressedData"_ASV, CompressedBuffer);
}
else
{
Writer.AddBinaryAttachment("RawHash"_ASV, RawHash);
}
Writer.EndObject();
return true;
}
FOptionalCacheRecord FZenDerivedDataBackend::LegacyGetCacheRecord(const FCacheKey& Key,
FStringView Context, ECachePolicy Policy, bool bAlwaysLoadInlineData) const
{
TStringBuilder<256> Uri;
LegacyMakeZenKey(Key, Uri);
TArray<uint8> Buffer;
EGetResult Result = GetZenData(Uri.ToString(), &Buffer);
if (Result != EGetResult::Success)
{
return FOptionalCacheRecord();
}
return FZenDerivedDataBackend::LegacyCreateRecord(MakeSharedBufferFromArray(MoveTemp(Buffer)),
Key, Context, Policy, bAlwaysLoadInlineData);
}
FPayload FZenDerivedDataBackend::LegacyGetCachePayload(const FCacheKey& Key, FStringView Context, ECachePolicy Policy,
const FPayload& PayloadIdOnly) const
{
auto GetDescriptor = [&PayloadIdOnly, &Key, &Context]()
{
return FString::Printf(TEXT("%s with hash %s for %s from '%.*s'"),
*WriteToString<16>(PayloadIdOnly.GetId()), *WriteToString<48>(PayloadIdOnly.GetRawHash()),
*WriteToString<96>(Key), Context.Len(), Context.GetData());
};
if (PayloadIdOnly.HasData())
{
return PayloadIdOnly;
}
TStringBuilder<256> Uri;
LegacyMakePayloadKey(Key, PayloadIdOnly.GetRawHash(), Uri);
bool bSkipData = EnumHasAllFlags(Policy, ECachePolicy::SkipData);
EGetResult GetResult;
TArray<uint8> ArrayBuffer;
GetResult = GetZenData(Uri.ToString(), bSkipData ? nullptr : &ArrayBuffer);
if (GetResult != EGetResult::Success)
{
switch (GetResult)
{
default:
UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache miss with missing payload %s"),
*GetName(), *GetDescriptor());
break;
case EGetResult::Corrupted:
UE_LOG(LogDerivedDataCache, Display, TEXT("%s: Cache miss with corrupted payload %s"),
*GetName(), *GetDescriptor());
break;
}
return FPayload();
}
if (bSkipData)
{
return PayloadIdOnly;
}
FCompressedBuffer CompressedBuffer = FCompressedBuffer::FromCompressed(
MakeSharedBufferFromArray(MoveTemp(ArrayBuffer)));
if (!CompressedBuffer || CompressedBuffer.GetRawHash() != PayloadIdOnly.GetRawHash())
{
UE_LOG(LogDerivedDataCache, Display, TEXT("%s: Cache miss with corrupted payload %s"),
*GetName(), *GetDescriptor());
return FPayload();
}
return FPayload(PayloadIdOnly.GetId(), MoveTemp(CompressedBuffer));
}
FOptionalCacheRecord FZenDerivedDataBackend::LegacyCreateRecord(FSharedBuffer&& RecordBytes, const FCacheKey& Key,
FStringView Context, ECachePolicy Policy, bool bAlwaysLoadInlineData) const
{
// TODO: Temporary function copied from FFileSystemDerivedDataBackend.
// Eventually We will instead send the Policy to zen sever along with the request for the FCacheKey,
// and it will end back all payloads in the response rather than requiring separate requests.
const FCbObject RecordObject(MoveTemp(RecordBytes));
FCacheRecordBuilder RecordBuilder = Factory.CreateRecord(Key);
if (!EnumHasAnyFlags(Policy, ECachePolicy::SkipMeta))
{
if (FCbFieldView MetaHash = RecordObject.FindView("MetaHash"_ASV))
{
if (FCbObject MetaObject = RecordObject["Meta"_ASV].AsObject(); MetaObject.GetHash() == MetaHash.AsHash())
{
RecordBuilder.SetMeta(MoveTemp(MetaObject));
}
else
{
UE_LOG(LogDerivedDataCache, Display, TEXT("%s: Cache miss with corrupted metadata for %s from '%.*s'"),
*GetName(), *WriteToString<96>(Key), Context.Len(), Context.GetData());
return FOptionalCacheRecord();
}
}
}
if (FCbObject ValueObject = RecordObject["Value"_ASV].AsObject())
{
const ECachePolicy ValuePolicy = Policy | (ECachePolicy::SkipData & ~ECachePolicy::SkipValue);
FPayload Payload = LegacyGetCachePayload(Key, Context, ValuePolicy, ValueObject, bAlwaysLoadInlineData);
if (Payload.IsNull())
{
return FOptionalCacheRecord();
}
RecordBuilder.SetValue(MoveTemp(Payload));
}
for (FCbField AttachmentField : RecordObject["Attachments"_ASV])
{
const ECachePolicy AttachmentsPolicy = Policy | (ECachePolicy::SkipData & ~ECachePolicy::SkipAttachments);
FPayload Payload = LegacyGetCachePayload(Key, Context, AttachmentsPolicy, AttachmentField.AsObject(),
bAlwaysLoadInlineData);
if (Payload.IsNull())
{
return FOptionalCacheRecord();
}
RecordBuilder.AddAttachment(MoveTemp(Payload));
}
return RecordBuilder.Build();
}
FPayload FZenDerivedDataBackend::LegacyGetCachePayload(const FCacheKey& Key, FStringView Context, ECachePolicy Policy,
const FCbObject& Object, bool bAlwaysLoadInlineData) const
{
// TODO: Temporary function copied from FFileSystemDerivedDataBackend, helper for CreateRecord
const FPayloadId Id(Object.FindView("Id"_ASV).AsObjectId().GetView());
const uint64 RawSize = Object.FindView("RawSize"_ASV).AsUInt64(MAX_uint64);
const FIoHash RawHash = Object.FindView("RawHash"_ASV).AsHash();
FIoHash CompressedHash = Object.FindView("CompressedHash"_ASV).AsHash();
FSharedBuffer CompressedData = Object["CompressedData"_ASV].AsBinary();
if (Id.IsNull() || RawSize == MAX_uint64 || RawHash.IsZero() || !(CompressedHash.IsZero() == CompressedData.IsNull()))
{
UE_LOG(LogDerivedDataCache, Display, TEXT("%s: Cache miss with invalid record format for %s from '%.*s'"),
*GetName(), *WriteToString<96>(Key), Context.Len(), Context.GetData());
return FPayload();
}
FPayload Payload(Id, RawHash, RawSize);
if (CompressedData)
{
if (EnumHasAllFlags(Policy, ECachePolicy::SkipData) && !bAlwaysLoadInlineData)
{
return Payload;
}
else
{
return LegacyValidateCachePayload(Key, Context, Payload, CompressedHash, MoveTemp(CompressedData));
}
}
return LegacyGetCachePayload(Key, Context, Policy, Payload);
}
FPayload FZenDerivedDataBackend::LegacyValidateCachePayload(const FCacheKey& Key, FStringView Context,
const FPayload& Payload, const FIoHash& CompressedHash, FSharedBuffer&& CompressedData) const
{
// TODO: Temporary function copied from FFileSystemDerivedDataBackend, helper for CreateRecord
if (FCompressedBuffer CompressedBuffer = FCompressedBuffer::FromCompressed(MoveTemp(CompressedData));
CompressedBuffer &&
CompressedBuffer.GetRawHash() == Payload.GetRawHash() &&
FIoHash::HashBuffer(CompressedBuffer.GetCompressed()) == CompressedHash)
{
return FPayload(Payload.GetId(), MoveTemp(CompressedBuffer));
}
UE_LOG(LogDerivedDataCache, Display,
TEXT("%s: Cache miss with corrupted payload %s with hash %s for %s from '%.*s'"),
*GetName(), *WriteToString<16>(Payload.GetId()), *WriteToString<48>(Payload.GetRawHash()),
*WriteToString<96>(Key), Context.Len(), Context.GetData());
return FPayload();
}
void FZenDerivedDataBackend::LegacyMakeZenKey(const FCacheKey& CacheKey, FStringBuilderBase& Out) const
{
Out.Reset();
FStringView Bucket = CacheKey.Bucket.ToString<TCHAR>();
Out.Appendf(TEXT("/z$/%.*s/%s"), Bucket.Len(), Bucket.GetData(), *LexToString(CacheKey.Hash));
}
void FZenDerivedDataBackend::LegacyMakePayloadKey(const FCacheKey& CacheKey, const FIoHash& RawHash, FStringBuilderBase& Out) const
{
Out.Reset();
FStringView Bucket = CacheKey.Bucket.ToString<TCHAR>();
Out.Appendf(TEXT("/z$/%.*s/%s/%s"), Bucket.Len(), Bucket.GetData(), *LexToString(CacheKey.Hash), *LexToString(RawHash));
}
} // namespace UE::DerivedData
#endif //WITH_HTTP_DDC_BACKEND