Files
acceptance-tests
data
debian
docs
eglib
external
Lucene.Net.Light
Newtonsoft.Json
aspnetwebstack
binary-reference-assemblies
cecil
debian-snapshot
ikdasm
ikvm
referencesource
rx
Ix
Rx
NET
Resources
Samples
Source
.nuget
Microsoft.Reactive.Testing
Playground
References
Rx_Xamarin
System.Reactive.Core
System.Reactive.Debugger
System.Reactive.Experimental
System.Reactive.Interfaces
System.Reactive.Linq
Properties
Reactive
Concurrency
Internal
Joins
Linq
Observable
GroupedObservable.cs
IQueryLanguage.cs
LocalQueryMethodImplementationTypeAttribute.cs
Observable.Aggregates.cs.REMOVED.git-id
Observable.Async.cs.REMOVED.git-id
Observable.Awaiter.cs
Observable.Binding.cs
Observable.Blocking.cs
Observable.Concurrency.cs
Observable.Conversions.cs
Observable.Creation.cs
Observable.Events.cs
Observable.Imperative.cs
Observable.Joins.cs
Observable.Multiple.cs.REMOVED.git-id
Observable.Single.cs
Observable.StandardSequenceOperators.cs.REMOVED.git-id
Observable.Time.cs.REMOVED.git-id
Observable_.cs
QueryLanguage.Aggregates.cs
QueryLanguage.Async.cs
QueryLanguage.Awaiter.cs
QueryLanguage.Binding.cs
QueryLanguage.Blocking.cs
QueryLanguage.Concurrency.cs
QueryLanguage.Conversions.cs
QueryLanguage.Creation.cs
QueryLanguage.Events.cs
QueryLanguage.Imperative.cs
QueryLanguage.Joins.cs
QueryLanguage.Multiple.cs
QueryLanguage.Single.cs
QueryLanguage.StandardSequenceOperators.cs
QueryLanguage.Time.cs
QueryLanguage_.cs
Subjects
Threading
EventPattern.cs
EventPatternSource.cs
EventPatternSourceBase.cs
EventSource.cs
Observer.Extensions.cs
TimeInterval.cs
Timestamped.cs
GlobalSuppressions.cs
InternalsVisibleTo.cs
NamespaceDocs.cs
Strings_Linq.Generated.cs
Strings_Linq.resx
System.Reactive.Linq.csproj
System.Reactive.Observable.Aliases
System.Reactive.PlatformServices
System.Reactive.Providers
System.Reactive.Runtime.Remoting
System.Reactive.Windows.Forms
System.Reactive.Windows.Threading
System.Reactive.WindowsRuntime
Tests.System.Reactive
packages
.gitattributes
.gitignore
35MSSharedLib1024.snk
Build.bat
BuildAll.proj
BuildSetup.bat
Clean.bat
Common.targets
Import.targets
Local.testsettings
README.txt
Rx.ruleset
Rx.sln.REMOVED.git-id
Test.ruleset
TraceAndTestImpact.testsettings
license.txt
packages.config
Test
tools
component
xpkg
.gitignore
README-microsoft-original.md
README.md
Rakefile
mono.patch
replacer.sh
ikvm-native
libgc
m4
man
mcs
mono
msvc
po
runtime
samples
scripts
support
tools
COPYING.LIB
ChangeLog.REMOVED.git-id
LICENSE
Makefile.am
Makefile.in
NEWS
README.md
acinclude.m4
aclocal.m4
autogen.sh
code_of_conduct.md
compile
config.guess
config.h.in
config.rpath
config.sub
configure.REMOVED.git-id
configure.ac.REMOVED.git-id
depcomp
install-sh
ltmain.sh.REMOVED.git-id
missing
mkinstalldirs
mono-uninstalled.pc.in
test-driver
winconfig.h
linux-packaging-mono/external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Joins.cs
Jo Shields a575963da9 Imported Upstream version 3.6.0
Former-commit-id: da6be194a6b1221998fc28233f2503bd61dd9d14
2014-08-13 10:39:27 +01:00

85 lines
3.0 KiB
C#

// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Joins;
namespace System.Reactive.Linq
{
internal partial class QueryLanguage
{
#region And
public virtual Pattern<TLeft, TRight> And<TLeft, TRight>(IObservable<TLeft> left, IObservable<TRight> right)
{
return new Pattern<TLeft, TRight>(left, right);
}
#endregion
#region Then
public virtual Plan<TResult> Then<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector)
{
return new Pattern<TSource>(source).Then(selector);
}
#endregion
#region When
public virtual IObservable<TResult> When<TResult>(params Plan<TResult>[] plans)
{
return When((IEnumerable<Plan<TResult>>)plans);
}
public virtual IObservable<TResult> When<TResult>(IEnumerable<Plan<TResult>> plans)
{
return new AnonymousObservable<TResult>(observer =>
{
var externalSubscriptions = new Dictionary<object, IJoinObserver>();
var gate = new object();
var activePlans = new List<ActivePlan>();
var outObserver = Observer.Create<TResult>(observer.OnNext,
exception =>
{
foreach (var po in externalSubscriptions.Values)
{
po.Dispose();
}
observer.OnError(exception);
},
observer.OnCompleted);
try
{
foreach (var plan in plans)
activePlans.Add(plan.Activate(externalSubscriptions, outObserver,
activePlan =>
{
activePlans.Remove(activePlan);
if (activePlans.Count == 0)
outObserver.OnCompleted();
}));
}
catch (Exception e)
{
//
// [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
//
return Throw<TResult>(e).Subscribe/*Unsafe*/(observer);
}
var group = new CompositeDisposable(externalSubscriptions.Values.Count);
foreach (var joinObserver in externalSubscriptions.Values)
{
joinObserver.Subscribe(gate);
group.Add(joinObserver);
}
return group;
});
}
#endregion
}
}