// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System; using System.Collections.Generic; using System.Linq; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading; using Microsoft.Reactive.Testing; #if NUNIT using NUnit.Framework; using TestClassAttribute = NUnit.Framework.TestFixtureAttribute; using TestMethodAttribute = NUnit.Framework.TestAttribute; using TestInitializeAttribute = NUnit.Framework.SetUpAttribute; #else using Microsoft.VisualStudio.TestTools.UnitTesting; #endif using ReactiveTests.Dummies; namespace ReactiveTests.Tests { [TestClass] public class ObservableTimeTest : ReactiveTest { #region + Buffer + [TestMethod] public void Buffer_Time_ArgumentChecking() { var scheduler = new TestScheduler(); var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), TimeSpan.Zero)); ReactiveAssert.Throws(() => Observable.Buffer(someObservable, TimeSpan.Zero, null)); ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), TimeSpan.Zero, scheduler)); ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), TimeSpan.Zero, TimeSpan.Zero)); ReactiveAssert.Throws(() => Observable.Buffer(someObservable, TimeSpan.Zero, TimeSpan.Zero, null)); ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), TimeSpan.Zero, TimeSpan.Zero, scheduler)); } [TestMethod] public void BufferWithTime_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1), DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1), DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), null)); ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1))); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1))); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1))); ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), TimeSpan.FromTicks(1), DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, TimeSpan.FromTicks(-1), DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, TimeSpan.FromTicks(1), null)); ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), TimeSpan.FromTicks(1))); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, TimeSpan.FromTicks(-1))); } [TestMethod] public void BufferWithTime_Basic1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())) ); res.Messages.AssertEqual( OnNext(300, "2,3,4"), OnNext(370, "4,5,6"), OnNext(440, "6,7,8"), OnNext(510, "8,9"), OnNext(580, ""), OnNext(600, ""), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void BufferWithTime_Basic2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Buffer(TimeSpan.FromTicks(70), TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())) ); res.Messages.AssertEqual( OnNext(270, "2,3"), OnNext(370, "5,6"), OnNext(470, "8,9"), OnNext(570, ""), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void BufferWithTime_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnError(600, ex) ); var res = scheduler.Start(() => xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())) ); res.Messages.AssertEqual( OnNext(300, "2,3,4"), OnNext(370, "4,5,6"), OnNext(440, "6,7,8"), OnNext(510, "8,9"), OnNext(580, ""), OnError(600, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void BufferWithTime_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())), 370 ); res.Messages.AssertEqual( OnNext(300, "2,3,4") ); xs.Subscriptions.AssertEqual( Subscribe(200, 370) ); } [TestMethod] public void BufferWithTime_Basic_Same() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())) ); res.Messages.AssertEqual( OnNext(300, "2,3,4"), OnNext(400, "5,6,7"), OnNext(500, "8,9"), OnNext(600, ""), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void BufferWithTime_Basic_Same_Periodic() { var scheduler = new PeriodicTestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())) ); res.Messages.AssertEqual( OnNext(300, "2,3,4"), OnNext(400, "5,6,7"), OnNext(500, "8,9"), OnNext(600, ""), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); scheduler.Timers.AssertEqual( new TimerRun(200, 600) { 300, 400, 500 } ); } [TestMethod] public void BufferWithTime_Basic_Same_Periodic_Error() { var ex = new Exception(); var scheduler = new PeriodicTestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnError(480, ex) ); var res = scheduler.Start(() => xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())) ); res.Messages.AssertEqual( OnNext(300, "2,3,4"), OnNext(400, "5,6,7"), OnError(480, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 480) ); scheduler.Timers.AssertEqual( new TimerRun(200, 480) { 300, 400 } ); } [TestMethod] public void BufferWithTime_Default() { Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1), TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10)); Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10)); } [TestMethod] public void BufferWithTimeOrCount_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), TimeSpan.FromTicks(1), 1, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, TimeSpan.FromTicks(-1), 1, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, TimeSpan.FromTicks(1), 0, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, TimeSpan.FromTicks(1), 1, null)); ReactiveAssert.Throws(() => Observable.Buffer(default(IObservable), TimeSpan.FromTicks(1), 1)); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, TimeSpan.FromTicks(-1), 1)); ReactiveAssert.Throws(() => Observable.Buffer(DummyObservable.Instance, TimeSpan.FromTicks(1), 0)); } [TestMethod] public void BufferWithTimeOrCount_Basic() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(205, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(370, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())) ); res.Messages.AssertEqual( OnNext(240, "1,2,3"), OnNext(310, "4"), OnNext(370, "5,6,7"), OnNext(440, "8"), OnNext(510, "9"), OnNext(580, ""), OnNext(600, ""), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void BufferWithTimeOrCount_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(205, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(370, 7), OnNext(420, 8), OnNext(470, 9), OnError(600, ex) ); var res = scheduler.Start(() => xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())) ); res.Messages.AssertEqual( OnNext(240, "1,2,3"), OnNext(310, "4"), OnNext(370, "5,6,7"), OnNext(440, "8"), OnNext(510, "9"), OnNext(580, ""), OnError(600, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void BufferWithTimeOrCount_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(205, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(370, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())), 370 ); res.Messages.AssertEqual( OnNext(240, "1,2,3"), OnNext(310, "4"), OnNext(370, "5,6,7") ); xs.Subscriptions.AssertEqual( Subscribe(200, 370) ); } [TestMethod] public void BufferWithTimeOrCount_Default() { Observable.Range(1, 10, DefaultScheduler.Instance).Buffer(TimeSpan.FromDays(1), 3).Skip(1).First().AssertEqual(4, 5, 6); } #endregion #region + Delay + [TestMethod] public void Delay_ArgumentChecking() { var scheduler = new TestScheduler(); var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.Delay(default(IObservable), DateTimeOffset.Now)); ReactiveAssert.Throws(() => Observable.Delay(default(IObservable), TimeSpan.Zero)); ReactiveAssert.Throws(() => Observable.Delay(someObservable, TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.Delay(default(IObservable), DateTimeOffset.Now, scheduler)); ReactiveAssert.Throws(() => Observable.Delay(default(IObservable), TimeSpan.Zero, scheduler)); ReactiveAssert.Throws(() => Observable.Delay(someObservable, DateTimeOffset.Now, null)); ReactiveAssert.Throws(() => Observable.Delay(someObservable, TimeSpan.Zero, null)); ReactiveAssert.Throws(() => Observable.Delay(someObservable, TimeSpan.FromSeconds(-1), scheduler)); } [TestMethod] public void Delay_TimeSpan_Simple1() { Delay_TimeSpan_Simple1_Impl(false); } [TestMethod] public void Delay_TimeSpan_Simple1_Stopwatch() { Delay_TimeSpan_Simple1_Impl(true); } private void Delay_TimeSpan_Simple1_Impl(bool useStopwatch) { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(550) ); var res = scheduler.Start(() => xs.Delay(TimeSpan.FromTicks(100), useStopwatch ? scheduler : scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( OnNext(350, 2), OnNext(450, 3), OnNext(550, 4), OnCompleted(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_DateTimeOffset_Simple1() { Delay_DateTimeOffset_Simple1_Impl(false); } [TestMethod] public void Delay_DateTimeOffset_Simple1_Stopwatch() { Delay_DateTimeOffset_Simple1_Impl(true); } private void Delay_DateTimeOffset_Simple1_Impl(bool useStopwatch) { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(550) ); var res = scheduler.Start(() => xs.Delay(new DateTimeOffset(300, TimeSpan.Zero), useStopwatch ? scheduler : scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( OnNext(350, 2), OnNext(450, 3), OnNext(550, 4), OnCompleted(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_TimeSpan_Simple2() { Delay_TimeSpan_Simple2_Impl(false); } [TestMethod] public void Delay_TimeSpan_Simple2_Stopwatch() { Delay_TimeSpan_Simple2_Impl(true); } private void Delay_TimeSpan_Simple2_Impl(bool useStopwatch) { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(550) ); var res = scheduler.Start(() => xs.Delay(TimeSpan.FromTicks(50), useStopwatch ? scheduler : scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( OnNext(300, 2), OnNext(400, 3), OnNext(500, 4), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_DateTimeOffset_Simple2() { Delay_DateTimeOffset_Simple2_Impl(false); } [TestMethod] public void Delay_DateTimeOffset_Simple2_Stopwatch() { Delay_DateTimeOffset_Simple2_Impl(true); } private void Delay_DateTimeOffset_Simple2_Impl(bool useStopwatch) { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(550) ); var res = scheduler.Start(() => xs.Delay(new DateTimeOffset(250, TimeSpan.Zero), useStopwatch ? scheduler : scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( OnNext(300, 2), OnNext(400, 3), OnNext(500, 4), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_TimeSpan_Simple3() { Delay_TimeSpan_Simple3_Impl(false); } [TestMethod] public void Delay_TimeSpan_Simple3_Stopwatch() { Delay_TimeSpan_Simple3_Impl(true); } private void Delay_TimeSpan_Simple3_Impl(bool useStopwatch) { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(550) ); var res = scheduler.Start(() => xs.Delay(TimeSpan.FromTicks(150), useStopwatch ? scheduler : scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( OnNext(400, 2), OnNext(500, 3), OnNext(600, 4), OnCompleted(700) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_DateTimeOffset_Simple3() { Delay_DateTimeOffset_Simple3_Impl(false); } [TestMethod] public void Delay_DateTimeOffset_Simple3_Stopwatch() { Delay_DateTimeOffset_Simple3_Impl(true); } private void Delay_DateTimeOffset_Simple3_Impl(bool useStopwatch) { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(550) ); var res = scheduler.Start(() => xs.Delay(new DateTimeOffset(350, TimeSpan.Zero), useStopwatch ? scheduler : scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( OnNext(400, 2), OnNext(500, 3), OnNext(600, 4), OnCompleted(700) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_TimeSpan_Error1() { Delay_TimeSpan_Error1_Impl(false); } [TestMethod] public void Delay_TimeSpan_Error1_Stopwatch() { Delay_TimeSpan_Error1_Impl(true); } private void Delay_TimeSpan_Error1_Impl(bool useStopwatch) { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnError(550, ex) ); var res = scheduler.Start(() => xs.Delay(TimeSpan.FromTicks(50), useStopwatch ? scheduler : scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( OnNext(300, 2), OnNext(400, 3), OnNext(500, 4), OnError(550, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_DateTimeOffset_Error1() { Delay_DateTimeOffset_Error1_Impl(false); } [TestMethod] public void Delay_DateTimeOffset_Error1_Stopwatch() { Delay_DateTimeOffset_Error1_Impl(true); } private void Delay_DateTimeOffset_Error1_Impl(bool useStopwatch) { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnError(550, ex) ); var res = scheduler.Start(() => xs.Delay(new DateTimeOffset(250, TimeSpan.Zero), useStopwatch ? scheduler : scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( OnNext(300, 2), OnNext(400, 3), OnNext(500, 4), OnError(550, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_TimeSpan_Error2() { Delay_TimeSpan_Error2_Impl(false); } [TestMethod] public void Delay_TimeSpan_Error2_Stopwatch() { Delay_TimeSpan_Error2_Impl(true); } private void Delay_TimeSpan_Error2_Impl(bool useStopwatch) { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnError(550, ex) ); var res = scheduler.Start(() => xs.Delay(TimeSpan.FromTicks(150), useStopwatch ? scheduler : scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( OnNext(400, 2), OnNext(500, 3), OnError(550, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_DateTimeOffset_Error2() { Delay_DateTimeOffset_Error2_Impl(false); } [TestMethod] public void Delay_DateTimeOffset_Error2_Stopwatch() { Delay_DateTimeOffset_Error2_Impl(true); } private void Delay_DateTimeOffset_Error2_Impl(bool useStopwatch) { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnError(550, ex) ); var res = scheduler.Start(() => xs.Delay(new DateTimeOffset(350, TimeSpan.Zero), useStopwatch ? scheduler : scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( OnNext(400, 2), OnNext(500, 3), OnError(550, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_TimeSpan_Real_Simple1() { Delay_TimeSpan_Real_Simple1_Impl(ThreadPoolScheduler.Instance.DisableOptimizations()); } [TestMethod] public void Delay_TimeSpan_Real_Simple1_Stopwatch() { Delay_TimeSpan_Real_Simple1_Impl(ThreadPoolScheduler.Instance); } private void Delay_TimeSpan_Real_Simple1_Impl(IScheduler scheduler) { var s = new Subject(); var res = s.Delay(TimeSpan.FromMilliseconds(10), scheduler); var lst = new List(); var e = new ManualResetEvent(false); res.Subscribe(lst.Add, () => e.Set()); new Thread(() => { s.OnNext(1); s.OnNext(2); s.OnNext(3); s.OnCompleted(); }).Start(); e.WaitOne(); Assert.IsTrue(new[] { 1, 2, 3 }.SequenceEqual(lst)); } [TestMethod] public void Delay_TimeSpan_Real_Error1() { Delay_TimeSpan_Real_Error1_Impl(ThreadPoolScheduler.Instance.DisableOptimizations()); } [TestMethod] public void Delay_TimeSpan_Real_Error1_Stopwatch() { Delay_TimeSpan_Real_Error1_Impl(ThreadPoolScheduler.Instance); } private void Delay_TimeSpan_Real_Error1_Impl(IScheduler scheduler) { var ex = new Exception(); var s = new Subject(); var res = s.Delay(TimeSpan.FromMilliseconds(10), scheduler); var e = new ManualResetEvent(false); var err = default(Exception); res.Subscribe(_ => { }, ex_ => { err = ex_; e.Set(); }); new Thread(() => { s.OnNext(1); s.OnNext(2); s.OnNext(3); s.OnError(ex); }).Start(); e.WaitOne(); Assert.AreSame(ex, err); } [TestMethod] public void Delay_TimeSpan_Real_Error2() { Delay_TimeSpan_Real_Error2_Impl(ThreadPoolScheduler.Instance.DisableOptimizations()); } [TestMethod] public void Delay_TimeSpan_Real_Error2_Stopwatch() { Delay_TimeSpan_Real_Error2_Impl(ThreadPoolScheduler.Instance); } private void Delay_TimeSpan_Real_Error2_Impl(IScheduler scheduler) { var ex = new Exception(); var s = new Subject(); var res = s.Delay(TimeSpan.FromMilliseconds(10), scheduler); var next = new ManualResetEvent(false); var e = new ManualResetEvent(false); var err = default(Exception); res.Subscribe(_ => { next.Set(); }, ex_ => { err = ex_; e.Set(); }); new Thread(() => { s.OnNext(1); next.WaitOne(); s.OnError(ex); }).Start(); e.WaitOne(); Assert.AreSame(ex, err); } [TestMethod] public void Delay_TimeSpan_Real_Error3() { Delay_TimeSpan_Real_Error3_Impl(ThreadPoolScheduler.Instance.DisableOptimizations()); } [TestMethod] public void Delay_TimeSpan_Real_Error3_Stopwatch() { Delay_TimeSpan_Real_Error3_Impl(ThreadPoolScheduler.Instance); } private void Delay_TimeSpan_Real_Error3_Impl(IScheduler scheduler) { var ex = new Exception(); var s = new Subject(); var res = s.Delay(TimeSpan.FromMilliseconds(10), scheduler); var next = new ManualResetEvent(false); var ack = new ManualResetEvent(false); var e = new ManualResetEvent(false); var err = default(Exception); res.Subscribe(_ => { next.Set(); ack.WaitOne(); }, ex_ => { err = ex_; e.Set(); }); new Thread(() => { s.OnNext(1); next.WaitOne(); s.OnError(ex); ack.Set(); }).Start(); e.WaitOne(); Assert.AreSame(ex, err); } [TestMethod] public void Delay_TimeSpan_Positive() { var scheduler = new TestScheduler(); var msgs = new[] { OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(550) }; var xs = scheduler.CreateHotObservable(msgs); const ushort delay = 42; var res = scheduler.Start(() => xs.Delay(TimeSpan.FromTicks(delay), scheduler) ); var expected = from n in msgs where n.Time > ObservableTest.Subscribed select new Recorded>((ushort)(n.Time + delay), n.Value); res.Messages.AssertEqual(expected); } [TestMethod] public void Delay_Empty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnCompleted(550) ); var res = scheduler.Start(() => xs.Delay(TimeSpan.FromTicks(10), scheduler) ); res.Messages.AssertEqual( OnCompleted(560) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(550, ex) ); var res = scheduler.Start(() => xs.Delay(TimeSpan.FromTicks(10), scheduler) ); res.Messages.AssertEqual( OnError(550, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_Never() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1) ); var res = scheduler.Start(() => xs.Delay(TimeSpan.FromTicks(10), scheduler) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void Delay_TimeSpan_DefaultScheduler() { Assert.IsTrue(Observable.Return(1).Delay(TimeSpan.FromMilliseconds(1)).ToEnumerable().SequenceEqual(new[] { 1 })); } [TestMethod] #if MONODROID [Ignore ("Timer not working?")] #endif public void Delay_DateTimeOffset_DefaultScheduler() { Assert.IsTrue(Observable.Return(1).Delay(DateTimeOffset.UtcNow + TimeSpan.FromMilliseconds(1)).ToEnumerable().SequenceEqual(new[] { 1 })); } [TestMethod] public void Delay_CrossingMessages() { var lst = new List(); var evt = new ManualResetEvent(false); var s = new Subject(); s.Delay(TimeSpan.FromSeconds(0.01)).Subscribe(x => { lst.Add(x); if (x < 9) s.OnNext(x + 1); else s.OnCompleted(); }, () => { evt.Set(); }); s.OnNext(0); evt.WaitOne(); Assert.IsTrue(Enumerable.Range(0, 10).SequenceEqual(lst)); } [TestMethod] public void Delay_Duration_ArgumentChecking() { var someObservable = DummyObservable.Instance; ReactiveAssert.Throws(() => Observable.Delay(default(IObservable), x => someObservable)); ReactiveAssert.Throws(() => Observable.Delay(someObservable, default(Func>))); ReactiveAssert.Throws(() => Observable.Delay(default(IObservable), someObservable, x => someObservable)); ReactiveAssert.Throws(() => Observable.Delay(someObservable, default(IObservable), x => someObservable)); ReactiveAssert.Throws(() => Observable.Delay(someObservable, someObservable, default(Func>))); } [TestMethod] public void Delay_Duration_Simple1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 10), OnNext(220, 30), OnNext(230, 50), OnNext(240, 35), OnNext(250, 20), OnCompleted(260) ); var res = scheduler.Start(() => xs.Delay(x => scheduler.CreateColdObservable(new[] { OnNext(x, "!") })) ); res.Messages.AssertEqual( OnNext(210 + 10, 10), OnNext(220 + 30, 30), OnNext(250 + 20, 20), OnNext(240 + 35, 35), OnNext(230 + 50, 50), OnCompleted(280) ); xs.Subscriptions.AssertEqual( Subscribe(200, 260) ); } [TestMethod] public void Delay_Duration_Simple2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnNext(250, 6), OnCompleted(300) ); var ys = scheduler.CreateColdObservable( OnNext(10, "!") ); var res = scheduler.Start(() => xs.Delay(_ => ys) ); res.Messages.AssertEqual( OnNext(210 + 10, 2), OnNext(220 + 10, 3), OnNext(230 + 10, 4), OnNext(240 + 10, 5), OnNext(250 + 10, 6), OnCompleted(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(210, 220), Subscribe(220, 230), Subscribe(230, 240), Subscribe(240, 250), Subscribe(250, 260) ); } [TestMethod] public void Delay_Duration_Simple3() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnNext(250, 6), OnCompleted(300) ); var ys = scheduler.CreateColdObservable( OnNext(100, "!") ); var res = scheduler.Start(() => xs.Delay(_ => ys) ); res.Messages.AssertEqual( OnNext(210 + 100, 2), OnNext(220 + 100, 3), OnNext(230 + 100, 4), OnNext(240 + 100, 5), OnNext(250 + 100, 6), OnCompleted(350) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(210, 310), Subscribe(220, 320), Subscribe(230, 330), Subscribe(240, 340), Subscribe(250, 350) ); } [TestMethod] public void Delay_Duration_Simple4_InnerEmpty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnNext(250, 6), OnCompleted(300) ); var ys = scheduler.CreateColdObservable( OnCompleted(100) ); var res = scheduler.Start(() => xs.Delay(_ => ys) ); res.Messages.AssertEqual( OnNext(210 + 100, 2), OnNext(220 + 100, 3), OnNext(230 + 100, 4), OnNext(240 + 100, 5), OnNext(250 + 100, 6), OnCompleted(350) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(210, 310), Subscribe(220, 320), Subscribe(230, 330), Subscribe(240, 340), Subscribe(250, 350) ); } [TestMethod] public void Delay_Duration_Dispose1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnNext(250, 6), OnCompleted(300) ); var ys = scheduler.CreateColdObservable( OnNext(200, "!") ); var res = scheduler.Start(() => xs.Delay(_ => ys), 425 ); res.Messages.AssertEqual( OnNext(210 + 200, 2), OnNext(220 + 200, 3) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(210, 410), Subscribe(220, 420), Subscribe(230, 425), Subscribe(240, 425), Subscribe(250, 425) ); } [TestMethod] public void Delay_Duration_Dispose2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(400, 3), OnCompleted(500) ); var ys = scheduler.CreateColdObservable( OnNext(50, "!") ); var res = scheduler.Start(() => xs.Delay(_ => ys), 300 ); res.Messages.AssertEqual( OnNext(210 + 50, 2) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(210, 260) ); } [TestMethod] public void Delay_Duration_OuterError1() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnNext(250, 6), OnError(300, ex) ); var ys = scheduler.CreateColdObservable( OnNext(100, "!") ); var res = scheduler.Start(() => xs.Delay(_ => ys) ); res.Messages.AssertEqual( OnError(300, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(210, 300), Subscribe(220, 300), Subscribe(230, 300), Subscribe(240, 300), Subscribe(250, 300) ); } [TestMethod] public void Delay_Duration_OuterError2() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnNext(250, 6), OnError(300, ex) ); var ys = scheduler.CreateColdObservable( OnNext(10, "!") ); var res = scheduler.Start(() => xs.Delay(_ => ys) ); res.Messages.AssertEqual( OnNext(210 + 10, 2), OnNext(220 + 10, 3), OnNext(230 + 10, 4), OnNext(240 + 10, 5), OnNext(250 + 10, 6), OnError(300, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(210, 220), Subscribe(220, 230), Subscribe(230, 240), Subscribe(240, 250), Subscribe(250, 260) ); } [TestMethod] public void Delay_Duration_InnerError1() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnNext(250, 6), OnCompleted(300) ); var ys = scheduler.CreateColdObservable( OnNext(30, "!") ); var zs = scheduler.CreateColdObservable( OnError(25, ex) ); var res = scheduler.Start(() => xs.Delay(x => x != 5 ? ys : zs) ); res.Messages.AssertEqual( OnNext(210 + 30, 2), OnNext(220 + 30, 3), OnNext(230 + 30, 4), OnError(240 + 25, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 265) ); ys.Subscriptions.AssertEqual( Subscribe(210, 240), Subscribe(220, 250), Subscribe(230, 260), Subscribe(250, 265) ); } [TestMethod] public void Delay_Duration_InnerError2() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(220, 3), OnNext(230, 4), OnNext(240, 5), OnNext(250, 6), OnCompleted(300) ); var ys = scheduler.CreateColdObservable( OnError(100, ex) ); var res = scheduler.Start(() => xs.Delay(_ => ys) ); res.Messages.AssertEqual( OnError(210 + 100, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(210, 310), Subscribe(220, 310), Subscribe(230, 310), Subscribe(240, 310), Subscribe(250, 310) ); } [TestMethod] public void Delay_Duration_SelectorThrows1() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(300, 3), OnNext(350, 4), OnNext(400, 5), OnNext(450, 6), OnCompleted(500) ); var ys = scheduler.CreateColdObservable( OnNext(80, "") ); var res = scheduler.Start(() => xs.Delay(x => { if (x == 4) throw ex; return ys; }) ); res.Messages.AssertEqual( OnNext(330, 2), OnError(350, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 350) ); ys.Subscriptions.AssertEqual( Subscribe(250, 330), Subscribe(300, 350) ); } [TestMethod] public void Delay_Duration_Simple() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(550) ); var res = scheduler.Start(() => xs.Delay(x => scheduler.CreateColdObservable( OnNext(x * 10, "Ignore"), OnNext(x * 10 + 5, "Aargh!") ) ) ); res.Messages.AssertEqual( OnNext(250 + 2 * 10, 2), OnNext(350 + 3 * 10, 3), OnNext(450 + 4 * 10, 4), OnCompleted(550) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_Duration_DeferOnCompleted() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(451) ); var res = scheduler.Start(() => xs.Delay(x => scheduler.CreateColdObservable( OnNext(x * 10, "Ignore"), OnNext(x * 10 + 5, "Aargh!") ) ) ); res.Messages.AssertEqual( OnNext(250 + 2 * 10, 2), OnNext(350 + 3 * 10, 3), OnNext(450 + 4 * 10, 4), OnCompleted(450 + 4 * 10) ); xs.Subscriptions.AssertEqual( Subscribe(200, 451) ); } [TestMethod] public void Delay_Duration_InnerError() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(451) ); var ex = new Exception(); var res = scheduler.Start(() => xs.Delay(x => x < 4 ? scheduler.CreateColdObservable( OnNext(x * 10, "Ignore"), OnNext(x * 10 + 5, "Aargh!") ) : scheduler.CreateColdObservable( OnError(x * 10, ex) ) ) ); res.Messages.AssertEqual( OnNext(250 + 2 * 10, 2), OnNext(350 + 3 * 10, 3), OnError(450 + 4 * 10, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 451) ); } [TestMethod] public void Delay_Duration_OuterError() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnError(460, ex) ); var res = scheduler.Start(() => xs.Delay(x => scheduler.CreateColdObservable( OnNext(x * 10, "Ignore"), OnNext(x * 10 + 5, "Aargh!") ) ) ); res.Messages.AssertEqual( OnNext(250 + 2 * 10, 2), OnNext(350 + 3 * 10, 3), OnError(460, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 460) ); } [TestMethod] public void Delay_Duration_SelectorThrows2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(550) ); var ex = new Exception(); var res = scheduler.Start(() => xs.Delay(x => { if (x < 4) { return scheduler.CreateColdObservable( OnNext(x * 10, "Ignore"), OnNext(x * 10 + 5, "Aargh!") ); } else throw ex; }) ); res.Messages.AssertEqual( OnNext(250 + 2 * 10, 2), OnNext(350 + 3 * 10, 3), OnError(450, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 450) ); } [TestMethod] public void Delay_Duration_InnerDone() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(550) ); var ex = new Exception(); var res = scheduler.Start(() => xs.Delay(x => scheduler.CreateColdObservable( OnCompleted(x * 10) ) ) ); res.Messages.AssertEqual( OnNext(250 + 2 * 10, 2), OnNext(350 + 3 * 10, 3), OnNext(450 + 4 * 10, 4), OnCompleted(550) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Delay_Duration_InnerSubscriptionTimes() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, -1), OnNext(250, 0), OnNext(350, 1), OnNext(450, 2), OnCompleted(550) ); var ys = new[] { scheduler.CreateColdObservable( OnNext(20, 42), OnNext(25, 99) ), scheduler.CreateColdObservable( OnNext(10, 43), OnNext(15, 99) ), scheduler.CreateColdObservable( OnNext(30, 44), OnNext(35, 99) ), }; var res = scheduler.Start(() => xs.Delay(x => ys[x]) ); res.Messages.AssertEqual( OnNext(250 + 20, 0), OnNext(350 + 10, 1), OnNext(450 + 30, 2), OnCompleted(550) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20)); ys[1].Subscriptions.AssertEqual(Subscribe(350, 350 + 10)); ys[2].Subscriptions.AssertEqual(Subscribe(450, 450 + 30)); } [TestMethod] public void Delay_DurationAndSubscription_Simple1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 10), OnNext(220, 30), OnNext(230, 50), OnNext(240, 35), OnNext(250, 20), OnCompleted(260) ); var ys = scheduler.CreateColdObservable( OnNext(10, "!") ); var res = scheduler.Start(() => xs.Delay(ys, x => scheduler.CreateColdObservable(new[] { OnNext(x, "!") })) ); res.Messages.AssertEqual( OnNext(220 + 30, 30), OnNext(250 + 20, 20), OnNext(240 + 35, 35), OnNext(230 + 50, 50), OnCompleted(280) ); xs.Subscriptions.AssertEqual( Subscribe(210, 260) ); ys.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Delay_DurationAndSubscription_Simple2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 10), OnNext(220, 30), OnNext(230, 50), OnNext(240, 35), OnNext(250, 20), OnCompleted(260) ); var ys = scheduler.CreateColdObservable( OnCompleted(10) ); var res = scheduler.Start(() => xs.Delay(ys, x => scheduler.CreateColdObservable(new[] { OnNext(x, "!") })) ); res.Messages.AssertEqual( OnNext(220 + 30, 30), OnNext(250 + 20, 20), OnNext(240 + 35, 35), OnNext(230 + 50, 50), OnCompleted(280) ); xs.Subscriptions.AssertEqual( Subscribe(210, 260) ); ys.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Delay_DurationAndSubscription_Dispose1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 10), OnNext(220, 30), OnNext(230, 50), OnNext(240, 35), OnNext(250, 20), OnCompleted(260) ); var ys = scheduler.CreateColdObservable( OnNext(10, "!") ); var res = scheduler.Start(() => xs.Delay(ys, x => scheduler.CreateColdObservable(new[] { OnNext(x, "!") })), 255 ); res.Messages.AssertEqual( OnNext(220 + 30, 30) ); xs.Subscriptions.AssertEqual( Subscribe(210, 255) ); ys.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Delay_DurationAndSubscription_Dispose2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 10), OnNext(220, 30), OnNext(230, 50), OnNext(240, 35), OnNext(250, 20), OnCompleted(260) ); var ys = scheduler.CreateColdObservable( OnNext(100, "!") ); var res = scheduler.Start(() => xs.Delay(ys, x => scheduler.CreateColdObservable(new[] { OnNext(x, "!") })), 255 ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( ); ys.Subscriptions.AssertEqual( Subscribe(200, 255) ); } [TestMethod] public void Delay_DurationAndSubscription_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 10), OnNext(220, 30), OnNext(230, 50), OnNext(240, 35), OnNext(250, 20), OnCompleted(260) ); var ys = scheduler.CreateColdObservable( OnError(10, ex) ); var res = scheduler.Start(() => xs.Delay(ys, x => scheduler.CreateColdObservable(new[] { OnNext(x, "!") })) ); res.Messages.AssertEqual( OnError(200 + 10, ex) ); xs.Subscriptions.AssertEqual( ); ys.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Delay_ErrorHandling1() { // // Checks for race condition between OnNext and OnError where the latter has a chance to // send out the OnError message before the former gets a chance to run in the delayed // queue. In that case, the OnNext message should not come out. // // See DrainQueue's first _hasFailed check. // var xs = Observable.Create(observer => { observer.OnNext(42); observer.OnError(new Exception()); return () => { }; }); var s = new ImpulseScheduler(); var called = false; var failed = new ManualResetEvent(false); xs.Delay(TimeSpan.FromDays(1), s).Subscribe(_ => { called = true; }, ex => { failed.Set(); }); failed.WaitOne(); s.Event.Set(); s.Done.WaitOne(); Assert.IsFalse(called); } class ImpulseScheduler : IScheduler { public DateTimeOffset Now { get { return DateTimeOffset.UtcNow; } } public IDisposable Schedule(TState state, Func action) { throw new NotImplementedException(); } private ManualResetEvent _event = new ManualResetEvent(false); private ManualResetEvent _done = new ManualResetEvent(false); public ManualResetEvent Event { get { return _event; } } public ManualResetEvent Done { get { return _done; } } public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { Scheduler.Default.Schedule(() => { _event.WaitOne(); action(this, state); _done.Set(); }); return Disposable.Empty; } public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) { throw new NotImplementedException(); } } [TestMethod] public void Delay_LongRunning_CancelEarly() { var xs = Observable.Create(observer => { return Scheduler.Default.Schedule(TimeSpan.FromHours(1), () => { observer.OnNext(42); }); }); var s = new ManualResetEvent(false); var e = new ManualResetEvent(false); var ys = xs.Delay(TimeSpan.Zero, new MyLongRunning1(s, e)); var d = ys.Subscribe(_ => { }); s.WaitOne(); d.Dispose(); e.WaitOne(); } class MyLongRunning1 : LocalScheduler, ISchedulerLongRunning { private ManualResetEvent _start; private ManualResetEvent _stop; public MyLongRunning1(ManualResetEvent start, ManualResetEvent stop) { _start = start; _stop = stop; } public IDisposable ScheduleLongRunning(TState state, Action action) { var b = new BooleanDisposable(); new Thread(() => { _start.Set(); action(state, b); _stop.Set(); }).Start(); return b; } public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { throw new NotImplementedException(); } } [TestMethod] public void Delay_LongRunning_CancelLate() { var xs = Observable.Return(42); var s = new ManualResetEvent(false); var e = new ManualResetEvent(false); var ys = xs.Delay(TimeSpan.FromHours(1), new MyLongRunning2(s, e)); var d = ys.Subscribe(_ => { }); s.WaitOne(); d.Dispose(); e.WaitOne(); } class MyLongRunning2 : LocalScheduler, ISchedulerLongRunning { private ManualResetEvent _start; private ManualResetEvent _stop; public MyLongRunning2(ManualResetEvent start, ManualResetEvent stop) { _start = start; _stop = stop; } public IDisposable ScheduleLongRunning(TState state, Action action) { var b = new BooleanDisposable(); new Thread(() => { action(state, b); _stop.Set(); }).Start(); return b; } public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { _start.Set(); return Disposable.Empty; } } #endregion #region + DelaySubscription + [TestMethod] public void DelaySubscription_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.DelaySubscription(default(IObservable), DateTimeOffset.Now)); ReactiveAssert.Throws(() => Observable.DelaySubscription(default(IObservable), DateTimeOffset.Now, Scheduler.Immediate)); ReactiveAssert.Throws(() => Observable.DelaySubscription(DummyObservable.Instance, DateTimeOffset.Now, default(IScheduler))); ReactiveAssert.Throws(() => Observable.DelaySubscription(default(IObservable), TimeSpan.Zero)); ReactiveAssert.Throws(() => Observable.DelaySubscription(default(IObservable), TimeSpan.Zero, Scheduler.Immediate)); ReactiveAssert.Throws(() => Observable.DelaySubscription(DummyObservable.Instance, TimeSpan.Zero, default(IScheduler))); ReactiveAssert.Throws(() => Observable.DelaySubscription(DummyObservable.Instance, TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.DelaySubscription(DummyObservable.Instance, TimeSpan.FromSeconds(-1), Scheduler.Immediate)); } [TestMethod] public void DelaySubscription_TimeSpan_Default() { var lst = new List(); Observable.Range(0, 10).DelaySubscription(TimeSpan.FromMilliseconds(1)).ForEach(lst.Add); Assert.IsTrue(Enumerable.Range(0, 10).SequenceEqual(lst)); } [TestMethod] public void DelaySubscription_TimeSpan_Simple() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(50, 42), OnNext(60, 43), OnCompleted(70) ); var res = scheduler.Start(() => xs.DelaySubscription(TimeSpan.FromTicks(30), scheduler) ); res.Messages.AssertEqual( OnNext(280, 42), OnNext(290, 43), OnCompleted(300) ); xs.Subscriptions.AssertEqual( Subscribe(230, 300) ); } [TestMethod] public void DelaySubscription_TimeSpan_Error() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(50, 42), OnNext(60, 43), OnError(70, ex) ); var res = scheduler.Start(() => xs.DelaySubscription(TimeSpan.FromTicks(30), scheduler) ); res.Messages.AssertEqual( OnNext(280, 42), OnNext(290, 43), OnError(300, ex) ); xs.Subscriptions.AssertEqual( Subscribe(230, 300) ); } [TestMethod] public void DelaySubscription_DateTimeOffset_Default() { var lst = new List(); Observable.Range(0, 10).DelaySubscription(DateTimeOffset.UtcNow.AddMilliseconds(1)).ForEach(lst.Add); Assert.IsTrue(Enumerable.Range(0, 10).SequenceEqual(lst)); } [TestMethod] public void DelaySubscription_DateTimeOffset_Simple() { var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(50, 42), OnNext(60, 43), OnCompleted(70) ); var res = scheduler.Start(() => xs.DelaySubscription(new DateTimeOffset(230, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnNext(280, 42), OnNext(290, 43), OnCompleted(300) ); xs.Subscriptions.AssertEqual( Subscribe(230, 300) ); } [TestMethod] public void DelaySubscription_DateTimeOffset_Error() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = scheduler.CreateColdObservable( OnNext(50, 42), OnNext(60, 43), OnError(70, ex) ); var res = scheduler.Start(() => xs.DelaySubscription(new DateTimeOffset(230, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnNext(280, 42), OnNext(290, 43), OnError(300, ex) ); xs.Subscriptions.AssertEqual( Subscribe(230, 300) ); } #endregion #region + Generate + [TestMethod] public void Generate_TimeSpan_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, (IScheduler)null)); ReactiveAssert.Throws(() => Observable.Generate(0, (Func)null, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, (Func)null, DummyFunc.Instance, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, (Func)null, DummyFunc.Instance, DummyFunc.Instance, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, (Func)null, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyScheduler.Instance).Subscribe(null)); } [TestMethod] public void Generate_TimeSpan_Finite() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Generate(0, x => x <= 3, x => x + 1, x => x, x => TimeSpan.FromTicks(x + 1), scheduler) ); res.Messages.AssertEqual( OnNext(202, 0), OnNext(204, 1), OnNext(207, 2), OnNext(211, 3), OnCompleted(211) ); } [TestMethod] public void Generate_TimeSpan_Throw_Condition() { var scheduler = new TestScheduler(); var ex = new Exception(); var res = scheduler.Start(() => Observable.Generate(0, new Func(x => { throw ex; }), x => x + 1, x => x, x => TimeSpan.FromTicks(x + 1), scheduler) ); res.Messages.AssertEqual( OnError(201, ex) ); } [TestMethod] public void Generate_TimeSpan_Throw_ResultSelector() { var scheduler = new TestScheduler(); var ex = new Exception(); var res = scheduler.Start(() => Observable.Generate(0, x => true, x => x + 1, new Func(x => { throw ex; }), x => TimeSpan.FromTicks(x + 1), scheduler) ); res.Messages.AssertEqual( OnError(201, ex) ); } [TestMethod] public void Generate_TimeSpan_Throw_Iterate() { var scheduler = new TestScheduler(); var ex = new Exception(); var res = scheduler.Start(() => Observable.Generate(0, x => true, new Func(x => { throw ex; }), x => x, x => TimeSpan.FromTicks(x + 1), scheduler) ); res.Messages.AssertEqual( OnNext(202, 0), OnError(202, ex) ); } [TestMethod] public void Generate_TimeSpan_Throw_TimeSelector() { var scheduler = new TestScheduler(); var ex = new Exception(); var res = scheduler.Start(() => Observable.Generate(0, x => true, x => x + 1, x => x, new Func(x => { throw ex; }), scheduler) ); res.Messages.AssertEqual( OnError(201, ex) ); } [TestMethod] public void Generate_TimeSpan_Dispose() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Generate(0, x => true, x => x + 1, x => x, x => TimeSpan.FromTicks(x + 1), scheduler), 210 ); res.Messages.AssertEqual( OnNext(202, 0), OnNext(204, 1), OnNext(207, 2) ); } [TestMethod] public void Generate_TimeSpan_DefaultScheduler_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Generate(0, (Func)null, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, (Func)null, DummyFunc.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, (Func)null, DummyFunc.Instance, DummyFunc.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, (Func)null)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance).Subscribe(null)); } [TestMethod] public void Generate_TimeSpan_DefaultScheduler() { Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => TimeSpan.FromMilliseconds(x)).AssertEqual(Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => TimeSpan.FromMilliseconds(x), DefaultScheduler.Instance)); } [TestMethod] public void Generate_DateTimeOffset_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, (IScheduler)null)); ReactiveAssert.Throws(() => Observable.Generate(0, (Func)null, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, (Func)null, DummyFunc.Instance, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, (Func)null, DummyFunc.Instance, DummyFunc.Instance, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, (Func)null, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyScheduler.Instance).Subscribe(null)); } [TestMethod] public void Generate_DateTimeOffset_Finite() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Generate(0, x => x <= 3, x => x + 1, x => x, x => scheduler.Now.AddTicks(x + 1), scheduler) ); res.Messages.AssertEqual( OnNext(202, 0), OnNext(204, 1), OnNext(207, 2), OnNext(211, 3), OnCompleted(211) ); } [TestMethod] public void Generate_DateTimeOffset_Throw_Condition() { var scheduler = new TestScheduler(); var ex = new Exception(); var res = scheduler.Start(() => Observable.Generate(0, new Func(x => { throw ex; }), x => x + 1, x => x, x => scheduler.Now.AddTicks(x + 1), scheduler) ); res.Messages.AssertEqual( OnError(201, ex) ); } [TestMethod] public void Generate_DateTimeOffset_Throw_ResultSelector() { var scheduler = new TestScheduler(); var ex = new Exception(); var res = scheduler.Start(() => Observable.Generate(0, x => true, x => x + 1, new Func(x => { throw ex; }), x => scheduler.Now.AddTicks(x + 1), scheduler) ); res.Messages.AssertEqual( OnError(201, ex) ); } [TestMethod] public void Generate_DateTimeOffset_Throw_Iterate() { var scheduler = new TestScheduler(); var ex = new Exception(); var res = scheduler.Start(() => Observable.Generate(0, x => true, new Func(x => { throw ex; }), x => x, x => scheduler.Now.AddTicks(x + 1), scheduler) ); res.Messages.AssertEqual( OnNext(202, 0), OnError(202, ex) ); } [TestMethod] public void Generate_DateTimeOffset_Throw_TimeSelector() { var scheduler = new TestScheduler(); var ex = new Exception(); var res = scheduler.Start(() => Observable.Generate(0, x => true, x => x + 1, x => x, new Func(x => { throw ex; }), scheduler) ); res.Messages.AssertEqual( OnError(201, ex) ); } [TestMethod] public void Generate_DateTimeOffset_Dispose() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Generate(0, x => true, x => x + 1, x => x, x => scheduler.Now.AddTicks(x + 1), scheduler), 210 ); res.Messages.AssertEqual( OnNext(202, 0), OnNext(204, 1), OnNext(207, 2) ); } [TestMethod] public void Generate_DateTimeOffset_DefaultScheduler_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Generate(0, (Func)null, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, (Func)null, DummyFunc.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, (Func)null, DummyFunc.Instance, DummyFunc.Instance)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, (Func)null)); ReactiveAssert.Throws(() => Observable.Generate(0, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance, DummyFunc.Instance).Subscribe(null)); } [TestMethod] public void Generate_DateTimeOffset_DefaultScheduler() { Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => DateTimeOffset.Now.AddMilliseconds(x)).AssertEqual(Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => DateTimeOffset.Now.AddMilliseconds(x), DefaultScheduler.Instance)); } #endregion #region + Interval + [TestMethod] public void Interval_TimeSpan_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Interval(TimeSpan.Zero, null)); ReactiveAssert.Throws(() => Observable.Interval(TimeSpan.Zero, DummyScheduler.Instance).Subscribe(null)); ReactiveAssert.Throws(() => Observable.Interval(TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.Interval(TimeSpan.FromSeconds(-1), DummyScheduler.Instance)); } [TestMethod] public void Interval_TimeSpan_Basic() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Interval(TimeSpan.FromTicks(100), scheduler) ); res.Messages.AssertEqual( OnNext(300, 0L), OnNext(400, 1L), OnNext(500, 2L), OnNext(600, 3L), OnNext(700, 4L), OnNext(800, 5L), OnNext(900, 6L) ); } [TestMethod] public void Interval_TimeSpan_Zero() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Interval(TimeSpan.FromTicks(0), scheduler), 210 ); res.Messages.AssertEqual( OnNext(201, 0L), OnNext(202, 1L), OnNext(203, 2L), OnNext(204, 3L), OnNext(205, 4L), OnNext(206, 5L), OnNext(207, 6L), OnNext(208, 7L), OnNext(209, 8L) ); } [TestMethod] public void Interval_TimeSpan_Zero_DefaultScheduler() { var scheduler = new TestScheduler(); var observer = scheduler.CreateObserver(); var completed = new ManualResetEventSlim(); Observable.Interval(TimeSpan.Zero).TakeWhile(i => i < 10).Subscribe(observer.OnNext, completed.Set); completed.Wait(); Assert.AreEqual(10, observer.Messages.Count); } [TestMethod] public void Interval_TimeSpan_Disposed() { var scheduler = new TestScheduler(); var res = scheduler.Start( () => Observable.Interval(TimeSpan.FromTicks(1000), scheduler) ); res.Messages.AssertEqual( ); } [TestMethod] public void Interval_TimeSpan_ObserverThrows() { var scheduler = new TestScheduler(); var xs = Observable.Interval(TimeSpan.FromTicks(1), scheduler); xs.Subscribe(x => { throw new InvalidOperationException(); }); ReactiveAssert.Throws(() => scheduler.Start()); } [TestMethod] public void Interval_TimeSpan_DefaultScheduler() { Assert.IsTrue(Observable.Interval(TimeSpan.FromMilliseconds(1)).ToEnumerable().Take(3).SequenceEqual(new[] { 0L, 1L, 2L })); } #endregion #region + Sample + [TestMethod] public void Sample_ArgumentChecking() { var scheduler = new TestScheduler(); var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.Sample(default(IObservable), TimeSpan.Zero)); ReactiveAssert.Throws(() => Observable.Sample(default(IObservable), TimeSpan.Zero, scheduler)); ReactiveAssert.Throws(() => Observable.Sample(someObservable, TimeSpan.Zero, null)); ReactiveAssert.Throws(() => Observable.Sample(someObservable, TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.Sample(someObservable, TimeSpan.FromSeconds(-1), scheduler)); ReactiveAssert.Throws(() => Observable.Sample(default(IObservable), someObservable)); ReactiveAssert.Throws(() => Observable.Sample(someObservable, default(IObservable))); } [TestMethod] public void Sample_Regular() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(230, 3), OnNext(260, 4), OnNext(300, 5), OnNext(350, 6), OnNext(380, 7), OnCompleted(390) ); var res = scheduler.Start(() => xs.Sample(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnNext(250, 3), OnNext(300, 5), /* CHECK: boundary of sampling */ OnNext(350, 6), OnNext(400, 7), /* Sample in last bucket */ OnCompleted(400) ); xs.Subscriptions.AssertEqual( Subscribe(200, 390) ); } [TestMethod] public void Sample_Periodic_Regular() { var scheduler = new PeriodicTestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(230, 3), OnNext(260, 4), OnNext(300, 5), OnNext(350, 6), OnNext(380, 7), OnCompleted(390) ); var res = scheduler.Start(() => xs.Sample(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnNext(250, 3), OnNext(300, 5), /* CHECK: boundary of sampling */ OnNext(350, 6), OnNext(400, 7), /* Sample in last bucket */ OnCompleted(400) ); xs.Subscriptions.AssertEqual( Subscribe(200, 390) ); scheduler.Timers.AssertEqual( new TimerRun(200, 400) { 250, 300, 350, 400 } ); } [TestMethod] public void Sample_ErrorInFlight() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(230, 3), OnNext(260, 4), OnNext(300, 5), OnNext(310, 6), OnError(330, ex) ); var res = scheduler.Start(() => xs.Sample(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnNext(250, 3), OnNext(300, 5), /* CHECK: boundary of sampling */ OnError(330, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 330) ); } [TestMethod] public void Sample_Periodic_ErrorInFlight() { var scheduler = new PeriodicTestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(230, 3), OnNext(260, 4), OnNext(300, 5), OnNext(310, 6), OnError(330, ex) ); var res = scheduler.Start(() => xs.Sample(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnNext(250, 3), OnNext(300, 5), /* CHECK: boundary of sampling */ OnError(330, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 330) ); scheduler.Timers.AssertEqual( new TimerRun(200, 330) { 250, 300 } ); } [TestMethod] public void Sample_Empty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnCompleted(300) ); var res = scheduler.Start(() => xs.Sample(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnCompleted(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void Sample_Periodic_Empty() { var scheduler = new PeriodicTestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnCompleted(300) ); var res = scheduler.Start(() => xs.Sample(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnCompleted(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); scheduler.Timers.AssertEqual( new TimerRun(200, 300) { 250, 300 } ); } [TestMethod] public void Sample_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(300, ex) ); var res = scheduler.Start(() => xs.Sample(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnError(300, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void Sample_Periodic_Error() { var scheduler = new PeriodicTestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(300, ex) ); var res = scheduler.Start(() => xs.Sample(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnError(300, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); scheduler.Timers.AssertEqual( new TimerRun(200, 300) { 250 } ); } [TestMethod] public void Sample_Never() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1) ); var res = scheduler.Start(() => xs.Sample(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void Sample_Periodic_Never() { var scheduler = new PeriodicTestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1) ); var res = scheduler.Start(() => xs.Sample(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); scheduler.Timers.AssertEqual( new TimerRun(200, 1000) { 250, 300, 350, 400, 450, 500, 550, 600, 650, 700, 750, 800, 850, 900, 950 } ); } [TestMethod] public void Sample_DefaultScheduler_Periodic() { var res = Observable.Return(42).Sample(TimeSpan.FromMilliseconds(1)).ToEnumerable().Single(); Assert.AreEqual(42, res); } [TestMethod] public void Sample_DefaultScheduler_PeriodicDisabled() { var res = Observable.Return(42).Sample(TimeSpan.FromMilliseconds(1), Scheduler.Default.DisableOptimizations()).ToEnumerable().Single(); Assert.AreEqual(42, res); } [TestMethod] public void Sample_Sampler_Simple1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(220, 2), OnNext(240, 3), OnNext(290, 4), OnNext(300, 5), OnNext(310, 6), OnCompleted(400) ); var ys = scheduler.CreateHotObservable( OnNext(150, ""), OnNext(210, "bar"), OnNext(250, "foo"), OnNext(260, "qux"), OnNext(320, "baz"), OnCompleted(500) ); var res = scheduler.Start(() => xs.Sample(ys) ); res.Messages.AssertEqual( OnNext(250, 3), OnNext(320, 6), OnCompleted(500 /* on sampling boundaries only */) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); ys.Subscriptions.AssertEqual( Subscribe(200, 500) ); } [TestMethod] public void Sample_Sampler_Simple2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(220, 2), OnNext(240, 3), OnNext(290, 4), OnNext(300, 5), OnNext(310, 6), OnNext(360, 7), OnCompleted(400) ); var ys = scheduler.CreateHotObservable( OnNext(150, ""), OnNext(210, "bar"), OnNext(250, "foo"), OnNext(260, "qux"), OnNext(320, "baz"), OnCompleted(500) ); var res = scheduler.Start(() => xs.Sample(ys) ); res.Messages.AssertEqual( OnNext(250, 3), OnNext(320, 6), OnNext(500, 7), OnCompleted(500 /* on sampling boundaries only */) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); ys.Subscriptions.AssertEqual( Subscribe(200, 500) ); } [TestMethod] public void Sample_Sampler_Simple3() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(220, 2), OnNext(240, 3), OnNext(290, 4), OnCompleted(300) ); var ys = scheduler.CreateHotObservable( OnNext(150, ""), OnNext(210, "bar"), OnNext(250, "foo"), OnNext(260, "qux"), OnNext(320, "baz"), OnCompleted(500) ); var res = scheduler.Start(() => xs.Sample(ys) ); res.Messages.AssertEqual( OnNext(250, 3), OnNext(320, 4), OnCompleted(320 /* on sampling boundaries only */) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(200, 320) ); } [TestMethod] public void Sample_Sampler_SourceThrows() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(220, 2), OnNext(240, 3), OnNext(290, 4), OnNext(300, 5), OnNext(310, 6), OnError(320, ex) ); var ys = scheduler.CreateHotObservable( OnNext(150, ""), OnNext(210, "bar"), OnNext(250, "foo"), OnNext(260, "qux"), OnNext(330, "baz"), OnCompleted(400) ); var res = scheduler.Start(() => xs.Sample(ys) ); res.Messages.AssertEqual( OnNext(250, 3), OnError(320, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 320) ); ys.Subscriptions.AssertEqual( Subscribe(200, 320) ); } #if !NO_PERF // BREAKING CHANGE v2 > v1.x - behavior when sampler throws [TestMethod] public void Sample_Sampler_SamplerThrows() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(220, 2), OnNext(240, 3), OnNext(290, 4), OnNext(300, 5), OnNext(310, 6), OnCompleted(400) ); var ys = scheduler.CreateHotObservable( OnNext(150, ""), OnNext(210, "bar"), OnNext(250, "foo"), OnNext(260, "qux"), OnError(320, ex) ); var res = scheduler.Start(() => xs.Sample(ys) ); res.Messages.AssertEqual( OnNext(250, 3), OnError(320, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 320) ); ys.Subscriptions.AssertEqual( Subscribe(200, 320) ); } #endif #endregion #region + Skip + [TestMethod] public void Skip_ArgumentChecking() { var xs = Observable.Return(42); ReactiveAssert.Throws(() => Observable.Skip(default(IObservable), TimeSpan.FromSeconds(1))); ReactiveAssert.Throws(() => Observable.Skip(xs, TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.Skip(default(IObservable), TimeSpan.FromSeconds(1), Scheduler.Default)); ReactiveAssert.Throws(() => Observable.Skip(xs, TimeSpan.FromSeconds(1), default(IScheduler))); ReactiveAssert.Throws(() => Observable.Skip(xs, TimeSpan.FromSeconds(-1), Scheduler.Default)); } [TestMethod] public void Skip_Zero() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.Skip(TimeSpan.Zero, scheduler) ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void Skip_Some() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.Skip(TimeSpan.FromTicks(15), scheduler) ); res.Messages.AssertEqual( OnNext(220, 2), OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void Skip_Late() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.Skip(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void Skip_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnError(210, ex) ); var res = scheduler.Start(() => xs.Skip(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnError(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Skip_Never() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( ); var res = scheduler.Start(() => xs.Skip(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void Skip_Twice1() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnCompleted(270) ); var res = scheduler.Start(() => xs.Skip(TimeSpan.FromTicks(15), scheduler).Skip(TimeSpan.FromTicks(30), scheduler) ); res.Messages.AssertEqual( OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnCompleted(270) ); xs.Subscriptions.AssertEqual( Subscribe(200, 270) ); } [TestMethod] public void Skip_Twice2() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnCompleted(270) ); var res = scheduler.Start(() => xs.Skip(TimeSpan.FromTicks(30), scheduler).Skip(TimeSpan.FromTicks(15), scheduler) ); res.Messages.AssertEqual( OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnCompleted(270) ); xs.Subscriptions.AssertEqual( Subscribe(200, 270) ); } [TestMethod] public void Skip_Default() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.Skip(TimeSpan.FromSeconds(60)); var e = new ManualResetEvent(false); var lst = new List(); res.Subscribe( lst.Add, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.Count == 0); } #endregion #region + SkipLast + [TestMethod] public void SkipLast_ArgumentChecking() { var xs = Observable.Return(42); ReactiveAssert.Throws(() => Observable.SkipLast(default(IObservable), TimeSpan.FromSeconds(1))); ReactiveAssert.Throws(() => Observable.SkipLast(xs, TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.SkipLast(default(IObservable), TimeSpan.FromSeconds(1), Scheduler.Default)); ReactiveAssert.Throws(() => Observable.SkipLast(xs, TimeSpan.FromSeconds(1), default(IScheduler))); ReactiveAssert.Throws(() => Observable.SkipLast(xs, TimeSpan.FromSeconds(-1), Scheduler.Default)); } [TestMethod] public void SkipLast_Zero1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.SkipLast(TimeSpan.Zero, scheduler) ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void SkipLast_Zero2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(230) ); var res = scheduler.Start(() => xs.SkipLast(TimeSpan.Zero, scheduler) ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void SkipLast_Some1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.SkipLast(TimeSpan.FromTicks(15), scheduler) ); res.Messages.AssertEqual( OnNext(230, 1), OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void SkipLast_Some2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnNext(270, 7), OnNext(280, 8), OnNext(290, 9), OnCompleted(300) ); var res = scheduler.Start(() => xs.SkipLast(TimeSpan.FromTicks(45), scheduler) ); res.Messages.AssertEqual( OnNext(260, 1), OnNext(270, 2), OnNext(280, 3), OnNext(290, 4), OnNext(300, 5), OnCompleted(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void SkipLast_All() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.SkipLast(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void SkipLast_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnError(210, ex) ); var res = scheduler.Start(() => xs.SkipLast(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnError(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void SkipLast_Never() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( ); var res = scheduler.Start(() => xs.SkipLast(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void SkipLast_Default1() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.SkipLast(TimeSpan.FromSeconds(60)); var e = new ManualResetEvent(false); var lst = new List(); res.Subscribe( lst.Add, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.Count == 0); } [TestMethod] public void SkipLast_Default2() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.SkipLast(TimeSpan.FromSeconds(60), Scheduler.Default.DisableOptimizations()); var e = new ManualResetEvent(false); var lst = new List(); res.Subscribe( lst.Add, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.Count == 0); } [TestMethod] public void SkipLast_Default3() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.SkipLast(TimeSpan.Zero); var e = new ManualResetEvent(false); var lst = new List(); res.Subscribe( lst.Add, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.SequenceEqual(Enumerable.Range(0, 10))); } [TestMethod] public void SkipLast_Default4() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.SkipLast(TimeSpan.Zero, Scheduler.Default.DisableOptimizations()); var e = new ManualResetEvent(false); var lst = new List(); res.Subscribe( lst.Add, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.SequenceEqual(Enumerable.Range(0, 10))); } #endregion #region + SkipUntil + [TestMethod] public void SkipUntil_ArgumentChecking() { var xs = Observable.Return(42); ReactiveAssert.Throws(() => Observable.SkipUntil(default(IObservable), DateTimeOffset.Now)); ReactiveAssert.Throws(() => Observable.SkipUntil(default(IObservable), DateTimeOffset.Now, Scheduler.Default)); ReactiveAssert.Throws(() => Observable.SkipUntil(xs, DateTimeOffset.Now, default(IScheduler))); } [TestMethod] public void SkipUntil_Zero() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.SkipUntil(new DateTimeOffset(), scheduler) ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void SkipUntil_Some() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.SkipUntil(new DateTimeOffset(215, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnNext(220, 2), OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void SkipUntil_Late() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.SkipUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void SkipUntil_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnError(210, ex) ); var res = scheduler.Start(() => xs.SkipUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnError(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void SkipUntil_Never() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( ); var res = scheduler.Start(() => xs.SkipUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void SkipUntil_Twice1() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnCompleted(270) ); var res = scheduler.Start(() => xs.SkipUntil(new DateTimeOffset(215, TimeSpan.Zero), scheduler).SkipUntil(new DateTimeOffset(230, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnCompleted(270) ); xs.Subscriptions.AssertEqual( Subscribe(200, 270) ); } [TestMethod] public void SkipUntil_Twice2() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnCompleted(270) ); var res = scheduler.Start(() => xs.SkipUntil(new DateTimeOffset(230, TimeSpan.Zero), scheduler).SkipUntil(new DateTimeOffset(215, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnCompleted(270) ); xs.Subscriptions.AssertEqual( Subscribe(200, 270) ); } [TestMethod] public void SkipUntil_Default() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.SkipUntil(DateTimeOffset.UtcNow.AddMinutes(1)); var e = new ManualResetEvent(false); var lst = new List(); res.Subscribe( lst.Add, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.Count == 0); } #endregion #region + Take + [TestMethod] public void Take_ArgumentChecking() { var xs = Observable.Return(42); ReactiveAssert.Throws(() => Observable.Take(default(IObservable), TimeSpan.FromSeconds(1))); ReactiveAssert.Throws(() => Observable.Take(xs, TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.Take(default(IObservable), TimeSpan.FromSeconds(1), Scheduler.Default)); ReactiveAssert.Throws(() => Observable.Take(xs, TimeSpan.FromSeconds(1), default(IScheduler))); ReactiveAssert.Throws(() => Observable.Take(xs, TimeSpan.FromSeconds(-1), Scheduler.Default)); } [TestMethod] public void Take_Zero() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.Take(TimeSpan.Zero, scheduler) ); res.Messages.AssertEqual( OnCompleted(201) ); xs.Subscriptions.AssertEqual( Subscribe(200, 201) ); } [TestMethod] public void Take_Some() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(240) ); var res = scheduler.Start(() => xs.Take(TimeSpan.FromTicks(25), scheduler) ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnCompleted(225) ); xs.Subscriptions.AssertEqual( Subscribe(200, 225) ); } [TestMethod] public void Take_Late() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.Take(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void Take_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnError(210, ex) ); var res = scheduler.Start(() => xs.Take(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnError(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void Take_Never() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( ); var res = scheduler.Start(() => xs.Take(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void Take_Twice1() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnCompleted(270) ); var res = scheduler.Start(() => xs.Take(TimeSpan.FromTicks(55), scheduler).Take(TimeSpan.FromTicks(35), scheduler) ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(235) ); xs.Subscriptions.AssertEqual( Subscribe(200, 235) ); } [TestMethod] public void Take_Twice2() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnCompleted(270) ); var res = scheduler.Start(() => xs.Take(TimeSpan.FromTicks(35), scheduler).Take(TimeSpan.FromTicks(55), scheduler) ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(235) ); xs.Subscriptions.AssertEqual( Subscribe(200, 235) ); } [TestMethod] public void Take_Default() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.Take(TimeSpan.FromSeconds(60)); var e = new ManualResetEvent(false); var lst = new List(); res.Subscribe( lst.Add, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.SequenceEqual(Enumerable.Range(0, 10))); } #endregion #region + TakeLast + [TestMethod] public void TakeLast_ArgumentChecking() { var xs = Observable.Return(42); ReactiveAssert.Throws(() => Observable.TakeLast(default(IObservable), TimeSpan.FromSeconds(1))); ReactiveAssert.Throws(() => Observable.TakeLast(xs, TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.TakeLast(default(IObservable), TimeSpan.FromSeconds(1), Scheduler.Default)); ReactiveAssert.Throws(() => Observable.TakeLast(xs, TimeSpan.FromSeconds(1), default(IScheduler))); ReactiveAssert.Throws(() => Observable.TakeLast(xs, TimeSpan.FromSeconds(-1), Scheduler.Default)); ReactiveAssert.Throws(() => Observable.TakeLast(default(IObservable), TimeSpan.FromSeconds(1), Scheduler.Default, Scheduler.Default)); ReactiveAssert.Throws(() => Observable.TakeLast(xs, TimeSpan.FromSeconds(1), default(IScheduler), Scheduler.Default)); ReactiveAssert.Throws(() => Observable.TakeLast(xs, TimeSpan.FromSeconds(1), Scheduler.Default, default(IScheduler))); ReactiveAssert.Throws(() => Observable.TakeLast(xs, TimeSpan.FromSeconds(-1), Scheduler.Default, Scheduler.Default)); } [TestMethod] public void TakeLast_Zero1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.Zero, scheduler) ); res.Messages.AssertEqual( OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void TakeLast_Zero1_WithLoopScheduler() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.Zero, scheduler, scheduler) ); res.Messages.AssertEqual( OnCompleted(231) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void TakeLast_Zero2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.Zero, scheduler) ); res.Messages.AssertEqual( OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void TakeLast_Zero2_WithLoopScheduler() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.Zero, scheduler, scheduler) ); res.Messages.AssertEqual( OnCompleted(231) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void TakeLast_Some1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(240) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(25), scheduler) ); res.Messages.AssertEqual( OnNext(240, 2), OnNext(240, 3), OnCompleted(240) ); xs.Subscriptions.AssertEqual( Subscribe(200, 240) ); } [TestMethod] public void TakeLast_Some1_WithLoopScheduler() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(240) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(25), scheduler, scheduler) ); res.Messages.AssertEqual( OnNext(241, 2), OnNext(242, 3), OnCompleted(243) ); xs.Subscriptions.AssertEqual( Subscribe(200, 240) ); } [TestMethod] public void TakeLast_Some2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(300) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(25), scheduler) ); res.Messages.AssertEqual( OnCompleted(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void TakeLast_Some2_WithLoopScheduler() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(300) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(25), scheduler, scheduler) ); res.Messages.AssertEqual( OnCompleted(301) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void TakeLast_Some3() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnNext(270, 7), OnNext(280, 8), OnNext(290, 9), OnCompleted(300) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(45), scheduler) ); res.Messages.AssertEqual( OnNext(300, 6), OnNext(300, 7), OnNext(300, 8), OnNext(300, 9), OnCompleted(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void TakeLast_Some3_WithLoopScheduler() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnNext(270, 7), OnNext(280, 8), OnNext(290, 9), OnCompleted(300) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(45), scheduler, scheduler) ); res.Messages.AssertEqual( OnNext(301, 6), OnNext(302, 7), OnNext(303, 8), OnNext(304, 9), OnCompleted(305) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void TakeLast_Some4() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(240, 2), OnNext(250, 3), OnNext(280, 4), OnNext(290, 5), OnNext(300, 6), OnCompleted(350) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(25), scheduler) ); res.Messages.AssertEqual( OnCompleted(350) ); xs.Subscriptions.AssertEqual( Subscribe(200, 350) ); } [TestMethod] public void TakeLast_Some4_WithLoopScheduler() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(240, 2), OnNext(250, 3), OnNext(280, 4), OnNext(290, 5), OnNext(300, 6), OnCompleted(350) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(25), scheduler, scheduler) ); res.Messages.AssertEqual( OnCompleted(351) ); xs.Subscriptions.AssertEqual( Subscribe(200, 350) ); } [TestMethod] public void TakeLast_All() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnNext(230, 1), OnNext(230, 2), OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void TakeLast_All_WithLoopScheduler() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(50), scheduler, scheduler) ); res.Messages.AssertEqual( OnNext(231, 1), OnNext(232, 2), OnCompleted(233) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void TakeLast_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnError(210, ex) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnError(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void TakeLast_Error_WithLoopScheduler() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnError(210, ex) ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(50), scheduler, scheduler) ); res.Messages.AssertEqual( OnError(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void TakeLast_Never() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void TakeLast_Never_WithLoopScheduler() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( ); var res = scheduler.Start(() => xs.TakeLast(TimeSpan.FromTicks(50), scheduler, scheduler) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void TakeLast_Default1() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.TakeLast(TimeSpan.FromSeconds(60)); var e = new ManualResetEvent(false); var lst = new List(); res.Subscribe( lst.Add, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.SequenceEqual(Enumerable.Range(0, 10))); } [TestMethod] public void TakeLast_Default2() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.TakeLast(TimeSpan.FromSeconds(60), Scheduler.Default.DisableOptimizations()); var e = new ManualResetEvent(false); var lst = new List(); res.Subscribe( lst.Add, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.SequenceEqual(Enumerable.Range(0, 10))); } [TestMethod] public void TakeLast_Default3() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.TakeLast(TimeSpan.Zero); var e = new ManualResetEvent(false); var lst = new List(); res.Subscribe( lst.Add, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.Count == 0); } [TestMethod] public void TakeLast_Default4() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.TakeLast(TimeSpan.Zero, Scheduler.Default.DisableOptimizations()); var e = new ManualResetEvent(false); var lst = new List(); res.Subscribe( lst.Add, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.Count == 0); } [TestMethod] public void TakeLast_LongRunning_Regular() { var res = Observable.Range(0, 10, Scheduler.Default).TakeLast(TimeSpan.FromSeconds(60), Scheduler.Default, NewThreadScheduler.Default); var lst = new List(); res.ForEach(lst.Add); Assert.IsTrue(Enumerable.Range(0, 10).SequenceEqual(lst)); } #endregion #region + TakeLastBuffer + [TestMethod] public void TakeLastBuffer_ArgumentChecking() { var xs = Observable.Return(42); ReactiveAssert.Throws(() => Observable.TakeLastBuffer(default(IObservable), TimeSpan.FromSeconds(1))); ReactiveAssert.Throws(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.TakeLastBuffer(default(IObservable), TimeSpan.FromSeconds(1), Scheduler.Default)); ReactiveAssert.Throws(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(1), default(IScheduler))); ReactiveAssert.Throws(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(-1), Scheduler.Default)); } [TestMethod] public void TakeLastBuffer_Zero1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.Zero, scheduler) ); res.Messages.AssertEqual( OnNext>(230, lst => lst.Count == 0), OnCompleted>(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void TakeLastBuffer_Zero2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.Zero, scheduler) ); res.Messages.AssertEqual( OnNext>(230, lst => lst.Count == 0), OnCompleted>(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void TakeLastBuffer_Some1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(240) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler) ); res.Messages.AssertEqual( OnNext>(240, lst => lst.SequenceEqual(new[] { 2, 3 })), OnCompleted>(240) ); xs.Subscriptions.AssertEqual( Subscribe(200, 240) ); } [TestMethod] public void TakeLastBuffer_Some2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(300) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler) ); res.Messages.AssertEqual( OnNext>(300, lst => lst.Count == 0), OnCompleted>(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void TakeLastBuffer_Some3() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnNext(270, 7), OnNext(280, 8), OnNext(290, 9), OnCompleted(300) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(45), scheduler) ); res.Messages.AssertEqual( OnNext>(300, lst => lst.SequenceEqual(new[] { 6, 7, 8, 9 })), OnCompleted>(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void TakeLastBuffer_Some4() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(240, 2), OnNext(250, 3), OnNext(280, 4), OnNext(290, 5), OnNext(300, 6), OnCompleted(350) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler) ); res.Messages.AssertEqual( OnNext>(350, lst => lst.Count == 0), OnCompleted>(350) ); xs.Subscriptions.AssertEqual( Subscribe(200, 350) ); } [TestMethod] public void TakeLastBuffer_All() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnNext>(230, lst => lst.SequenceEqual(new[] { 1, 2 })), OnCompleted>(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void TakeLastBuffer_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnError(210, ex) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnError>(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void TakeLastBuffer_Never() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void TakeLastBuffer_Default1() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.TakeLastBuffer(TimeSpan.FromSeconds(60)).SingleAsync(); var e = new ManualResetEvent(false); var lst = default (IList); res.Subscribe( x => lst = x, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.SequenceEqual(Enumerable.Range(0, 10))); } [TestMethod] public void TakeLastBuffer_Default2() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.TakeLastBuffer(TimeSpan.FromSeconds(60), Scheduler.Default.DisableOptimizations()).SingleAsync(); var e = new ManualResetEvent(false); var lst = default(IList); res.Subscribe( x => lst = x, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.SequenceEqual(Enumerable.Range(0, 10))); } [TestMethod] public void TakeLastBuffer_Default3() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.TakeLastBuffer(TimeSpan.Zero).SingleAsync(); var e = new ManualResetEvent(false); var lst = default(IList); res.Subscribe( x => lst = x, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.Count == 0); } [TestMethod] public void TakeLastBuffer_Default4() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.TakeLastBuffer(TimeSpan.Zero, Scheduler.Default.DisableOptimizations()).SingleAsync(); var e = new ManualResetEvent(false); var lst = default(IList); res.Subscribe( x => lst = x, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.Count == 0); } #endregion #region + TakeUntil + [TestMethod] public void TakeUntil_ArgumentChecking() { var xs = Observable.Return(42); ReactiveAssert.Throws(() => Observable.TakeUntil(default(IObservable), DateTimeOffset.Now)); ReactiveAssert.Throws(() => Observable.TakeUntil(default(IObservable), DateTimeOffset.Now, Scheduler.Default)); ReactiveAssert.Throws(() => Observable.TakeUntil(xs, DateTimeOffset.Now, default(IScheduler))); } [TestMethod] public void TakeUntil_Zero() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeUntil(new DateTimeOffset(), scheduler) ); res.Messages.AssertEqual( OnCompleted(201) ); xs.Subscriptions.AssertEqual( Subscribe(200, 201) ); } [TestMethod] public void TakeUntil_Some() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(240) ); var res = scheduler.Start(() => xs.TakeUntil(new DateTimeOffset(225, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnCompleted(225) ); xs.Subscriptions.AssertEqual( Subscribe(200, 225) ); } [TestMethod] public void TakeUntil_Late() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [TestMethod] public void TakeUntil_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnError(210, ex) ); var res = scheduler.Start(() => xs.TakeUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnError(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [TestMethod] public void TakeUntil_Never() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( ); var res = scheduler.Start(() => xs.TakeUntil(new DateTimeOffset(250, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); } [TestMethod] public void TakeUntil_Twice1() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnCompleted(270) ); var res = scheduler.Start(() => xs.TakeUntil(new DateTimeOffset(255, TimeSpan.Zero), scheduler).TakeUntil(new DateTimeOffset(235, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(235) ); xs.Subscriptions.AssertEqual( Subscribe(200, 235) ); } [TestMethod] public void TakeUntil_Twice2() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnCompleted(270) ); var res = scheduler.Start(() => xs.TakeUntil(new DateTimeOffset(235, TimeSpan.Zero), scheduler).TakeUntil(new DateTimeOffset(255, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(235) ); xs.Subscriptions.AssertEqual( Subscribe(200, 235) ); } [TestMethod] public void TakeUntil_Default() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.TakeUntil(DateTimeOffset.Now.AddMinutes(1)); var e = new ManualResetEvent(false); var lst = new List(); res.Subscribe( lst.Add, () => e.Set() ); e.WaitOne(); Assert.IsTrue(lst.SequenceEqual(Enumerable.Range(0, 10))); } #endregion #region + Throttle + [TestMethod] public void Throttle_ArgumentChecking() { var scheduler = new TestScheduler(); var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.Throttle(default(IObservable), TimeSpan.Zero)); ReactiveAssert.Throws(() => Observable.Throttle(someObservable, TimeSpan.Zero, null)); ReactiveAssert.Throws(() => Observable.Throttle(default(IObservable), TimeSpan.Zero, scheduler)); ReactiveAssert.Throws(() => Observable.Throttle(someObservable, TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.Throttle(someObservable, TimeSpan.FromSeconds(-1), scheduler)); } private IEnumerable>> Generate(S seed, Func condition, Func iterate, Func>> selector, Func>> final) { S s; for (s = seed; condition(s); s = iterate(s)) yield return selector(s); yield return final(s); } [TestMethod] public void Throttle_TimeSpan_AllPass() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 0), OnNext(210, 1), OnNext(240, 2), OnNext(270, 3), OnNext(300, 4), OnCompleted(400) ); var res = scheduler.Start(() => xs.Throttle(TimeSpan.FromTicks(20), scheduler) ); res.Messages.AssertEqual( OnNext(230, 1), OnNext(260, 2), OnNext(290, 3), OnNext(320, 4), OnCompleted(400) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void Throttle_TimeSpan_AllPass_ErrorEnd() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 0), OnNext(210, 1), OnNext(240, 2), OnNext(270, 3), OnNext(300, 4), OnError(400, ex) ); var res = scheduler.Start(() => xs.Throttle(TimeSpan.FromTicks(20), scheduler) ); res.Messages.AssertEqual( OnNext(230, 1), OnNext(260, 2), OnNext(290, 3), OnNext(320, 4), OnError(400, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void Throttle_TimeSpan_AllDrop() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 0), OnNext(210, 1), OnNext(240, 2), OnNext(270, 3), OnNext(300, 4), OnNext(330, 5), OnNext(360, 6), OnNext(390, 7), OnCompleted(400) ); var res = scheduler.Start(() => xs.Throttle(TimeSpan.FromTicks(40), scheduler) ); res.Messages.AssertEqual( OnNext(400, 7), OnCompleted(400) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void Throttle_TimeSpan_AllDrop_ErrorEnd() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 0), OnNext(210, 1), OnNext(240, 2), OnNext(270, 3), OnNext(300, 4), OnNext(330, 5), OnNext(360, 6), OnNext(390, 7), OnError(400, ex) ); var res = scheduler.Start(() => xs.Throttle(TimeSpan.FromTicks(40), scheduler) ); res.Messages.AssertEqual( OnError(400, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void Throttle_Empty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 0), OnCompleted(300) ); var res = scheduler.Start(() => xs.Throttle(TimeSpan.FromTicks(10), scheduler) ); res.Messages.AssertEqual( OnCompleted(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void Throttle_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 0), OnError(300, ex) ); var res = scheduler.Start(() => xs.Throttle(TimeSpan.FromTicks(10), scheduler) ); res.Messages.AssertEqual( OnError(300, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void Throttle_Never() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 0) ); var res = scheduler.Start(() => xs.Throttle(TimeSpan.FromTicks(10), scheduler) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void Throttle_Simple() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 0), OnNext(210, 1), OnNext(240, 2), OnNext(250, 3), OnNext(280, 4), OnCompleted(300) ); var res = scheduler.Start(() => xs.Throttle(TimeSpan.FromTicks(20), scheduler) ); res.Messages.AssertEqual( OnNext(230, 1), OnNext(270, 3), OnNext(300, 4), OnCompleted(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void Throttle_DefaultScheduler() { Assert.IsTrue(Observable.Return(1).Throttle(TimeSpan.FromMilliseconds(1)).ToEnumerable().SequenceEqual(new[] { 1 })); } [TestMethod] public void Throttle_Duration_ArgumentChecking() { var someObservable = DummyObservable.Instance; ReactiveAssert.Throws(() => Observable.Throttle(default(IObservable), x => someObservable)); ReactiveAssert.Throws(() => Observable.Throttle(someObservable, default(Func>))); } [TestMethod] public void Throttle_Duration_DelayBehavior() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, -1), OnNext(250, 0), OnNext(280, 1), OnNext(310, 2), OnNext(350, 3), OnNext(400, 4), OnCompleted(550) ); var ys = new[] { scheduler.CreateColdObservable( OnNext(20, 42), OnNext(25, 99) ), scheduler.CreateColdObservable( OnNext(20, 42), OnNext(25, 99) ), scheduler.CreateColdObservable( OnNext(20, 42), OnNext(25, 99) ), scheduler.CreateColdObservable( OnNext(20, 42), OnNext(25, 99) ), scheduler.CreateColdObservable( OnNext(20, 42), OnNext(25, 99) ), }; var res = scheduler.Start(() => xs.Throttle(x => ys[x]) ); res.Messages.AssertEqual( OnNext(250 + 20, 0), OnNext(280 + 20, 1), OnNext(310 + 20, 2), OnNext(350 + 20, 3), OnNext(400 + 20, 4), OnCompleted(550) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20)); ys[1].Subscriptions.AssertEqual(Subscribe(280, 280 + 20)); ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20)); ys[3].Subscriptions.AssertEqual(Subscribe(350, 350 + 20)); ys[4].Subscriptions.AssertEqual(Subscribe(400, 400 + 20)); } [TestMethod] public void Throttle_Duration_ThrottleBehavior() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, -1), OnNext(250, 0), OnNext(280, 1), OnNext(310, 2), OnNext(350, 3), OnNext(400, 4), OnCompleted(550) ); var ys = new[] { scheduler.CreateColdObservable( OnNext(20, 42), OnNext(25, 99) ), scheduler.CreateColdObservable( OnNext(40, 42), OnNext(45, 99) ), scheduler.CreateColdObservable( OnNext(20, 42), OnNext(25, 99) ), scheduler.CreateColdObservable( OnNext(60, 42), OnNext(65, 99) ), scheduler.CreateColdObservable( OnNext(20, 42), OnNext(25, 99) ), }; var res = scheduler.Start(() => xs.Throttle(x => ys[x]) ); res.Messages.AssertEqual( OnNext(250 + 20, 0), OnNext(310 + 20, 2), OnNext(400 + 20, 4), OnCompleted(550) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20)); ys[1].Subscriptions.AssertEqual(Subscribe(280, 310)); ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20)); ys[3].Subscriptions.AssertEqual(Subscribe(350, 400)); ys[4].Subscriptions.AssertEqual(Subscribe(400, 400 + 20)); } [TestMethod] public void Throttle_Duration_EarlyCompletion() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, -1), OnNext(250, 0), OnNext(280, 1), OnNext(310, 2), OnNext(350, 3), OnNext(400, 4), OnCompleted(410) ); var ys = new[] { scheduler.CreateColdObservable( OnNext(20, 42), OnNext(25, 99) ), scheduler.CreateColdObservable( OnNext(40, 42), OnNext(45, 99) ), scheduler.CreateColdObservable( OnNext(20, 42), OnNext(25, 99) ), scheduler.CreateColdObservable( OnNext(60, 42), OnNext(65, 99) ), scheduler.CreateColdObservable( OnNext(20, 42), OnNext(25, 99) ), }; var res = scheduler.Start(() => xs.Throttle(x => ys[x]) ); res.Messages.AssertEqual( OnNext(250 + 20, 0), OnNext(310 + 20, 2), OnNext(410, 4), OnCompleted(410) ); xs.Subscriptions.AssertEqual( Subscribe(200, 410) ); ys[0].Subscriptions.AssertEqual(Subscribe(250, 250 + 20)); ys[1].Subscriptions.AssertEqual(Subscribe(280, 310)); ys[2].Subscriptions.AssertEqual(Subscribe(310, 310 + 20)); ys[3].Subscriptions.AssertEqual(Subscribe(350, 400)); ys[4].Subscriptions.AssertEqual(Subscribe(400, 410)); } [TestMethod] public void Throttle_Duration_InnerError() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(550) ); var ex = new Exception(); var res = scheduler.Start(() => xs.Throttle(x => x < 4 ? scheduler.CreateColdObservable( OnNext(x * 10, "Ignore"), OnNext(x * 10 + 5, "Aargh!") ) : scheduler.CreateColdObservable( OnError(x * 10, ex) ) ) ); res.Messages.AssertEqual( OnNext(250 + 2 * 10, 2), OnNext(350 + 3 * 10, 3), OnError(450 + 4 * 10, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 490) ); } [TestMethod] public void Throttle_Duration_OuterError() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnError(460, ex) ); var res = scheduler.Start(() => xs.Throttle(x => scheduler.CreateColdObservable( OnNext(x * 10, "Ignore"), OnNext(x * 10 + 5, "Aargh!") ) ) ); res.Messages.AssertEqual( OnNext(250 + 2 * 10, 2), OnNext(350 + 3 * 10, 3), OnError(460, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 460) ); } [TestMethod] public void Throttle_Duration_SelectorThrows() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(550) ); var ex = new Exception(); var res = scheduler.Start(() => xs.Throttle(x => { if (x < 4) { return scheduler.CreateColdObservable( OnNext(x * 10, "Ignore"), OnNext(x * 10 + 5, "Aargh!") ); } else throw ex; }) ); res.Messages.AssertEqual( OnNext(250 + 2 * 10, 2), OnNext(350 + 3 * 10, 3), OnError(450, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 450) ); } [TestMethod] public void Throttle_Duration_InnerDone_DelayBehavior() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4), OnCompleted(550) ); var ex = new Exception(); var res = scheduler.Start(() => xs.Throttle(x => scheduler.CreateColdObservable( OnCompleted(x * 10) ) ) ); res.Messages.AssertEqual( OnNext(250 + 2 * 10, 2), OnNext(350 + 3 * 10, 3), OnNext(450 + 4 * 10, 4), OnCompleted(550) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } [TestMethod] public void Throttle_Duration_InnerDone_ThrottleBehavior() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(250, 2), OnNext(280, 3), OnNext(300, 4), OnNext(400, 5), OnNext(410, 6), OnCompleted(550) ); var ex = new Exception(); var res = scheduler.Start(() => xs.Throttle(x => scheduler.CreateColdObservable( OnCompleted(x * 10) ) ) ); res.Messages.AssertEqual( OnNext(250 + 2 * 10, 2), OnNext(300 + 4 * 10, 4), OnNext(410 + 6 * 10, 6), OnCompleted(550) ); xs.Subscriptions.AssertEqual( Subscribe(200, 550) ); } #endregion #region + TimeInterval + [TestMethod] public void TimeInterval_ArgumentChecking() { var scheduler = new TestScheduler(); var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.TimeInterval(default(IObservable))); ReactiveAssert.Throws(() => Observable.TimeInterval(default(IObservable), scheduler)); ReactiveAssert.Throws(() => Observable.TimeInterval(someObservable, null)); } [TestMethod] public void TimeInterval_Regular() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(230, 3), OnNext(260, 4), OnNext(300, 5), OnNext(350, 6), OnCompleted(400) ); var res = scheduler.Start(() => xs.TimeInterval(scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( OnNext(210, new TimeInterval(2, TimeSpan.FromTicks(10))), OnNext(230, new TimeInterval(3, TimeSpan.FromTicks(20))), OnNext(260, new TimeInterval(4, TimeSpan.FromTicks(30))), OnNext(300, new TimeInterval(5, TimeSpan.FromTicks(40))), OnNext(350, new TimeInterval(6, TimeSpan.FromTicks(50))), OnCompleted>(400) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void TimeInterval_Empty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnCompleted(300) ); var res = scheduler.Start(() => xs.TimeInterval(scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( OnCompleted>(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void TimeInterval_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(300, ex) ); var res = scheduler.Start(() => xs.TimeInterval(scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( OnError>(300, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void TimeInterval_Never() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1) ); var res = scheduler.Start(() => xs.TimeInterval(scheduler.DisableOptimizations()) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void TimeInterval_DefaultScheduler() { Assert.IsTrue(Observable.Return(1).TimeInterval().Count().First() == 1); } [TestMethod] public void TimeInterval_WithStopwatch_Regular() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(230, 3), OnNext(260, 4), OnNext(300, 5), OnNext(350, 6), OnCompleted(400) ); var res = scheduler.Start(() => xs.TimeInterval(scheduler) ); res.Messages.AssertEqual( OnNext(210, new TimeInterval(2, TimeSpan.FromTicks(10))), OnNext(230, new TimeInterval(3, TimeSpan.FromTicks(20))), OnNext(260, new TimeInterval(4, TimeSpan.FromTicks(30))), OnNext(300, new TimeInterval(5, TimeSpan.FromTicks(40))), OnNext(350, new TimeInterval(6, TimeSpan.FromTicks(50))), OnCompleted>(400) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void TimeInterval_WithStopwatch_Empty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnCompleted(300) ); var res = scheduler.Start(() => xs.TimeInterval(scheduler) ); res.Messages.AssertEqual( OnCompleted>(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void TimeInterval_WithStopwatch_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(300, ex) ); var res = scheduler.Start(() => xs.TimeInterval(scheduler) ); res.Messages.AssertEqual( OnError>(300, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [TestMethod] public void TimeInterval_WithStopwatch_Never() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1) ); var res = scheduler.Start(() => xs.TimeInterval(scheduler) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } #endregion #region + Timeout + [TestMethod] public void Timeout_ArgumentChecking() { var scheduler = new TestScheduler(); var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.Timeout(default(IObservable), TimeSpan.Zero)); ReactiveAssert.Throws(() => Observable.Timeout(default(IObservable), TimeSpan.Zero, someObservable)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, TimeSpan.Zero, default(IObservable))); ReactiveAssert.Throws(() => Observable.Timeout(default(IObservable), new DateTimeOffset())); ReactiveAssert.Throws(() => Observable.Timeout(default(IObservable), new DateTimeOffset(), someObservable)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, new DateTimeOffset(), default(IObservable))); ReactiveAssert.Throws(() => Observable.Timeout(default(IObservable), TimeSpan.Zero, scheduler)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, TimeSpan.Zero, default(IScheduler))); ReactiveAssert.Throws(() => Observable.Timeout(default(IObservable), TimeSpan.Zero, someObservable, scheduler)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, TimeSpan.Zero, someObservable, null)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, TimeSpan.Zero, default(IObservable), scheduler)); ReactiveAssert.Throws(() => Observable.Timeout(default(IObservable), new DateTimeOffset(), scheduler)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, new DateTimeOffset(), default(IScheduler))); ReactiveAssert.Throws(() => Observable.Timeout(default(IObservable), new DateTimeOffset(), someObservable, scheduler)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, new DateTimeOffset(), someObservable, null)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, new DateTimeOffset(), default(IObservable), scheduler)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, TimeSpan.FromSeconds(-1), scheduler)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, TimeSpan.FromSeconds(-1), someObservable)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, TimeSpan.FromSeconds(-1), someObservable, scheduler)); } [TestMethod] public void Timeout_InTime() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(230, 3), OnNext(260, 4), OnNext(300, 5), OnNext(350, 6), OnCompleted(400) ); var res = scheduler.Start(() => xs.Timeout(TimeSpan.FromTicks(500), scheduler) ); res.Messages.AssertEqual( OnNext(210, 2), OnNext(230, 3), OnNext(260, 4), OnNext(300, 5), OnNext(350, 6), OnCompleted(400) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void Timeout_DateTimeOffset_TimeoutOccurs_WithDefaultException() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(410, 1) ); var res = scheduler.Start(() => xs.Timeout(new DateTimeOffset(new DateTime(400), TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnError(400, ex => ex is TimeoutException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void Timeout_TimeSpan_TimeoutOccurs_WithDefaultException() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(410, 1) ); var res = scheduler.Start(() => xs.Timeout(TimeSpan.FromTicks(200), scheduler) ); res.Messages.AssertEqual( OnError(400, ex => ex is TimeoutException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void Timeout_TimeSpan_DefaultScheduler() { Assert.IsTrue(Observable.Return(1).Timeout(TimeSpan.FromSeconds(10)).ToEnumerable().Single() == 1); } [TestMethod] public void Timeout_TimeSpan_Observable_DefaultScheduler() { Assert.IsTrue(Observable.Return(1).Timeout(TimeSpan.FromSeconds(10), Observable.Return(2)).ToEnumerable().Single() == 1); } [TestMethod] public void Timeout_DateTimeOffset_DefaultScheduler() { Assert.IsTrue(Observable.Return(1).Timeout(DateTimeOffset.UtcNow + TimeSpan.FromSeconds(10)).ToEnumerable().Single() == 1); } [TestMethod] public void Timeout_DateTimeOffset_Observable_DefaultScheduler() { Assert.IsTrue(Observable.Return(1).Timeout(DateTimeOffset.UtcNow + TimeSpan.FromSeconds(10), Observable.Return(2)).ToEnumerable().Single() == 1); } [TestMethod] public void Timeout_TimeoutOccurs_1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(130, 2), OnNext(310, 3), OnNext(400, 4), OnCompleted(500) ); var ys = scheduler.CreateColdObservable( OnNext(50, -1), OnNext(200, -2), OnNext(310, -3), OnCompleted(320) ); var res = scheduler.Start(() => xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler) ); res.Messages.AssertEqual( OnNext(350, -1), OnNext(500, -2), OnNext(610, -3), OnCompleted(620) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(300, 620) ); } [TestMethod] public void Timeout_TimeoutOccurs_2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(130, 2), OnNext(240, 3), OnNext(310, 4), OnNext(430, 5), OnCompleted(500) ); var ys = scheduler.CreateColdObservable( OnNext(50, -1), OnNext(200, -2), OnNext(310, -3), OnCompleted(320) ); var res = scheduler.Start(() => xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler) ); res.Messages.AssertEqual( OnNext(240, 3), OnNext(310, 4), OnNext(460, -1), OnNext(610, -2), OnNext(720, -3), OnCompleted(730) ); xs.Subscriptions.AssertEqual( Subscribe(200, 410) ); ys.Subscriptions.AssertEqual( Subscribe(410, 730) ); } [TestMethod] public void Timeout_TimeoutOccurs_Never() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(130, 2), OnNext(240, 3), OnNext(310, 4), OnNext(430, 5), OnCompleted(500) ); var ys = scheduler.CreateColdObservable( ); var res = scheduler.Start(() => xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler) ); res.Messages.AssertEqual( OnNext(240, 3), OnNext(310, 4) ); xs.Subscriptions.AssertEqual( Subscribe(200, 410) ); ys.Subscriptions.AssertEqual( Subscribe(410, 1000) ); } [TestMethod] public void Timeout_TimeoutOccurs_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnCompleted(500) ); var ys = scheduler.CreateColdObservable( OnNext(100, -1) ); var res = scheduler.Start(() => xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler) ); res.Messages.AssertEqual( OnNext(400, -1) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(300, 1000) ); } [TestMethod] public void Timeout_TimeoutOccurs_Error() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnError(500, new Exception()) ); var ys = scheduler.CreateColdObservable( OnNext(100, -1) ); var res = scheduler.Start(() => xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler) ); res.Messages.AssertEqual( OnNext(400, -1) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(300, 1000) ); } [TestMethod] public void Timeout_TimeoutOccurs_NextIsError() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(500, 42) ); var ys = scheduler.CreateColdObservable( OnError(100, ex) ); var res = scheduler.Start(() => xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler) ); res.Messages.AssertEqual( OnError(400, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(300, 400) ); } [TestMethod] public void Timeout_TimeoutNotOccurs_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnCompleted(250) ); var ys = scheduler.CreateColdObservable( OnNext(100, -1) ); var res = scheduler.Start(() => xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler) ); res.Messages.AssertEqual( OnCompleted(250) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); ys.Subscriptions.AssertEqual( ); } [TestMethod] public void Timeout_TimeoutNotOccurs_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnError(250, ex) ); var ys = scheduler.CreateColdObservable( OnNext(100, -1) ); var res = scheduler.Start(() => xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler) ); res.Messages.AssertEqual( OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); ys.Subscriptions.AssertEqual( ); } [TestMethod] public void Timeout_TimeoutDoesNotOccur() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(130, 2), OnNext(240, 3), OnNext(320, 4), OnNext(410, 5), OnCompleted(500) ); var ys = scheduler.CreateColdObservable( OnNext(50, -1), OnNext(200, -2), OnNext(310, -3), OnCompleted(320) ); var res = scheduler.Start(() => xs.Timeout(TimeSpan.FromTicks(100), ys, scheduler) ); res.Messages.AssertEqual( OnNext(240, 3), OnNext(320, 4), OnNext(410, 5), OnCompleted(500) ); xs.Subscriptions.AssertEqual( Subscribe(200, 500) ); ys.Subscriptions.AssertEqual( ); } [TestMethod] public void Timeout_DateTimeOffset_TimeoutOccurs() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(410, 1) ); var ys = scheduler.CreateColdObservable( OnNext(100, -1) ); var res = scheduler.Start(() => xs.Timeout(new DateTimeOffset(new DateTime(400), TimeSpan.Zero), ys, scheduler) ); res.Messages.AssertEqual( OnNext(500, -1) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); ys.Subscriptions.AssertEqual( Subscribe(400, 1000) ); } [TestMethod] public void Timeout_DateTimeOffset_TimeoutDoesNotOccur_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnCompleted(390) ); var ys = scheduler.CreateColdObservable( OnNext(100, -1) ); var res = scheduler.Start(() => xs.Timeout(new DateTimeOffset(new DateTime(400), TimeSpan.Zero), ys, scheduler) ); res.Messages.AssertEqual( OnNext(310, 1), OnCompleted(390) ); xs.Subscriptions.AssertEqual( Subscribe(200, 390) ); ys.Subscriptions.AssertEqual( ); } [TestMethod] public void Timeout_DateTimeOffset_TimeoutDoesNotOccur_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnError(390, ex) ); var ys = scheduler.CreateColdObservable( OnNext(100, -1) ); var res = scheduler.Start(() => xs.Timeout(new DateTimeOffset(new DateTime(400), TimeSpan.Zero), ys, scheduler) ); res.Messages.AssertEqual( OnNext(310, 1), OnError(390, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 390) ); ys.Subscriptions.AssertEqual( ); } [TestMethod] public void Timeout_DateTimeOffset_TimeoutOccur_2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); var ys = scheduler.CreateColdObservable( OnNext(100, -1) ); var res = scheduler.Start(() => xs.Timeout(new DateTimeOffset(new DateTime(400), TimeSpan.Zero), ys, scheduler) ); res.Messages.AssertEqual( OnNext(310, 1), OnNext(350, 2), OnNext(500, -1) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); ys.Subscriptions.AssertEqual( Subscribe(400, 1000) ); } [TestMethod] public void Timeout_DateTimeOffset_TimeoutOccur_3() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); var ys = scheduler.CreateColdObservable( ); var res = scheduler.Start(() => xs.Timeout(new DateTimeOffset(new DateTime(400), TimeSpan.Zero), ys, scheduler) ); res.Messages.AssertEqual( OnNext(310, 1), OnNext(350, 2) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); ys.Subscriptions.AssertEqual( Subscribe(400, 1000) ); } [TestMethod] public void Timeout_Duration_ArgumentChecking() { var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.Timeout(default(IObservable), someObservable, x => someObservable, someObservable)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, default(IObservable), x => someObservable, someObservable)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, someObservable, default(Func>), someObservable)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, someObservable, x => someObservable, default(IObservable))); ReactiveAssert.Throws(() => Observable.Timeout(default(IObservable), someObservable, x => someObservable)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, default(IObservable), x => someObservable)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, someObservable, default(Func>))); ReactiveAssert.Throws(() => Observable.Timeout(default(IObservable), x => someObservable, someObservable)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, default(Func>), someObservable)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, x => someObservable, default(IObservable))); ReactiveAssert.Throws(() => Observable.Timeout(default(IObservable), x => someObservable)); ReactiveAssert.Throws(() => Observable.Timeout(someObservable, default(Func>))); } [TestMethod] public void Timeout_Duration_Simple_Never() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); var ys = scheduler.CreateColdObservable( ); var res = scheduler.Start(() => xs.Timeout(ys, _ => ys) ); res.Messages.AssertEqual( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); xs.Subscriptions.AssertEqual( Subscribe(200, 450) ); ys.Subscriptions.AssertEqual( Subscribe(200, 310), Subscribe(310, 350), Subscribe(350, 420), Subscribe(420, 450) ); } [TestMethod] public void Timeout_Duration_Simple_TimeoutFirst() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); var ys = scheduler.CreateColdObservable( OnNext(100, "Boo!") ); var zs = scheduler.CreateColdObservable( ); var res = scheduler.Start(() => xs.Timeout(ys, _ => zs) ); res.Messages.AssertEqual( OnError(300, ex => ex is TimeoutException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(200, 300) ); zs.Subscriptions.AssertEqual( ); } [TestMethod] public void Timeout_Duration_Simple_TimeoutFirst_Other() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); var ys = scheduler.CreateColdObservable( OnNext(100, "Boo!") ); var zs = scheduler.CreateColdObservable( ); var ts = scheduler.CreateColdObservable( OnNext(50, 42), OnCompleted(70) ); var res = scheduler.Start(() => xs.Timeout(ys, _ => zs, ts) ); res.Messages.AssertEqual( OnNext(350, 42), OnCompleted(370) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); ys.Subscriptions.AssertEqual( Subscribe(200, 300) ); zs.Subscriptions.AssertEqual( ); ts.Subscriptions.AssertEqual( Subscribe(300, 370) ); } [TestMethod] public void Timeout_Duration_Simple_TimeoutLater() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); var ys = scheduler.CreateColdObservable( ); var zs = scheduler.CreateColdObservable( OnNext(50, "Boo!") ); var res = scheduler.Start(() => xs.Timeout(ys, _ => zs) ); res.Messages.AssertEqual( OnNext(310, 1), OnNext(350, 2), OnError(400, ex => ex is TimeoutException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); ys.Subscriptions.AssertEqual( Subscribe(200, 310) ); zs.Subscriptions.AssertEqual( Subscribe(310, 350), Subscribe(350, 400) ); } [TestMethod] public void Timeout_Duration_Simple_TimeoutLater_Other() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); var ys = scheduler.CreateColdObservable( ); var zs = scheduler.CreateColdObservable( OnNext(50, "Boo!") ); var ts = scheduler.CreateColdObservable( OnNext(50, 42), OnCompleted(70) ); var res = scheduler.Start(() => xs.Timeout(ys, _ => zs, ts) ); res.Messages.AssertEqual( OnNext(310, 1), OnNext(350, 2), OnNext(450, 42), OnCompleted(470) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); ys.Subscriptions.AssertEqual( Subscribe(200, 310) ); zs.Subscriptions.AssertEqual( Subscribe(310, 350), Subscribe(350, 400) ); ts.Subscriptions.AssertEqual( Subscribe(400, 470) ); } [TestMethod] public void Timeout_Duration_Simple_TimeoutLater_NoFirst() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); var zs = scheduler.CreateColdObservable( OnNext(50, "Boo!") ); var res = scheduler.Start(() => xs.Timeout(_ => zs) ); res.Messages.AssertEqual( OnNext(310, 1), OnNext(350, 2), OnError(400, ex => ex is TimeoutException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); zs.Subscriptions.AssertEqual( Subscribe(310, 350), Subscribe(350, 400) ); } [TestMethod] public void Timeout_Duration_Simple_TimeoutLater_Other_NoFirst() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); var zs = scheduler.CreateColdObservable( OnNext(50, "Boo!") ); var ts = scheduler.CreateColdObservable( OnNext(50, 42), OnCompleted(70) ); var res = scheduler.Start(() => xs.Timeout(_ => zs, ts) ); res.Messages.AssertEqual( OnNext(310, 1), OnNext(350, 2), OnNext(450, 42), OnCompleted(470) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); zs.Subscriptions.AssertEqual( Subscribe(310, 350), Subscribe(350, 400) ); ts.Subscriptions.AssertEqual( Subscribe(400, 470) ); } [TestMethod] public void Timeout_Duration_Simple_TimeoutByCompletion() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); var ys = scheduler.CreateColdObservable( ); var zs = scheduler.CreateColdObservable( OnCompleted(50) ); var res = scheduler.Start(() => xs.Timeout(ys, _ => zs) ); res.Messages.AssertEqual( OnNext(310, 1), OnNext(350, 2), OnError(400, ex => ex is TimeoutException) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); ys.Subscriptions.AssertEqual( Subscribe(200, 310) ); zs.Subscriptions.AssertEqual( Subscribe(310, 350), Subscribe(350, 400) ); } [TestMethod] public void Timeout_Duration_Simple_SelectorThrows() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); var ys = scheduler.CreateColdObservable( ); var zs = scheduler.CreateColdObservable( ); var ex = new Exception(); var res = scheduler.Start(() => xs.Timeout(ys, x => { if (x < 3) return zs; else throw ex; }) ); res.Messages.AssertEqual( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnError(420, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 420) ); ys.Subscriptions.AssertEqual( Subscribe(200, 310) ); zs.Subscriptions.AssertEqual( Subscribe(310, 350), Subscribe(350, 420) ); } [TestMethod] public void Timeout_Duration_Simple_InnerThrows() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); var ys = scheduler.CreateColdObservable( ); var zs = scheduler.CreateColdObservable( OnError(50, ex) ); var res = scheduler.Start(() => xs.Timeout(ys, x => zs) ); res.Messages.AssertEqual( OnNext(310, 1), OnNext(350, 2), OnError(400, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); ys.Subscriptions.AssertEqual( Subscribe(200, 310) ); zs.Subscriptions.AssertEqual( Subscribe(310, 350), Subscribe(350, 400) ); } [TestMethod] public void Timeout_Duration_Simple_FirstThrows() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnCompleted(450) ); var ys = scheduler.CreateColdObservable( OnError(50, ex) ); var zs = scheduler.CreateColdObservable( ); var res = scheduler.Start(() => xs.Timeout(ys, x => zs) ); res.Messages.AssertEqual( OnError(250, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 250) ); ys.Subscriptions.AssertEqual( Subscribe(200, 250) ); zs.Subscriptions.AssertEqual( ); } [TestMethod] public void Timeout_Duration_Simple_SourceThrows() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnError(450, ex) ); var ys = scheduler.CreateColdObservable( ); var zs = scheduler.CreateColdObservable( ); var res = scheduler.Start(() => xs.Timeout(ys, x => zs) ); res.Messages.AssertEqual( OnNext(310, 1), OnNext(350, 2), OnNext(420, 3), OnError(450, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 450) ); ys.Subscriptions.AssertEqual( Subscribe(200, 310) ); zs.Subscriptions.AssertEqual( Subscribe(310, 350), Subscribe(350, 420), Subscribe(420, 450) ); } #endregion #region + Timer + [TestMethod] public void OneShotTimer_TimeSpan_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Timer(TimeSpan.Zero, null)); ReactiveAssert.Throws(() => Observable.Timer(TimeSpan.Zero, DummyScheduler.Instance).Subscribe(null)); ReactiveAssert.Throws(() => Observable.Timer(DateTimeOffset.Now, null)); ReactiveAssert.Throws(() => Observable.Timer(TimeSpan.Zero, TimeSpan.Zero, null)); ReactiveAssert.Throws(() => Observable.Timer(DateTimeOffset.Now, TimeSpan.Zero, null)); ReactiveAssert.Throws(() => Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(-1), DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Timer(DateTimeOffset.Now, TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.Timer(DateTimeOffset.Now, TimeSpan.FromSeconds(-1), DummyScheduler.Instance)); } [TestMethod] public void OneShotTimer_TimeSpan_Basic() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Timer(TimeSpan.FromTicks(300), scheduler) ); res.Messages.AssertEqual( OnNext(500, 0L), OnCompleted(500) ); } [TestMethod] public void OneShotTimer_TimeSpan_Zero() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Timer(TimeSpan.FromTicks(0), scheduler) ); res.Messages.AssertEqual( OnNext(201, 0L), OnCompleted(201) ); } [TestMethod] public void OneShotTimer_TimeSpan_Zero_DefaultScheduler() { var scheduler = new TestScheduler(); var observer = scheduler.CreateObserver(); var completed = new ManualResetEventSlim(); Observable.Timer(TimeSpan.Zero).Subscribe(observer.OnNext, completed.Set); completed.Wait(); Assert.AreEqual(1, observer.Messages.Count); } [TestMethod] public void OneShotTimer_TimeSpan_Negative() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Timer(TimeSpan.FromTicks(-1), scheduler) ); res.Messages.AssertEqual( OnNext(201, 0L), OnCompleted(201) ); } [TestMethod] public void OneShotTimer_TimeSpan_Disposed() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Timer(TimeSpan.FromTicks(1000), scheduler) ); res.Messages.AssertEqual( ); } [TestMethod] public void OneShotTimer_TimeSpan_ObserverThrows() { var scheduler1 = new TestScheduler(); var xs = Observable.Timer(TimeSpan.FromTicks(1), scheduler1); xs.Subscribe(x => { throw new InvalidOperationException(); }); ReactiveAssert.Throws(() => scheduler1.Start()); var scheduler2 = new TestScheduler(); var ys = Observable.Timer(TimeSpan.FromTicks(1), scheduler2); ys.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); }); ReactiveAssert.Throws(() => scheduler2.Start()); } [TestMethod] public void OneShotTimer_TimeSpan_DefaultScheduler() { Assert.IsTrue(Observable.Timer(TimeSpan.FromMilliseconds(1)).ToEnumerable().SequenceEqual(new[] { 0L })); } [TestMethod] public void OneShotTimer_DateTimeOffset_DefaultScheduler() { Assert.IsTrue(Observable.Timer(DateTimeOffset.UtcNow + TimeSpan.FromSeconds(1)).ToEnumerable().SequenceEqual(new[] { 0L })); } [TestMethod] public void OneShotTimer_TimeSpan_TimeSpan_DefaultScheduler() { Assert.IsTrue(Observable.Timer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(1)).ToEnumerable().Take(2).SequenceEqual(new[] { 0L, 1L })); } [TestMethod] public void OneShotTimer_DateTimeOffset_TimeSpan_DefaultScheduler() { Assert.IsTrue(Observable.Timer(DateTimeOffset.UtcNow + TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(1)).ToEnumerable().Take(2).SequenceEqual(new[] { 0L, 1L })); } [TestMethod] public void OneShotTimer_DateTimeOffset_Basic() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Timer(new DateTimeOffset(500, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnNext(500, 0L), OnCompleted(500) ); } [TestMethod] public void OneShotTimer_DateTimeOffset_Zero() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Timer(new DateTimeOffset(200, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnNext(201, 0L), OnCompleted(201) ); } [TestMethod] public void OneShotTimer_DateTimeOffset_Past() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Timer(new DateTimeOffset(0, TimeSpan.Zero), scheduler) ); res.Messages.AssertEqual( OnNext(201, 0L), OnCompleted(201) ); } [TestMethod] public void RepeatingTimer_TimeSpan_Zero_DefaultScheduler() { var scheduler = new TestScheduler(); var observer = scheduler.CreateObserver(); var completed = new ManualResetEventSlim(); Observable.Timer(TimeSpan.Zero, TimeSpan.Zero).TakeWhile(i => i < 10).Subscribe(observer.OnNext, completed.Set); completed.Wait(); Assert.AreEqual(10, observer.Messages.Count); } [TestMethod] public void RepeatingTimer_DateTimeOffset_TimeSpan_Simple() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Timer(new DateTimeOffset(300, TimeSpan.Zero), TimeSpan.FromTicks(100), scheduler), 0, 200, 750 ); res.Messages.AssertEqual( OnNext(300, 0L), OnNext(400, 1L), OnNext(500, 2L), OnNext(600, 3L), OnNext(700, 4L) ); } [TestMethod] public void RepeatingTimer_TimeSpan_TimeSpan_Simple() { var scheduler = new TestScheduler(); var res = scheduler.Start(() => Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler), 0, 200, 750 ); res.Messages.AssertEqual( OnNext(300, 0L), OnNext(400, 1L), OnNext(500, 2L), OnNext(600, 3L), OnNext(700, 4L) ); } [TestMethod] public void RepeatingTimer_Periodic1() { var scheduler = new PeriodicTestScheduler(); var res = scheduler.Start(() => Observable.Timer(TimeSpan.FromTicks(50), TimeSpan.FromTicks(100), scheduler), 0, 200, 700 ); res.Messages.AssertEqual( OnNext(250, 0L), OnNext(350, 1L), OnNext(450, 2L), OnNext(550, 3L), OnNext(650, 4L) ); scheduler.Timers.AssertEqual( new TimerRun(250, 700) { 350, 450, 550, 650 } ); } [TestMethod] public void RepeatingTimer_Periodic2() { var scheduler = new PeriodicTestScheduler(); var res = scheduler.Start(() => Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler), 0, 200, 750 ); res.Messages.AssertEqual( OnNext(300, 0L), OnNext(400, 1L), OnNext(500, 2L), OnNext(600, 3L), OnNext(700, 4L) ); scheduler.Timers.AssertEqual( new TimerRun(200, 750) { 300, 400, 500, 600, 700 } ); } [TestMethod] public void RepeatingTimer_UsingStopwatch_Slippage1() { var scheduler = new TestScheduler(); var xs = default(IObservable); scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); }); var times = new List(); var onNext = new Action(x => { times.Add(scheduler.Clock); if (x == 0) { return; } if (x < 2) { scheduler.Sleep(50); return; } if (x < 4) { scheduler.Sleep(120); return; } if (x < 6) { scheduler.Sleep(50); return; } if (x < 8) { return; } }); var d = default(IDisposable); scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); }); scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); }); scheduler.Start(); times.AssertEqual( 201, // 1 off because of initial scheduling jump (InvokeStart) 301, 401, 522, // 2 off because of 401 + 120 + 1 scheduling tick 643, // 3 off because of 522 + 120 + 1 scheduling tick 701, 801, 901 ); } [TestMethod] public void RepeatingTimer_UsingStopwatch_Slippage2() { var scheduler = new TestScheduler(); var xs = default(IObservable); scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler); }); var times = new List(); var onNext = new Action(x => { times.Add(scheduler.Clock); if (x == 0) { return; } if (x < 2) { scheduler.Sleep(50); return; } if (x < 4) { scheduler.Sleep(120); return; } if (x < 6) { scheduler.Sleep(50); return; } if (x < 8) { return; } }); var d = default(IDisposable); scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); }); scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); }); scheduler.Start(); times.AssertEqual( 300, 400, 500, 621, // 1 off because of recursive scheduling beyond the target time 742, // 2 off because of 621 + 120 + 1 scheduling tick 800, 900 ); } [TestMethod] public void RepeatingTimer_UsingStopwatch_Slippage3_CatchUpFromLongInvokeStart() { var scheduler = new TestScheduler(); var xs = default(IObservable); scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); }); var times = new List(); var onNext = new Action(x => { times.Add(scheduler.Clock); if (x == 0) { scheduler.Sleep(350); return; } }); var d = default(IDisposable); scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); }); scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); }); scheduler.Start(); times.AssertEqual( 201, // 1 off because of initial scheduling jump (InvokeStart) 551, // catching up after excessive delay of 350 (target was 300) 552, // catching up after excessive delay of 350 (target was 400) 553, // catching up after excessive delay of 350 (target was 500) 601, // back in sync 701, 801, 901 ); } [TestMethod] public void RepeatingTimer_UsingStopwatch_Slippage3_CatchUpFromLongInvokeStart_ThrowsFirst() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = default(IObservable); scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); }); var onNext = new Action(x => { if (x == 0) throw ex; }); var d = default(IDisposable); scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); }); scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); }); try { scheduler.Start(); } catch (Exception e) { Assert.AreEqual(201, scheduler.Clock); Assert.AreSame(ex, e); } } [TestMethod] public void RepeatingTimer_UsingStopwatch_Slippage3_CatchUpFromLongInvokeStart_ThrowsBeyondFirst() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = default(IObservable); scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler); }); var times = new List(); var onNext = new Action(x => { times.Add(scheduler.Clock); if (x == 0) { scheduler.Sleep(350); return; } if (x == 5) throw ex; }); var d = default(IDisposable); scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); }); scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); }); try { scheduler.Start(); } catch (Exception e) { Assert.AreEqual(701, scheduler.Clock); Assert.AreSame(ex, e); } times.AssertEqual( 201, // 1 off because of initial scheduling jump (InvokeStart) 551, // catching up after excessive delay of 350 (target was 300) 552, // catching up after excessive delay of 350 (target was 400) 553, // catching up after excessive delay of 350 (target was 500) 601, // back in sync 701 ); } [TestMethod] public void RepeatingTimer_NoStopwatch_Slippage1() { var scheduler = new TestScheduler(); var xs = default(IObservable); scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromTicks(100), scheduler.DisableOptimizations(typeof(IStopwatchProvider))); }); var times = new List(); var onNext = new Action(x => { times.Add(scheduler.Clock); if (x == 0) { return; } if (x < 2) { scheduler.Sleep(50); return; } if (x < 4) { scheduler.Sleep(120); return; } if (x < 6) { scheduler.Sleep(50); return; } if (x < 8) { return; } }); var d = default(IDisposable); scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); }); scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); }); scheduler.Start(); times.AssertEqual( 201, // 1 off because of initial scheduling jump (InvokeStart) 301, 401, 523, // 3 off because of 401 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code) 645, // 5 off because of 523 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code) 743, // \ 843, // +--> 43 off because this situation (no stopwatch or periodic scheduling interface) only gets best effort treatment (see SchedulePeriodic emulation code) 943 // / ); } [TestMethod] public void RepeatingTimer_NoStopwatch_Slippage2() { var scheduler = new TestScheduler(); var xs = default(IObservable); scheduler.ScheduleAbsolute(100, () => { xs = Observable.Timer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(100), scheduler.DisableOptimizations(typeof(IStopwatchProvider))); }); var times = new List(); var onNext = new Action(x => { times.Add(scheduler.Clock); if (x == 0) { return; } if (x < 2) { scheduler.Sleep(50); return; } if (x < 4) { scheduler.Sleep(120); return; } if (x < 6) { scheduler.Sleep(50); return; } if (x < 8) { return; } }); var d = default(IDisposable); scheduler.ScheduleAbsolute(200, () => { d = xs.Subscribe(onNext); }); scheduler.ScheduleAbsolute(1000, () => { d.Dispose(); }); scheduler.Start(); times.AssertEqual( 300, 400, 500, 622, // 2 off because of 500 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code) 744, // 4 off because of 622 + 120 + 2 scheduling ticks (one due to yield in SchedulePeriodic emulation code) 842, // | 942 // +--> 42 off because this situation (no stopwatch or periodic scheduling interface) only gets best effort treatment (see SchedulePeriodic emulation code) ); } [TestMethod] public void RepeatingTimer_Start_CatchUp() { var e = new ManualResetEvent(false); var xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10)); var d = new SingleAssignmentDisposable(); d.Disposable = xs.Subscribe(x => { if (x == 0) Thread.Sleep(500); if (x > 10) { e.Set(); d.Dispose(); } }); e.WaitOne(); } [TestMethod] public void RepeatingTimer_Start_CatchUp_Throws() { var end = new ManualResetEvent(false); var err = new Exception(); var ex = default(Exception); var s = ThreadPoolScheduler.Instance.Catch(e => { Interlocked.Exchange(ref ex, e); end.Set(); return true; }); var xs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10), s); xs.Subscribe(x => { if (x == 0) Thread.Sleep(500); if (x == 5) throw err; }); end.WaitOne(); Assert.AreSame(err, ex); } class SchedulerWithCatch : IServiceProvider, IScheduler { private readonly IScheduler _scheduler; private readonly Action _setException; public SchedulerWithCatch(IScheduler scheduler, Action setException) { _scheduler = scheduler; _setException = setException; } public object GetService(Type serviceType) { return ((IServiceProvider)_scheduler).GetService(serviceType); } public DateTimeOffset Now { get { return _scheduler.Now; } } public IDisposable Schedule(TState state, Func action) { return _scheduler.Schedule(state, GetCatch(action)); } public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { return _scheduler.Schedule(state, dueTime, GetCatch(action)); } public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) { return _scheduler.Schedule(state, dueTime, GetCatch(action)); } private Func GetCatch(Func action) { return (self, s) => { try { return action(new SchedulerWithCatch(self, _setException), s); } catch (Exception ex) { _setException(ex); return Disposable.Empty; } }; } } class PeriodicTestScheduler : TestScheduler, ISchedulerPeriodic, IServiceProvider { private readonly List _timers; public PeriodicTestScheduler() { _timers = new List(); } public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { var run = new TimerRun(this.Clock); _timers.Add(run); var x = state; var d = this.Schedule(period, self => { run.Add(this.Clock); x = action(x); self(period); }); return new CompositeDisposable( Disposable.Create(() => { run.Stop(this.Clock); }), d ); } public List Timers { get { return _timers; } } protected override object GetService(Type serviceType) { if (serviceType == typeof(ISchedulerPeriodic)) return this as ISchedulerPeriodic; return null; } } class TimerRun : IEnumerable { private readonly long _started; private long _stopped; private bool _hasStopped; private readonly List _ticks; public TimerRun(long started) { _started = started; _ticks = new List(); } public TimerRun(long started, long stopped) { _started = started; _stopped = stopped; _hasStopped = true; _ticks = new List(); } public override int GetHashCode() { return 0; } public override bool Equals(object obj) { var other = obj as TimerRun; if (other == null) return false; return _started == other._started && _stopped == other._stopped && _ticks.SequenceEqual(other._ticks); } public long Started { get { return _started; } } public IEnumerable Ticks { get { return _ticks; } } public long Stopped { get { return _stopped; } } internal void Stop(long clock) { _stopped = clock; _hasStopped = true; } public override string ToString() { var sb = new StringBuilder(); sb.Append("Start(" + _started + ") "); sb.Append("Ticks(" + string.Join(", ", _ticks.Select(t => t.ToString()).ToArray()) + ") "); if (_hasStopped) sb.Append("Stop(" + _stopped + ")"); return sb.ToString(); } public IEnumerator GetEnumerator() { return _ticks.GetEnumerator(); } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return _ticks.GetEnumerator(); } public void Add(long clock) { _ticks.Add(clock); } } #endregion #region + Timestamp + [TestMethod] public void Timestamp_ArgumentChecking() { var scheduler = new TestScheduler(); var someObservable = Observable.Empty(); ReactiveAssert.Throws(() => Observable.Timestamp(default(IObservable))); ReactiveAssert.Throws(() => Observable.Timestamp(default(IObservable), scheduler)); ReactiveAssert.Throws(() => Observable.Timestamp(someObservable, null)); } [TestMethod] public void Timestamp_Regular() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(230, 3), OnNext(260, 4), OnNext(300, 5), OnNext(350, 6), OnCompleted(400) ); var res = scheduler.Start(() => xs.Timestamp(scheduler) ); res.Messages.AssertEqual( OnNext(210, new Timestamped(2, new DateTimeOffset(210, TimeSpan.Zero))), OnNext(230, new Timestamped(3, new DateTimeOffset(230, TimeSpan.Zero))), OnNext(260, new Timestamped(4, new DateTimeOffset(260, TimeSpan.Zero))), OnNext(300, new Timestamped(5, new DateTimeOffset(300, TimeSpan.Zero))), OnNext(350, new Timestamped(6, new DateTimeOffset(350, TimeSpan.Zero))), OnCompleted>(400) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void Timestamp_Empty() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnCompleted(400) ); var res = scheduler.Start(() => xs.Timestamp(scheduler) ); res.Messages.AssertEqual( OnCompleted>(400) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void Timestamp_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnError(400, ex) ); var res = scheduler.Start(() => xs.Timestamp(scheduler) ); res.Messages.AssertEqual( OnError>(400, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 400) ); } [TestMethod] public void Timestamp_Never() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1) ); var res = scheduler.Start(() => xs.Timestamp(scheduler) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [TestMethod] public void Timestamp_DefaultScheduler() { Assert.IsTrue(Observable.Return(1).Timestamp().Count().First() == 1); } #endregion #region + Window + [TestMethod] public void Window_Time_Basic() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(240, 3), OnNext(270, 4), OnNext(320, 5), OnNext(360, 6), OnNext(390, 7), OnNext(410, 8), OnNext(460, 9), OnNext(470, 10), OnCompleted(490) ); var res = scheduler.Start(() => xs.Window(TimeSpan.FromTicks(100), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge() ); res.Messages.AssertEqual( OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(270, "0 4"), OnNext(300, "0 end"), OnNext(320, "1 5"), OnNext(360, "1 6"), OnNext(390, "1 7"), OnNext(400, "1 end"), OnNext(410, "2 8"), OnNext(460, "2 9"), OnNext(470, "2 10"), OnNext(490, "2 end"), OnCompleted(490) ); xs.Subscriptions.AssertEqual( Subscribe(200, 490) ); } [TestMethod] public void Window_Time_Basic_Periodic() { var scheduler = new PeriodicTestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(240, 3), OnNext(270, 4), OnNext(320, 5), OnNext(360, 6), OnNext(390, 7), OnNext(410, 8), OnNext(460, 9), OnNext(470, 10), OnCompleted(490) ); var res = scheduler.Start(() => xs.Window(TimeSpan.FromTicks(100), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge() ); res.Messages.AssertEqual( OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(270, "0 4"), OnNext(300, "0 end"), OnNext(320, "1 5"), OnNext(360, "1 6"), OnNext(390, "1 7"), OnNext(400, "1 end"), OnNext(410, "2 8"), OnNext(460, "2 9"), OnNext(470, "2 10"), OnNext(490, "2 end"), OnCompleted(490) ); xs.Subscriptions.AssertEqual( Subscribe(200, 490) ); scheduler.Timers.AssertEqual( new TimerRun(200, 490) { 300, 400 } ); } [TestMethod] public void Window_Time_Basic_Periodic_Error() { var ex = new Exception(); var scheduler = new PeriodicTestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(240, 3), OnNext(270, 4), OnNext(320, 5), OnNext(360, 6), OnNext(390, 7), OnNext(410, 8), OnError(460, ex) ); var res = scheduler.Start(() => xs.Window(TimeSpan.FromTicks(100), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge() ); res.Messages.AssertEqual( OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(270, "0 4"), OnNext(300, "0 end"), OnNext(320, "1 5"), OnNext(360, "1 6"), OnNext(390, "1 7"), OnNext(400, "1 end"), OnNext(410, "2 8"), OnError(460, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 460) ); scheduler.Timers.AssertEqual( new TimerRun(200, 460) { 300, 400 } ); } [TestMethod] public void Window_Time_Basic_Both() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(150, 1), OnNext(210, 2), OnNext(240, 3), OnNext(270, 4), OnNext(320, 5), OnNext(360, 6), OnNext(390, 7), OnNext(410, 8), OnNext(460, 9), OnNext(470, 10), OnCompleted(490) ); var res = scheduler.Start(() => xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(50), scheduler).Select((ys, i) => ys.Select(y => i + " " + y).Concat(Observable.Return(i + " end"))).Merge() ); res.Messages.AssertEqual( OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(270, "0 4"), OnNext(270, "1 4"), OnNext(300, "0 end"), OnNext(320, "1 5"), OnNext(320, "2 5"), OnNext(350, "1 end"), OnNext(360, "2 6"), OnNext(360, "3 6"), OnNext(390, "2 7"), OnNext(390, "3 7"), OnNext(400, "2 end"), OnNext(410, "3 8"), OnNext(410, "4 8"), OnNext(450, "3 end"), OnNext(460, "4 9"), OnNext(460, "5 9"), OnNext(470, "4 10"), OnNext(470, "5 10"), OnNext(490, "4 end"), OnNext(490, "5 end"), OnCompleted(490) ); xs.Subscriptions.AssertEqual( Subscribe(200, 490) ); } [TestMethod] public void WindowWithTime_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Window(default(IObservable), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1), DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1), DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), null)); ReactiveAssert.Throws(() => Observable.Window(default(IObservable), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1))); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1))); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1))); ReactiveAssert.Throws(() => Observable.Window(default(IObservable), TimeSpan.FromTicks(1), DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, TimeSpan.FromTicks(-1), DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, TimeSpan.FromTicks(1), null)); ReactiveAssert.Throws(() => Observable.Window(default(IObservable), TimeSpan.FromTicks(1))); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, TimeSpan.FromTicks(-1))); } [TestMethod] public void WindowWithTime_Basic1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge() ); res.Messages.AssertEqual( OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(280, "0 4"), OnNext(280, "1 4"), OnNext(320, "1 5"), OnNext(350, "1 6"), OnNext(350, "2 6"), OnNext(380, "2 7"), OnNext(420, "2 8"), OnNext(420, "3 8"), OnNext(470, "3 9"), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void WindowWithTime_Basic2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Window(TimeSpan.FromTicks(70), TimeSpan.FromTicks(100), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge() ); res.Messages.AssertEqual( OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(320, "1 5"), OnNext(350, "1 6"), OnNext(420, "2 8"), OnNext(470, "2 9"), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void WindowWithTime_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnError(600, ex) ); var res = scheduler.Start(() => xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge() ); res.Messages.AssertEqual( OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(280, "0 4"), OnNext(280, "1 4"), OnNext(320, "1 5"), OnNext(350, "1 6"), OnNext(350, "2 6"), OnNext(380, "2 7"), OnNext(420, "2 8"), OnNext(420, "3 8"), OnNext(470, "3 9"), OnError(600, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void WindowWithTime_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Window(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(), 370 ); res.Messages.AssertEqual( OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(280, "0 4"), OnNext(280, "1 4"), OnNext(320, "1 5"), OnNext(350, "1 6"), OnNext(350, "2 6") ); xs.Subscriptions.AssertEqual( Subscribe(200, 370) ); } [TestMethod] public void WindowWithTime_Basic_Same() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(380, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Window(TimeSpan.FromTicks(100), scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge() ); res.Messages.AssertEqual( OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(280, "0 4"), OnNext(320, "1 5"), OnNext(350, "1 6"), OnNext(380, "1 7"), OnNext(420, "2 8"), OnNext(470, "2 9"), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void WindowWithTime_Default() { Observable.Range(0, 10).Window(TimeSpan.FromDays(1), TimeSpan.FromDays(1)).SelectMany(Observable.ToList).First().AssertEqual(Enumerable.Range(0, 10)); Observable.Range(0, 10).Window(TimeSpan.FromDays(1)).SelectMany(Observable.ToList).First().AssertEqual(Enumerable.Range(0, 10)); } [TestMethod] public void WindowWithTimeOrCount_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Window(default(IObservable), TimeSpan.FromTicks(1), 1, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, TimeSpan.FromTicks(-1), 1, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, TimeSpan.FromTicks(1), 0, DummyScheduler.Instance)); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, TimeSpan.FromTicks(1), 1, null)); ReactiveAssert.Throws(() => Observable.Window(default(IObservable), TimeSpan.FromTicks(1), 1)); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, TimeSpan.FromTicks(-1), 1)); ReactiveAssert.Throws(() => Observable.Window(DummyObservable.Instance, TimeSpan.FromTicks(1), 0)); } [TestMethod] public void WindowWithTimeOrCount_Basic() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(205, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(370, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Window(TimeSpan.FromTicks(70), 3, scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge() ); res.Messages.AssertEqual( OnNext(205, "0 1"), OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(280, "1 4"), OnNext(320, "2 5"), OnNext(350, "2 6"), OnNext(370, "2 7"), OnNext(420, "3 8"), OnNext(470, "4 9"), OnCompleted(600) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void WindowWithTimeOrCount_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(205, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(370, 7), OnNext(420, 8), OnNext(470, 9), OnError(600, ex) ); var res = scheduler.Start(() => xs.Window(TimeSpan.FromTicks(70), 3, scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge() ); res.Messages.AssertEqual( OnNext(205, "0 1"), OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(280, "1 4"), OnNext(320, "2 5"), OnNext(350, "2 6"), OnNext(370, "2 7"), OnNext(420, "3 8"), OnNext(470, "4 9"), OnError(600, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 600) ); } [TestMethod] public void WindowWithTimeOrCount_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(205, 1), OnNext(210, 2), OnNext(240, 3), OnNext(280, 4), OnNext(320, 5), OnNext(350, 6), OnNext(370, 7), OnNext(420, 8), OnNext(470, 9), OnCompleted(600) ); var res = scheduler.Start(() => xs.Window(TimeSpan.FromTicks(70), 3, scheduler).Select((w, i) => w.Select(x => i.ToString() + " " + x.ToString())).Merge(), 370 ); res.Messages.AssertEqual( OnNext(205, "0 1"), OnNext(210, "0 2"), OnNext(240, "0 3"), OnNext(280, "1 4"), OnNext(320, "2 5"), OnNext(350, "2 6"), OnNext(370, "2 7") ); xs.Subscriptions.AssertEqual( Subscribe(200, 370) ); } [TestMethod] public void WindowWithTimeOrCount_Default() { Observable.Range(1, 10).Window(TimeSpan.FromDays(1), 3).Skip(1).First().SequenceEqual(Observable.Range(4, 3)); } #endregion } }