You've already forked linux-packaging-mono
acceptance-tests
data
debian
docs
external
Newtonsoft.Json
api-doc-tools
api-snapshot
aspnetwebstack
binary-reference-assemblies
bockbuild
boringssl
cecil
cecil-legacy
corefx
corert
ikdasm
ikvm
linker
nuget-buildtasks
nunit-lite
roslyn-binaries
rx
Ix
Rx
NET
Resources
Samples
Source
.nuget
Microsoft.Reactive.Testing
Playground
References
Rx_Xamarin
System.Reactive.Core
System.Reactive.Debugger
System.Reactive.Experimental
System.Reactive.Interfaces
System.Reactive.Linq
Properties
Reactive
Concurrency
Internal
Joins
Linq
Observable
GroupedObservable.cs
IQueryLanguage.cs
LocalQueryMethodImplementationTypeAttribute.cs
Observable.Aggregates.cs.REMOVED.git-id
Observable.Async.cs.REMOVED.git-id
Observable.Awaiter.cs
Observable.Binding.cs
Observable.Blocking.cs
Observable.Concurrency.cs
Observable.Conversions.cs
Observable.Creation.cs
Observable.Events.cs
Observable.Imperative.cs
Observable.Joins.cs
Observable.Multiple.cs.REMOVED.git-id
Observable.Single.cs
Observable.StandardSequenceOperators.cs.REMOVED.git-id
Observable.Time.cs.REMOVED.git-id
Observable_.cs
QueryLanguage.Aggregates.cs
QueryLanguage.Async.cs
QueryLanguage.Awaiter.cs
QueryLanguage.Binding.cs
QueryLanguage.Blocking.cs
QueryLanguage.Concurrency.cs
QueryLanguage.Conversions.cs
QueryLanguage.Creation.cs
QueryLanguage.Events.cs
QueryLanguage.Imperative.cs
QueryLanguage.Joins.cs
QueryLanguage.Multiple.cs
QueryLanguage.Single.cs
QueryLanguage.StandardSequenceOperators.cs
QueryLanguage.Time.cs
QueryLanguage_.cs
Subjects
Threading
EventPattern.cs
EventPatternSource.cs
EventPatternSourceBase.cs
EventSource.cs
Observer.Extensions.cs
TimeInterval.cs
Timestamped.cs
GlobalSuppressions.cs
InternalsVisibleTo.cs
NamespaceDocs.cs
Strings_Linq.Generated.cs
Strings_Linq.resx
System.Reactive.Linq.csproj
System.Reactive.Observable.Aliases
System.Reactive.PlatformServices
System.Reactive.Providers
System.Reactive.Runtime.Remoting
System.Reactive.Windows.Forms
System.Reactive.Windows.Threading
System.Reactive.WindowsRuntime
Tests.System.Reactive
packages
.gitattributes
.gitignore
35MSSharedLib1024.snk
Build.bat
BuildAll.proj
BuildSetup.bat
Clean.bat
Common.targets
Import.targets
Local.testsettings
README.txt
Rx.ruleset
Rx.sln.REMOVED.git-id
Test.ruleset
TraceAndTestImpact.testsettings
license.txt
packages.config
Test
tools
component
xpkg
.gitignore
README-microsoft-original.md
README.md
Rakefile
mono.patch
replacer.sh
xunit-binaries
ikvm-native
libgc
llvm
m4
man
mcs
mono
msvc
po
runtime
samples
scripts
support
tools
COPYING.LIB
LICENSE
Makefile.am
Makefile.in
NEWS
README.md
acinclude.m4
aclocal.m4
autogen.sh
code_of_conduct.md
compile
config.guess
config.h.in
config.rpath
config.sub
configure.REMOVED.git-id
configure.ac.REMOVED.git-id
depcomp
install-sh
ltmain.sh.REMOVED.git-id
missing
mkinstalldirs
mono-uninstalled.pc.in
test-driver
winconfig.h
160 lines
4.5 KiB
C#
160 lines
4.5 KiB
C#
![]() |
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
|
|||
|
|
|||
|
using System.Collections.Generic;
|
|||
|
using System.Reactive.Concurrency;
|
|||
|
using System.Reactive.Disposables;
|
|||
|
|
|||
|
namespace System.Reactive.Linq
|
|||
|
{
|
|||
|
#if !NO_PERF
|
|||
|
using ObservableImpl;
|
|||
|
#endif
|
|||
|
|
|||
|
internal partial class QueryLanguage
|
|||
|
{
|
|||
|
#region + Subscribe +
|
|||
|
|
|||
|
public virtual IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer)
|
|||
|
{
|
|||
|
return Subscribe_<TSource>(source, observer, SchedulerDefaults.Iteration);
|
|||
|
}
|
|||
|
|
|||
|
public virtual IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler)
|
|||
|
{
|
|||
|
return Subscribe_<TSource>(source, observer, scheduler);
|
|||
|
}
|
|||
|
|
|||
|
private static IDisposable Subscribe_<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler)
|
|||
|
{
|
|||
|
#if !NO_PERF
|
|||
|
//
|
|||
|
// [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
|
|||
|
//
|
|||
|
return new ToObservable<TSource>(source, scheduler).Subscribe/*Unsafe*/(observer);
|
|||
|
#else
|
|||
|
var e = source.GetEnumerator();
|
|||
|
var flag = new BooleanDisposable();
|
|||
|
|
|||
|
scheduler.Schedule(self =>
|
|||
|
{
|
|||
|
var hasNext = false;
|
|||
|
var ex = default(Exception);
|
|||
|
var current = default(TSource);
|
|||
|
|
|||
|
if (flag.IsDisposed)
|
|||
|
{
|
|||
|
e.Dispose();
|
|||
|
return;
|
|||
|
}
|
|||
|
|
|||
|
try
|
|||
|
{
|
|||
|
hasNext = e.MoveNext();
|
|||
|
if (hasNext)
|
|||
|
current = e.Current;
|
|||
|
}
|
|||
|
catch (Exception exception)
|
|||
|
{
|
|||
|
ex = exception;
|
|||
|
}
|
|||
|
|
|||
|
if (!hasNext || ex != null)
|
|||
|
{
|
|||
|
e.Dispose();
|
|||
|
}
|
|||
|
|
|||
|
if (ex != null)
|
|||
|
{
|
|||
|
observer.OnError(ex);
|
|||
|
return;
|
|||
|
}
|
|||
|
|
|||
|
if (!hasNext)
|
|||
|
{
|
|||
|
observer.OnCompleted();
|
|||
|
return;
|
|||
|
}
|
|||
|
|
|||
|
observer.OnNext(current);
|
|||
|
self();
|
|||
|
});
|
|||
|
|
|||
|
return flag;
|
|||
|
#endif
|
|||
|
}
|
|||
|
|
|||
|
#endregion
|
|||
|
|
|||
|
#region + ToEnumerable +
|
|||
|
|
|||
|
public virtual IEnumerable<TSource> ToEnumerable<TSource>(IObservable<TSource> source)
|
|||
|
{
|
|||
|
return new AnonymousEnumerable<TSource>(() => source.GetEnumerator());
|
|||
|
}
|
|||
|
|
|||
|
#endregion
|
|||
|
|
|||
|
#region ToEvent
|
|||
|
|
|||
|
public virtual IEventSource<Unit> ToEvent(IObservable<Unit> source)
|
|||
|
{
|
|||
|
return new EventSource<Unit>(source, (h, _) => h(Unit.Default));
|
|||
|
}
|
|||
|
|
|||
|
public virtual IEventSource<TSource> ToEvent<TSource>(IObservable<TSource> source)
|
|||
|
{
|
|||
|
return new EventSource<TSource>(source, (h, value) => h(value));
|
|||
|
}
|
|||
|
|
|||
|
#endregion
|
|||
|
|
|||
|
#region ToEventPattern
|
|||
|
|
|||
|
public virtual IEventPatternSource<TEventArgs> ToEventPattern<TEventArgs>(IObservable<EventPattern<TEventArgs>> source)
|
|||
|
#if !NO_EVENTARGS_CONSTRAINT
|
|||
|
where TEventArgs : EventArgs
|
|||
|
#endif
|
|||
|
{
|
|||
|
return new EventPatternSource<TEventArgs>(
|
|||
|
#if !NO_VARIANCE
|
|||
|
source,
|
|||
|
#else
|
|||
|
source.Select(x => (EventPattern<object, TEventArgs>)x),
|
|||
|
#endif
|
|||
|
(h, evt) => h(evt.Sender, evt.EventArgs)
|
|||
|
);
|
|||
|
}
|
|||
|
|
|||
|
#endregion
|
|||
|
|
|||
|
#region + ToObservable +
|
|||
|
|
|||
|
public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source)
|
|||
|
{
|
|||
|
#if !NO_PERF
|
|||
|
return new ToObservable<TSource>(source, SchedulerDefaults.Iteration);
|
|||
|
#else
|
|||
|
return ToObservable_(source, SchedulerDefaults.Iteration);
|
|||
|
#endif
|
|||
|
}
|
|||
|
|
|||
|
public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source, IScheduler scheduler)
|
|||
|
{
|
|||
|
#if !NO_PERF
|
|||
|
return new ToObservable<TSource>(source, scheduler);
|
|||
|
#else
|
|||
|
return ToObservable_(source, scheduler);
|
|||
|
#endif
|
|||
|
}
|
|||
|
|
|||
|
#if NO_PERF
|
|||
|
private static IObservable<TSource> ToObservable_<TSource>(IEnumerable<TSource> source, IScheduler scheduler)
|
|||
|
{
|
|||
|
return new AnonymousObservable<TSource>(observer => source.Subscribe(observer, scheduler));
|
|||
|
}
|
|||
|
#endif
|
|||
|
|
|||
|
#endregion
|
|||
|
}
|
|||
|
}
|