// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Reactive.Concurrency;
namespace System.Reactive.Subjects
{
    /// 
    /// Represents an object that is both an observable sequence as well as an observer.
    /// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
    /// 
    /// The type of the elements processed by the subject.
    public sealed class ReplaySubject : ISubject, IDisposable
    {
        private const int InfiniteBufferSize = int.MaxValue;
        private readonly int _bufferSize;
        private readonly TimeSpan _window;
        private readonly IScheduler _scheduler;
        private readonly IStopwatch _stopwatch;
        private readonly Queue> _queue;
        private bool _isStopped;
        private Exception _error;
        private ImmutableList> _observers;
        private bool _isDisposed;
        
        private readonly object _gate = new object();
        /// 
        /// Initializes a new instance of the  class with the specified buffer size, window and scheduler.
        /// 
        /// Maximum element count of the replay buffer.
        /// Maximum time length of the replay buffer.
        /// Scheduler the observers are invoked on.
        ///  is less than zero. -or-  is less than TimeSpan.Zero.
        ///  is null.
        public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
        {
            if (bufferSize < 0)
                throw new ArgumentOutOfRangeException("bufferSize");
            if (window < TimeSpan.Zero)
                throw new ArgumentOutOfRangeException("window");
            if (scheduler == null)
                throw new ArgumentNullException("scheduler");
            _bufferSize = bufferSize;
            _window = window;
            _scheduler = scheduler;
            _stopwatch = _scheduler.StartStopwatch();
            _queue = new Queue>();
            _isStopped = false;
            _error = null;
            _observers = new ImmutableList>();
        }
        /// 
        /// Initializes a new instance of the  class with the specified buffer size and window.
        /// 
        /// Maximum element count of the replay buffer.
        /// Maximum time length of the replay buffer.
        ///  is less than zero. -or-  is less than TimeSpan.Zero.
        public ReplaySubject(int bufferSize, TimeSpan window)
            : this(bufferSize, window, SchedulerDefaults.Iteration)
        {
        }
        /// 
        /// Initializes a new instance of the  class.
        /// 
        public ReplaySubject()
            : this(InfiniteBufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
        {
        }
        /// 
        /// Initializes a new instance of the  class with the specified scheduler.
        /// 
        /// Scheduler the observers are invoked on.
        ///  is null.
        public ReplaySubject(IScheduler scheduler)
            : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
        {
        }
        /// 
        /// Initializes a new instance of the  class with the specified buffer size and scheduler.
        /// 
        /// Maximum element count of the replay buffer.
        /// Scheduler the observers are invoked on.
        ///  is null.
        ///  is less than zero.
        public ReplaySubject(int bufferSize, IScheduler scheduler)
            : this(bufferSize, TimeSpan.MaxValue, scheduler)
        {
        }
        /// 
        /// Initializes a new instance of the  class with the specified buffer size.
        /// 
        /// Maximum element count of the replay buffer.
        ///  is less than zero.
        public ReplaySubject(int bufferSize)
            : this(bufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
        {
        }
        /// 
        /// Initializes a new instance of the  class with the specified window and scheduler.
        /// 
        /// Maximum time length of the replay buffer.
        /// Scheduler the observers are invoked on.
        ///  is null.
        ///  is less than TimeSpan.Zero.
        public ReplaySubject(TimeSpan window, IScheduler scheduler)
            : this(InfiniteBufferSize, window, scheduler)
        {
        }
        /// 
        /// Initializes a new instance of the  class with the specified window.
        /// 
        /// Maximum time length of the replay buffer.
        ///  is less than TimeSpan.Zero.
        public ReplaySubject(TimeSpan window)
            : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
        {
        }
        /// 
        /// Indicates whether the subject has observers subscribed to it.
        /// 
        public bool HasObservers
        {
            get
            {
                var observers = _observers;
                return observers != null && observers.Data.Length > 0;
            }
        }
        void Trim(TimeSpan now)
        {
            while (_queue.Count > _bufferSize)
                _queue.Dequeue();
            while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
                _queue.Dequeue();
        }
        /// 
        /// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
        /// 
        /// The value to send to all observers.
        public void OnNext(T value)
        {
            var o = default(ScheduledObserver[]);
            lock (_gate)
            {
                CheckDisposed();
                if (!_isStopped)
                {
                    var now = _stopwatch.Elapsed;
                    _queue.Enqueue(new TimeInterval(value, now));
                    Trim(now);
                    o = _observers.Data;
                    foreach (var observer in o)
                        observer.OnNext(value);
                }
            }
            if (o != null)
                foreach (var observer in o)
                    observer.EnsureActive();
        }
        /// 
        /// Notifies all subscribed and future observers about the specified exception.
        /// 
        /// The exception to send to all observers.
        ///  is null.
        public void OnError(Exception error)
        {
            if (error == null)
                throw new ArgumentNullException("error");
            var o = default(ScheduledObserver[]);
            lock (_gate)
            {
                CheckDisposed();
                if (!_isStopped)
                {
                    var now = _stopwatch.Elapsed;
                    _isStopped = true;
                    _error = error;
                    Trim(now);
                    o = _observers.Data;
                    foreach (var observer in o)
                        observer.OnError(error);
                    _observers = new ImmutableList>();
                }
            }
            if (o != null)
                foreach (var observer in o)
                    observer.EnsureActive();
        }
        /// 
        /// Notifies all subscribed and future observers about the end of the sequence.
        /// 
        public void OnCompleted()
        {
            var o = default(ScheduledObserver[]);
            lock (_gate)
            {
                CheckDisposed();
                if (!_isStopped)
                {
                    var now = _stopwatch.Elapsed;
                    _isStopped = true;
                    Trim(now);
                    o = _observers.Data;
                    foreach (var observer in o)
                        observer.OnCompleted();
                    _observers = new ImmutableList>();
                }
            }
            if (o != null)
                foreach (var observer in o)
                    observer.EnsureActive();
        }
        /// 
        /// Subscribes an observer to the subject.
        /// 
        /// Observer to subscribe to the subject.
        /// Disposable object that can be used to unsubscribe the observer from the subject.
        ///  is null.
        public IDisposable Subscribe(IObserver observer)
        {
            if (observer == null)
                throw new ArgumentNullException("observer");
            var so = new ScheduledObserver(_scheduler, observer);
            var n = 0;
            var subscription = new RemovableDisposable(this, so);
            lock (_gate)
            {
                CheckDisposed();
                //
                // Notice the v1.x behavior of always calling Trim is preserved here.
                //
                // This may be subject (pun intended) of debate: should this policy
                // only be applied while the sequence is active? With the current
                // behavior, a sequence will "die out" after it has terminated by
                // continuing to drop OnNext notifications from the queue.
                //
                // In v1.x, this behavior was due to trimming based on the clock value
                // returned by scheduler.Now, applied to all but the terminal message
                // in the queue. Using the IStopwatch has the same effect. Either way,
                // we guarantee the final notification will be observed, but there's
                // no way to retain the buffer directly. One approach is to use the
                // time-based TakeLast operator and apply an unbounded ReplaySubject
                // to it.
                //
                // To conclude, we're keeping the behavior as-is for compatibility
                // reasons with v1.x.
                //
                Trim(_stopwatch.Elapsed);
                _observers = _observers.Add(so);
                n = _queue.Count;
                foreach (var item in _queue)
                    so.OnNext(item.Value);
                if (_error != null)
                {
                    n++;
                    so.OnError(_error);
                }
                else if (_isStopped)
                {
                    n++;
                    so.OnCompleted();
                }
            }
            so.EnsureActive(n);
            return subscription;
        }
        void Unsubscribe(ScheduledObserver observer)
        {
            lock (_gate)
            {
                if (!_isDisposed)
                    _observers = _observers.Remove(observer);
            }
        }
        sealed class RemovableDisposable : IDisposable
        {
            private readonly ReplaySubject _subject;
            private readonly ScheduledObserver _observer;
            public RemovableDisposable(ReplaySubject subject, ScheduledObserver observer)
            {
                _subject = subject;
                _observer = observer;
            }
            public void Dispose()
            {
                _observer.Dispose();
                _subject.Unsubscribe(_observer);
            }
        }
        void CheckDisposed()
        {
            if (_isDisposed)
                throw new ObjectDisposedException(string.Empty);
        }
        /// 
        /// Releases all resources used by the current instance of the  class and unsubscribe all observers.
        /// 
        public void Dispose()
        {
            lock (_gate)
            {
                _isDisposed = true;
                _observers = null;
            }
        }
    }
}