diff --git a/Engine/Source/Programs/Horde/Horde.Agent/Commands/Bundles/CreateCommand.cs b/Engine/Source/Programs/Horde/Horde.Agent/Commands/Bundles/CreateCommand.cs index a13344f3eaab..39580c75fba8 100644 --- a/Engine/Source/Programs/Horde/Horde.Agent/Commands/Bundles/CreateCommand.cs +++ b/Engine/Source/Programs/Horde/Horde.Agent/Commands/Bundles/CreateCommand.cs @@ -1,10 +1,12 @@ // Copyright Epic Games, Inc. All Rights Reserved. using System; +using System.Diagnostics; using System.Globalization; using System.IO; using System.Threading; using System.Threading.Tasks; +using System.Timers; using EpicGames.Core; using EpicGames.Horde.Storage; using EpicGames.Horde.Storage.Nodes; @@ -118,12 +120,19 @@ namespace Horde.Agent.Commands.Bundles { using IMemoryCache cache = new MemoryCache(new MemoryCacheOptions()); IStorageClient store = CreateStorageClient(cache, logger); + TreeWriter writer = new TreeWriter(store, prefix: RefName.Text); DirectoryNode node = new DirectoryNode(DirectoryFlags.None); - await node.CopyFromDirectoryAsync(InputDir.ToDirectoryInfo(), new ChunkingOptions(), writer, logger, CancellationToken.None); + + Stopwatch timer = Stopwatch.StartNew(); + + ChunkingOptions options = new ChunkingOptions(); + await node.CopyFromDirectoryAsync(InputDir.ToDirectoryInfo(), options, writer, logger, CancellationToken.None); await writer.WriteRefAsync(RefName, node); + + logger.LogInformation("Time: {Time}", timer.Elapsed.TotalSeconds); return 0; } } diff --git a/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/Nodes/DirectoryNode.cs b/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/Nodes/DirectoryNode.cs index c07cda8412a4..4bfbab089a94 100644 --- a/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/Nodes/DirectoryNode.cs +++ b/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/Nodes/DirectoryNode.cs @@ -1,8 +1,8 @@ // Copyright Epic Games, Inc. All Rights Reserved. using EpicGames.Core; -using EpicGames.Horde.Storage.Git; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using System; using System.Buffers; using System.Collections.Generic; @@ -10,8 +10,10 @@ using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; +using System.Reflection; using System.Threading; using System.Threading.Tasks; +using System.Xml.Linq; namespace EpicGames.Horde.Storage.Nodes { @@ -75,7 +77,15 @@ namespace EpicGames.Horde.Storage.Nodes /// /// Cached length of this node /// - long _cachedLength; + readonly long _cachedLength; + + /// + /// Constructor + /// + public FileEntry(Utf8String name, FileEntryFlags flags) + : this(name, flags, new LeafFileNode()) + { + } /// /// Constructor @@ -135,36 +145,8 @@ namespace EpicGames.Horde.Storage.Nodes /// Cancellation token for the operation public async Task AppendAsync(Stream stream, ChunkingOptions options, TreeWriter writer, CancellationToken cancellationToken) { - const int BufferLength = 32 * 1024; - - using IMemoryOwner owner = MemoryPool.Shared.Rent(BufferLength * 2); - Memory buffer = owner.Memory; - - int readBufferOffset = 0; - Memory appendBuffer = Memory.Empty; - for (; ; ) - { - // Start a read into memory - Memory readBuffer = buffer.Slice(readBufferOffset, BufferLength); - Task readTask = Task.Run(async () => await stream.ReadAsync(readBuffer, cancellationToken), cancellationToken); - - // In the meantime, append the last data that was read to the tree - if (appendBuffer.Length > 0) - { - await AppendAsync(appendBuffer, options, writer, cancellationToken); - } - - // Wait for the read to finish - int numBytes = await readTask; - if (numBytes == 0) - { - break; - } - - // Switch the buffers around - appendBuffer = readBuffer.Slice(0, numBytes); - readBufferOffset ^= BufferLength; - } + FileNode node = await ExpandAsync(cancellationToken); + Target = await node.AppendAsync(stream, options, writer, cancellationToken); } /// @@ -346,7 +328,22 @@ namespace EpicGames.Horde.Storage.Nodes /// True if the entry exists public bool Contains(Utf8String name) => TryGetFileEntry(name, out _) || TryGetDirectoryEntry(name, out _); -#region File operations + #region File operations + + /// + /// Adds a new file entry to this directory + /// + /// The entry to add + public void AddFile(FileEntry entry) + { + if (TryGetDirectoryEntry(entry.Name, out _)) + { + throw new ArgumentException($"A directory with the name {entry.Name} already exists"); + } + + _nameToFileEntry.Add(entry.Name, entry); + MarkAsDirty(); + } /// /// Adds a new directory with the given name @@ -356,17 +353,8 @@ namespace EpicGames.Horde.Storage.Nodes /// The new directory object public FileEntry AddFile(Utf8String name, FileEntryFlags flags) { - if (TryGetDirectoryEntry(name, out _)) - { - throw new ArgumentException($"A directory with the name {name} already exists"); - } - - FileNode newNode = new LeafFileNode(); - - FileEntry entry = new FileEntry(name, flags, newNode); - _nameToFileEntry[name] = entry; - MarkAsDirty(); - + FileEntry entry = new FileEntry(name, flags); + AddFile(entry); return entry; } @@ -573,7 +561,7 @@ namespace EpicGames.Horde.Storage.Nodes return false; } -#endregion + #endregion /// /// Adds files from a directory on disk @@ -585,21 +573,81 @@ namespace EpicGames.Horde.Storage.Nodes /// Cancellation token for the operation /// public async Task CopyFromDirectoryAsync(DirectoryInfo directoryInfo, ChunkingOptions options, TreeWriter writer, ILogger logger, CancellationToken cancellationToken) + { + const int MaxWriters = 32; + const long MinSizePerWriter = 1024 * 1024; + + // Enumerate all the files below this directory + List<(DirectoryNode DirectoryNode, FileInfo FileInfo)> files = new List<(DirectoryNode, FileInfo)>(); + FindFilesToCopy(directoryInfo, files); + + // Compute the total size + long totalSize = files.Sum(x => x.Item2.Length); + long chunkSize = Math.Max(MinSizePerWriter, totalSize / MaxWriters); + + List tasks = new List(); + long currentSize = 0; + long targetSize = chunkSize; + FileEntry[] fileEntries = new FileEntry[files.Count]; + + // Split it into separate writers + for (int minIdx = 0; minIdx < files.Count; ) + { + currentSize += files[minIdx].FileInfo.Length; + + int maxIdx = minIdx + 1; + while (maxIdx < files.Count && currentSize + files[maxIdx].FileInfo.Length <= targetSize) + { + currentSize += files[maxIdx].FileInfo.Length; + maxIdx++; + } + + int minIdxCopy = minIdx; + tasks.Add(Task.Run(() => CopyFilesAsync(files, minIdxCopy, maxIdx, fileEntries, options, writer, logger, cancellationToken), cancellationToken)); + + targetSize += chunkSize; + minIdx = maxIdx; + } + + // Wait for them all to finish + await Task.WhenAll(tasks); + + // Update the directory with all the output entries + for (int idx = 0; idx < files.Count; idx++) + { + files[idx].DirectoryNode.AddFile(fileEntries[idx]); + } + } + + void FindFilesToCopy(DirectoryInfo directoryInfo, List<(DirectoryNode, FileInfo)> files) { foreach (DirectoryInfo subDirectoryInfo in directoryInfo.EnumerateDirectories()) { - logger.LogInformation("Adding {Directory}", subDirectoryInfo.FullName); - DirectoryNode subDirectoryNode = AddDirectory(subDirectoryInfo.Name); - await subDirectoryNode.CopyFromDirectoryAsync(subDirectoryInfo, options, writer, logger, cancellationToken); + AddDirectory(subDirectoryInfo.Name).FindFilesToCopy(subDirectoryInfo, files); } foreach (FileInfo fileInfo in directoryInfo.EnumerateFiles()) { - logger.LogInformation("Adding {File}", fileInfo.FullName); - using Stream stream = fileInfo.OpenRead(); - await AddFileAsync(fileInfo.Name, 0, stream, options, writer, cancellationToken); + files.Add((this, fileInfo)); } } + static async Task CopyFilesAsync(List<(DirectoryNode DirectoryNode, FileInfo FileInfo)> files, int minIdx, int maxIdx, FileEntry[] entries, ChunkingOptions options, TreeWriter baseWriter, ILogger logger, CancellationToken cancellationToken) + { + TreeWriter writer = new TreeWriter(baseWriter); + for(int idx = minIdx; idx < maxIdx; idx++) + { + FileInfo fileInfo = files[idx].FileInfo; + + FileEntry fileEntry = new FileEntry(fileInfo.Name, FileEntryFlags.None); + using (Stream stream = fileInfo.OpenRead()) + { + await fileEntry.AppendAsync(stream, options, writer, cancellationToken); + } + entries[idx] = fileEntry; + } + await writer.FlushAsync(cancellationToken); + } + /// /// Utility function to allow extracting a packed directory to disk /// diff --git a/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/Nodes/FileNode.cs b/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/Nodes/FileNode.cs index 3d9b136fc7f6..4ddd2e8042be 100644 --- a/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/Nodes/FileNode.cs +++ b/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/Nodes/FileNode.cs @@ -132,6 +132,63 @@ namespace EpicGames.Horde.Storage.Nodes return AppendAsync(this, input, options, writer, cancellationToken); } + /// + /// Appends data to this file + /// + /// Data to append to the file + /// Options for chunking the data + /// Writer for new node data + /// Cancellation token for the operation + /// New node at the root of this file + public ValueTask AppendAsync(Stream stream, ChunkingOptions options, TreeWriter writer, CancellationToken cancellationToken) + { + return AppendAsync(this, stream, options, writer, cancellationToken); + } + + /// + /// Appends data to this file + /// + /// Node to append to + /// Data to append to the file + /// Options for chunking the data + /// Writer for new node data + /// Cancellation token for the operation + /// New node at the root of this file + static async ValueTask AppendAsync(FileNode node, Stream stream, ChunkingOptions options, TreeWriter writer, CancellationToken cancellationToken) + { + const int BufferLength = 32 * 1024; + + using IMemoryOwner owner = MemoryPool.Shared.Rent(BufferLength * 2); + Memory buffer = owner.Memory; + + int readBufferOffset = 0; + Memory appendBuffer = Memory.Empty; + for (; ; ) + { + // Start a read into memory + Memory readBuffer = buffer.Slice(readBufferOffset, BufferLength); + Task readTask = Task.Run(async () => await stream.ReadAsync(readBuffer, cancellationToken), cancellationToken); + + // In the meantime, append the last data that was read to the tree + if (appendBuffer.Length > 0) + { + node = await node.AppendAsync(appendBuffer, options, writer, cancellationToken); + } + + // Wait for the read to finish + int numBytes = await readTask; + if (numBytes == 0) + { + break; + } + + // Switch the buffers around + appendBuffer = readBuffer.Slice(0, numBytes); + readBufferOffset ^= BufferLength; + } + return node; + } + static async ValueTask AppendAsync(FileNode root, ReadOnlyMemory input, ChunkingOptions options, TreeWriter writer, CancellationToken cancellationToken) { for (; ; ) @@ -303,23 +360,33 @@ namespace EpicGames.Horde.Storage.Nodes return newData; } - // Fast path for appending data to the buffer up to the chunk window size - int windowSize = options.LeafOptions.MinSize; - if (Length < windowSize) + // If the target option sizes are fixed, just chunk the data along fixed boundaries + if (options.LeafOptions.MinSize == options.LeafOptions.TargetSize && options.LeafOptions.MaxSize == options.LeafOptions.TargetSize) { - int appendLength = Math.Min(windowSize - (int)Length, newData.Length); - AppendLeafData(newData.Span.Slice(0, appendLength)); - newData = newData.Slice(appendLength); - } - - // Cap the maximum amount of data to append to this node - int maxLength = Math.Min(newData.Length, options.LeafOptions.MaxSize - (int)Length); - if (maxLength > 0) - { - ReadOnlySpan inputSpan = newData.Span.Slice(0, maxLength); - int length = AppendLeafDataToChunkBoundary(inputSpan, options); + int length = Math.Min(newData.Length, options.LeafOptions.MaxSize - (int)Length); + AppendLeafData(newData.Span.Slice(0, length), 0); newData = newData.Slice(length); } + else + { + // Fast path for appending data to the buffer up to the chunk window size + int windowSize = options.LeafOptions.MinSize; + if (Length < windowSize) + { + int appendLength = Math.Min(windowSize - (int)Length, newData.Length); + AppendLeafData(newData.Span.Slice(0, appendLength)); + newData = newData.Slice(appendLength); + } + + // Cap the maximum amount of data to append to this node + int maxLength = Math.Min(newData.Length, options.LeafOptions.MaxSize - (int)Length); + if (maxLength > 0) + { + ReadOnlySpan inputSpan = newData.Span.Slice(0, maxLength); + int length = AppendLeafDataToChunkBoundary(inputSpan, options); + newData = newData.Slice(length); + } + } // Mark this node as complete if we've reached the max size if (Length == options.LeafOptions.MaxSize) @@ -518,7 +585,7 @@ namespace EpicGames.Horde.Storage.Nodes } // Collapse the final node - await writer.WriteAsync(Children[^1].Target!, cancellationToken); + await writer.WriteAsync(Children[^1], cancellationToken); } } diff --git a/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/StorageClientBase.cs b/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/StorageClientBase.cs index c3b579695a3f..2a93fa807b45 100644 --- a/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/StorageClientBase.cs +++ b/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/StorageClientBase.cs @@ -343,8 +343,8 @@ namespace EpicGames.Horde.Storage public void ReadRef(TreeNodeRef treeNodeRef) { - treeNodeRef._store = _store; - (treeNodeRef._hash, treeNodeRef._locator) = _refs[(int)this.ReadUnsignedVarInt()]; + (IoHash hash, NodeLocator locator) = _refs[(int)this.ReadUnsignedVarInt()]; + treeNodeRef.MarkAsClean(_store, hash, locator, 0); } } diff --git a/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/TreeNode.cs b/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/TreeNode.cs index 37e7d0e2e2e9..fdf2960b713c 100644 --- a/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/TreeNode.cs +++ b/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/TreeNode.cs @@ -1,18 +1,9 @@ // Copyright Epic Games, Inc. All Rights Reserved. using System; -using System.Buffers; -using System.Collections.Generic; -using System.Diagnostics; using System.Reflection; using System.Reflection.Emit; -using System.Reflection.Metadata; -using System.Threading; -using System.Threading.Tasks; -using System.Xml.Linq; using EpicGames.Core; -using EpicGames.Serialization; -using JetBrains.Annotations; namespace EpicGames.Horde.Storage { @@ -22,26 +13,30 @@ namespace EpicGames.Horde.Storage public abstract class TreeNode { /// - /// Whether the node is dirty or not + /// Revision number for the node. Incremented whenever the node is modified; used to detect changes between a serialized ref and live instance. /// - public bool IsDirty { get; private set; } + int _revision; + + /// + /// Revision number for this node. Incremented Whether the node is dirty or not + /// + public int Revision => _revision; /// /// Default constructor /// protected TreeNode() { - IsDirty = true; + _revision = 1; } /// - /// Serialization constructor. Clears the dirty flag by default. + /// Serialization constructor. Leaves the revision number zeroed by default. /// /// protected TreeNode(IMemoryReader reader) { _ = reader; - IsDirty = false; } /// @@ -49,7 +44,7 @@ namespace EpicGames.Horde.Storage /// protected void MarkAsDirty() { - IsDirty = true; + _revision++; } /// diff --git a/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/TreeNodeRef.cs b/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/TreeNodeRef.cs index 0aa97a9f5dc4..5cb89b2cf8f8 100644 --- a/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/TreeNodeRef.cs +++ b/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/TreeNodeRef.cs @@ -1,7 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. using System; -using System.Reflection; using System.Threading; using System.Threading.Tasks; using EpicGames.Core; @@ -9,35 +8,49 @@ using EpicGames.Core; namespace EpicGames.Horde.Storage { /// - /// Stores a reference from a parent to child node, which can be resurrected after the child node is flushed to storage if subsequently modified. + /// Stores a reference from a parent to child node. The reference may be to a node in memory, or to a node in the storage system. /// - public abstract class TreeNodeRef + public class TreeNodeRef { /// /// Store containing the node data. May be null for nodes in memory. /// - internal IStorageClient? _store; + public IStorageClient? Store { get; private set; } /// /// Hash of the referenced node. Invalid for nodes in memory. /// - internal IoHash _hash; + public IoHash Hash { get; private set; } /// /// Locator for the blob containing this node. Invalid for nodes in memory. /// - internal NodeLocator _locator; + public NodeLocator Locator { get; private set; } + + /// + /// Revision number of the target node + /// + private int _revision; + + /// + /// The target node in memory + /// + private TreeNode? _target; /// /// The target node, or null if the node is not resident in memory. /// - public TreeNode? Target { get; set; } + public TreeNode? Target + { + get => _target; + set => MarkAsDirty(value); + } /// /// Creates a reference to a node in memory. /// /// Node to reference - protected TreeNodeRef(TreeNode target) + protected internal TreeNodeRef(TreeNode target) { Target = target; } @@ -50,13 +63,13 @@ namespace EpicGames.Horde.Storage /// Locator for the node internal TreeNodeRef(IStorageClient store, IoHash hash, NodeLocator locator) { - _store = store; - _hash = hash; - _locator = locator; + Store = store; + Hash = hash; + Locator = locator; } /// - /// + /// Deserialization constructor /// /// public TreeNodeRef(ITreeNodeReader reader) @@ -64,18 +77,62 @@ namespace EpicGames.Horde.Storage reader.ReadRef(this); } + /// + /// Determines whether the the referenced node has modified from the last version written to storage + /// + /// + public bool IsDirty() => _target != null && _revision != _target.Revision; + + /// + /// Update the reference to refer to a node in memory. + /// + /// The target node + public void MarkAsDirty(TreeNode? target) + { + if (target == null) + { + if (!Locator.IsValid()) + { + throw new InvalidOperationException("Node has not been serialized to disk; cannot clear target reference."); + } + } + else + { + Store = null; + Hash = default; + Locator = default; + _revision = 0; + } + + _target = target; + } + + /// + /// Update the reference to refer to a location in storage. + /// + /// The storage client + /// Hash of the node + /// Location of the node + /// Revision number for the node + public bool MarkAsClean(IStorageClient store, IoHash hash, NodeLocator locator, int revision) + { + bool result = false; + if (_target == null || _target.Revision == revision) + { + Store = store; + Hash = hash; + Locator = locator; + _revision = revision; + result = true; + } + return result; + } + /// /// Serialize the node to the given writer /// /// public void Serialize(ITreeNodeWriter writer) => writer.WriteRef(this); - - /// - /// Resolve this reference to a concrete node - /// - /// Cancellation token for the operation - /// - public abstract ValueTask ExpandBaseAsync(CancellationToken cancellationToken = default); } /// @@ -121,16 +178,6 @@ namespace EpicGames.Horde.Storage { } - /// - /// Resolve this reference to a concrete node - /// - /// Cancellation token for the operation - /// - public override async ValueTask ExpandBaseAsync(CancellationToken cancellationToken = default) - { - return await ExpandAsync(cancellationToken); - } - /// /// Resolve this reference to a concrete node /// @@ -141,7 +188,7 @@ namespace EpicGames.Horde.Storage T? result = Target; if (result == null) { - Target = await _store!.ReadNodeAsync(_locator, cancellationToken); + Target = await Store!.ReadNodeAsync(Locator, cancellationToken); result = Target; } return result; diff --git a/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/TreeWriter.cs b/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/TreeWriter.cs index d59fc1336be7..91e0b2d0d02a 100644 --- a/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/TreeWriter.cs +++ b/Engine/Source/Programs/Shared/EpicGames.Horde/Storage/TreeWriter.cs @@ -41,13 +41,17 @@ namespace EpicGames.Horde.Storage { public readonly IoHash Hash; public readonly int Length; + public readonly int Revision; public readonly IReadOnlyList RefHashes; + public readonly TreeNodeRef NodeRef; - public NodeInfo(IoHash hash, int length, IReadOnlyList refs) + public NodeInfo(IoHash hash, int length, IReadOnlyList refs, TreeNodeRef nodeRef, int revision) { Hash = hash; Length = length; RefHashes = refs; + NodeRef = nodeRef; + Revision = revision; } } @@ -90,8 +94,8 @@ namespace EpicGames.Horde.Storage // Queue of nodes for the current bundle readonly List _queue = new List(); - // List of fences for particular nodes to be written - readonly Dictionary> _fences = new Dictionary>(); + // Queue of nodes that are already scheduled to be written elsewhere + readonly List _secondaryQueue = new List(); // List of packets in the current bundle readonly List _packets = new List(); @@ -102,6 +106,11 @@ namespace EpicGames.Horde.Storage // Total size of compressed packets in the current bundle int _currentBundleLength; + /// + /// Accessor for the store backing this writer + /// + public IStorageClient Store => _store; + /// /// Constructor /// @@ -117,20 +126,12 @@ namespace EpicGames.Horde.Storage } /// - /// Returns a task that can be used to wait for a particular node to be flushed to storage. + /// Copies settings from another tree writer /// - /// Hash of the node to wait for - /// - public Task WaitForFlushAsync(IoHash hash) + /// Other instance to copy from + public TreeWriter(TreeWriter other) + : this(other._store, other._options, other._prefix) { - TaskCompletionSource? tcs; - if (!_fences.TryGetValue(hash, out tcs)) - { - _ = _hashToNode[hash]; // Make sure it exists! - tcs = new TaskCompletionSource(); - _fences.Add(hash, tcs); - } - return tcs.Task; } /// @@ -149,37 +150,53 @@ namespace EpicGames.Horde.Storage public bool TryGetLocator(IoHash hash, out NodeLocator locator) => _hashToNode.TryGetValue(hash, out locator); /// - public async Task WriteAsync(TreeNode node, CancellationToken cancellationToken = default) + public async Task WriteAsync(TreeNodeRef nodeRef, CancellationToken cancellationToken = default) { + // If the target node hasn't been modified, use the existing state. + if (!nodeRef.IsDirty()) + { + Debug.Assert(nodeRef.Locator.IsValid()); + _hashToNode.TryAdd(nodeRef.Hash, nodeRef.Locator); + nodeRef.Target = null; + return nodeRef.Hash; + } + // Serialize the node NodeWriter nodeWriter = new NodeWriter(); - node.Serialize(nodeWriter); + nodeRef.Target!.Serialize(nodeWriter); // Write all the references first IoHash[] nextRefs = new IoHash[nodeWriter.Refs.Count]; for (int idx = 0; idx < nodeWriter.Refs.Count; idx++) { TreeNodeRef reference = nodeWriter.Refs[idx]; - if (reference.Target == null || !reference.Target.IsDirty) - { - nextRefs[idx] = reference._hash; - } - else - { - nextRefs[idx] = await WriteAsync(reference.Target, cancellationToken); - } + nextRefs[idx] = await WriteAsync(reference, cancellationToken); } // Get the hash for the new blob ReadOnlySequence data = nodeWriter.Data.AsSequence(); IoHash hash = ComputeHash(data, nextRefs); - // Write the node if we don't already have it - if (!_hashToNode.ContainsKey(hash)) + // Check if we're already tracking a node with the same hash + NodeInfo nodeInfo = new NodeInfo(hash, (int)data.Length, nextRefs, nodeRef, nodeRef.Target.Revision); + if (_hashToNode.TryGetValue(hash, out NodeLocator locator)) { + // If the node is in the lookup but not currently valid, it's already queued for writing. Add this ref to the list of refs that needs fixing up, + // so we can update it after flushing. + if (locator.IsValid()) + { + nodeRef.MarkAsClean(_store, hash, locator, nodeRef.Target.Revision); + } + else + { + _secondaryQueue.Add(nodeInfo); + } + } + else + { + // Write the node if we don't already have it _packetWriter.WriteFixedLengthBytes(data); - NodeInfo nodeInfo = new NodeInfo(hash, (int)data.Length, nextRefs); _queue.Add(nodeInfo); _hashToNode.Add(hash, default); @@ -205,12 +222,13 @@ namespace EpicGames.Horde.Storage /// public async Task WriteRefAsync(RefName name, TreeNode node, CancellationToken cancellationToken = default) { - IoHash hash = await WriteAsync(node, cancellationToken); + TreeNodeRef nodeRef = new TreeNodeRef(node); + await WriteAsync(nodeRef, cancellationToken); await FlushAsync(cancellationToken); - NodeLocator locator = GetLocator(hash); - await _store.WriteRefTargetAsync(name, locator, cancellationToken); - return locator; + Debug.Assert(nodeRef.Locator.IsValid()); + await _store.WriteRefTargetAsync(name, nodeRef.Locator, cancellationToken); + return nodeRef.Locator; } /// @@ -313,24 +331,25 @@ namespace EpicGames.Horde.Storage BlobLocator locator = await _store.WriteBundleAsync(bundle, _prefix, cancellationToken); for (int idx = 0; idx < _queue.Count; idx++) { + NodeLocator nodeLocator = new NodeLocator(locator, idx); + NodeInfo nodeInfo = _queue[idx]; - _hashToNode[nodeInfo.Hash] = new NodeLocator(locator, idx); + nodeInfo.NodeRef.MarkAsClean(_store, nodeInfo.Hash, nodeLocator, nodeInfo.Revision); + + _hashToNode[nodeInfo.Hash] = nodeLocator; } - // Fire all the alerts for any nodes that have been written - List removeFences = new List(_fences.Keys.Where(x => nodeToIndex.ContainsKey(x))); - foreach ((IoHash hash, TaskCompletionSource tcs) in _fences) + // Update any refs with their target locator + int refIdx = 0; + for (; refIdx < _secondaryQueue.Count; refIdx++) { - if (nodeToIndex.ContainsKey(hash)) + NodeInfo nodeInfo = _secondaryQueue[refIdx]; + if (_hashToNode.TryGetValue(nodeInfo.Hash, out NodeLocator refLocator)) { - tcs.SetResult(_hashToNode[hash]); - removeFences.Add(hash); + nodeInfo.NodeRef.MarkAsClean(_store, nodeInfo.Hash, refLocator, nodeInfo.Revision); } } - foreach (IoHash removeFence in removeFences) - { - _hashToNode.Remove(removeFence); - } + _secondaryQueue.RemoveRange(0, refIdx); // Reset the output state _packets.Clear();