465 lines
14 KiB
C#
465 lines
14 KiB
C#
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
|
|
|
|
using System.Collections;
|
|
using System.Collections.Generic;
|
|
using System.Threading;
|
|
using System.Reactive.Disposables;
|
|
|
|
#if NO_SEMAPHORE
|
|
using System.Reactive.Threading;
|
|
#endif
|
|
|
|
namespace System.Reactive.Linq
|
|
{
|
|
#if !NO_PERF
|
|
using ObservableImpl;
|
|
#endif
|
|
|
|
internal partial class QueryLanguage
|
|
{
|
|
#region - Chunkify -
|
|
|
|
public virtual IEnumerable<IList<TSource>> Chunkify<TSource>(IObservable<TSource> source)
|
|
{
|
|
return source.Collect<TSource, IList<TSource>>(() => new List<TSource>(), (lst, x) => { lst.Add(x); return lst; }, _ => new List<TSource>());
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region + Collect +
|
|
|
|
public virtual IEnumerable<TResult> Collect<TSource, TResult>(IObservable<TSource> source, Func<TResult> newCollector, Func<TResult, TSource, TResult> merge)
|
|
{
|
|
return Collect_<TSource, TResult>(source, newCollector, merge, _ => newCollector());
|
|
}
|
|
|
|
public virtual IEnumerable<TResult> Collect<TSource, TResult>(IObservable<TSource> source, Func<TResult> getInitialCollector, Func<TResult, TSource, TResult> merge, Func<TResult, TResult> getNewCollector)
|
|
{
|
|
return Collect_<TSource, TResult>(source, getInitialCollector, merge, getNewCollector);
|
|
}
|
|
|
|
private static IEnumerable<TResult> Collect_<TSource, TResult>(IObservable<TSource> source, Func<TResult> getInitialCollector, Func<TResult, TSource, TResult> merge, Func<TResult, TResult> getNewCollector)
|
|
{
|
|
#if !NO_PERF
|
|
return new Collect<TSource, TResult>(source, getInitialCollector, merge, getNewCollector);
|
|
#else
|
|
return new AnonymousEnumerable<TResult>(() =>
|
|
{
|
|
var c = getInitialCollector();
|
|
var f = default(Notification<TSource>);
|
|
var o = new object();
|
|
var done = false;
|
|
return PushToPull<TSource, TResult>(
|
|
source,
|
|
x =>
|
|
{
|
|
lock (o)
|
|
{
|
|
if (x.HasValue)
|
|
{
|
|
try
|
|
{
|
|
c = merge(c, x.Value);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
f = Notification.CreateOnError<TSource>(ex);
|
|
}
|
|
}
|
|
else
|
|
f = x;
|
|
}
|
|
},
|
|
() =>
|
|
{
|
|
if (f != null)
|
|
{
|
|
if (f.Kind == NotificationKind.OnError)
|
|
{
|
|
return Notification.CreateOnError<TResult>(f.Exception);
|
|
}
|
|
else
|
|
{
|
|
if (done)
|
|
return Notification.CreateOnCompleted<TResult>();
|
|
else
|
|
done = true;
|
|
}
|
|
}
|
|
|
|
var l = default(TResult);
|
|
lock (o)
|
|
{
|
|
l = c;
|
|
c = getNewCollector(c);
|
|
}
|
|
|
|
return Notification.CreateOnNext(l);
|
|
}
|
|
);
|
|
});
|
|
#endif
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region First
|
|
|
|
public virtual TSource First<TSource>(IObservable<TSource> source)
|
|
{
|
|
return FirstOrDefaultInternal(source, true);
|
|
}
|
|
|
|
public virtual TSource First<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
|
|
{
|
|
return First(Where(source, predicate));
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region FirstOrDefault
|
|
|
|
public virtual TSource FirstOrDefault<TSource>(IObservable<TSource> source)
|
|
{
|
|
return FirstOrDefaultInternal(source, false);
|
|
}
|
|
|
|
public virtual TSource FirstOrDefault<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
|
|
{
|
|
return FirstOrDefault(Where(source, predicate));
|
|
}
|
|
|
|
private static TSource FirstOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
|
|
{
|
|
var value = default(TSource);
|
|
var seenValue = false;
|
|
var ex = default(Exception);
|
|
var evt = new ManualResetEvent(false);
|
|
|
|
//
|
|
// [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
|
|
//
|
|
using (source.Subscribe/*Unsafe*/(new AnonymousObserver<TSource>(
|
|
v =>
|
|
{
|
|
if (!seenValue)
|
|
{
|
|
value = v;
|
|
}
|
|
seenValue = true;
|
|
evt.Set();
|
|
},
|
|
e =>
|
|
{
|
|
ex = e;
|
|
evt.Set();
|
|
},
|
|
() =>
|
|
{
|
|
evt.Set();
|
|
})))
|
|
{
|
|
evt.WaitOne();
|
|
}
|
|
|
|
ex.ThrowIfNotNull();
|
|
|
|
if (throwOnEmpty && !seenValue)
|
|
throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
|
|
|
|
return value;
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region + ForEach +
|
|
|
|
public virtual void ForEach<TSource>(IObservable<TSource> source, Action<TSource> onNext)
|
|
{
|
|
#if !NO_PERF
|
|
var evt = new ManualResetEvent(false);
|
|
var sink = new ForEach<TSource>._(onNext, () => evt.Set());
|
|
|
|
using (source.SubscribeSafe(sink))
|
|
{
|
|
evt.WaitOne();
|
|
}
|
|
|
|
sink.Error.ThrowIfNotNull();
|
|
#else
|
|
ForEach_(source, onNext);
|
|
#endif
|
|
}
|
|
|
|
public virtual void ForEach<TSource>(IObservable<TSource> source, Action<TSource, int> onNext)
|
|
{
|
|
#if !NO_PERF
|
|
var evt = new ManualResetEvent(false);
|
|
var sink = new ForEach<TSource>.ForEachImpl(onNext, () => evt.Set());
|
|
|
|
using (source.SubscribeSafe(sink))
|
|
{
|
|
evt.WaitOne();
|
|
}
|
|
|
|
sink.Error.ThrowIfNotNull();
|
|
#else
|
|
var i = 0;
|
|
ForEach_(source, x => onNext(x, checked(i++)));
|
|
#endif
|
|
}
|
|
|
|
#if NO_PERF
|
|
private static void ForEach_<TSource>(IObservable<TSource> source, Action<TSource> onNext)
|
|
{
|
|
var exception = default(Exception);
|
|
|
|
var evt = new ManualResetEvent(false);
|
|
using (source.Subscribe(
|
|
x =>
|
|
{
|
|
try
|
|
{
|
|
onNext(x);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
exception = ex;
|
|
evt.Set();
|
|
}
|
|
},
|
|
ex =>
|
|
{
|
|
exception = ex;
|
|
evt.Set();
|
|
},
|
|
() => evt.Set()
|
|
))
|
|
{
|
|
evt.WaitOne();
|
|
}
|
|
|
|
if (exception != null)
|
|
exception.Throw();
|
|
}
|
|
#endif
|
|
|
|
#endregion
|
|
|
|
#region + GetEnumerator +
|
|
|
|
public virtual IEnumerator<TSource> GetEnumerator<TSource>(IObservable<TSource> source)
|
|
{
|
|
#if !NO_PERF && !NO_CDS
|
|
var e = new GetEnumerator<TSource>();
|
|
return e.Run(source);
|
|
#else
|
|
var q = new Queue<Notification<TSource>>();
|
|
var s = new Semaphore(0, int.MaxValue);
|
|
return PushToPull(
|
|
source,
|
|
x =>
|
|
{
|
|
lock (q)
|
|
q.Enqueue(x);
|
|
s.Release();
|
|
},
|
|
() =>
|
|
{
|
|
s.WaitOne();
|
|
lock (q)
|
|
return q.Dequeue();
|
|
});
|
|
#endif
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Last
|
|
|
|
public virtual TSource Last<TSource>(IObservable<TSource> source)
|
|
{
|
|
return LastOrDefaultInternal(source, true);
|
|
}
|
|
|
|
public virtual TSource Last<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
|
|
{
|
|
return Last(Where(source, predicate));
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region LastOrDefault
|
|
|
|
public virtual TSource LastOrDefault<TSource>(IObservable<TSource> source)
|
|
{
|
|
return LastOrDefaultInternal(source, false);
|
|
}
|
|
|
|
public virtual TSource LastOrDefault<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
|
|
{
|
|
return LastOrDefault(Where(source, predicate));
|
|
}
|
|
|
|
private static TSource LastOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
|
|
{
|
|
var value = default(TSource);
|
|
var seenValue = false;
|
|
var ex = default(Exception);
|
|
var evt = new ManualResetEvent(false);
|
|
|
|
//
|
|
// [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
|
|
//
|
|
using (source.Subscribe/*Unsafe*/(new AnonymousObserver<TSource>(
|
|
v =>
|
|
{
|
|
seenValue = true;
|
|
value = v;
|
|
},
|
|
e =>
|
|
{
|
|
ex = e;
|
|
evt.Set();
|
|
},
|
|
() =>
|
|
{
|
|
evt.Set();
|
|
})))
|
|
{
|
|
evt.WaitOne();
|
|
}
|
|
|
|
ex.ThrowIfNotNull();
|
|
|
|
if (throwOnEmpty && !seenValue)
|
|
throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
|
|
|
|
return value;
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region + Latest +
|
|
|
|
public virtual IEnumerable<TSource> Latest<TSource>(IObservable<TSource> source)
|
|
{
|
|
return new Latest<TSource>(source);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region + MostRecent +
|
|
|
|
public virtual IEnumerable<TSource> MostRecent<TSource>(IObservable<TSource> source, TSource initialValue)
|
|
{
|
|
return new MostRecent<TSource>(source, initialValue);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region + Next +
|
|
|
|
public virtual IEnumerable<TSource> Next<TSource>(IObservable<TSource> source)
|
|
{
|
|
return new Next<TSource>(source);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Single
|
|
|
|
public virtual TSource Single<TSource>(IObservable<TSource> source)
|
|
{
|
|
return SingleOrDefaultInternal(source, true);
|
|
}
|
|
|
|
public virtual TSource Single<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
|
|
{
|
|
return Single(Where(source, predicate));
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region SingleOrDefault
|
|
|
|
public virtual TSource SingleOrDefault<TSource>(IObservable<TSource> source)
|
|
{
|
|
return SingleOrDefaultInternal(source, false);
|
|
}
|
|
|
|
public virtual TSource SingleOrDefault<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)
|
|
{
|
|
return SingleOrDefault(Where(source, predicate));
|
|
}
|
|
|
|
private static TSource SingleOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
|
|
{
|
|
var value = default(TSource);
|
|
var seenValue = false;
|
|
var ex = default(Exception);
|
|
var evt = new ManualResetEvent(false);
|
|
|
|
//
|
|
// [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
|
|
//
|
|
using (source.Subscribe/*Unsafe*/(new AnonymousObserver<TSource>(
|
|
v =>
|
|
{
|
|
if (seenValue)
|
|
{
|
|
ex = new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_ELEMENT);
|
|
evt.Set();
|
|
}
|
|
|
|
value = v;
|
|
seenValue = true;
|
|
},
|
|
e =>
|
|
{
|
|
ex = e;
|
|
evt.Set();
|
|
},
|
|
() =>
|
|
{
|
|
evt.Set();
|
|
})))
|
|
{
|
|
evt.WaitOne();
|
|
}
|
|
|
|
ex.ThrowIfNotNull();
|
|
|
|
if (throwOnEmpty && !seenValue)
|
|
throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
|
|
|
|
return value;
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Wait
|
|
|
|
public virtual TSource Wait<TSource>(IObservable<TSource> source)
|
|
{
|
|
return LastOrDefaultInternal(source, true);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region |> Helpers <|
|
|
|
|
#if NO_CDS || NO_PERF
|
|
private static IEnumerator<TResult> PushToPull<TSource, TResult>(IObservable<TSource> source, Action<Notification<TSource>> push, Func<Notification<TResult>> pull)
|
|
{
|
|
var subscription = new SingleAssignmentDisposable();
|
|
var adapter = new PushPullAdapter<TSource, TResult>(push, pull, subscription.Dispose);
|
|
subscription.Disposable = source.SubscribeSafe(adapter);
|
|
return adapter;
|
|
}
|
|
#endif
|
|
|
|
#endregion
|
|
}
|
|
}
|