// Copyright Epic Games, Inc. All Rights Reserved.
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EpicGames.Horde.Users;
using Horde.Server.Server;
using Horde.Server.Users;
using Horde.Server.Utilities;
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver;
namespace Horde.Server.Notifications
{
///
/// Collection of notification triggers
///
public class NotificationTriggerCollection : INotificationTriggerCollection
{
class SubscriptionDocument : INotificationSubscription
{
[BsonIgnoreIfNull]
public string? User { get; set; }
public UserId UserId { get; set; }
public bool Email { get; set; }
public bool Slack { get; set; }
[BsonConstructor]
private SubscriptionDocument()
{
User = null!;
}
public SubscriptionDocument(UserId userId, bool email, bool slack)
{
UserId = userId;
Email = email;
Slack = slack;
}
public SubscriptionDocument(INotificationSubscription subscription)
{
UserId = subscription.UserId;
Email = subscription.Email;
Slack = subscription.Slack;
}
}
class TriggerDocument : INotificationTrigger
{
public ObjectId Id { get; set; }
public bool Fired { get; set; }
public List Subscriptions { get; set; } = new List();
public int UpdateIndex { get; set; }
IReadOnlyList INotificationTrigger.Subscriptions => Subscriptions;
}
readonly IMongoCollection _triggers;
readonly IUserCollection _userCollection;
///
/// Constructor
///
/// The database singleton
///
public NotificationTriggerCollection(MongoService mongoService, IUserCollection userCollection)
{
_triggers = mongoService.GetCollection("NotificationTriggers");
_userCollection = userCollection;
}
///
public async Task GetAsync(ObjectId triggerId, CancellationToken cancellationToken)
{
TriggerDocument? trigger = await _triggers.Find(x => x.Id == triggerId).FirstOrDefaultAsync(cancellationToken);
if (trigger != null)
{
for (int idx = 0; idx < trigger.Subscriptions.Count; idx++)
{
SubscriptionDocument subscription = trigger.Subscriptions[idx];
if (subscription.User != null)
{
IUser? user = await _userCollection.FindUserByLoginAsync(subscription.User, cancellationToken);
if (user == null)
{
trigger.Subscriptions.RemoveAt(idx);
idx--;
}
else
{
subscription.UserId = user.Id;
subscription.User = null;
}
}
}
}
return trigger;
}
///
public async Task FindOrAddAsync(ObjectId triggerId, CancellationToken cancellationToken)
{
for (; ; )
{
// Find an existing trigger
INotificationTrigger? existing = await GetAsync(triggerId, cancellationToken);
if (existing != null)
{
return existing;
}
// Try to insert a new document
try
{
TriggerDocument newDocument = new TriggerDocument();
newDocument.Id = triggerId;
await _triggers.InsertOneAsync(newDocument, (InsertOneOptions?)null, cancellationToken);
return newDocument;
}
catch (MongoWriteException ex)
{
if (ex.WriteError.Category != ServerErrorCategory.DuplicateKey)
{
throw;
}
}
}
}
///
/// Updates an existing document
///
/// The trigger to update
/// The update definition
/// Cancellation token for the operation
/// The updated document
async Task TryUpdateAsync(INotificationTrigger trigger, TransactionBuilder transaction, CancellationToken cancellationToken)
{
TriggerDocument document = (TriggerDocument)trigger;
int nextUpdateIndex = document.UpdateIndex + 1;
FilterDefinition filter = Builders.Filter.Expr(x => x.Id == trigger.Id && x.UpdateIndex == document.UpdateIndex);
UpdateDefinition update = transaction.ToUpdateDefinition().Set(x => x.UpdateIndex, nextUpdateIndex);
UpdateResult result = await _triggers.UpdateOneAsync(filter, update, null, cancellationToken);
if (result.ModifiedCount > 0)
{
transaction.ApplyTo(document);
document.UpdateIndex = nextUpdateIndex;
return document;
}
return null;
}
///
public async Task DeleteAsync(ObjectId triggerId, CancellationToken cancellationToken)
{
await _triggers.DeleteOneAsync(x => x.Id == triggerId, cancellationToken);
}
///
public async Task DeleteAsync(List triggerIds, CancellationToken cancellationToken)
{
FilterDefinition filter = Builders.Filter.In(x => x.Id, triggerIds);
await _triggers.DeleteManyAsync(filter, cancellationToken);
}
///
public async Task FireAsync(INotificationTrigger trigger, CancellationToken cancellationToken)
{
if (trigger.Fired)
{
return null;
}
for (; ; )
{
TransactionBuilder transaction = new TransactionBuilder();
transaction.Set(x => x.Fired, true);
INotificationTrigger? newTrigger = await TryUpdateAsync(trigger, transaction, cancellationToken);
if (newTrigger != null)
{
return newTrigger;
}
newTrigger = await FindOrAddAsync(trigger.Id, cancellationToken); // Need to add to prevent race condition on triggering vs adding
if (newTrigger == null || newTrigger.Fired)
{
return null;
}
}
}
///
public async Task UpdateSubscriptionsAsync(INotificationTrigger trigger, UserId userId, bool? email, bool? slack, CancellationToken cancellationToken)
{
for (; ; )
{
// If the trigger has already fired, don't add a new subscription to it
if (trigger.Fired)
{
return trigger;
}
// Try to update the trigger
List newSubscriptions = new List();
newSubscriptions.AddRange(trigger.Subscriptions.Select(x => new SubscriptionDocument(x)));
SubscriptionDocument? newSubscription = newSubscriptions.FirstOrDefault(x => x.UserId == userId);
if (newSubscription == null)
{
newSubscription = new SubscriptionDocument(userId, email ?? false, slack ?? false);
newSubscriptions.Add(newSubscription);
}
else
{
newSubscription.Email = email ?? newSubscription.Email;
newSubscription.Slack = slack ?? newSubscription.Slack;
}
TransactionBuilder transaction = new TransactionBuilder();
transaction.Set(x => x.Subscriptions, newSubscriptions);
INotificationTrigger? newTrigger = await TryUpdateAsync(trigger, transaction, cancellationToken);
if (newTrigger != null)
{
return newTrigger;
}
newTrigger = await GetAsync(trigger.Id, cancellationToken);
if (newTrigger == null)
{
return null;
}
}
}
}
}