// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Reactive.Concurrency;
namespace System.Reactive.Subjects
{
///
/// Represents an object that is both an observable sequence as well as an observer.
/// Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
///
/// The type of the elements processed by the subject.
public sealed class ReplaySubject : ISubject, IDisposable
{
private const int InfiniteBufferSize = int.MaxValue;
private readonly int _bufferSize;
private readonly TimeSpan _window;
private readonly IScheduler _scheduler;
private readonly IStopwatch _stopwatch;
private readonly Queue> _queue;
private bool _isStopped;
private Exception _error;
private ImmutableList> _observers;
private bool _isDisposed;
private readonly object _gate = new object();
///
/// Initializes a new instance of the class with the specified buffer size, window and scheduler.
///
/// Maximum element count of the replay buffer.
/// Maximum time length of the replay buffer.
/// Scheduler the observers are invoked on.
/// is less than zero. -or- is less than TimeSpan.Zero.
/// is null.
public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
{
if (bufferSize < 0)
throw new ArgumentOutOfRangeException("bufferSize");
if (window < TimeSpan.Zero)
throw new ArgumentOutOfRangeException("window");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
_bufferSize = bufferSize;
_window = window;
_scheduler = scheduler;
_stopwatch = _scheduler.StartStopwatch();
_queue = new Queue>();
_isStopped = false;
_error = null;
_observers = new ImmutableList>();
}
///
/// Initializes a new instance of the class with the specified buffer size and window.
///
/// Maximum element count of the replay buffer.
/// Maximum time length of the replay buffer.
/// is less than zero. -or- is less than TimeSpan.Zero.
public ReplaySubject(int bufferSize, TimeSpan window)
: this(bufferSize, window, SchedulerDefaults.Iteration)
{
}
///
/// Initializes a new instance of the class.
///
public ReplaySubject()
: this(InfiniteBufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{
}
///
/// Initializes a new instance of the class with the specified scheduler.
///
/// Scheduler the observers are invoked on.
/// is null.
public ReplaySubject(IScheduler scheduler)
: this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
{
}
///
/// Initializes a new instance of the class with the specified buffer size and scheduler.
///
/// Maximum element count of the replay buffer.
/// Scheduler the observers are invoked on.
/// is null.
/// is less than zero.
public ReplaySubject(int bufferSize, IScheduler scheduler)
: this(bufferSize, TimeSpan.MaxValue, scheduler)
{
}
///
/// Initializes a new instance of the class with the specified buffer size.
///
/// Maximum element count of the replay buffer.
/// is less than zero.
public ReplaySubject(int bufferSize)
: this(bufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{
}
///
/// Initializes a new instance of the class with the specified window and scheduler.
///
/// Maximum time length of the replay buffer.
/// Scheduler the observers are invoked on.
/// is null.
/// is less than TimeSpan.Zero.
public ReplaySubject(TimeSpan window, IScheduler scheduler)
: this(InfiniteBufferSize, window, scheduler)
{
}
///
/// Initializes a new instance of the class with the specified window.
///
/// Maximum time length of the replay buffer.
/// is less than TimeSpan.Zero.
public ReplaySubject(TimeSpan window)
: this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
{
}
///
/// Indicates whether the subject has observers subscribed to it.
///
public bool HasObservers
{
get
{
var observers = _observers;
return observers != null && observers.Data.Length > 0;
}
}
void Trim(TimeSpan now)
{
while (_queue.Count > _bufferSize)
_queue.Dequeue();
while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
_queue.Dequeue();
}
///
/// Notifies all subscribed and future observers about the arrival of the specified element in the sequence.
///
/// The value to send to all observers.
public void OnNext(T value)
{
var o = default(ScheduledObserver[]);
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
var now = _stopwatch.Elapsed;
_queue.Enqueue(new TimeInterval(value, now));
Trim(now);
o = _observers.Data;
foreach (var observer in o)
observer.OnNext(value);
}
}
if (o != null)
foreach (var observer in o)
observer.EnsureActive();
}
///
/// Notifies all subscribed and future observers about the specified exception.
///
/// The exception to send to all observers.
/// is null.
public void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException("error");
var o = default(ScheduledObserver[]);
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
var now = _stopwatch.Elapsed;
_isStopped = true;
_error = error;
Trim(now);
o = _observers.Data;
foreach (var observer in o)
observer.OnError(error);
_observers = new ImmutableList>();
}
}
if (o != null)
foreach (var observer in o)
observer.EnsureActive();
}
///
/// Notifies all subscribed and future observers about the end of the sequence.
///
public void OnCompleted()
{
var o = default(ScheduledObserver[]);
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
var now = _stopwatch.Elapsed;
_isStopped = true;
Trim(now);
o = _observers.Data;
foreach (var observer in o)
observer.OnCompleted();
_observers = new ImmutableList>();
}
}
if (o != null)
foreach (var observer in o)
observer.EnsureActive();
}
///
/// Subscribes an observer to the subject.
///
/// Observer to subscribe to the subject.
/// Disposable object that can be used to unsubscribe the observer from the subject.
/// is null.
public IDisposable Subscribe(IObserver observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
var so = new ScheduledObserver(_scheduler, observer);
var n = 0;
var subscription = new RemovableDisposable(this, so);
lock (_gate)
{
CheckDisposed();
//
// Notice the v1.x behavior of always calling Trim is preserved here.
//
// This may be subject (pun intended) of debate: should this policy
// only be applied while the sequence is active? With the current
// behavior, a sequence will "die out" after it has terminated by
// continuing to drop OnNext notifications from the queue.
//
// In v1.x, this behavior was due to trimming based on the clock value
// returned by scheduler.Now, applied to all but the terminal message
// in the queue. Using the IStopwatch has the same effect. Either way,
// we guarantee the final notification will be observed, but there's
// no way to retain the buffer directly. One approach is to use the
// time-based TakeLast operator and apply an unbounded ReplaySubject
// to it.
//
// To conclude, we're keeping the behavior as-is for compatibility
// reasons with v1.x.
//
Trim(_stopwatch.Elapsed);
_observers = _observers.Add(so);
n = _queue.Count;
foreach (var item in _queue)
so.OnNext(item.Value);
if (_error != null)
{
n++;
so.OnError(_error);
}
else if (_isStopped)
{
n++;
so.OnCompleted();
}
}
so.EnsureActive(n);
return subscription;
}
void Unsubscribe(ScheduledObserver observer)
{
lock (_gate)
{
if (!_isDisposed)
_observers = _observers.Remove(observer);
}
}
sealed class RemovableDisposable : IDisposable
{
private readonly ReplaySubject _subject;
private readonly ScheduledObserver _observer;
public RemovableDisposable(ReplaySubject subject, ScheduledObserver observer)
{
_subject = subject;
_observer = observer;
}
public void Dispose()
{
_observer.Dispose();
_subject.Unsubscribe(_observer);
}
}
void CheckDisposed()
{
if (_isDisposed)
throw new ObjectDisposedException(string.Empty);
}
///
/// Releases all resources used by the current instance of the class and unsubscribe all observers.
///
public void Dispose()
{
lock (_gate)
{
_isDisposed = true;
_observers = null;
}
}
}
}