Horde: Implement ranged reads for storage backends. Remove code to emulate ranged reads from StorageClientBase.

#preflight none

[CL 22227699 by Ben Marsh in ue5-main branch]
This commit is contained in:
Ben Marsh
2022-09-28 14:37:09 -04:00
parent ffd2c74a66
commit f1b55f3565
19 changed files with 125 additions and 295 deletions
@@ -27,7 +27,7 @@ namespace EpicGames.Horde.Tests
{
using IMemoryCache cache = new MemoryCache(new MemoryCacheOptions());
InMemoryBlobStore blobStore = new InMemoryBlobStore(cache, NullLogger.Instance);
MemoryBlobStore blobStore = new MemoryBlobStore(cache, NullLogger.Instance);
await TestTreeAsync(blobStore, new TreeOptions { MaxBlobSize = 1024 * 1024 });
Assert.AreEqual(1, blobStore.Blobs.Count);
@@ -39,7 +39,7 @@ namespace EpicGames.Horde.Tests
{
using IMemoryCache cache = new MemoryCache(new MemoryCacheOptions());
InMemoryBlobStore blobStore = new InMemoryBlobStore(cache, NullLogger.Instance);
MemoryBlobStore blobStore = new MemoryBlobStore(cache, NullLogger.Instance);
await TestTreeAsync(blobStore, new TreeOptions { MaxBlobSize = 1 });
Assert.AreEqual(5, blobStore.Blobs.Count);
@@ -70,7 +70,7 @@ namespace EpicGames.Horde.Tests
}
}
static async Task TestTreeAsync(InMemoryBlobStore store, TreeOptions options)
static async Task TestTreeAsync(MemoryBlobStore store, TreeOptions options)
{
// Generate a tree
{
@@ -132,7 +132,7 @@ namespace EpicGames.Horde.Tests
public async Task DirectoryNodesAsync()
{
using IMemoryCache cache = new MemoryCache(new MemoryCacheOptions());
InMemoryBlobStore store = new InMemoryBlobStore(cache, NullLogger.Instance);
MemoryBlobStore store = new MemoryBlobStore(cache, NullLogger.Instance);
// Generate a tree
{
@@ -168,7 +168,7 @@ namespace EpicGames.Horde.Tests
public async Task FileNodesAsync()
{
using IMemoryCache cache = new MemoryCache(new MemoryCacheOptions());
InMemoryBlobStore store = new InMemoryBlobStore(cache, NullLogger.Instance);
MemoryBlobStore store = new MemoryBlobStore(cache, NullLogger.Instance);
// Generate a tree
{
@@ -213,7 +213,7 @@ namespace EpicGames.Horde.Tests
{
using IMemoryCache cache = new MemoryCache(new MemoryCacheOptions());
InMemoryBlobStore store = new InMemoryBlobStore(cache, NullLogger.Instance);
MemoryBlobStore store = new MemoryBlobStore(cache, NullLogger.Instance);
TreeWriter writer = new TreeWriter(store, new TreeOptions());
byte[] chunk = RandomNumberGenerator.GetBytes(256);
@@ -289,7 +289,7 @@ namespace EpicGames.Horde.Tests
public async Task LargeFileTestAsync()
{
using IMemoryCache cache = new MemoryCache(new MemoryCacheOptions());
InMemoryBlobStore store = new InMemoryBlobStore(cache, NullLogger.Instance);
MemoryBlobStore store = new MemoryBlobStore(cache, NullLogger.Instance);
const int length = 1024;
const int copies = 4096;
@@ -20,12 +20,12 @@ namespace EpicGames.Horde.Tests
public sealed class BundleTests : IDisposable
{
readonly IMemoryCache _cache;
readonly InMemoryBlobStore _storage;
readonly MemoryBlobStore _storage;
public BundleTests()
{
_cache = new MemoryCache(new MemoryCacheOptions());
_storage = new InMemoryBlobStore(_cache, NullLogger.Instance);
_storage = new MemoryBlobStore(_cache, NullLogger.Instance);
}
public void Dispose()
@@ -166,7 +166,7 @@ namespace EpicGames.Horde.Tests
[TestMethod]
public async Task BasicTestDirectory()
{
InMemoryBlobStore store = _storage;
MemoryBlobStore store = _storage;
DirectoryNode root = new DirectoryNode(DirectoryFlags.None);
DirectoryNode node = root.AddDirectory("hello");
@@ -16,7 +16,7 @@ namespace EpicGames.Horde.Storage.Backends
/// <summary>
/// Implementation of <see cref="IStorageClient"/> which stores data in memory. Not intended for production use.
/// </summary>
public class InMemoryBlobStore : StorageClientBase
public class MemoryBlobStore : StorageClientBase
{
/// <summary>
/// Map of blob id to blob data
@@ -37,7 +37,7 @@ namespace EpicGames.Horde.Storage.Backends
/// <summary>
/// Constructor
/// </summary>
public InMemoryBlobStore(IMemoryCache cache, ILogger logger)
public MemoryBlobStore(IMemoryCache cache, ILogger logger)
: base(cache, logger)
{
}
@@ -51,6 +51,13 @@ namespace EpicGames.Horde.Storage.Backends
return Task.FromResult(stream);
}
/// <inheritdoc/>
public override Task<Stream> ReadBlobRangeAsync(BlobLocator locator, int offset, int length, CancellationToken cancellationToken = default)
{
Stream stream = new ReadOnlySequenceStream(_blobs[locator].AsSequence().Slice(offset));
return Task.FromResult(stream);
}
/// <inheritdoc/>
public override async Task<BlobLocator> WriteBlobAsync(Stream stream, Utf8String prefix = default, CancellationToken cancellationToken = default)
{
@@ -9,7 +9,6 @@ using System.Threading;
using System.Threading.Tasks;
using EpicGames.Core;
using Microsoft.CodeAnalysis;
using Microsoft.CodeAnalysis.CSharp.Syntax;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
@@ -20,85 +19,6 @@ namespace EpicGames.Horde.Storage
/// </summary>
public abstract class StorageClientBase : IStorageClient
{
/// <summary>
/// Wraps a stream and returns a portion of it
/// </summary>
class RangedReadStream : Stream, IDisposable
{
readonly Stream _inner;
readonly int _length;
int _position;
public RangedReadStream(Stream inner, int length)
{
_inner = inner;
_length = length;
}
/// <inheritdoc/>
public override bool CanRead => true;
/// <inheritdoc/>
public override bool CanSeek => false;
/// <inheritdoc/>
public override bool CanWrite => false;
/// <inheritdoc/>
public override long Length => throw new NotSupportedException();
/// <inheritdoc/>
public override long Position { get => _position; set => throw new NotSupportedException(); }
/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
_inner.Dispose();
}
}
/// <inheritdoc/>
public override void Flush() => _inner.Flush();
/// <inheritdoc/>
public override int Read(Span<byte> buffer)
{
buffer = buffer.Slice(0, Math.Min(buffer.Length, _length - _position));
int read = _inner.Read(buffer);
_position += read;
return read;
}
/// <inheritdoc/>
public override int Read(byte[] buffer, int offset, int count) => Read(buffer.AsSpan(offset, count));
/// <inheritdoc/>
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
buffer = buffer.Slice(0, Math.Min(buffer.Length, _length - _position));
int read = await _inner.ReadAsync(buffer, cancellationToken);
_position += read;
return read;
}
/// <inheritdoc/>
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => await ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
/// <inheritdoc/>
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
/// <inheritdoc/>
public override void SetLength(long value) => throw new NotSupportedException();
/// <inheritdoc/>
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
}
/// <summary>
/// Describes a bundle export
/// </summary>
@@ -222,14 +142,16 @@ namespace EpicGames.Horde.Storage
public abstract Task<Stream> ReadBlobAsync(BlobLocator locator, CancellationToken cancellationToken = default);
/// <inheritdoc/>
public virtual async Task<Stream> ReadBlobRangeAsync(BlobLocator locator, int offset, int length, CancellationToken cancellationToken = default)
{
_logger.LogInformation("Reading {Locator} [{Offset},{Length}]", locator, offset, length);
Stream stream = await ReadBlobAsync(locator, cancellationToken);
stream.Seek(offset, SeekOrigin.Begin);
return new RangedReadStream(stream, Math.Min((int)(stream.Length - offset), length));
}
public abstract Task<Stream> ReadBlobRangeAsync(BlobLocator locator, int offset, int length, CancellationToken cancellationToken = default);
/// <summary>
/// Utility method to read a blob into a buffer
/// </summary>
/// <param name="locator">Blob location</param>
/// <param name="offset">Offset within the blob</param>
/// <param name="memory">Buffer to read into</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
/// <returns>The data that was read</returns>
async Task<Memory<byte>> ReadBlobRangeAsync(BlobLocator locator, int offset, Memory<byte> memory, CancellationToken cancellationToken = default)
{
using (Stream stream = await ReadBlobRangeAsync(locator, offset, memory.Length, cancellationToken))