// Copyright Epic Games, Inc. All Rights Reserved. using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using EpicGames.Horde.Users; using Horde.Server.Server; using Horde.Server.Users; using Horde.Server.Utilities; using MongoDB.Bson; using MongoDB.Bson.Serialization.Attributes; using MongoDB.Driver; namespace Horde.Server.Notifications { /// /// Collection of notification triggers /// public class NotificationTriggerCollection : INotificationTriggerCollection { class SubscriptionDocument : INotificationSubscription { [BsonIgnoreIfNull] public string? User { get; set; } public UserId UserId { get; set; } public bool Email { get; set; } public bool Slack { get; set; } [BsonConstructor] private SubscriptionDocument() { User = null!; } public SubscriptionDocument(UserId userId, bool email, bool slack) { UserId = userId; Email = email; Slack = slack; } public SubscriptionDocument(INotificationSubscription subscription) { UserId = subscription.UserId; Email = subscription.Email; Slack = subscription.Slack; } } class TriggerDocument : INotificationTrigger { public ObjectId Id { get; set; } public bool Fired { get; set; } public List Subscriptions { get; set; } = new List(); public int UpdateIndex { get; set; } IReadOnlyList INotificationTrigger.Subscriptions => Subscriptions; } readonly IMongoCollection _triggers; readonly IUserCollection _userCollection; /// /// Constructor /// /// The database singleton /// public NotificationTriggerCollection(MongoService mongoService, IUserCollection userCollection) { _triggers = mongoService.GetCollection("NotificationTriggers"); _userCollection = userCollection; } /// public async Task GetAsync(ObjectId triggerId, CancellationToken cancellationToken) { TriggerDocument? trigger = await _triggers.Find(x => x.Id == triggerId).FirstOrDefaultAsync(cancellationToken); if (trigger != null) { for (int idx = 0; idx < trigger.Subscriptions.Count; idx++) { SubscriptionDocument subscription = trigger.Subscriptions[idx]; if (subscription.User != null) { IUser? user = await _userCollection.FindUserByLoginAsync(subscription.User, cancellationToken); if (user == null) { trigger.Subscriptions.RemoveAt(idx); idx--; } else { subscription.UserId = user.Id; subscription.User = null; } } } } return trigger; } /// public async Task FindOrAddAsync(ObjectId triggerId, CancellationToken cancellationToken) { for (; ; ) { // Find an existing trigger INotificationTrigger? existing = await GetAsync(triggerId, cancellationToken); if (existing != null) { return existing; } // Try to insert a new document try { TriggerDocument newDocument = new TriggerDocument(); newDocument.Id = triggerId; await _triggers.InsertOneAsync(newDocument, (InsertOneOptions?)null, cancellationToken); return newDocument; } catch (MongoWriteException ex) { if (ex.WriteError.Category != ServerErrorCategory.DuplicateKey) { throw; } } } } /// /// Updates an existing document /// /// The trigger to update /// The update definition /// Cancellation token for the operation /// The updated document async Task TryUpdateAsync(INotificationTrigger trigger, TransactionBuilder transaction, CancellationToken cancellationToken) { TriggerDocument document = (TriggerDocument)trigger; int nextUpdateIndex = document.UpdateIndex + 1; FilterDefinition filter = Builders.Filter.Expr(x => x.Id == trigger.Id && x.UpdateIndex == document.UpdateIndex); UpdateDefinition update = transaction.ToUpdateDefinition().Set(x => x.UpdateIndex, nextUpdateIndex); UpdateResult result = await _triggers.UpdateOneAsync(filter, update, null, cancellationToken); if (result.ModifiedCount > 0) { transaction.ApplyTo(document); document.UpdateIndex = nextUpdateIndex; return document; } return null; } /// public async Task DeleteAsync(ObjectId triggerId, CancellationToken cancellationToken) { await _triggers.DeleteOneAsync(x => x.Id == triggerId, cancellationToken); } /// public async Task DeleteAsync(List triggerIds, CancellationToken cancellationToken) { FilterDefinition filter = Builders.Filter.In(x => x.Id, triggerIds); await _triggers.DeleteManyAsync(filter, cancellationToken); } /// public async Task FireAsync(INotificationTrigger trigger, CancellationToken cancellationToken) { if (trigger.Fired) { return null; } for (; ; ) { TransactionBuilder transaction = new TransactionBuilder(); transaction.Set(x => x.Fired, true); INotificationTrigger? newTrigger = await TryUpdateAsync(trigger, transaction, cancellationToken); if (newTrigger != null) { return newTrigger; } newTrigger = await FindOrAddAsync(trigger.Id, cancellationToken); // Need to add to prevent race condition on triggering vs adding if (newTrigger == null || newTrigger.Fired) { return null; } } } /// public async Task UpdateSubscriptionsAsync(INotificationTrigger trigger, UserId userId, bool? email, bool? slack, CancellationToken cancellationToken) { for (; ; ) { // If the trigger has already fired, don't add a new subscription to it if (trigger.Fired) { return trigger; } // Try to update the trigger List newSubscriptions = new List(); newSubscriptions.AddRange(trigger.Subscriptions.Select(x => new SubscriptionDocument(x))); SubscriptionDocument? newSubscription = newSubscriptions.FirstOrDefault(x => x.UserId == userId); if (newSubscription == null) { newSubscription = new SubscriptionDocument(userId, email ?? false, slack ?? false); newSubscriptions.Add(newSubscription); } else { newSubscription.Email = email ?? newSubscription.Email; newSubscription.Slack = slack ?? newSubscription.Slack; } TransactionBuilder transaction = new TransactionBuilder(); transaction.Set(x => x.Subscriptions, newSubscriptions); INotificationTrigger? newTrigger = await TryUpdateAsync(trigger, transaction, cancellationToken); if (newTrigger != null) { return newTrigger; } newTrigger = await GetAsync(trigger.Id, cancellationToken); if (newTrigger == null) { return null; } } } } }