2022-05-30 18:14:02 -04:00
// Copyright Epic Games, Inc. All Rights Reserved.
# include "ZenServerInterface.h"
2022-05-31 23:25:37 -04:00
# include "DerivedDataLegacyCacheStore.h"
2022-05-30 18:14:02 -04:00
# if UE_WITH_ZEN
# include "BatchView.h"
2022-05-31 22:01:53 -04:00
# include "DerivedDataBackendInterface.h"
2022-05-30 18:14:02 -04:00
# include "DerivedDataCachePrivate.h"
# include "DerivedDataCacheRecord.h"
# include "DerivedDataCacheUsageStats.h"
# include "DerivedDataChunk.h"
2022-07-18 12:23:16 -04:00
# include "DerivedDataRequest.h"
2022-06-10 12:56:08 -04:00
# include "DerivedDataRequestOwner.h"
2022-05-31 12:42:17 -04:00
# include "Http/HttpClient.h"
2022-06-20 02:08:49 -04:00
# include "Math/UnrealMathUtility.h"
2022-05-31 22:01:53 -04:00
# include "Misc/App.h"
2022-05-30 18:14:02 -04:00
# include "Misc/ConfigCacheIni.h"
# include "Misc/Optional.h"
# include "ProfilingDebugging/CountersTrace.h"
# include "ProfilingDebugging/CpuProfilerTrace.h"
# include "Serialization/CompactBinary.h"
# include "Serialization/CompactBinaryPackage.h"
# include "Serialization/CompactBinaryWriter.h"
2022-05-31 12:42:17 -04:00
# include "Serialization/LargeMemoryWriter.h"
2022-07-18 12:23:16 -04:00
# include "Serialization/MemoryReader.h"
2022-05-31 12:42:17 -04:00
# include "Templates/Function.h"
2022-05-30 18:14:02 -04:00
# include "ZenBackendUtils.h"
# include "ZenSerialization.h"
# include "ZenStatistics.h"
TRACE_DECLARE_INT_COUNTER ( ZenDDC_Get , TEXT ( " ZenDDC Get " ) ) ;
TRACE_DECLARE_INT_COUNTER ( ZenDDC_GetHit , TEXT ( " ZenDDC Get Hit " ) ) ;
TRACE_DECLARE_INT_COUNTER ( ZenDDC_Put , TEXT ( " ZenDDC Put " ) ) ;
TRACE_DECLARE_INT_COUNTER ( ZenDDC_PutHit , TEXT ( " ZenDDC Put Hit " ) ) ;
TRACE_DECLARE_INT_COUNTER ( ZenDDC_BytesReceived , TEXT ( " ZenDDC Bytes Received " ) ) ;
TRACE_DECLARE_INT_COUNTER ( ZenDDC_BytesSent , TEXT ( " ZenDDC Bytes Sent " ) ) ;
TRACE_DECLARE_INT_COUNTER ( ZenDDC_CacheRecordRequestCount , TEXT ( " ZenDDC CacheRecord Request Count " ) ) ;
TRACE_DECLARE_INT_COUNTER ( ZenDDC_ChunkRequestCount , TEXT ( " ZenDDC Chunk Request Count " ) ) ;
namespace UE : : DerivedData
{
template < typename T >
void ForEachBatch ( const int32 BatchSize , const int32 TotalCount , T & & Fn )
{
check ( BatchSize > 0 ) ;
if ( TotalCount > 0 )
{
const int32 BatchCount = FMath : : DivideAndRoundUp ( TotalCount , BatchSize ) ;
const int32 Last = TotalCount - 1 ;
for ( int32 BatchIndex = 0 ; BatchIndex < BatchCount ; BatchIndex + + )
{
const int32 BatchFirstIndex = BatchIndex * BatchSize ;
const int32 BatchLastIndex = FMath : : Min ( BatchFirstIndex + BatchSize - 1 , Last ) ;
Fn ( BatchFirstIndex , BatchLastIndex ) ;
}
}
}
/**
* Backend for a HTTP based caching service ( Zen )
*/
2022-05-31 22:01:53 -04:00
class FZenCacheStore final : public ILegacyCacheStore
2022-05-30 18:14:02 -04:00
{
public :
/**
2022-05-31 22:01:53 -04:00
* Creates the cache store client , checks health status and attempts to acquire an access token .
2022-05-30 18:14:02 -04:00
*
2022-07-18 12:23:16 -04:00
* @ param ServiceUrl Base url to the service including scheme .
2022-05-30 18:14:02 -04:00
* @ param Namespace Namespace to use .
*/
2022-07-18 12:23:16 -04:00
FZenCacheStore ( const TCHAR * ServiceUrl , const TCHAR * Namespace ) ;
2022-05-30 18:14:02 -04:00
2022-05-31 22:01:53 -04:00
inline FString GetName ( ) const { return ZenService . GetInstance ( ) . GetURL ( ) ; }
2022-05-30 18:14:02 -04:00
/**
2022-05-31 22:01:53 -04:00
* Checks if cache service is usable ( reachable and accessible ) .
2022-05-30 18:14:02 -04:00
* @ return true if usable
*/
2022-05-31 22:01:53 -04:00
inline bool IsUsable ( ) const { return bIsUsable ; }
2022-05-30 18:14:02 -04:00
// ICacheStore
2022-05-31 22:01:53 -04:00
void Put (
2022-05-30 18:14:02 -04:00
TConstArrayView < FCachePutRequest > Requests ,
IRequestOwner & Owner ,
2022-05-31 22:01:53 -04:00
FOnCachePutComplete & & OnComplete = FOnCachePutComplete ( ) ) final ;
2022-05-30 18:14:02 -04:00
2022-05-31 22:01:53 -04:00
void Get (
2022-05-30 18:14:02 -04:00
TConstArrayView < FCacheGetRequest > Requests ,
IRequestOwner & Owner ,
2022-05-31 22:01:53 -04:00
FOnCacheGetComplete & & OnComplete ) final ;
2022-05-30 18:14:02 -04:00
2022-05-31 22:01:53 -04:00
void PutValue (
2022-05-30 18:14:02 -04:00
TConstArrayView < FCachePutValueRequest > Requests ,
IRequestOwner & Owner ,
2022-05-31 22:01:53 -04:00
FOnCachePutValueComplete & & OnComplete = FOnCachePutValueComplete ( ) ) final ;
2022-05-30 18:14:02 -04:00
2022-05-31 22:01:53 -04:00
void GetValue (
2022-05-30 18:14:02 -04:00
TConstArrayView < FCacheGetValueRequest > Requests ,
IRequestOwner & Owner ,
2022-05-31 22:01:53 -04:00
FOnCacheGetValueComplete & & OnComplete = FOnCacheGetValueComplete ( ) ) final ;
2022-05-30 18:14:02 -04:00
2022-05-31 22:01:53 -04:00
void GetChunks (
2022-05-30 18:14:02 -04:00
TConstArrayView < FCacheGetChunkRequest > Requests ,
IRequestOwner & Owner ,
2022-05-31 22:01:53 -04:00
FOnCacheGetChunkComplete & & OnComplete ) final ;
// ILegacyCacheStore
void LegacyStats ( FDerivedDataCacheStatsNode & OutNode ) final ;
bool LegacyDebugOptions ( FBackendDebugOptions & Options ) final ;
2022-05-30 18:14:02 -04:00
private :
bool IsServiceReady ( ) ;
2022-07-18 12:23:16 -04:00
static FCompositeBuffer SaveRpcPackage ( const FCbPackage & Package ) ;
THttpUniquePtr < IHttpRequest > CreateRpcRequest ( ) ;
THttpUniquePtr < IHttpResponse > PerformBlockingRpc ( FCbObject RequestObject , FCbPackage & OutResponse ) ;
THttpUniquePtr < IHttpResponse > PerformBlockingRpc ( const FCbPackage & RequestPackage , FCbPackage & OutResponse ) ;
using FOnRpcComplete = TUniqueFunction < void ( THttpUniquePtr < IHttpResponse > & HttpResponse , FCbPackage & Response ) > ;
void EnqueueAsyncRpc ( IRequestOwner & Owner , FCbObject RequestObject , FOnRpcComplete & & OnComplete ) ;
void EnqueueAsyncRpc ( IRequestOwner & Owner , const FCbPackage & RequestPackage , FOnRpcComplete & & OnComplete ) ;
2022-06-10 12:56:08 -04:00
template < typename T , typename . . . ArgTypes >
static TRefCountPtr < T > MakeAsyncOp ( ArgTypes & & . . . Args )
{
// TODO: This should in-place construct from a pre-allocated memory pool
return TRefCountPtr < T > ( new T ( Forward < ArgTypes > ( Args ) . . . ) ) ;
}
2022-07-18 12:23:16 -04:00
2022-05-30 18:14:02 -04:00
private :
2022-07-18 12:23:16 -04:00
class FPutOp ;
class FGetOp ;
class FPutValueOp ;
class FGetValueOp ;
class FGetChunksOp ;
class FCbPackageReceiver ;
class FAsyncCbPackageReceiver ;
2022-06-10 12:56:08 -04:00
2022-05-30 18:14:02 -04:00
FString Namespace ;
UE : : Zen : : FScopeZenService ZenService ;
mutable FDerivedDataCacheUsageStats UsageStats ;
2022-07-18 12:23:16 -04:00
THttpUniquePtr < IHttpConnectionPool > ConnectionPool ;
FHttpRequestQueue RequestQueue ;
2022-05-30 18:14:02 -04:00
bool bIsUsable = false ;
int32 BatchPutMaxBytes = 1024 * 1024 ;
int32 CacheRecordBatchSize = 8 ;
int32 CacheChunksBatchSize = 8 ;
FBackendDebugOptions DebugOptions ;
2022-07-18 12:23:16 -04:00
TAnsiStringBuilder < 256 > RpcUri ;
2022-05-30 18:14:02 -04:00
} ;
2022-07-18 12:23:16 -04:00
class FZenCacheStore : : FPutOp final : public FThreadSafeRefCountedObject
2022-06-10 12:56:08 -04:00
{
public :
FPutOp ( FZenCacheStore & InCacheStore ,
IRequestOwner & InOwner ,
const TConstArrayView < FCachePutRequest > InRequests ,
FOnCachePutComplete & & InOnComplete )
: CacheStore ( InCacheStore )
, Owner ( InOwner )
, Requests ( InRequests )
, Batches ( Requests , [ this ] ( const FCachePutRequest & NextRequest ) { return BatchGroupingFilter ( NextRequest ) ; } )
, OnComplete ( MoveTemp ( InOnComplete ) )
{
}
void IssueRequests ( )
{
FRequestBarrier Barrier ( Owner ) ;
for ( TArrayView < const FCachePutRequest > Batch : Batches )
{
FCbPackage BatchPackage ;
FCbWriter BatchWriter ;
BatchWriter . BeginObject ( ) ;
{
BatchWriter < < ANSITEXTVIEW ( " Method " ) < < " PutCacheRecords " ;
BatchWriter . BeginObject ( ANSITEXTVIEW ( " Params " ) ) ;
{
ECachePolicy BatchDefaultPolicy = Batch [ 0 ] . Policy . GetRecordPolicy ( ) ;
BatchWriter < < ANSITEXTVIEW ( " DefaultPolicy " ) < < * WriteToString < 128 > ( BatchDefaultPolicy ) ;
2022-07-18 12:23:16 -04:00
BatchWriter . AddString ( ANSITEXTVIEW ( " Namespace " ) , CacheStore . Namespace ) ;
2022-06-10 12:56:08 -04:00
BatchWriter . BeginArray ( ANSITEXTVIEW ( " Requests " ) ) ;
for ( const FCachePutRequest & Request : Batch )
{
const FCacheRecord & Record = Request . Record ;
BatchWriter . BeginObject ( ) ;
{
BatchWriter . SetName ( ANSITEXTVIEW ( " Record " ) ) ;
Record . Save ( BatchPackage , BatchWriter ) ;
2022-06-20 23:40:25 -04:00
if ( ! Request . Policy . IsDefault ( ) )
2022-06-10 12:56:08 -04:00
{
2022-06-20 23:40:25 -04:00
BatchWriter < < ANSITEXTVIEW ( " Policy " ) < < Request . Policy ;
2022-06-10 12:56:08 -04:00
}
}
BatchWriter . EndObject ( ) ;
}
BatchWriter . EndArray ( ) ;
}
BatchWriter . EndObject ( ) ;
}
BatchWriter . EndObject ( ) ;
BatchPackage . SetObject ( BatchWriter . Save ( ) . AsObject ( ) ) ;
2022-07-18 12:23:16 -04:00
auto OnRpcComplete = [ this , OpRef = TRefCountPtr < FPutOp > ( this ) , Batch ] ( THttpUniquePtr < IHttpResponse > & HttpResponse , FCbPackage & Response )
2022-06-10 12:56:08 -04:00
{
int32 RequestIndex = 0 ;
2022-07-18 12:23:16 -04:00
if ( HttpResponse - > GetErrorCode ( ) = = EHttpErrorCode : : None )
2022-06-10 12:56:08 -04:00
{
const FCbObject & ResponseObj = Response . GetObject ( ) ;
for ( FCbField ResponseField : ResponseObj [ ANSITEXTVIEW ( " Result " ) ] )
{
if ( RequestIndex > = Batch . Num ( ) )
{
+ + RequestIndex ;
continue ;
}
const FCachePutRequest & Request = Batch [ RequestIndex + + ] ;
const FCacheKey & Key = Request . Record . GetKey ( ) ;
bool bPutSucceeded = ResponseField . AsBool ( ) ;
if ( CacheStore . DebugOptions . ShouldSimulatePutMiss ( Key ) )
{
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: Simulated miss for put of %s from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Key ) , * Request . Name ) ;
bPutSucceeded = false ;
}
bPutSucceeded ? OnHit ( Request ) : OnMiss ( Request ) ;
}
if ( RequestIndex ! = Batch . Num ( ) )
{
UE_LOG ( LogDerivedDataCache , Warning ,
2022-07-21 23:01:51 -04:00
TEXT ( " %s: Invalid response received from PutCacheRecords RPC: %d results expected, received %d, from %s " ) ,
* CacheStore . GetName ( ) , Batch . Num ( ) , RequestIndex , * WriteToString < 256 > ( * HttpResponse ) ) ;
2022-06-10 12:56:08 -04:00
}
}
for ( const FCachePutRequest & Request : Batch . RightChop ( RequestIndex ) )
{
OnMiss ( Request ) ;
}
} ;
2022-07-18 12:23:16 -04:00
CacheStore . EnqueueAsyncRpc ( Owner , BatchPackage , MoveTemp ( OnRpcComplete ) ) ;
2022-06-10 12:56:08 -04:00
}
}
private :
EBatchView BatchGroupingFilter ( const FCachePutRequest & NextRequest )
{
const FCacheRecord & Record = NextRequest . Record ;
uint64 RecordSize = sizeof ( FCacheKey ) + Record . GetMeta ( ) . GetSize ( ) ;
for ( const FValueWithId & Value : Record . GetValues ( ) )
{
RecordSize + = Value . GetData ( ) . GetCompressedSize ( ) ;
}
BatchSize + = RecordSize ;
if ( BatchSize > CacheStore . BatchPutMaxBytes )
{
BatchSize = RecordSize ;
return EBatchView : : NewBatch ;
}
return EBatchView : : Continue ;
}
void OnHit ( const FCachePutRequest & Request )
{
const FCacheKey & Key = Request . Record . GetKey ( ) ;
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: Cache Put complete for %s from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Key ) , * Request . Name ) ;
COOK_STAT ( Timer . AddHit ( Private : : GetCacheRecordCompressedSize ( Request . Record ) ) ) ;
OnComplete ( Request . MakeResponse ( EStatus : : Ok ) ) ;
} ;
void OnMiss ( const FCachePutRequest & Request )
{
const FCacheKey & Key = Request . Record . GetKey ( ) ;
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: Cache Put miss for '%s' from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Key ) , * Request . Name ) ;
COOK_STAT ( Timer . AddMiss ( ) ) ;
OnComplete ( Request . MakeResponse ( EStatus : : Error ) ) ;
} ;
FZenCacheStore & CacheStore ;
IRequestOwner & Owner ;
const TArray < FCachePutRequest , TInlineAllocator < 1 > > Requests ;
uint64 BatchSize = 0 ;
TBatchView < const FCachePutRequest > Batches ;
FOnCachePutComplete OnComplete ;
COOK_STAT ( FCookStats : : FScopedStatsCounter Timer = CacheStore . UsageStats . TimePut ( ) ) ;
} ;
2022-07-18 12:23:16 -04:00
class FZenCacheStore : : FGetOp final : public FThreadSafeRefCountedObject
2022-06-10 12:56:08 -04:00
{
public :
FGetOp ( FZenCacheStore & InCacheStore ,
IRequestOwner & InOwner ,
const TConstArrayView < FCacheGetRequest > InRequests ,
FOnCacheGetComplete & & InOnComplete )
: CacheStore ( InCacheStore )
, Owner ( InOwner )
, Requests ( InRequests )
, OnComplete ( MoveTemp ( InOnComplete ) )
{
TRACE_COUNTER_ADD ( ZenDDC_Get , ( int64 ) Requests . Num ( ) ) ;
TRACE_COUNTER_ADD ( ZenDDC_CacheRecordRequestCount , int64 ( Requests . Num ( ) ) ) ;
}
virtual ~ FGetOp ( )
{
TRACE_COUNTER_SUBTRACT ( ZenDDC_CacheRecordRequestCount , int64 ( Requests . Num ( ) ) ) ;
}
void IssueRequests ( )
{
FRequestBarrier Barrier ( Owner ) ;
ForEachBatch ( CacheStore . CacheRecordBatchSize , Requests . Num ( ) ,
[ this ] ( int32 BatchFirst , int32 BatchLast )
{
TArrayView < const FCacheGetRequest > Batch ( Requests . GetData ( ) + BatchFirst , BatchLast - BatchFirst + 1 ) ;
FCbWriter BatchRequest ;
BatchRequest . BeginObject ( ) ;
{
BatchRequest < < ANSITEXTVIEW ( " Method " ) < < ANSITEXTVIEW ( " GetCacheRecords " ) ;
BatchRequest . BeginObject ( ANSITEXTVIEW ( " Params " ) ) ;
{
ECachePolicy BatchDefaultPolicy = Batch [ 0 ] . Policy . GetRecordPolicy ( ) ;
BatchRequest < < ANSITEXTVIEW ( " DefaultPolicy " ) < < * WriteToString < 128 > ( BatchDefaultPolicy ) ;
2022-07-18 12:23:16 -04:00
BatchRequest . AddString ( ANSITEXTVIEW ( " Namespace " ) , CacheStore . Namespace ) ;
2022-06-10 12:56:08 -04:00
BatchRequest . BeginArray ( ANSITEXTVIEW ( " Requests " ) ) ;
for ( const FCacheGetRequest & Request : Batch )
{
BatchRequest . BeginObject ( ) ;
{
BatchRequest < < ANSITEXTVIEW ( " Key " ) < < Request . Key ;
2022-06-20 23:40:25 -04:00
if ( ! Request . Policy . IsDefault ( ) )
2022-06-10 12:56:08 -04:00
{
2022-06-20 23:40:25 -04:00
BatchRequest < < ANSITEXTVIEW ( " Policy " ) < < Request . Policy ;
2022-06-10 12:56:08 -04:00
}
}
BatchRequest . EndObject ( ) ;
}
BatchRequest . EndArray ( ) ;
}
BatchRequest . EndObject ( ) ;
}
BatchRequest . EndObject ( ) ;
FGetOp * OriginalOp = this ;
2022-07-18 12:23:16 -04:00
auto OnRpcComplete = [ this , OpRef = TRefCountPtr < FGetOp > ( OriginalOp ) , Batch ] ( THttpUniquePtr < IHttpResponse > & HttpResponse , FCbPackage & Response )
2022-06-10 12:56:08 -04:00
{
int32 RequestIndex = 0 ;
2022-07-18 12:23:16 -04:00
if ( HttpResponse - > GetErrorCode ( ) = = EHttpErrorCode : : None )
2022-06-10 12:56:08 -04:00
{
const FCbObject & ResponseObj = Response . GetObject ( ) ;
for ( FCbField RecordField : ResponseObj [ ANSITEXTVIEW ( " Result " ) ] )
{
if ( RequestIndex > = Batch . Num ( ) )
{
+ + RequestIndex ;
continue ;
}
const FCacheGetRequest & Request = Batch [ RequestIndex + + ] ;
const FCacheKey & Key = Request . Key ;
FOptionalCacheRecord Record ;
if ( CacheStore . DebugOptions . ShouldSimulateGetMiss ( Key ) )
{
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: Simulated miss for put of '%s' from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Key ) , * Request . Name ) ;
}
else if ( ! RecordField . IsNull ( ) )
{
Record = FCacheRecord : : Load ( Response , RecordField . AsObject ( ) ) ;
}
Record ? OnHit ( Request , MoveTemp ( Record ) . Get ( ) ) : OnMiss ( Request ) ;
}
if ( RequestIndex ! = Batch . Num ( ) )
{
UE_LOG ( LogDerivedDataCache , Warning ,
2022-07-21 23:01:51 -04:00
TEXT ( " %s: Invalid response received from GetCacheRecords RPC: %d results expected, received %d, from %s " ) ,
* CacheStore . GetName ( ) , Batch . Num ( ) , RequestIndex , * WriteToString < 256 > ( * HttpResponse ) ) ;
2022-06-10 12:56:08 -04:00
}
}
for ( const FCacheGetRequest & Request : Batch . RightChop ( RequestIndex ) )
{
OnMiss ( Request ) ;
}
} ;
2022-07-18 12:23:16 -04:00
CacheStore . EnqueueAsyncRpc ( Owner , BatchRequest . Save ( ) . AsObject ( ) , MoveTemp ( OnRpcComplete ) ) ;
2022-06-10 12:56:08 -04:00
} ) ;
}
private :
void OnHit ( const FCacheGetRequest & Request , FCacheRecord & & Record )
{
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: Cache hit for '%s' from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Request . Key ) , * Request . Name ) ;
TRACE_COUNTER_ADD ( ZenDDC_GetHit , int64 ( 1 ) ) ;
int64 ReceivedSize = Private : : GetCacheRecordCompressedSize ( Record ) ;
TRACE_COUNTER_ADD ( ZenDDC_BytesReceived , ReceivedSize ) ;
COOK_STAT ( Timer . AddHit ( ReceivedSize ) ) ;
OnComplete ( { Request . Name , MoveTemp ( Record ) , Request . UserData , EStatus : : Ok } ) ;
} ;
void OnMiss ( const FCacheGetRequest & Request )
{
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: Cache miss for '%s' from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Request . Key ) , * Request . Name ) ;
OnComplete ( Request . MakeResponse ( EStatus : : Error ) ) ;
} ;
FZenCacheStore & CacheStore ;
IRequestOwner & Owner ;
const TArray < FCacheGetRequest , TInlineAllocator < 1 > > Requests ;
FOnCacheGetComplete OnComplete ;
COOK_STAT ( FCookStats : : FScopedStatsCounter Timer = CacheStore . UsageStats . TimeGet ( ) ) ;
} ;
2022-07-18 12:23:16 -04:00
class FZenCacheStore : : FPutValueOp final : public FThreadSafeRefCountedObject
2022-06-10 12:56:08 -04:00
{
public :
FPutValueOp ( FZenCacheStore & InCacheStore ,
IRequestOwner & InOwner ,
const TConstArrayView < FCachePutValueRequest > InRequests ,
FOnCachePutValueComplete & & InOnComplete )
: CacheStore ( InCacheStore )
, Owner ( InOwner )
, Requests ( InRequests )
, Batches ( Requests , [ this ] ( const FCachePutValueRequest & NextRequest ) { return BatchGroupingFilter ( NextRequest ) ; } )
, OnComplete ( MoveTemp ( InOnComplete ) )
{
}
void IssueRequests ( )
{
FRequestBarrier Barrier ( Owner ) ;
for ( TArrayView < const FCachePutValueRequest > Batch : Batches )
{
FCbPackage BatchPackage ;
FCbWriter BatchWriter ;
BatchWriter . BeginObject ( ) ;
{
BatchWriter < < ANSITEXTVIEW ( " Method " ) < < ANSITEXTVIEW ( " PutCacheValues " ) ;
BatchWriter . BeginObject ( ANSITEXTVIEW ( " Params " ) ) ;
{
ECachePolicy BatchDefaultPolicy = Batch [ 0 ] . Policy ;
BatchWriter < < ANSITEXTVIEW ( " DefaultPolicy " ) < < * WriteToString < 128 > ( BatchDefaultPolicy ) ;
2022-07-18 12:23:16 -04:00
BatchWriter . AddString ( ANSITEXTVIEW ( " Namespace " ) , CacheStore . Namespace ) ;
2022-06-10 12:56:08 -04:00
BatchWriter . BeginArray ( " Requests " ) ;
for ( const FCachePutValueRequest & Request : Batch )
{
BatchWriter . BeginObject ( ) ;
{
BatchWriter < < ANSITEXTVIEW ( " Key " ) < < Request . Key ;
const FValue & Value = Request . Value ;
BatchWriter . AddBinaryAttachment ( " RawHash " , Value . GetRawHash ( ) ) ;
if ( Value . HasData ( ) )
{
BatchPackage . AddAttachment ( FCbAttachment ( Value . GetData ( ) ) ) ;
}
if ( Request . Policy ! = BatchDefaultPolicy )
{
BatchWriter < < ANSITEXTVIEW ( " Policy " ) < < WriteToString < 128 > ( Request . Policy ) ;
}
}
BatchWriter . EndObject ( ) ;
}
BatchWriter . EndArray ( ) ;
}
BatchWriter . EndObject ( ) ;
}
BatchWriter . EndObject ( ) ;
BatchPackage . SetObject ( BatchWriter . Save ( ) . AsObject ( ) ) ;
2022-07-18 12:23:16 -04:00
auto OnRpcComplete = [ this , OpRef = TRefCountPtr < FPutValueOp > ( this ) , Batch ] ( THttpUniquePtr < IHttpResponse > & HttpResponse , FCbPackage & Response )
2022-06-10 12:56:08 -04:00
{
int32 RequestIndex = 0 ;
2022-07-18 12:23:16 -04:00
if ( HttpResponse - > GetErrorCode ( ) = = EHttpErrorCode : : None )
2022-06-10 12:56:08 -04:00
{
const FCbObject & ResponseObj = Response . GetObject ( ) ;
for ( FCbField ResponseField : ResponseObj [ ANSITEXTVIEW ( " Result " ) ] )
{
if ( RequestIndex > = Batch . Num ( ) )
{
+ + RequestIndex ;
continue ;
}
const FCachePutValueRequest & Request = Batch [ RequestIndex + + ] ;
bool bPutSucceeded = ResponseField . AsBool ( ) ;
if ( CacheStore . DebugOptions . ShouldSimulatePutMiss ( Request . Key ) )
{
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: Simulated miss for PutValue of %s from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Request . Key ) , * Request . Name ) ;
bPutSucceeded = false ;
}
bPutSucceeded ? OnHit ( Request ) : OnMiss ( Request ) ;
}
if ( RequestIndex ! = Batch . Num ( ) )
{
UE_LOG ( LogDerivedDataCache , Warning ,
2022-07-21 23:01:51 -04:00
TEXT ( " %s: Invalid response received from PutCacheValues RPC: %d results expected, received %d, from %s " ) ,
* CacheStore . GetName ( ) , Batch . Num ( ) , RequestIndex , * WriteToString < 256 > ( * HttpResponse ) ) ;
2022-06-10 12:56:08 -04:00
}
}
for ( const FCachePutValueRequest & Request : Batch . RightChop ( RequestIndex ) )
{
OnMiss ( Request ) ;
}
} ;
2022-07-18 12:23:16 -04:00
CacheStore . EnqueueAsyncRpc ( Owner , BatchPackage , MoveTemp ( OnRpcComplete ) ) ;
2022-06-10 12:56:08 -04:00
}
}
private :
EBatchView BatchGroupingFilter ( const FCachePutValueRequest & NextRequest )
{
uint64 ValueSize = sizeof ( FCacheKey ) + NextRequest . Value . GetData ( ) . GetCompressedSize ( ) ;
BatchSize + = ValueSize ;
if ( BatchSize > CacheStore . BatchPutMaxBytes )
{
BatchSize = ValueSize ;
return EBatchView : : NewBatch ;
}
return EBatchView : : Continue ;
}
void OnHit ( const FCachePutValueRequest & Request )
{
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: Cache PutValue complete for %s from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Request . Key ) , * Request . Name ) ;
COOK_STAT ( Timer . AddHit ( Request . Value . GetData ( ) . GetCompressedSize ( ) ) ) ;
OnComplete ( Request . MakeResponse ( EStatus : : Ok ) ) ;
} ;
void OnMiss ( const FCachePutValueRequest & Request )
{
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: Cache PutValue miss for '%s' from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Request . Key ) , * Request . Name ) ;
COOK_STAT ( Timer . AddMiss ( ) ) ;
OnComplete ( Request . MakeResponse ( EStatus : : Error ) ) ;
} ;
FZenCacheStore & CacheStore ;
IRequestOwner & Owner ;
const TArray < FCachePutValueRequest , TInlineAllocator < 1 > > Requests ;
uint64 BatchSize = 0 ;
TBatchView < const FCachePutValueRequest > Batches ;
FOnCachePutValueComplete OnComplete ;
COOK_STAT ( FCookStats : : FScopedStatsCounter Timer = CacheStore . UsageStats . TimePut ( ) ) ;
} ;
2022-07-18 12:23:16 -04:00
class FZenCacheStore : : FGetValueOp final : public FThreadSafeRefCountedObject
2022-06-10 12:56:08 -04:00
{
public :
FGetValueOp ( FZenCacheStore & InCacheStore ,
IRequestOwner & InOwner ,
const TConstArrayView < FCacheGetValueRequest > InRequests ,
FOnCacheGetValueComplete & & InOnComplete )
: CacheStore ( InCacheStore )
, Owner ( InOwner )
, Requests ( InRequests )
, OnComplete ( MoveTemp ( InOnComplete ) )
{
TRACE_COUNTER_ADD ( ZenDDC_Get , ( int64 ) Requests . Num ( ) ) ;
TRACE_COUNTER_ADD ( ZenDDC_CacheRecordRequestCount , int64 ( Requests . Num ( ) ) ) ;
}
virtual ~ FGetValueOp ( )
{
TRACE_COUNTER_SUBTRACT ( ZenDDC_CacheRecordRequestCount , int64 ( Requests . Num ( ) ) ) ;
}
void IssueRequests ( )
{
FRequestBarrier Barrier ( Owner ) ;
ForEachBatch ( CacheStore . CacheRecordBatchSize , Requests . Num ( ) ,
[ this ] ( int32 BatchFirst , int32 BatchLast )
{
TArrayView < const FCacheGetValueRequest > Batch ( Requests . GetData ( ) + BatchFirst , BatchLast - BatchFirst + 1 ) ;
FCbWriter BatchRequest ;
BatchRequest . BeginObject ( ) ;
{
BatchRequest < < ANSITEXTVIEW ( " Method " ) < < ANSITEXTVIEW ( " GetCacheValues " ) ;
BatchRequest . BeginObject ( ANSITEXTVIEW ( " Params " ) ) ;
{
ECachePolicy BatchDefaultPolicy = Batch [ 0 ] . Policy ;
BatchRequest < < ANSITEXTVIEW ( " DefaultPolicy " ) < < * WriteToString < 128 > ( BatchDefaultPolicy ) ;
2022-07-18 12:23:16 -04:00
BatchRequest . AddString ( ANSITEXTVIEW ( " Namespace " ) , CacheStore . Namespace ) ;
2022-06-10 12:56:08 -04:00
BatchRequest . BeginArray ( " Requests " ) ;
for ( const FCacheGetValueRequest & Request : Batch )
{
BatchRequest . BeginObject ( ) ;
{
BatchRequest < < ANSITEXTVIEW ( " Key " ) < < Request . Key ;
if ( Request . Policy ! = BatchDefaultPolicy )
{
BatchRequest < < ANSITEXTVIEW ( " Policy " ) < < WriteToString < 128 > ( Request . Policy ) ;
}
}
BatchRequest . EndObject ( ) ;
}
BatchRequest . EndArray ( ) ;
}
BatchRequest . EndObject ( ) ;
}
BatchRequest . EndObject ( ) ;
FGetValueOp * OriginalOp = this ;
2022-07-18 12:23:16 -04:00
auto OnRpcComplete = [ this , OpRef = TRefCountPtr < FGetValueOp > ( OriginalOp ) , Batch ] ( THttpUniquePtr < IHttpResponse > & HttpResponse , FCbPackage & Response )
2022-06-10 12:56:08 -04:00
{
int32 RequestIndex = 0 ;
2022-07-18 12:23:16 -04:00
if ( HttpResponse - > GetErrorCode ( ) = = EHttpErrorCode : : None )
2022-06-10 12:56:08 -04:00
{
const FCbObject & ResponseObj = Response . GetObject ( ) ;
for ( FCbFieldView ResultField : ResponseObj [ ANSITEXTVIEW ( " Result " ) ] )
{
if ( RequestIndex > = Batch . Num ( ) )
{
+ + RequestIndex ;
continue ;
}
const FCacheGetValueRequest & Request = Batch [ RequestIndex + + ] ;
FCbObjectView ResultObj = ResultField . AsObjectView ( ) ;
TOptional < FValue > Value ;
if ( CacheStore . DebugOptions . ShouldSimulateGetMiss ( Request . Key ) )
{
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: Simulated miss for GetValue of '%s' from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Request . Key ) , * Request . Name ) ;
}
else
{
FCbFieldView RawHashField = ResultObj [ " RawHash " ] ;
FIoHash RawHash = RawHashField . AsHash ( ) ;
if ( const FCbAttachment * Attachment = Response . FindAttachment ( RawHash ) )
{
Value . Emplace ( Attachment - > AsCompressedBinary ( ) ) ;
}
else
{
FCbFieldView RawSizeField = ResultObj [ " RawSize " ] ;
uint64 RawSize = RawSizeField . AsUInt64 ( ) ;
if ( ! RawSizeField . HasError ( ) & & ! RawHashField . HasError ( ) )
{
Value . Emplace ( RawHash , RawSize ) ;
}
}
}
( bool ) Value ? OnHit ( Request , MoveTemp ( * Value ) ) : OnMiss ( Request ) ;
}
if ( RequestIndex ! = Batch . Num ( ) )
{
UE_LOG ( LogDerivedDataCache , Warning ,
2022-07-21 23:01:51 -04:00
TEXT ( " %s: Invalid response received from GetCacheValues RPC: %d results expected, received %d from %s " ) ,
* CacheStore . GetName ( ) , Batch . Num ( ) , RequestIndex , * WriteToString < 256 > ( * HttpResponse ) ) ;
2022-06-10 12:56:08 -04:00
}
}
for ( const FCacheGetValueRequest & Request : Batch . RightChop ( RequestIndex ) )
{
OnMiss ( Request ) ;
}
} ;
2022-07-18 12:23:16 -04:00
CacheStore . EnqueueAsyncRpc ( Owner , BatchRequest . Save ( ) . AsObject ( ) , MoveTemp ( OnRpcComplete ) ) ;
2022-06-10 12:56:08 -04:00
} ) ;
}
private :
void OnHit ( const FCacheGetValueRequest & Request , FValue & & Value )
{
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: Cache hit for '%s' from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Request . Key ) , * Request . Name ) ;
TRACE_COUNTER_ADD ( ZenDDC_GetHit , int64 ( 1 ) ) ;
int64 ReceivedSize = Value . GetData ( ) . GetCompressedSize ( ) ;
TRACE_COUNTER_ADD ( ZenDDC_BytesReceived , ReceivedSize ) ;
COOK_STAT ( Timer . AddHit ( ReceivedSize ) ) ;
OnComplete ( { Request . Name , Request . Key , MoveTemp ( Value ) , Request . UserData , EStatus : : Ok } ) ;
} ;
void OnMiss ( const FCacheGetValueRequest & Request )
{
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: Cache miss for '%s' from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Request . Key ) , * Request . Name ) ;
OnComplete ( Request . MakeResponse ( EStatus : : Error ) ) ;
} ;
FZenCacheStore & CacheStore ;
IRequestOwner & Owner ;
const TArray < FCacheGetValueRequest , TInlineAllocator < 1 > > Requests ;
FOnCacheGetValueComplete OnComplete ;
COOK_STAT ( FCookStats : : FScopedStatsCounter Timer = CacheStore . UsageStats . TimeGet ( ) ) ;
} ;
2022-07-18 12:23:16 -04:00
class FZenCacheStore : : FGetChunksOp final : public FThreadSafeRefCountedObject
2022-06-10 12:56:08 -04:00
{
public :
FGetChunksOp ( FZenCacheStore & InCacheStore ,
IRequestOwner & InOwner ,
const TConstArrayView < FCacheGetChunkRequest > InRequests ,
FOnCacheGetChunkComplete & & InOnComplete )
: CacheStore ( InCacheStore )
, Owner ( InOwner )
, Requests ( InRequests )
, OnComplete ( MoveTemp ( InOnComplete ) )
{
TRACE_COUNTER_ADD ( ZenDDC_ChunkRequestCount , int64 ( Requests . Num ( ) ) ) ;
TRACE_COUNTER_ADD ( ZenDDC_Get , int64 ( Requests . Num ( ) ) ) ;
Requests . StableSort ( TChunkLess ( ) ) ;
}
virtual ~ FGetChunksOp ( )
{
TRACE_COUNTER_SUBTRACT ( ZenDDC_ChunkRequestCount , int64 ( Requests . Num ( ) ) ) ;
}
void IssueRequests ( )
{
FRequestBarrier Barrier ( Owner ) ;
ForEachBatch ( CacheStore . CacheChunksBatchSize , Requests . Num ( ) ,
[ this ] ( int32 BatchFirst , int32 BatchLast )
{
TArrayView < const FCacheGetChunkRequest > Batch ( Requests . GetData ( ) + BatchFirst , BatchLast - BatchFirst + 1 ) ;
FCbWriter BatchRequest ;
BatchRequest . BeginObject ( ) ;
{
BatchRequest < < ANSITEXTVIEW ( " Method " ) < < " GetCacheChunks " ;
BatchRequest . AddInteger ( ANSITEXTVIEW ( " MethodVersion " ) , 1 ) ;
BatchRequest . BeginObject ( ANSITEXTVIEW ( " Params " ) ) ;
{
ECachePolicy DefaultPolicy = Batch [ 0 ] . Policy ;
BatchRequest < < ANSITEXTVIEW ( " DefaultPolicy " ) < < WriteToString < 128 > ( DefaultPolicy ) ;
2022-07-18 12:23:16 -04:00
BatchRequest . AddString ( ANSITEXTVIEW ( " Namespace " ) , CacheStore . Namespace ) ;
2022-06-10 12:56:08 -04:00
BatchRequest . BeginArray ( ANSITEXTVIEW ( " ChunkRequests " ) ) ;
for ( const FCacheGetChunkRequest & Request : Batch )
{
BatchRequest . BeginObject ( ) ;
{
BatchRequest < < ANSITEXTVIEW ( " Key " ) < < Request . Key ;
if ( Request . Id . IsValid ( ) )
{
BatchRequest . AddObjectId ( ANSITEXTVIEW ( " ValueId " ) , Request . Id ) ;
}
if ( Request . RawOffset ! = 0 )
{
BatchRequest < < ANSITEXTVIEW ( " RawOffset " ) < < Request . RawOffset ;
}
if ( Request . RawSize ! = MAX_uint64 )
{
BatchRequest < < ANSITEXTVIEW ( " RawSize " ) < < Request . RawSize ;
}
if ( ! Request . RawHash . IsZero ( ) )
{
BatchRequest < < ANSITEXTVIEW ( " ChunkId " ) < < Request . RawHash ;
}
if ( Request . Policy ! = DefaultPolicy )
{
BatchRequest < < ANSITEXTVIEW ( " Policy " ) < < WriteToString < 128 > ( Request . Policy ) ;
}
}
BatchRequest . EndObject ( ) ;
}
BatchRequest . EndArray ( ) ;
}
BatchRequest . EndObject ( ) ;
}
BatchRequest . EndObject ( ) ;
FCbPackage BatchResponse ;
2022-07-18 12:23:16 -04:00
EHttpErrorCode HttpErrorCode = EHttpErrorCode : : Unknown ;
2022-06-10 12:56:08 -04:00
{
LLM_SCOPE_BYTAG ( UntaggedDDCResult ) ;
2022-07-18 12:23:16 -04:00
THttpUniquePtr < IHttpResponse > HttpResponse = CacheStore . PerformBlockingRpc ( BatchRequest . Save ( ) . AsObject ( ) , BatchResponse ) ;
HttpErrorCode = HttpResponse - > GetErrorCode ( ) ;
2022-06-10 12:56:08 -04:00
}
int32 RequestIndex = 0 ;
2022-07-18 12:23:16 -04:00
if ( HttpErrorCode = = EHttpErrorCode : : None )
2022-06-10 12:56:08 -04:00
{
const FCbObject & ResponseObj = BatchResponse . GetObject ( ) ;
for ( FCbFieldView ResultView : ResponseObj [ ANSITEXTVIEW ( " Result " ) ] )
{
if ( RequestIndex > = Batch . Num ( ) )
{
+ + RequestIndex ;
continue ;
}
const FCacheGetChunkRequest & Request = Batch [ RequestIndex + + ] ;
FIoHash RawHash ;
bool Succeeded = false ;
uint64 RawSize = 0 ;
FCbObjectView ResultObject = ResultView . AsObjectView ( ) ;
FSharedBuffer RequestedBytes ;
if ( CacheStore . DebugOptions . ShouldSimulateGetMiss ( Request . Key ) )
{
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: Simulated miss for get of '%s' from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Request . Key , ' / ' , Request . Id ) , * Request . Name ) ;
}
else
{
FCbFieldView HashView = ResultObject [ ANSITEXTVIEW ( " RawHash " ) ] ;
RawHash = HashView . AsHash ( ) ;
if ( ! HashView . HasError ( ) )
{
if ( const FCbAttachment * Attachment = BatchResponse . FindAttachment ( HashView . AsHash ( ) ) )
{
FCompressedBuffer CompressedBuffer = Attachment - > AsCompressedBinary ( ) ;
if ( CompressedBuffer )
{
TRACE_COUNTER_ADD ( ZenDDC_BytesReceived , CompressedBuffer . GetCompressedSize ( ) ) ;
RequestedBytes = FCompressedBufferReader ( CompressedBuffer ) . Decompress ( Request . RawOffset , Request . RawSize ) ;
RawSize = RequestedBytes . GetSize ( ) ;
Succeeded = true ;
}
}
else
{
FCbFieldView RawSizeField = ResultObject [ ANSITEXTVIEW ( " RawSize " ) ] ;
uint64 TotalSize = RawSizeField . AsUInt64 ( ) ;
Succeeded = ! RawSizeField . HasError ( ) ;
if ( Succeeded )
{
RawSize = FMath : : Min ( Request . RawSize , TotalSize - FMath : : Min ( Request . RawOffset , TotalSize ) ) ;
}
}
}
}
Succeeded ? OnHit ( Request , MoveTemp ( RawHash ) , RawSize , MoveTemp ( RequestedBytes ) ) : OnMiss ( Request ) ;
}
}
for ( const FCacheGetChunkRequest & Request : Batch . RightChop ( RequestIndex ) )
{
OnMiss ( Request ) ;
}
} ) ;
}
private :
void OnHit ( const FCacheGetChunkRequest & Request , FIoHash & & RawHash , uint64 RawSize , FSharedBuffer & & RequestedBytes )
{
UE_LOG ( LogDerivedDataCache , Verbose , TEXT ( " %s: CacheChunk hit for '%s' from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 96 > ( Request . Key , ' / ' , Request . Id ) , * Request . Name ) ;
TRACE_COUNTER_ADD ( ZenDDC_GetHit , int64 ( 1 ) ) ;
COOK_STAT ( Timer . AddHit ( RawSize ) ) ;
OnComplete ( { Request . Name , Request . Key , Request . Id , Request . RawOffset ,
RawSize , MoveTemp ( RawHash ) , MoveTemp ( RequestedBytes ) , Request . UserData , EStatus : : Ok } ) ;
} ;
void OnMiss ( const FCacheGetChunkRequest & Request )
{
UE_LOG ( LogDerivedDataCache , Display , TEXT ( " %s: CacheChunk miss with missing value '%s' for '%s' from '%s' " ) ,
* CacheStore . GetName ( ) , * WriteToString < 16 > ( Request . Id ) , * WriteToString < 96 > ( Request . Key ) , * Request . Name ) ;
OnComplete ( Request . MakeResponse ( EStatus : : Error ) ) ;
} ;
FZenCacheStore & CacheStore ;
IRequestOwner & Owner ;
TArray < FCacheGetChunkRequest , TInlineAllocator < 1 > > Requests ;
FOnCacheGetChunkComplete OnComplete ;
COOK_STAT ( FCookStats : : FScopedStatsCounter Timer = CacheStore . UsageStats . TimeGet ( ) ) ;
} ;
2022-07-18 12:23:16 -04:00
class FZenCacheStore : : FCbPackageReceiver final : public IHttpReceiver
{
public :
FCbPackageReceiver ( const FCbPackageReceiver & ) = delete ;
FCbPackageReceiver & operator = ( const FCbPackageReceiver & ) = delete ;
explicit FCbPackageReceiver ( FCbPackage & OutPackage , IHttpReceiver * InNext = nullptr )
: Package ( OutPackage )
, Next ( InNext )
{
}
private :
IHttpReceiver * OnCreate ( IHttpResponse & Response ) final
{
return & BodyReceiver ;
}
IHttpReceiver * OnComplete ( IHttpResponse & Response ) final
{
FMemoryReaderView Ar ( MakeMemoryView ( BodyArray ) ) ;
Package . TryLoad ( Ar ) ;
return Next ;
}
private :
FCbPackage & Package ;
IHttpReceiver * Next ;
TArray64 < uint8 > BodyArray ;
FHttpByteArrayReceiver BodyReceiver { BodyArray , this } ;
} ;
class FZenCacheStore : : FAsyncCbPackageReceiver final : public FRequestBase , public IHttpReceiver
{
public :
FAsyncCbPackageReceiver ( const FAsyncCbPackageReceiver & ) = delete ;
FAsyncCbPackageReceiver & operator = ( const FAsyncCbPackageReceiver & ) = delete ;
FAsyncCbPackageReceiver (
THttpUniquePtr < IHttpRequest > & & InRequest ,
IRequestOwner * InOwner ,
FOnRpcComplete & & InOnRpcComplete )
: Request ( MoveTemp ( InRequest ) )
, Owner ( InOwner )
, BaseReceiver ( Package , this )
, OnRpcComplete ( MoveTemp ( InOnRpcComplete ) )
{
Request - > SendAsync ( this , Response ) ;
}
private :
// IRequest Interface
void SetPriority ( EPriority Priority ) final { }
void Cancel ( ) final { Monitor - > Cancel ( ) ; }
void Wait ( ) final { Monitor - > Wait ( ) ; }
// IHttpReceiver Interface
IHttpReceiver * OnCreate ( IHttpResponse & LocalResponse ) final
{
Monitor = LocalResponse . GetMonitor ( ) ;
Owner - > Begin ( this ) ;
return & BaseReceiver ;
}
IHttpReceiver * OnComplete ( IHttpResponse & LocalResponse ) final
{
Request . Reset ( ) ;
Owner - > End ( this , [ Self = this ]
{
if ( Self - > OnRpcComplete )
{
// Launch a task for the completion function since it can execute arbitrary code.
Self - > Owner - > LaunchTask ( TEXT ( " ZenHttpComplete " ) , [ Self = TRefCountPtr ( Self ) ]
{
Self - > OnRpcComplete ( Self - > Response , Self - > Package ) ;
} ) ;
}
} ) ;
return nullptr ;
}
private :
THttpUniquePtr < IHttpRequest > Request ;
THttpUniquePtr < IHttpResponse > Response ;
TRefCountPtr < IHttpResponseMonitor > Monitor ;
IRequestOwner * Owner ;
FCbPackage Package ;
FCbPackageReceiver BaseReceiver ;
FOnRpcComplete OnRpcComplete ;
} ;
2022-05-30 18:14:02 -04:00
FZenCacheStore : : FZenCacheStore (
const TCHAR * InServiceUrl ,
2022-07-18 12:23:16 -04:00
const TCHAR * InNamespace )
2022-05-30 18:14:02 -04:00
: Namespace ( InNamespace )
, ZenService ( InServiceUrl )
{
if ( IsServiceReady ( ) )
{
2022-07-18 12:23:16 -04:00
RpcUri < < ZenService . GetInstance ( ) . GetURL ( ) < < ANSITEXTVIEW ( " /z$/$rpc " ) ;
2022-09-07 10:37:16 -04:00
const uint32 MaxConnections = uint32 ( FMath : : Clamp ( FPlatformMisc : : NumberOfCoresIncludingHyperthreads ( ) , 8 , 64 ) ) ;
2022-07-18 12:23:16 -04:00
constexpr uint32 RequestPoolSize = 128 ;
constexpr uint32 RequestPoolOverflowSize = 128 ;
FHttpConnectionPoolParams ConnectionPoolParams ;
ConnectionPoolParams . MaxConnections = MaxConnections ;
ConnectionPoolParams . MinConnections = MaxConnections ;
ConnectionPool = IHttpManager : : Get ( ) . CreateConnectionPool ( ConnectionPoolParams ) ;
FHttpClientParams ClientParams ;
ClientParams . MaxRequests = RequestPoolSize + RequestPoolOverflowSize ;
ClientParams . MinRequests = RequestPoolSize ;
RequestQueue = FHttpRequestQueue ( * ConnectionPool , ClientParams ) ;
2022-05-30 18:14:02 -04:00
bIsUsable = true ;
// Issue a request for stats as it will be fetched asynchronously and issuing now makes them available sooner for future callers.
Zen : : FZenStats ZenStats ;
ZenService . GetInstance ( ) . GetStats ( ZenStats ) ;
}
GConfig - > GetInt ( TEXT ( " Zen " ) , TEXT ( " BatchPutMaxBytes " ) , BatchPutMaxBytes , GEngineIni ) ;
GConfig - > GetInt ( TEXT ( " Zen " ) , TEXT ( " CacheRecordBatchSize " ) , CacheRecordBatchSize , GEngineIni ) ;
GConfig - > GetInt ( TEXT ( " Zen " ) , TEXT ( " CacheChunksBatchSize " ) , CacheChunksBatchSize , GEngineIni ) ;
}
bool FZenCacheStore : : IsServiceReady ( )
{
return ZenService . GetInstance ( ) . IsServiceReady ( ) ;
}
2022-07-18 12:23:16 -04:00
FCompositeBuffer FZenCacheStore : : SaveRpcPackage ( const FCbPackage & Package )
2022-05-31 12:42:17 -04:00
{
2022-07-18 12:23:16 -04:00
FLargeMemoryWriter Memory ;
Zen : : Http : : SaveCbPackage ( Package , Memory ) ;
uint64 PackageMemorySize = Memory . TotalSize ( ) ;
return FCompositeBuffer ( FSharedBuffer : : TakeOwnership ( Memory . ReleaseOwnership ( ) , PackageMemorySize , FMemory : : Free ) ) ;
2022-05-31 12:42:17 -04:00
}
2022-07-18 12:23:16 -04:00
THttpUniquePtr < IHttpRequest > FZenCacheStore : : CreateRpcRequest ( )
2022-05-31 12:42:17 -04:00
{
2022-07-18 12:23:16 -04:00
THttpUniquePtr < IHttpRequest > Request = RequestQueue . CreateRequest ( { } ) ;
Request - > SetUri ( RpcUri ) ;
Request - > SetMethod ( EHttpMethod : : Post ) ;
Request - > AddAcceptType ( EHttpMediaType : : CbPackage ) ;
return Request ;
2022-05-31 12:42:17 -04:00
}
2022-07-18 12:23:16 -04:00
THttpUniquePtr < IHttpResponse > FZenCacheStore : : PerformBlockingRpc ( FCbObject RequestObject , FCbPackage & OutResponse )
2022-05-31 12:42:17 -04:00
{
2022-07-18 12:23:16 -04:00
THttpUniquePtr < IHttpRequest > Request = CreateRpcRequest ( ) ;
Request - > SetContentType ( EHttpMediaType : : CbObject ) ;
Request - > SetBody ( RequestObject . GetBuffer ( ) . MakeOwned ( ) ) ;
2022-05-31 12:42:17 -04:00
2022-07-18 12:23:16 -04:00
FCbPackageReceiver Receiver ( OutResponse ) ;
THttpUniquePtr < IHttpResponse > Response ;
Request - > Send ( & Receiver , Response ) ;
return Response ;
2022-05-31 12:42:17 -04:00
}
2022-07-18 12:23:16 -04:00
THttpUniquePtr < IHttpResponse > FZenCacheStore : : PerformBlockingRpc ( const FCbPackage & RequestPackage , FCbPackage & OutResponse )
2022-05-31 12:42:17 -04:00
{
2022-07-18 12:23:16 -04:00
THttpUniquePtr < IHttpRequest > Request = CreateRpcRequest ( ) ;
Request - > SetContentType ( EHttpMediaType : : CbPackage ) ;
Request - > SetBody ( SaveRpcPackage ( RequestPackage ) ) ;
FCbPackageReceiver Receiver ( OutResponse ) ;
THttpUniquePtr < IHttpResponse > Response ;
Request - > Send ( & Receiver , Response ) ;
return Response ;
2022-05-31 12:42:17 -04:00
}
2022-07-18 12:23:16 -04:00
void FZenCacheStore : : EnqueueAsyncRpc ( IRequestOwner & Owner , FCbObject RequestObject , FOnRpcComplete & & OnComplete )
2022-05-31 12:42:17 -04:00
{
2022-07-18 12:23:16 -04:00
THttpUniquePtr < IHttpRequest > Request = CreateRpcRequest ( ) ;
Request - > SetContentType ( EHttpMediaType : : CbObject ) ;
Request - > SetBody ( RequestObject . GetBuffer ( ) . MakeOwned ( ) ) ;
new FAsyncCbPackageReceiver ( MoveTemp ( Request ) , & Owner , MoveTemp ( OnComplete ) ) ;
}
void FZenCacheStore : : EnqueueAsyncRpc ( IRequestOwner & Owner , const FCbPackage & RequestPackage , FOnRpcComplete & & OnComplete )
{
THttpUniquePtr < IHttpRequest > Request = CreateRpcRequest ( ) ;
Request - > SetContentType ( EHttpMediaType : : CbPackage ) ;
Request - > SetBody ( SaveRpcPackage ( RequestPackage ) ) ;
new FAsyncCbPackageReceiver ( MoveTemp ( Request ) , & Owner , MoveTemp ( OnComplete ) ) ;
2022-05-31 12:42:17 -04:00
}
2022-05-31 22:01:53 -04:00
void FZenCacheStore : : LegacyStats ( FDerivedDataCacheStatsNode & OutNode )
2022-05-30 18:14:02 -04:00
{
Zen : : FZenStats ZenStats ;
FDerivedDataCacheUsageStats LocalStats ;
FDerivedDataCacheUsageStats RemoteStats ;
# if ENABLE_COOK_STATS
using EHitOrMiss = FCookStats : : CallStats : : EHitOrMiss ;
using ECacheStatType = FCookStats : : CallStats : : EStatType ;
Zen : : GetDefaultServiceInstance ( ) . GetStats ( ZenStats ) ;
const int64 RemotePutSize = int64 ( ZenStats . UpstreamStats . TotalUploadedMB * 1024 * 1024 ) ;
const int64 RemoteGetSize = int64 ( ZenStats . UpstreamStats . TotalDownloadedMB * 1024 * 1024 ) ;
const int64 LocalGetSize = FMath : : Max < int64 > ( 0 , UsageStats . GetStats . GetAccumulatedValueAnyThread ( EHitOrMiss : : Hit , ECacheStatType : : Bytes ) - RemoteGetSize ) ;
LocalStats . PutStats = UsageStats . PutStats ;
LocalStats . ExistsStats = UsageStats . ExistsStats ;
LocalStats . PrefetchStats = UsageStats . PrefetchStats ;
LocalStats . GetStats . Accumulate ( EHitOrMiss : : Hit , ECacheStatType : : Counter , ZenStats . CacheStats . Hits - ZenStats . CacheStats . UpstreamHits , /*bIsInGameThread*/ false ) ;
LocalStats . GetStats . Accumulate ( EHitOrMiss : : Miss , ECacheStatType : : Counter , ZenStats . CacheStats . Misses + ZenStats . CacheStats . UpstreamHits , /*bIsInGameThread*/ false ) ;
RemoteStats . GetStats . Accumulate ( EHitOrMiss : : Hit , ECacheStatType : : Counter , ZenStats . CacheStats . UpstreamHits , /*bIsInGameThread*/ false ) ;
RemoteStats . GetStats . Accumulate ( EHitOrMiss : : Miss , ECacheStatType : : Counter , ZenStats . CacheStats . Misses , /*bIsInGameThread*/ false ) ;
LocalStats . GetStats . Accumulate ( EHitOrMiss : : Hit , ECacheStatType : : Bytes , LocalGetSize , /*bIsInGameThread*/ false ) ;
RemoteStats . GetStats . Accumulate ( EHitOrMiss : : Hit , ECacheStatType : : Bytes , RemoteGetSize , /*bIsInGameThread*/ false ) ;
RemoteStats . PutStats . Accumulate ( EHitOrMiss : : Hit , ECacheStatType : : Bytes , RemotePutSize , /*bIsInGameThread*/ false ) ;
# endif
2022-05-31 22:01:53 -04:00
if ( ZenStats . UpstreamStats . EndPointStats . IsEmpty ( ) )
{
OutNode = { TEXT ( " Zen " ) , ZenService . GetInstance ( ) . GetURL ( ) , /*bIsLocal*/ true } ;
OutNode . UsageStats . Add ( TEXT ( " " ) , LocalStats ) ;
return ;
}
2022-05-30 18:14:02 -04:00
TSharedRef < FDerivedDataCacheStatsNode > LocalNode =
MakeShared < FDerivedDataCacheStatsNode > ( TEXT ( " Zen " ) , ZenService . GetInstance ( ) . GetURL ( ) , /*bIsLocal*/ true ) ;
2022-05-31 05:11:05 -04:00
LocalNode - > UsageStats . Add ( TEXT ( " " ) , LocalStats ) ;
2022-05-30 18:14:02 -04:00
TSharedRef < FDerivedDataCacheStatsNode > RemoteNode =
MakeShared < FDerivedDataCacheStatsNode > ( ZenStats . UpstreamStats . EndPointStats [ 0 ] . Name , ZenStats . UpstreamStats . EndPointStats [ 0 ] . Url , /*bIsLocal*/ false ) ;
2022-05-31 05:11:05 -04:00
RemoteNode - > UsageStats . Add ( TEXT ( " " ) , RemoteStats ) ;
2022-05-30 18:14:02 -04:00
2022-05-31 22:01:53 -04:00
OutNode = { TEXT ( " Zen Group " ) , TEXT ( " " ) , /*bIsLocal*/ true } ;
OutNode . Children . Add ( LocalNode ) ;
OutNode . Children . Add ( RemoteNode ) ;
2022-05-30 18:14:02 -04:00
}
2022-05-31 22:01:53 -04:00
bool FZenCacheStore : : LegacyDebugOptions ( FBackendDebugOptions & InOptions )
2022-05-30 18:14:02 -04:00
{
DebugOptions = InOptions ;
return true ;
}
void FZenCacheStore : : Put (
const TConstArrayView < FCachePutRequest > Requests ,
IRequestOwner & Owner ,
FOnCachePutComplete & & OnComplete )
{
TRACE_CPUPROFILER_EVENT_SCOPE ( ZenDDC : : PutCachedRecord ) ;
2022-06-10 12:56:08 -04:00
TRefCountPtr < FPutOp > PutOp = MakeAsyncOp < FPutOp > ( * this , Owner , Requests , MoveTemp ( OnComplete ) ) ;
PutOp - > IssueRequests ( ) ;
2022-05-30 18:14:02 -04:00
}
void FZenCacheStore : : Get (
const TConstArrayView < FCacheGetRequest > Requests ,
IRequestOwner & Owner ,
FOnCacheGetComplete & & OnComplete )
{
TRACE_CPUPROFILER_EVENT_SCOPE ( ZenDDC : : GetCacheRecord ) ;
2022-06-10 12:56:08 -04:00
TRefCountPtr < FGetOp > GetOp = MakeAsyncOp < FGetOp > ( * this , Owner , Requests , MoveTemp ( OnComplete ) ) ;
GetOp - > IssueRequests ( ) ;
2022-05-30 18:14:02 -04:00
}
void FZenCacheStore : : PutValue (
TConstArrayView < FCachePutValueRequest > Requests ,
IRequestOwner & Owner ,
FOnCachePutValueComplete & & OnComplete )
{
TRACE_CPUPROFILER_EVENT_SCOPE ( ZenDDC : : PutValue ) ;
2022-06-10 12:56:08 -04:00
TRefCountPtr < FPutValueOp > PutValueOp = MakeAsyncOp < FPutValueOp > ( * this , Owner , Requests , MoveTemp ( OnComplete ) ) ;
PutValueOp - > IssueRequests ( ) ;
2022-05-30 18:14:02 -04:00
}
void FZenCacheStore : : GetValue (
TConstArrayView < FCacheGetValueRequest > Requests ,
IRequestOwner & Owner ,
FOnCacheGetValueComplete & & OnComplete )
2022-06-10 12:56:08 -04:00
{
2022-05-30 18:14:02 -04:00
TRACE_CPUPROFILER_EVENT_SCOPE ( ZenDDC : : GetValue ) ;
2022-06-10 12:56:08 -04:00
TRefCountPtr < FGetValueOp > GetValueOp = MakeAsyncOp < FGetValueOp > ( * this , Owner , Requests , MoveTemp ( OnComplete ) ) ;
GetValueOp - > IssueRequests ( ) ;
2022-05-30 18:14:02 -04:00
}
void FZenCacheStore : : GetChunks (
TConstArrayView < FCacheGetChunkRequest > Requests ,
IRequestOwner & Owner ,
FOnCacheGetChunkComplete & & OnComplete )
{
TRACE_CPUPROFILER_EVENT_SCOPE ( ZenDDC : : GetChunks ) ;
2022-06-10 12:56:08 -04:00
TRefCountPtr < FGetChunksOp > GetChunksOp = MakeAsyncOp < FGetChunksOp > ( * this , Owner , Requests , MoveTemp ( OnComplete ) ) ;
GetChunksOp - > IssueRequests ( ) ;
2022-05-30 18:14:02 -04:00
}
2022-05-31 22:01:53 -04:00
TTuple < ILegacyCacheStore * , ECacheStoreFlags > CreateZenCacheStore ( const TCHAR * NodeName , const TCHAR * Config )
2022-05-30 18:14:02 -04:00
{
2022-05-31 22:01:53 -04:00
FString ServiceUrl ;
FParse : : Value ( Config , TEXT ( " Host= " ) , ServiceUrl ) ;
FString Namespace ;
if ( ! FParse : : Value ( Config , TEXT ( " Namespace= " ) , Namespace ) )
2022-05-30 18:14:02 -04:00
{
2022-05-31 22:01:53 -04:00
Namespace = FApp : : GetProjectName ( ) ;
UE_LOG ( LogDerivedDataCache , Warning , TEXT ( " %s: Missing required parameter 'Namespace', falling back to '%s' " ) , NodeName , * Namespace ) ;
2022-05-30 18:14:02 -04:00
}
2022-05-31 22:01:53 -04:00
FString StructuredNamespace ;
if ( ! FParse : : Value ( Config , TEXT ( " StructuredNamespace= " ) , StructuredNamespace ) )
{
StructuredNamespace = Namespace ;
UE_LOG ( LogDerivedDataCache , Warning , TEXT ( " %s: Missing required parameter 'StructuredNamespace', falling back to Namespace: '%s' " ) , NodeName , * StructuredNamespace ) ;
}
2022-07-18 12:23:16 -04:00
TUniquePtr < FZenCacheStore > Backend = MakeUnique < FZenCacheStore > ( * ServiceUrl , * StructuredNamespace ) ;
2022-05-31 22:01:53 -04:00
if ( ! Backend - > IsUsable ( ) )
{
UE_LOG ( LogDerivedDataCache , Warning , TEXT ( " %s: Failed to contact the service (%s), will not use it. " ) , NodeName , * Backend - > GetName ( ) ) ;
Backend . Reset ( ) ;
}
return MakeTuple ( Backend . Release ( ) , ECacheStoreFlags : : Local | ECacheStoreFlags : : Remote | ECacheStoreFlags : : Query | ECacheStoreFlags : : Store ) ;
2022-05-30 18:14:02 -04:00
}
} // namespace UE::DerivedData
# else
namespace UE : : DerivedData
{
2022-05-31 22:01:53 -04:00
TTuple < ILegacyCacheStore * , ECacheStoreFlags > CreateZenCacheStore ( const TCHAR * NodeName , const TCHAR * Config )
2022-05-30 18:14:02 -04:00
{
2022-05-31 22:01:53 -04:00
UE_LOG ( LogDerivedDataCache , Warning , TEXT ( " %s: Zen cache is not yet supported in the current build configuration. " ) , NodeName ) ;
return MakeTuple ( nullptr , ECacheStoreFlags : : None ) ;
2022-05-30 18:14:02 -04:00
}
} // UE::DerivedData
2022-02-02 05:33:16 -05:00
# endif // UE_WITH_ZEN