UnrealPak: Implement DDC2 async API for iostore compression.

BatchGet with max 128 inflight requests (or ~1 GiB in total) in batches of 8 items (or ~16 MiB each).
BatchPut with max 128 inflight requests (or ~256 MiB in total) in batches of 8 items (or ~1 MiB each).
Skip ddc for chunks smaller than CompressionMinBytesSaved (1KiB by default).
Skip ddc for .umap to avoid cache churn since maps are known to cook non-deterministically.
Skip ddc for shaders that use a different code path in UnrealPak as well as in runtime.
Use a new DDC2 cache key (that includes the CompressionBufferSize) and cache bucket.
Use TArray64/FMemoryWriter64 for serializing the data to support chunks bigger than 2 GiB.
Postpone allocation of compression buffers until the ddc get request completes and the size is known.
Reduce memory buffer limits to 2 GiB and 3 GiB again (earlier temporary bumps to 3 GiB and 4 GiB are not needed after recent task/retraction changes).
Add logging of number of ddc hits and puts.

#jira UE-204758
#rb paul.chipchase, Per.Larsson
#tests identical binary output

[CL 34451300 by pj kack in ue5-main branch]
This commit is contained in:
pj kack
2024-06-18 01:30:02 -04:00
parent d507d097ec
commit 7d49f13480
3 changed files with 566 additions and 140 deletions
@@ -7,6 +7,9 @@
#include "Algo/TopologicalSort.h"
#include "Async/AsyncWork.h"
#include "CookMetadata.h"
#include "DerivedDataCache.h"
#include "DerivedDataCacheInterface.h"
#include "GenericPlatform/GenericPlatformMisc.h"
#include "HAL/FileManager.h"
#include "HAL/PlatformFileManager.h"
#include "HAL/PlatformMemory.h"
@@ -3328,7 +3331,7 @@ public:
InitiatorThread = Async(EAsyncExecution::Thread, [this]() { InitiatorThreadFunc(); });
RetirerThread = Async(EAsyncExecution::Thread, [this]() { RetirerThreadFunc(); });
MaxSourceBufferMemory = 4ull << 30;
MaxSourceBufferMemory = 3ull << 30;
FParse::Value(FCommandLine::Get(), TEXT("MaxSourceBufferMemory="), MaxSourceBufferMemory);
MaxConcurrentSourceReads = uint32(FMath::Clamp(FPlatformMisc::NumberOfCoresIncludingHyperthreads()/2, 4, 32));
@@ -5708,9 +5711,14 @@ int32 CreateTarget(const FIoStoreArguments& Arguments, const FIoStoreWriterSetti
{
ProgressStringBuilder.Appendf(TEXT(" [%llu compressed]"), Progress.CompressedChunksCount);
}
if (Progress.CompressionDDCHitCount || Progress.CompressionDDCPutCount)
{
ProgressStringBuilder.Appendf(TEXT(" [DDC: %llu hits, %llu puts]"),
Progress.CompressionDDCHitCount, Progress.CompressionDDCPutCount);
}
if (Progress.ScheduledCompressionTasksCount)
{
ProgressStringBuilder.Appendf(TEXT(" [%llu compression tasks scheduled]"), Progress.ScheduledCompressionTasksCount);
ProgressStringBuilder.Appendf(TEXT(" [%llu compression tasks]"), Progress.ScheduledCompressionTasksCount);
}
}
else
@@ -5726,6 +5734,24 @@ int32 CreateTarget(const FIoStoreArguments& Arguments, const FIoStoreWriterSetti
}
}
if (GeneralIoWriterSettings.bCompressionEnableDDC)
{
TRACE_CPUPROFILER_EVENT_SCOPE(WaitForDDC);
UE_LOG(LogIoStore, Display, TEXT(""));
UE_LOG(LogIoStore, Display, TEXT("Waiting for DDC..."));
GetDerivedDataCacheRef().WaitForQuiescence(true);
FIoStoreWriterContext::FProgress Progress = IoStoreWriterContext->GetProgress();
const uint64 TotalDDCAttempts = Progress.CompressionDDCHitCount + Progress.CompressionDDCMissCount;
const double DDCHitRate = double(Progress.CompressionDDCHitCount) / TotalDDCAttempts * 100.0;
const double DDCPutRate = double(Progress.CompressionDDCPutCount) / TotalDDCAttempts * 100.0;
UE_LOG(LogIoStore, Display, TEXT("Compression DDC hits: %llu/%llu (%.2lf%%)"),
Progress.CompressionDDCHitCount, TotalDDCAttempts, DDCHitRate);
UE_LOG(LogIoStore, Display, TEXT("Compression DDC puts: %llu/%llu (%.2lf%%) [%llu failed]"),
Progress.CompressionDDCPutCount, TotalDDCAttempts, DDCPutRate, Progress.CompressionDDCPutErrorCount);
UE_LOG(LogIoStore, Display, TEXT(""));
}
{
FIoStoreWriterContext::FProgress Progress = IoStoreWriterContext->GetProgress();
if (Progress.HashDbChunksCount)
@@ -5765,6 +5791,40 @@ int32 CreateTarget(const FIoStoreArguments& Arguments, const FIoStoreWriterSetti
}
}
}
if (Progress.CompressionDDCHitCount)
{
UE_LOG(LogIoStore, Display, TEXT("%s / %s chunks for %s bytes were loaded from DDC, by type:"),
*NumberString(Progress.CompressionDDCHitCount),
*NumberString(Progress.TotalChunksCount),
*NumberString(Progress.CompressionDDCGetBytes));
for (uint8 i = 0; i < (uint8)EIoChunkType::MAX; i++)
{
if (Progress.CompressionDDCHitsByType[i])
{
UE_LOG(LogIoStore, Display, TEXT(" %-26s %s / %s"), *LexToString((EIoChunkType)i),
*NumberString(Progress.CompressionDDCHitsByType[i]),
*NumberString(Progress.BeginCompressChunksByType[i]));
}
}
}
if (Progress.CompressionDDCPutCount)
{
UE_LOG(LogIoStore, Display, TEXT("%s / %s chunks for %s bytes were stored in DDC, by type:"),
*NumberString(Progress.CompressionDDCPutCount),
*NumberString(Progress.TotalChunksCount),
*NumberString(Progress.CompressionDDCPutBytes));
for (uint8 i = 0; i < (uint8)EIoChunkType::MAX; i++)
{
if (Progress.CompressionDDCPutsByType[i])
{
UE_LOG(LogIoStore, Display, TEXT(" %-26s %s / %s"), *LexToString((EIoChunkType)i),
*NumberString(Progress.CompressionDDCPutsByType[i]),
*NumberString(Progress.BeginCompressChunksByType[i]));
}
}
}
UE_LOG(LogIoStore, Display, TEXT("Source bytes read:"));
uint64 ZenTotalBytes = 0;
for (uint64 b : WriteRequestManager.ZenSourceBytes)
@@ -5810,14 +5870,6 @@ int32 CreateTarget(const FIoStoreArguments& Arguments, const FIoStoreWriterSetti
}
}
if (GeneralIoWriterSettings.bCompressionEnableDDC)
{
FIoStoreWriterContext::FProgress Progress = IoStoreWriterContext->GetProgress();
uint64 TotalDDCAttempts = Progress.CompressionDDCHitCount + Progress.CompressionDDCMissCount;
double DDCHitRate = double(Progress.CompressionDDCHitCount) / TotalDDCAttempts * 100.0;
UE_LOG(LogIoStore, Display, TEXT("Compression DDC hits: %llu/%llu (%.2f%%)"), Progress.CompressionDDCHitCount, TotalDDCAttempts, DDCHitRate);
}
if (Arguments.WriteBackMetadataToAssetRegistry != EAssetRegistryWritebackMethod::Disabled)
{
DoAssetRegistryWritebackDuringStage(
File diff suppressed because it is too large Load Diff
@@ -70,8 +70,14 @@ public:
uint64 CompressedChunksByType[(int8)EIoChunkType::MAX] = { 0 };
uint64 SerializedChunksCount = 0;
uint64 ScheduledCompressionTasksCount = 0;
uint64 CompressionDDCHitsByType[(int8)EIoChunkType::MAX] = { 0 };
uint64 CompressionDDCPutsByType[(int8)EIoChunkType::MAX] = { 0 };
uint64 CompressionDDCHitCount = 0;
uint64 CompressionDDCMissCount = 0;
uint64 CompressionDDCPutCount = 0;
uint64 CompressionDDCPutErrorCount = 0;
uint64 CompressionDDCGetBytes = 0;
uint64 CompressionDDCPutBytes = 0;
// The number of chunk retrieved from the reference cache database, and their types.
uint64 RefDbChunksCount{ 0 };