// Copyright Epic Games, Inc. All Rights Reserved. #include "DerivedDataLegacyCacheStore.h" #include "Experimental/ZenServerInterface.h" #if UE_WITH_ZEN #include "BatchView.h" #include "DerivedDataBackendInterface.h" #include "DerivedDataCachePrivate.h" #include "DerivedDataCacheRecord.h" #include "DerivedDataCacheUsageStats.h" #include "DerivedDataChunk.h" #include "DerivedDataRequest.h" #include "DerivedDataRequestOwner.h" #include "Experimental/ZenStatistics.h" #include "HAL/FileManager.h" #include "Http/HttpClient.h" #include "Math/UnrealMathUtility.h" #include "Misc/App.h" #include "Misc/ConfigCacheIni.h" #include "Misc/Optional.h" #include "ProfilingDebugging/CountersTrace.h" #include "ProfilingDebugging/CpuProfilerTrace.h" #include "Serialization/CompactBinary.h" #include "Serialization/CompactBinaryPackage.h" #include "Serialization/CompactBinaryWriter.h" #include "Serialization/LargeMemoryWriter.h" #include "Serialization/MemoryReader.h" #include "Templates/Function.h" #include "ZenBackendUtils.h" #include "ZenSerialization.h" TRACE_DECLARE_INT_COUNTER(ZenDDC_Get, TEXT("ZenDDC Get")); TRACE_DECLARE_INT_COUNTER(ZenDDC_GetHit, TEXT("ZenDDC Get Hit")); TRACE_DECLARE_INT_COUNTER(ZenDDC_Put, TEXT("ZenDDC Put")); TRACE_DECLARE_INT_COUNTER(ZenDDC_PutHit, TEXT("ZenDDC Put Hit")); TRACE_DECLARE_INT_COUNTER(ZenDDC_BytesReceived, TEXT("ZenDDC Bytes Received")); TRACE_DECLARE_INT_COUNTER(ZenDDC_BytesSent, TEXT("ZenDDC Bytes Sent")); TRACE_DECLARE_INT_COUNTER(ZenDDC_CacheRecordRequestCountInFlight, TEXT("ZenDDC CacheRecord Request Count")); TRACE_DECLARE_INT_COUNTER(ZenDDC_ChunkRequestCountInFlight, TEXT("ZenDDC Chunk Request Count")); namespace UE::DerivedData { template void ForEachBatch(const int32 BatchSize, const int32 TotalCount, T&& Fn) { check(BatchSize > 0); if (TotalCount > 0) { const int32 BatchCount = FMath::DivideAndRoundUp(TotalCount, BatchSize); const int32 Last = TotalCount - 1; for (int32 BatchIndex = 0; BatchIndex < BatchCount; BatchIndex++) { const int32 BatchFirstIndex = BatchIndex * BatchSize; const int32 BatchLastIndex = FMath::Min(BatchFirstIndex + BatchSize - 1, Last); Fn(BatchFirstIndex, BatchLastIndex); } } } /** * Backend for a HTTP based caching service (Zen) */ class FZenCacheStore final : public ILegacyCacheStore { public: /** * Creates the cache store client, checks health status and attempts to acquire an access token. * * @param ServiceUrl Base url to the service including scheme. * @param Namespace Namespace to use. */ FZenCacheStore(const TCHAR* ServiceUrl, const TCHAR* Namespace); FZenCacheStore(UE::Zen::FServiceSettings&& InSettings, const TCHAR* Namespace); inline FString GetName() const { return ZenService.GetInstance().GetURL(); } /** * Checks if cache service is usable (reachable and accessible). * @return true if usable */ inline bool IsUsable() const { return bIsUsable; } // ICacheStore void Put( TConstArrayView Requests, IRequestOwner& Owner, FOnCachePutComplete&& OnComplete = FOnCachePutComplete()) final; void Get( TConstArrayView Requests, IRequestOwner& Owner, FOnCacheGetComplete&& OnComplete) final; void PutValue( TConstArrayView Requests, IRequestOwner& Owner, FOnCachePutValueComplete&& OnComplete = FOnCachePutValueComplete()) final; void GetValue( TConstArrayView Requests, IRequestOwner& Owner, FOnCacheGetValueComplete&& OnComplete = FOnCacheGetValueComplete()) final; void GetChunks( TConstArrayView Requests, IRequestOwner& Owner, FOnCacheGetChunkComplete&& OnComplete) final; // ILegacyCacheStore void LegacyStats(FDerivedDataCacheStatsNode& OutNode) final; bool LegacyDebugOptions(FBackendDebugOptions& Options) final; const Zen::FZenServiceInstance& GetServiceInstance() const { return ZenService.GetInstance(); } private: void Initialize(const TCHAR* Namespace); bool IsServiceReady(); static FCompositeBuffer SaveRpcPackage(const FCbPackage& Package); THttpUniquePtr CreateRpcRequest(); using FOnRpcComplete = TUniqueFunction& HttpResponse, FCbPackage& Response)>; void EnqueueAsyncRpc(IRequestOwner& Owner, FCbObject RequestObject, FOnRpcComplete&& OnComplete); void EnqueueAsyncRpc(IRequestOwner& Owner, const FCbPackage& RequestPackage, FOnRpcComplete&& OnComplete); template static TRefCountPtr MakeAsyncOp(ArgTypes&&... Args) { // TODO: This should in-place construct from a pre-allocated memory pool return TRefCountPtr(new T(Forward(Args)...)); } private: class FPutOp; class FGetOp; class FPutValueOp; class FGetValueOp; class FGetChunksOp; class FCbPackageReceiver; class FAsyncCbPackageReceiver; FString Namespace; UE::Zen::FScopeZenService ZenService; mutable FDerivedDataCacheUsageStats UsageStats; THttpUniquePtr ConnectionPool; FHttpRequestQueue RequestQueue; bool bIsUsable = false; bool bIsLocalConnection = false; int32 BatchPutMaxBytes = 1024*1024; int32 CacheRecordBatchSize = 8; int32 CacheChunksBatchSize = 8; FBackendDebugOptions DebugOptions; TAnsiStringBuilder<256> RpcUri; }; class FZenCacheStore::FPutOp final : public FThreadSafeRefCountedObject { public: FPutOp(FZenCacheStore& InCacheStore, IRequestOwner& InOwner, const TConstArrayView InRequests, FOnCachePutComplete&& InOnComplete) : CacheStore(InCacheStore) , Owner(InOwner) , Requests(InRequests) , Batches(Requests, [this](const FCachePutRequest& NextRequest) {return BatchGroupingFilter(NextRequest);}) , OnComplete(MoveTemp(InOnComplete)) { TRACE_COUNTER_ADD(ZenDDC_Put, (int64)Requests.Num()); COOK_STAT(Timers.Reserve(Requests.Num())); COOK_STAT(for (const FCachePutRequest& Request : Requests) { Timers.Add(Request.Record.GetKey(), CacheStore.UsageStats.TimePut()); }) } void IssueRequests() { FRequestBarrier Barrier(Owner); for (TArrayView Batch : Batches) { FCbPackage BatchPackage; FCbWriter BatchWriter; BatchWriter.BeginObject(); { BatchWriter << ANSITEXTVIEW("Method") << "PutCacheRecords"; BatchWriter.AddInteger(ANSITEXTVIEW("Accept"), Zen::Http::kCbPkgMagic); BatchWriter.BeginObject(ANSITEXTVIEW("Params")); { ECachePolicy BatchDefaultPolicy = Batch[0].Policy.GetRecordPolicy(); BatchWriter << ANSITEXTVIEW("DefaultPolicy") << *WriteToString<128>(BatchDefaultPolicy); BatchWriter.AddString(ANSITEXTVIEW("Namespace"), CacheStore.Namespace); BatchWriter.BeginArray(ANSITEXTVIEW("Requests")); for (const FCachePutRequest& Request : Batch) { const FCacheRecord& Record = Request.Record; BatchWriter.BeginObject(); { BatchWriter.SetName(ANSITEXTVIEW("Record")); Record.Save(BatchPackage, BatchWriter); if (!Request.Policy.IsDefault()) { BatchWriter << ANSITEXTVIEW("Policy") << Request.Policy; } } BatchWriter.EndObject(); } BatchWriter.EndArray(); } BatchWriter.EndObject(); } BatchWriter.EndObject(); BatchPackage.SetObject(BatchWriter.Save().AsObject()); auto OnRpcComplete = [this, OpRef = TRefCountPtr(this), Batch](THttpUniquePtr& HttpResponse, FCbPackage& Response) { int32 RequestIndex = 0; if (HttpResponse->GetErrorCode() == EHttpErrorCode::None) { const FCbObject& ResponseObj = Response.GetObject(); for (FCbField ResponseField : ResponseObj[ANSITEXTVIEW("Result")]) { if (RequestIndex >= Batch.Num()) { ++RequestIndex; continue; } const FCachePutRequest& Request = Batch[RequestIndex++]; const FCacheKey& Key = Request.Record.GetKey(); bool bPutSucceeded = ResponseField.AsBool(); if (CacheStore.DebugOptions.ShouldSimulatePutMiss(Key)) { UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Simulated miss for put of %s from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Key), *Request.Name); bPutSucceeded = false; } bPutSucceeded ? OnHit(Request) : OnMiss(Request); } if (RequestIndex != Batch.Num()) { UE_LOG(LogDerivedDataCache, Warning, TEXT("%s: Invalid response received from PutCacheRecords RPC: %d results expected, received %d, from %s"), *CacheStore.GetName(), Batch.Num(), RequestIndex, *WriteToString<256>(*HttpResponse)); } } for (const FCachePutRequest& Request : Batch.RightChop(RequestIndex)) { OnMiss(Request); } }; CacheStore.EnqueueAsyncRpc(Owner, BatchPackage, MoveTemp(OnRpcComplete)); } } private: EBatchView BatchGroupingFilter(const FCachePutRequest& NextRequest) { const FCacheRecord& Record = NextRequest.Record; uint64 RecordSize = sizeof(FCacheKey) + Record.GetMeta().GetSize(); for (const FValueWithId& Value : Record.GetValues()) { RecordSize += Value.GetData().GetCompressedSize(); } BatchSize += RecordSize; if (BatchSize > CacheStore.BatchPutMaxBytes) { BatchSize = RecordSize; return EBatchView::NewBatch; } return EBatchView::Continue; } void OnHit(const FCachePutRequest& Request) { const FCacheKey& Key = Request.Record.GetKey(); UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache Put complete for %s from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Key), *Request.Name); int64 SentSize = 0; for (const FValueWithId& Value : Request.Record.GetValues()) { SentSize += Value.GetData().GetCompressed().GetSize(); } TRACE_COUNTER_ADD(ZenDDC_BytesSent, SentSize); TRACE_COUNTER_INCREMENT(ZenDDC_PutHit); COOK_STAT(Timers[Key].AddHit(SentSize)); OnComplete(Request.MakeResponse(EStatus::Ok)); }; void OnMiss(const FCachePutRequest& Request) { const FCacheKey& Key = Request.Record.GetKey(); UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache Put miss for '%s' from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Key), *Request.Name); int64 SentSize = 0; for (const FValueWithId& Value : Request.Record.GetValues()) { SentSize += Value.GetData().GetCompressed().GetSize(); } COOK_STAT(Timers[Key].AddMiss(SentSize)); OnComplete(Request.MakeResponse(EStatus::Error)); }; FZenCacheStore& CacheStore; IRequestOwner& Owner; const TArray> Requests; uint64 BatchSize = 0; TBatchView Batches; FOnCachePutComplete OnComplete; COOK_STAT(TMap Timers); }; class FZenCacheStore::FGetOp final : public FThreadSafeRefCountedObject { public: FGetOp(FZenCacheStore& InCacheStore, IRequestOwner& InOwner, const TConstArrayView InRequests, FOnCacheGetComplete&& InOnComplete) : CacheStore(InCacheStore) , Owner(InOwner) , Requests(InRequests) , OnComplete(MoveTemp(InOnComplete)) { TRACE_COUNTER_ADD(ZenDDC_Get, (int64)Requests.Num()); TRACE_COUNTER_ADD(ZenDDC_CacheRecordRequestCountInFlight, int64(Requests.Num())); COOK_STAT(Timers.Reserve(Requests.Num())); COOK_STAT(for (const FCacheGetRequest& Request : Requests) { Timers.Add(Request.Key, CacheStore.UsageStats.TimeGet()); }) } virtual ~FGetOp() { TRACE_COUNTER_SUBTRACT(ZenDDC_CacheRecordRequestCountInFlight, int64(Requests.Num())); } void IssueRequests() { FRequestBarrier Barrier(Owner); ForEachBatch(CacheStore.CacheRecordBatchSize, Requests.Num(), [this](int32 BatchFirst, int32 BatchLast) { TArrayView Batch(Requests.GetData() + BatchFirst, BatchLast - BatchFirst + 1); FCbWriter BatchRequest; BatchRequest.BeginObject(); { BatchRequest << ANSITEXTVIEW("Method") << ANSITEXTVIEW("GetCacheRecords"); BatchRequest.AddInteger(ANSITEXTVIEW("Accept"), Zen::Http::kCbPkgMagic); if (CacheStore.bIsLocalConnection) { BatchRequest.AddInteger(ANSITEXTVIEW("AcceptFlags"), static_cast(Zen::Http::RpcAcceptOptions::kAllowLocalReferences)); BatchRequest.AddInteger(ANSITEXTVIEW("Pid"), FPlatformProcess::GetCurrentProcessId()); } BatchRequest.BeginObject(ANSITEXTVIEW("Params")); { ECachePolicy BatchDefaultPolicy = Batch[0].Policy.GetRecordPolicy(); BatchRequest << ANSITEXTVIEW("DefaultPolicy") << *WriteToString<128>(BatchDefaultPolicy); BatchRequest.AddString(ANSITEXTVIEW("Namespace"), CacheStore.Namespace); BatchRequest.BeginArray(ANSITEXTVIEW("Requests")); for (const FCacheGetRequest& Request : Batch) { BatchRequest.BeginObject(); { BatchRequest << ANSITEXTVIEW("Key") << Request.Key; if (!Request.Policy.IsDefault()) { BatchRequest << ANSITEXTVIEW("Policy") << Request.Policy; } } BatchRequest.EndObject(); } BatchRequest.EndArray(); } BatchRequest.EndObject(); } BatchRequest.EndObject(); FGetOp* OriginalOp = this; auto OnRpcComplete = [this, OpRef = TRefCountPtr(OriginalOp), Batch](THttpUniquePtr& HttpResponse, FCbPackage& Response) { int32 RequestIndex = 0; if (HttpResponse->GetErrorCode() == EHttpErrorCode::None) { const FCbObject& ResponseObj = Response.GetObject(); for (FCbField RecordField : ResponseObj[ANSITEXTVIEW("Result")]) { if (RequestIndex >= Batch.Num()) { ++RequestIndex; continue; } const FCacheGetRequest& Request = Batch[RequestIndex++]; const FCacheKey& Key = Request.Key; FOptionalCacheRecord Record; if (CacheStore.DebugOptions.ShouldSimulateGetMiss(Key)) { UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Simulated miss for put of '%s' from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Key), *Request.Name); } else if (!RecordField.IsNull()) { Record = FCacheRecord::Load(Response, RecordField.AsObject()); } Record ? OnHit(Request, MoveTemp(Record).Get()) : OnMiss(Request); } if (RequestIndex != Batch.Num()) { UE_LOG(LogDerivedDataCache, Warning, TEXT("%s: Invalid response received from GetCacheRecords RPC: %d results expected, received %d, from %s"), *CacheStore.GetName(), Batch.Num(), RequestIndex, *WriteToString<256>(*HttpResponse)); } } for (const FCacheGetRequest& Request : Batch.RightChop(RequestIndex)) { OnMiss(Request); } }; CacheStore.EnqueueAsyncRpc(Owner, BatchRequest.Save().AsObject(), MoveTemp(OnRpcComplete)); }); } private: void OnHit(const FCacheGetRequest& Request, FCacheRecord&& Record) { UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache hit for '%s' from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Request.Key), *Request.Name); TRACE_COUNTER_INCREMENT(ZenDDC_GetHit); int64 ReceivedSize = Private::GetCacheRecordCompressedSize(Record); TRACE_COUNTER_ADD(ZenDDC_BytesReceived, ReceivedSize); COOK_STAT(Timers[Request.Key].AddHit(ReceivedSize)); OnComplete({Request.Name, MoveTemp(Record), Request.UserData, EStatus::Ok}); }; void OnMiss(const FCacheGetRequest& Request) { UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache miss for '%s' from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Request.Key), *Request.Name); COOK_STAT(Timers[Request.Key].AddMiss()); OnComplete(Request.MakeResponse(EStatus::Error)); }; FZenCacheStore& CacheStore; IRequestOwner& Owner; const TArray> Requests; FOnCacheGetComplete OnComplete; COOK_STAT(TMap Timers); }; class FZenCacheStore::FPutValueOp final : public FThreadSafeRefCountedObject { public: FPutValueOp(FZenCacheStore& InCacheStore, IRequestOwner& InOwner, const TConstArrayView InRequests, FOnCachePutValueComplete&& InOnComplete) : CacheStore(InCacheStore) , Owner(InOwner) , Requests(InRequests) , Batches(Requests, [this](const FCachePutValueRequest& NextRequest) {return BatchGroupingFilter(NextRequest);}) , OnComplete(MoveTemp(InOnComplete)) { TRACE_COUNTER_ADD(ZenDDC_Put, (int64)Requests.Num()); COOK_STAT(Timers.Reserve(Requests.Num())); COOK_STAT(for (const FCachePutValueRequest& Request : Requests) { Timers.Add(Request.Key, CacheStore.UsageStats.TimePut()); }) } void IssueRequests() { FRequestBarrier Barrier(Owner); for (TArrayView Batch : Batches) { FCbPackage BatchPackage; FCbWriter BatchWriter; BatchWriter.BeginObject(); { BatchWriter << ANSITEXTVIEW("Method") << ANSITEXTVIEW("PutCacheValues"); BatchWriter.AddInteger(ANSITEXTVIEW("Accept"), Zen::Http::kCbPkgMagic); BatchWriter.BeginObject(ANSITEXTVIEW("Params")); { ECachePolicy BatchDefaultPolicy = Batch[0].Policy; BatchWriter << ANSITEXTVIEW("DefaultPolicy") << *WriteToString<128>(BatchDefaultPolicy); BatchWriter.AddString(ANSITEXTVIEW("Namespace"), CacheStore.Namespace); BatchWriter.BeginArray("Requests"); for (const FCachePutValueRequest& Request : Batch) { BatchWriter.BeginObject(); { BatchWriter << ANSITEXTVIEW("Key") << Request.Key; const FValue& Value = Request.Value; BatchWriter.AddBinaryAttachment("RawHash", Value.GetRawHash()); if (Value.HasData()) { BatchPackage.AddAttachment(FCbAttachment(Value.GetData())); } if (Request.Policy != BatchDefaultPolicy) { BatchWriter << ANSITEXTVIEW("Policy") << WriteToString<128>(Request.Policy); } } BatchWriter.EndObject(); } BatchWriter.EndArray(); } BatchWriter.EndObject(); } BatchWriter.EndObject(); BatchPackage.SetObject(BatchWriter.Save().AsObject()); auto OnRpcComplete = [this, OpRef = TRefCountPtr(this), Batch](THttpUniquePtr& HttpResponse, FCbPackage& Response) { int32 RequestIndex = 0; if (HttpResponse->GetErrorCode() == EHttpErrorCode::None) { const FCbObject& ResponseObj = Response.GetObject(); for (FCbField ResponseField : ResponseObj[ANSITEXTVIEW("Result")]) { if (RequestIndex >= Batch.Num()) { ++RequestIndex; continue; } const FCachePutValueRequest& Request = Batch[RequestIndex++]; bool bPutSucceeded = ResponseField.AsBool(); if (CacheStore.DebugOptions.ShouldSimulatePutMiss(Request.Key)) { UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Simulated miss for PutValue of %s from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Request.Key), *Request.Name); bPutSucceeded = false; } bPutSucceeded ? OnHit(Request) : OnMiss(Request); } if (RequestIndex != Batch.Num()) { UE_LOG(LogDerivedDataCache, Warning, TEXT("%s: Invalid response received from PutCacheValues RPC: %d results expected, received %d, from %s"), *CacheStore.GetName(), Batch.Num(), RequestIndex, *WriteToString<256>(*HttpResponse)); } } for (const FCachePutValueRequest& Request : Batch.RightChop(RequestIndex)) { OnMiss(Request); } }; CacheStore.EnqueueAsyncRpc(Owner, BatchPackage, MoveTemp(OnRpcComplete)); } } private: EBatchView BatchGroupingFilter(const FCachePutValueRequest& NextRequest) { uint64 ValueSize = sizeof(FCacheKey) + NextRequest.Value.GetData().GetCompressedSize(); BatchSize += ValueSize; if (BatchSize > CacheStore.BatchPutMaxBytes) { BatchSize = ValueSize; return EBatchView::NewBatch; } return EBatchView::Continue; } void OnHit(const FCachePutValueRequest& Request) { UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache PutValue complete for %s from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Request.Key), *Request.Name); TRACE_COUNTER_INCREMENT(ZenDDC_PutHit); int64 SentSize = Request.Value.GetData().GetCompressedSize(); TRACE_COUNTER_ADD(ZenDDC_BytesSent, SentSize); COOK_STAT(Timers[Request.Key].AddHit(SentSize)); OnComplete(Request.MakeResponse(EStatus::Ok)); }; void OnMiss(const FCachePutValueRequest& Request) { UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache PutValue miss for '%s' from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Request.Key), *Request.Name); COOK_STAT(Timers[Request.Key].AddMiss()); OnComplete(Request.MakeResponse(EStatus::Error)); }; FZenCacheStore& CacheStore; IRequestOwner& Owner; const TArray> Requests; uint64 BatchSize = 0; TBatchView Batches; FOnCachePutValueComplete OnComplete; COOK_STAT(TMap Timers); }; class FZenCacheStore::FGetValueOp final : public FThreadSafeRefCountedObject { public: FGetValueOp(FZenCacheStore& InCacheStore, IRequestOwner& InOwner, const TConstArrayView InRequests, FOnCacheGetValueComplete&& InOnComplete) : CacheStore(InCacheStore) , Owner(InOwner) , Requests(InRequests) , OnComplete(MoveTemp(InOnComplete)) { TRACE_COUNTER_ADD(ZenDDC_Get, (int64)Requests.Num()); TRACE_COUNTER_ADD(ZenDDC_CacheRecordRequestCountInFlight, int64(Requests.Num())); COOK_STAT(Timers.Reserve(Requests.Num())); COOK_STAT(for (const FCacheGetValueRequest& Request : Requests) { Timers.Add(Request.Key, CacheStore.UsageStats.TimeGet()); }) } virtual ~FGetValueOp() { TRACE_COUNTER_SUBTRACT(ZenDDC_CacheRecordRequestCountInFlight, int64(Requests.Num())); } void IssueRequests() { FRequestBarrier Barrier(Owner); ForEachBatch(CacheStore.CacheRecordBatchSize, Requests.Num(), [this](int32 BatchFirst, int32 BatchLast) { TArrayView Batch(Requests.GetData() + BatchFirst, BatchLast - BatchFirst + 1); FCbWriter BatchRequest; BatchRequest.BeginObject(); { BatchRequest << ANSITEXTVIEW("Method") << ANSITEXTVIEW("GetCacheValues"); BatchRequest.AddInteger(ANSITEXTVIEW("Accept"), Zen::Http::kCbPkgMagic); if (CacheStore.bIsLocalConnection) { BatchRequest.AddInteger(ANSITEXTVIEW("AcceptFlags"), static_cast(Zen::Http::RpcAcceptOptions::kAllowLocalReferences)); BatchRequest.AddInteger(ANSITEXTVIEW("Pid"), FPlatformProcess::GetCurrentProcessId()); } BatchRequest.BeginObject(ANSITEXTVIEW("Params")); { ECachePolicy BatchDefaultPolicy = Batch[0].Policy; BatchRequest << ANSITEXTVIEW("DefaultPolicy") << *WriteToString<128>(BatchDefaultPolicy); BatchRequest.AddString(ANSITEXTVIEW("Namespace"), CacheStore.Namespace); BatchRequest.BeginArray("Requests"); for (const FCacheGetValueRequest& Request : Batch) { BatchRequest.BeginObject(); { BatchRequest << ANSITEXTVIEW("Key") << Request.Key; if (Request.Policy != BatchDefaultPolicy) { BatchRequest << ANSITEXTVIEW("Policy") << WriteToString<128>(Request.Policy); } } BatchRequest.EndObject(); } BatchRequest.EndArray(); } BatchRequest.EndObject(); } BatchRequest.EndObject(); FGetValueOp* OriginalOp = this; auto OnRpcComplete = [this, OpRef = TRefCountPtr(OriginalOp), Batch](THttpUniquePtr& HttpResponse, FCbPackage& Response) { int32 RequestIndex = 0; if (HttpResponse->GetErrorCode() == EHttpErrorCode::None) { const FCbObject& ResponseObj = Response.GetObject(); for (FCbFieldView ResultField : ResponseObj[ANSITEXTVIEW("Result")]) { if (RequestIndex >= Batch.Num()) { ++RequestIndex; continue; } const FCacheGetValueRequest& Request = Batch[RequestIndex++]; FCbObjectView ResultObj = ResultField.AsObjectView(); TOptional Value; if (CacheStore.DebugOptions.ShouldSimulateGetMiss(Request.Key)) { UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Simulated miss for GetValue of '%s' from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Request.Key), *Request.Name); } else { FCbFieldView RawHashField = ResultObj["RawHash"]; FIoHash RawHash = RawHashField.AsHash(); if (const FCbAttachment* Attachment = Response.FindAttachment(RawHash)) { Value.Emplace(Attachment->AsCompressedBinary()); } else { FCbFieldView RawSizeField = ResultObj["RawSize"]; uint64 RawSize = RawSizeField.AsUInt64(); if (!RawSizeField.HasError() && !RawHashField.HasError()) { Value.Emplace(RawHash, RawSize); } } } (bool)Value ? OnHit(Request, MoveTemp(*Value)) : OnMiss(Request); } if (RequestIndex != Batch.Num()) { UE_LOG(LogDerivedDataCache, Warning, TEXT("%s: Invalid response received from GetCacheValues RPC: %d results expected, received %d from %s"), *CacheStore.GetName(), Batch.Num(), RequestIndex, *WriteToString<256>(*HttpResponse)); } } for (const FCacheGetValueRequest& Request : Batch.RightChop(RequestIndex)) { OnMiss(Request); } }; CacheStore.EnqueueAsyncRpc(Owner, BatchRequest.Save().AsObject(), MoveTemp(OnRpcComplete)); }); } private: void OnHit(const FCacheGetValueRequest& Request, FValue&& Value) { UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache hit for '%s' from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Request.Key), *Request.Name); TRACE_COUNTER_INCREMENT(ZenDDC_GetHit); int64 ReceivedSize = Value.GetData().GetCompressedSize(); TRACE_COUNTER_ADD(ZenDDC_BytesReceived, ReceivedSize); COOK_STAT(Timers[Request.Key].AddHit(ReceivedSize)); OnComplete({Request.Name, Request.Key, MoveTemp(Value), Request.UserData, EStatus::Ok}); }; void OnMiss(const FCacheGetValueRequest& Request) { UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Cache miss for '%s' from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Request.Key), *Request.Name); COOK_STAT(Timers[Request.Key].AddMiss()); OnComplete(Request.MakeResponse(EStatus::Error)); }; FZenCacheStore& CacheStore; IRequestOwner& Owner; const TArray> Requests; FOnCacheGetValueComplete OnComplete; COOK_STAT(TMap Timers); }; class FZenCacheStore::FGetChunksOp final : public FThreadSafeRefCountedObject { public: FGetChunksOp(FZenCacheStore& InCacheStore, IRequestOwner& InOwner, const TConstArrayView InRequests, FOnCacheGetChunkComplete&& InOnComplete) : CacheStore(InCacheStore) , Owner(InOwner) , Requests(InRequests) , OnComplete(MoveTemp(InOnComplete)) { TRACE_COUNTER_ADD(ZenDDC_ChunkRequestCountInFlight, int64(Requests.Num())); TRACE_COUNTER_ADD(ZenDDC_Get, int64(Requests.Num())); Requests.StableSort(TChunkLess()); COOK_STAT(Timers.Reserve(Requests.Num())); COOK_STAT(for (const FCacheGetChunkRequest& Request : Requests) { Timers.Add(Request.Key, CacheStore.UsageStats.TimeGet()); }) } virtual ~FGetChunksOp() { TRACE_COUNTER_SUBTRACT(ZenDDC_ChunkRequestCountInFlight, int64(Requests.Num())); } void IssueRequests() { FRequestBarrier Barrier(Owner); ForEachBatch(CacheStore.CacheChunksBatchSize, Requests.Num(), [this](int32 BatchFirst, int32 BatchLast) { TArrayView Batch(Requests.GetData() + BatchFirst, BatchLast - BatchFirst + 1); FCbWriter BatchRequest; BatchRequest.BeginObject(); { BatchRequest << ANSITEXTVIEW("Method") << "GetCacheChunks"; BatchRequest.AddInteger(ANSITEXTVIEW("Accept"), Zen::Http::kCbPkgMagic); if (CacheStore.bIsLocalConnection) { BatchRequest.AddInteger(ANSITEXTVIEW("AcceptFlags"), static_cast(Zen::Http::RpcAcceptOptions::kAllowLocalReferences)); BatchRequest.AddInteger(ANSITEXTVIEW("Pid"), FPlatformProcess::GetCurrentProcessId()); } BatchRequest.BeginObject(ANSITEXTVIEW("Params")); { ECachePolicy DefaultPolicy = Batch[0].Policy; BatchRequest << ANSITEXTVIEW("DefaultPolicy") << WriteToString<128>(DefaultPolicy); BatchRequest.AddString(ANSITEXTVIEW("Namespace"), CacheStore.Namespace); BatchRequest.BeginArray(ANSITEXTVIEW("ChunkRequests")); for (const FCacheGetChunkRequest& Request : Batch) { BatchRequest.BeginObject(); { BatchRequest << ANSITEXTVIEW("Key") << Request.Key; if (Request.Id.IsValid()) { BatchRequest.AddObjectId(ANSITEXTVIEW("ValueId"), Request.Id); } if (Request.RawOffset != 0) { BatchRequest << ANSITEXTVIEW("RawOffset") << Request.RawOffset; } if (Request.RawSize != MAX_uint64) { BatchRequest << ANSITEXTVIEW("RawSize") << Request.RawSize; } if (!Request.RawHash.IsZero()) { BatchRequest << ANSITEXTVIEW("ChunkId") << Request.RawHash; } if (Request.Policy != DefaultPolicy) { BatchRequest << ANSITEXTVIEW("Policy") << WriteToString<128>(Request.Policy); } } BatchRequest.EndObject(); } BatchRequest.EndArray(); } BatchRequest.EndObject(); } BatchRequest.EndObject(); FGetChunksOp* OriginalOp = this; auto OnRpcComplete = [this, OpRef = TRefCountPtr(OriginalOp), Batch](THttpUniquePtr& HttpResponse, FCbPackage& Response) { int32 RequestIndex = 0; if (HttpResponse->GetErrorCode() == EHttpErrorCode::None) { const FCbObject& ResponseObj = Response.GetObject(); for (FCbFieldView ResultView : ResponseObj[ANSITEXTVIEW("Result")]) { if (RequestIndex >= Batch.Num()) { ++RequestIndex; continue; } const FCacheGetChunkRequest& Request = Batch[RequestIndex++]; FIoHash RawHash; bool Succeeded = false; uint64 RawSize = 0; FCbObjectView ResultObject = ResultView.AsObjectView(); FSharedBuffer RequestedBytes; if (CacheStore.DebugOptions.ShouldSimulateGetMiss(Request.Key)) { UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: Simulated miss for get of '%s' from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Request.Key, '/', Request.Id), *Request.Name); } else { FCbFieldView HashView = ResultObject[ANSITEXTVIEW("RawHash")]; RawHash = HashView.AsHash(); if (!HashView.HasError()) { if (const FCbAttachment* Attachment = Response.FindAttachment(HashView.AsHash())) { FCompressedBuffer CompressedBuffer = Attachment->AsCompressedBinary(); if (CompressedBuffer) { TRACE_COUNTER_ADD(ZenDDC_BytesReceived, CompressedBuffer.GetCompressedSize()); RequestedBytes = FCompressedBufferReader(CompressedBuffer).Decompress(Request.RawOffset, Request.RawSize); RawSize = RequestedBytes.GetSize(); Succeeded = true; } } else { FCbFieldView RawSizeField = ResultObject[ANSITEXTVIEW("RawSize")]; uint64 TotalSize = RawSizeField.AsUInt64(); Succeeded = !RawSizeField.HasError(); if (Succeeded) { RawSize = FMath::Min(Request.RawSize, TotalSize - FMath::Min(Request.RawOffset, TotalSize)); } } } } Succeeded ? OnHit(Request, MoveTemp(RawHash), RawSize, MoveTemp(RequestedBytes)) : OnMiss(Request); } } for (const FCacheGetChunkRequest& Request : Batch.RightChop(RequestIndex)) { OnMiss(Request); } }; CacheStore.EnqueueAsyncRpc(Owner, BatchRequest.Save().AsObject(), MoveTemp(OnRpcComplete)); }); } private: void OnHit(const FCacheGetChunkRequest& Request, FIoHash&& RawHash, uint64 RawSize, FSharedBuffer&& RequestedBytes) { UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: CacheChunk hit for '%s' from '%s'"), *CacheStore.GetName(), *WriteToString<96>(Request.Key, '/', Request.Id), *Request.Name); TRACE_COUNTER_INCREMENT(ZenDDC_GetHit); int64 ReceivedSize = RequestedBytes.GetSize(); TRACE_COUNTER_ADD(ZenDDC_BytesReceived, ReceivedSize); COOK_STAT(Timers[Request.Key].AddHit(RawSize)); OnComplete({Request.Name, Request.Key, Request.Id, Request.RawOffset, RawSize, MoveTemp(RawHash), MoveTemp(RequestedBytes), Request.UserData, EStatus::Ok}); }; void OnMiss(const FCacheGetChunkRequest& Request) { UE_LOG(LogDerivedDataCache, Verbose, TEXT("%s: CacheChunk miss with missing value '%s' for '%s' from '%s'"), *CacheStore.GetName(), *WriteToString<16>(Request.Id), *WriteToString<96>(Request.Key), *Request.Name); COOK_STAT(Timers[Request.Key].AddMiss()); OnComplete(Request.MakeResponse(EStatus::Error)); }; FZenCacheStore& CacheStore; IRequestOwner& Owner; TArray> Requests; FOnCacheGetChunkComplete OnComplete; COOK_STAT(TMap Timers); }; class FZenCacheStore::FCbPackageReceiver final : public IHttpReceiver { public: FCbPackageReceiver(const FCbPackageReceiver&) = delete; FCbPackageReceiver& operator=(const FCbPackageReceiver&) = delete; explicit FCbPackageReceiver(FCbPackage& OutPackage, IHttpReceiver* InNext = nullptr) : Package(OutPackage) , Next(InNext) { } private: IHttpReceiver* OnCreate(IHttpResponse& Response) final { return &BodyReceiver; } IHttpReceiver* OnComplete(IHttpResponse& Response) final { FMemoryView MemoryView = MakeMemoryView(BodyArray); { FMemoryReaderView Ar(MemoryView); if (Zen::Http::TryLoadCbPackage(Package, Ar)) { return Next; } } FMemoryReaderView Ar(MemoryView); Package.TryLoad(Ar); return Next; } private: FCbPackage& Package; IHttpReceiver* Next; TArray64 BodyArray; FHttpByteArrayReceiver BodyReceiver{BodyArray, this}; }; class FZenCacheStore::FAsyncCbPackageReceiver final : public FRequestBase, public IHttpReceiver { public: FAsyncCbPackageReceiver(const FAsyncCbPackageReceiver&) = delete; FAsyncCbPackageReceiver& operator=(const FAsyncCbPackageReceiver&) = delete; FAsyncCbPackageReceiver( THttpUniquePtr&& InRequest, IRequestOwner* InOwner, Zen::FZenServiceInstance& InZenServiceInstance, FOnRpcComplete&& InOnRpcComplete) : Request(MoveTemp(InRequest)) , Owner(InOwner) , ZenServiceInstance(InZenServiceInstance) , BaseReceiver(Package, this) , OnRpcComplete(MoveTemp(InOnRpcComplete)) { Request->SendAsync(this, Response); } private: // IRequest Interface void SetPriority(EPriority Priority) final {} void Cancel() final { Monitor->Cancel(); } void Wait() final { Monitor->Wait(); } // IHttpReceiver Interface IHttpReceiver* OnCreate(IHttpResponse& LocalResponse) final { Monitor = LocalResponse.GetMonitor(); Owner->Begin(this); return &BaseReceiver; } bool ShouldRecoverAndRetry(IHttpResponse& LocalResponse) { if (!ZenServiceInstance.IsServiceRunningLocally()) { return false; } if ((LocalResponse.GetErrorCode() == EHttpErrorCode::Connect) || (LocalResponse.GetErrorCode() == EHttpErrorCode::TlsConnect) || (LocalResponse.GetErrorCode() == EHttpErrorCode::TimedOut)) { return true; } return false; } IHttpReceiver* OnComplete(IHttpResponse& LocalResponse) final { Owner->End(this, [Self = this, &LocalResponse] { if (Self->ShouldRecoverAndRetry(LocalResponse) && Self->ZenServiceInstance.TryRecovery()) { new FAsyncCbPackageReceiver(MoveTemp(Self->Request), Self->Owner, Self->ZenServiceInstance, MoveTemp(Self->OnRpcComplete)); return; } Self->Request.Reset(); if (Self->OnRpcComplete) { // Launch a task for the completion function since it can execute arbitrary code. Self->Owner->LaunchTask(TEXT("ZenHttpComplete"), [Self = TRefCountPtr(Self)] { Self->OnRpcComplete(Self->Response, Self->Package); }); } }); return nullptr; } private: THttpUniquePtr Request; THttpUniquePtr Response; TRefCountPtr Monitor; IRequestOwner* Owner; Zen::FZenServiceInstance& ZenServiceInstance; FCbPackage Package; FCbPackageReceiver BaseReceiver; FOnRpcComplete OnRpcComplete; }; FZenCacheStore::FZenCacheStore( const TCHAR* InServiceUrl, const TCHAR* InNamespace) : ZenService(InServiceUrl) { Initialize(InNamespace); } FZenCacheStore::FZenCacheStore( UE::Zen::FServiceSettings&& InSettings, const TCHAR* InNamespace) : ZenService(MoveTemp(InSettings)) { Initialize(InNamespace); } void FZenCacheStore::Initialize( const TCHAR* InNamespace) { Namespace = InNamespace; if (IsServiceReady()) { RpcUri << ZenService.GetInstance().GetURL() << ANSITEXTVIEW("/z$/$rpc"); const uint32 MaxConnections = uint32(FMath::Clamp(FPlatformMisc::NumberOfCoresIncludingHyperthreads(), 8, 64)); constexpr uint32 RequestPoolSize = 128; constexpr uint32 RequestPoolOverflowSize = 128; FHttpConnectionPoolParams ConnectionPoolParams; ConnectionPoolParams.MaxConnections = MaxConnections; ConnectionPoolParams.MinConnections = MaxConnections; ConnectionPool = IHttpManager::Get().CreateConnectionPool(ConnectionPoolParams); FHttpClientParams ClientParams; ClientParams.Version = EHttpVersion::V2; ClientParams.MaxRequests = RequestPoolSize + RequestPoolOverflowSize; ClientParams.MinRequests = RequestPoolSize; RequestQueue = FHttpRequestQueue(*ConnectionPool, ClientParams); bIsUsable = true; bIsLocalConnection = ZenService.GetInstance().IsServiceRunningLocally(); // Issue a request for stats as it will be fetched asynchronously and issuing now makes them available sooner for future callers. Zen::FZenStats ZenStats; ZenService.GetInstance().GetStats(ZenStats); } GConfig->GetInt(TEXT("Zen"), TEXT("BatchPutMaxBytes"), BatchPutMaxBytes, GEngineIni); GConfig->GetInt(TEXT("Zen"), TEXT("CacheRecordBatchSize"), CacheRecordBatchSize, GEngineIni); GConfig->GetInt(TEXT("Zen"), TEXT("CacheChunksBatchSize"), CacheChunksBatchSize, GEngineIni); } bool FZenCacheStore::IsServiceReady() { return ZenService.GetInstance().IsServiceReady(); } FCompositeBuffer FZenCacheStore::SaveRpcPackage(const FCbPackage& Package) { FLargeMemoryWriter Memory; Zen::Http::SaveCbPackage(Package, Memory); uint64 PackageMemorySize = Memory.TotalSize(); return FCompositeBuffer(FSharedBuffer::TakeOwnership(Memory.ReleaseOwnership(), PackageMemorySize, FMemory::Free)); } THttpUniquePtr FZenCacheStore::CreateRpcRequest() { THttpUniquePtr Request = RequestQueue.CreateRequest({}); Request->SetUri(RpcUri); Request->SetMethod(EHttpMethod::Post); Request->AddAcceptType(EHttpMediaType::CbPackage); return Request; } void FZenCacheStore::EnqueueAsyncRpc(IRequestOwner& Owner, FCbObject RequestObject, FOnRpcComplete&& OnComplete) { THttpUniquePtr Request = CreateRpcRequest(); Request->SetContentType(EHttpMediaType::CbObject); Request->SetBody(RequestObject.GetBuffer().MakeOwned()); new FAsyncCbPackageReceiver(MoveTemp(Request), &Owner, ZenService.GetInstance(), MoveTemp(OnComplete)); } void FZenCacheStore::EnqueueAsyncRpc(IRequestOwner& Owner, const FCbPackage& RequestPackage, FOnRpcComplete&& OnComplete) { THttpUniquePtr Request = CreateRpcRequest(); Request->SetContentType(EHttpMediaType::CbPackage); Request->SetBody(SaveRpcPackage(RequestPackage)); new FAsyncCbPackageReceiver(MoveTemp(Request), &Owner, ZenService.GetInstance(), MoveTemp(OnComplete)); } void FZenCacheStore::LegacyStats(FDerivedDataCacheStatsNode& OutNode) { EDerivedDataCacheStatus CacheStatus = EDerivedDataCacheStatus::None; OutNode = { TEXT("Zen"), ZenService.GetInstance().GetURL(), /*bIsLocal*/ ZenService.GetInstance().IsServiceRunningLocally()}; OutNode.UsageStats.Add(TEXT(""), UsageStats); #if 0 // DE: 20230213 We might need to revisit this if we change so Zen handles upstream again Zen::FZenStats ZenStats; FDerivedDataCacheUsageStats LocalStats; FDerivedDataCacheUsageStats RemoteStats; #if ENABLE_COOK_STATS using EHitOrMiss = FCookStats::CallStats::EHitOrMiss; using ECacheStatType = FCookStats::CallStats::EStatType; ZenService.GetInstance().GetStats(ZenStats); const int64 RemotePutSize = int64(ZenStats.UpstreamStats.TotalUploadedMB * 1024 * 1024); const int64 RemoteGetSize = int64(ZenStats.UpstreamStats.TotalDownloadedMB * 1024 * 1024); const int64 LocalGetSize = FMath::Max(0, UsageStats.GetStats.GetAccumulatedValueAnyThread(EHitOrMiss::Hit, ECacheStatType::Bytes) - RemoteGetSize); LocalStats.PutStats = UsageStats.PutStats; LocalStats.ExistsStats = UsageStats.ExistsStats; LocalStats.PrefetchStats = UsageStats.PrefetchStats; LocalStats.GetStats.Accumulate(EHitOrMiss::Hit, ECacheStatType::Counter, ZenStats.CacheStats.Hits - ZenStats.CacheStats.UpstreamHits, /*bIsInGameThread*/ false); LocalStats.GetStats.Accumulate(EHitOrMiss::Miss, ECacheStatType::Counter, ZenStats.CacheStats.Misses + ZenStats.CacheStats.UpstreamHits, /*bIsInGameThread*/ false); RemoteStats.GetStats.Accumulate(EHitOrMiss::Hit, ECacheStatType::Counter, ZenStats.CacheStats.UpstreamHits, /*bIsInGameThread*/ false); RemoteStats.GetStats.Accumulate(EHitOrMiss::Miss, ECacheStatType::Counter, ZenStats.CacheStats.Misses, /*bIsInGameThread*/ false); LocalStats.GetStats.Accumulate(EHitOrMiss::Hit, ECacheStatType::Bytes, LocalGetSize, /*bIsInGameThread*/ false); RemoteStats.GetStats.Accumulate(EHitOrMiss::Hit, ECacheStatType::Bytes, RemoteGetSize, /*bIsInGameThread*/ false); RemoteStats.PutStats.Accumulate(EHitOrMiss::Hit, ECacheStatType::Bytes, RemotePutSize, /*bIsInGameThread*/ false); #endif if (ZenStats.UpstreamStats.EndPointStats.IsEmpty()) { OutNode = {TEXT("Zen"), ZenService.GetInstance().GetURL(), /*bIsLocal*/ true}; OutNode.UsageStats.Add(TEXT(""), LocalStats); return; } TSharedRef LocalNode = MakeShared(TEXT("Zen"), ZenService.GetInstance().GetURL(), /*bIsLocal*/ true); LocalNode->UsageStats.Add(TEXT(""), LocalStats); TSharedRef RemoteNode = MakeShared(ZenStats.UpstreamStats.EndPointStats[0].Name, ZenStats.UpstreamStats.EndPointStats[0].Url, /*bIsLocal*/ false); RemoteNode->UsageStats.Add(TEXT(""), RemoteStats); OutNode = {TEXT("Zen Group"), TEXT(""), /*bIsLocal*/ true}; OutNode.Children.Add(LocalNode); OutNode.Children.Add(RemoteNode); #endif // 0 } bool FZenCacheStore::LegacyDebugOptions(FBackendDebugOptions& InOptions) { DebugOptions = InOptions; return true; } void FZenCacheStore::Put( const TConstArrayView Requests, IRequestOwner& Owner, FOnCachePutComplete&& OnComplete) { TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC::PutCachedRecord); TRefCountPtr PutOp = MakeAsyncOp(*this, Owner, Requests, MoveTemp(OnComplete)); PutOp->IssueRequests(); } void FZenCacheStore::Get( const TConstArrayView Requests, IRequestOwner& Owner, FOnCacheGetComplete&& OnComplete) { TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC::GetCacheRecord); TRefCountPtr GetOp = MakeAsyncOp(*this, Owner, Requests, MoveTemp(OnComplete)); GetOp->IssueRequests(); } void FZenCacheStore::PutValue( TConstArrayView Requests, IRequestOwner& Owner, FOnCachePutValueComplete&& OnComplete) { TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC::PutValue); TRefCountPtr PutValueOp = MakeAsyncOp(*this, Owner, Requests, MoveTemp(OnComplete)); PutValueOp->IssueRequests(); } void FZenCacheStore::GetValue( TConstArrayView Requests, IRequestOwner& Owner, FOnCacheGetValueComplete&& OnComplete) { TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC::GetValue); TRefCountPtr GetValueOp = MakeAsyncOp(*this, Owner, Requests, MoveTemp(OnComplete)); GetValueOp->IssueRequests(); } void FZenCacheStore::GetChunks( TConstArrayView Requests, IRequestOwner& Owner, FOnCacheGetChunkComplete&& OnComplete) { TRACE_CPUPROFILER_EVENT_SCOPE(ZenDDC::GetChunks); TRefCountPtr GetChunksOp = MakeAsyncOp(*this, Owner, Requests, MoveTemp(OnComplete)); GetChunksOp->IssueRequests(); } TTuple CreateZenCacheStore(const TCHAR* NodeName, const TCHAR* Config) { FString ServiceUrl; FParse::Value(Config, TEXT("Host="), ServiceUrl); FString OverrideName; if (FParse::Value(Config, TEXT("EnvHostOverride="), OverrideName)) { FString ServiceUrlEnv = FPlatformMisc::GetEnvironmentVariable(*OverrideName); if (!ServiceUrlEnv.IsEmpty()) { ServiceUrl = ServiceUrlEnv; UE_LOG(LogDerivedDataCache, Log, TEXT("%s: Found environment override for Host %s=%s"), NodeName, *OverrideName, *ServiceUrl); } } if (FParse::Value(Config, TEXT("CommandLineHostOverride="), OverrideName)) { if (FParse::Value(FCommandLine::Get(), *(OverrideName + TEXT("=")), ServiceUrl)) { UE_LOG(LogDerivedDataCache, Log, TEXT("%s: Found command line override for Host %s=%s"), NodeName, *OverrideName, *ServiceUrl); } } if (ServiceUrl == TEXT("None")) { UE_LOG(LogDerivedDataCache, Log, TEXT("Disabling %s data cache - host set to 'None'."), NodeName); return MakeTuple(nullptr, ECacheStoreFlags::None); } FString Namespace; if (!FParse::Value(Config, TEXT("StructuredNamespace="), Namespace) && !FParse::Value(Config, TEXT("Namespace="), Namespace)) { Namespace = FApp::GetProjectName(); UE_LOG(LogDerivedDataCache, Warning, TEXT("%s: Missing required parameter 'Namespace', falling back to '%s'"), NodeName, *Namespace); } FString Sandbox; FParse::Value(Config, TEXT("Sandbox="), Sandbox); bool bHasSandbox = !Sandbox.IsEmpty(); bool bUseLocalDataCachePathOverrides = !bHasSandbox; FString CachePathOverride; if (bUseLocalDataCachePathOverrides && UE::Zen::Private::IsLocalAutoLaunched(ServiceUrl) && UE::Zen::Private::GetLocalDataCachePathOverride(CachePathOverride)) { if (CachePathOverride == TEXT("None")) { UE_LOG(LogDerivedDataCache, Log, TEXT("Disabling %s data cache - path set to 'None'."), NodeName); return MakeTuple(nullptr, ECacheStoreFlags::None); } } TUniquePtr Backend; bool bFlush = false; FParse::Bool(Config, TEXT("Flush="), bFlush); if (bHasSandbox) { Zen::FServiceSettings DefaultServiceSettings; DefaultServiceSettings.ReadFromConfig(); if (!DefaultServiceSettings.IsAutoLaunch()) { UE_LOG(LogDerivedDataCache, Warning, TEXT("%s: Attempting to use a sandbox when there is no default autolaunch configured to interhit settings from. Cache will be disabled."), NodeName); return MakeTuple(nullptr, ECacheStoreFlags::None); } // Make a unique local instance (not the default local instance) of ZenServer Zen::FServiceSettings ServiceSettings; ServiceSettings.SettingsVariant.Emplace(); Zen::FServiceAutoLaunchSettings& AutoLaunchSettings = ServiceSettings.SettingsVariant.Get(); const Zen::FServiceAutoLaunchSettings& DefaultAutoLaunchSettings = DefaultServiceSettings.SettingsVariant.Get(); AutoLaunchSettings = DefaultAutoLaunchSettings; // Default as one more than the default port to not collide. Multiple sandboxes will share a desired port, but will get differing effective ports. AutoLaunchSettings.DesiredPort++; FPaths::NormalizeDirectoryName(AutoLaunchSettings.DataPath); AutoLaunchSettings.DataPath += TEXT("_"); AutoLaunchSettings.DataPath += Sandbox; // The unique local instances will always limit process lifetime for now to avoid accumulating many of them AutoLaunchSettings.bLimitProcessLifetime = true; // Flush the cache if requested. if (bFlush) { bool bStopped = true; if (UE::Zen::IsLocalServiceRunning(*AutoLaunchSettings.DataPath)) { bStopped = UE::Zen::StopLocalService(*AutoLaunchSettings.DataPath); } if (bStopped) { IFileManager::Get().DeleteDirectory(*(AutoLaunchSettings.DataPath / TEXT("")), /*bRequireExists*/ false, /*bTree*/ true); } else { UE_LOG(LogDerivedDataCache, Warning, TEXT("%s: Zen DDC could not be flushed due to an existing instance not shutting down when requested."), NodeName); } } Backend = MakeUnique(MoveTemp(ServiceSettings), *Namespace); } else { Backend = MakeUnique(*ServiceUrl, *Namespace); } if (!Backend->IsUsable()) { UE_LOG(LogDerivedDataCache, Warning, TEXT("%s: Failed to contact the service (%s), will not use it."), NodeName, *Backend->GetName()); Backend.Reset(); } bool bReadOnly = false; FParse::Bool(Config, TEXT("ReadOnly="), bReadOnly); // Default to locally launched service getting the Local cache store flag. Can be overridden by explicit value in config. const bool bServiceRunningLocally = Backend->GetServiceInstance().IsServiceRunningLocally(); bool bLocal = bServiceRunningLocally; FParse::Bool(Config, TEXT("Local="), bLocal); // Default to non-locally launched service getting the Remote cache store flag. Can be overridden by explicit value in config. // In the future this could be extended to allow the Remote flag by default (even for locally launched instances) if they have upstreams configured. bool bRemote = !bServiceRunningLocally; FParse::Bool(Config, TEXT("Remote="), bRemote); ECacheStoreFlags Flags = ECacheStoreFlags::Query; Flags |= bReadOnly ? ECacheStoreFlags::None : ECacheStoreFlags::Store; Flags |= bLocal ? ECacheStoreFlags::Local : ECacheStoreFlags::None; Flags |= bRemote ? ECacheStoreFlags::Remote : ECacheStoreFlags::None; return MakeTuple(Backend.Release(), Flags); } } // namespace UE::DerivedData #else namespace UE::DerivedData { TTuple CreateZenCacheStore(const TCHAR* NodeName, const TCHAR* Config) { UE_LOG(LogDerivedDataCache, Warning, TEXT("%s: Zen cache is not yet supported in the current build configuration."), NodeName); return MakeTuple(nullptr, ECacheStoreFlags::None); } } // UE::DerivedData #endif // UE_WITH_ZEN