// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using System.Threading; namespace System.Linq { public static partial class AsyncEnumerable { public static Task Aggregate(this IAsyncEnumerable source, TAccumulate seed, Func accumulator, Func resultSelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (accumulator == null) throw new ArgumentNullException("accumulator"); if (resultSelector == null) throw new ArgumentNullException("resultSelector"); var tcs = new TaskCompletionSource(); var acc = seed; var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { try { acc = accumulator(acc, e.Current); f(ct); } catch (Exception exception) { tcs.TrySetException(exception); } } else { var result = default(TResult); try { result = resultSelector(acc); } catch (Exception exception) { tcs.TrySetException(exception); return; } tcs.TrySetResult(result); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Aggregate(this IAsyncEnumerable source, TAccumulate seed, Func accumulator, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (accumulator == null) throw new ArgumentNullException("accumulator"); return source.Aggregate(seed, accumulator, x => x, cancellationToken); } public static Task Aggregate(this IAsyncEnumerable source, Func accumulator, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (accumulator == null) throw new ArgumentNullException("accumulator"); var tcs = new TaskCompletionSource(); var first = true; var acc = default(TSource); var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { try { if (first) acc = e.Current; else acc = accumulator(acc, e.Current); f(ct); } catch (Exception ex) { tcs.TrySetException(ex); } first = false; } else { if (first) tcs.TrySetException(new InvalidOperationException()); else tcs.TrySetResult(acc); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Count(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(0, (c, _) => c + 1, cancellationToken); } public static Task Count(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (predicate == null) throw new ArgumentNullException("predicate"); return source.Where(predicate).Aggregate(0, (c, _) => c + 1, cancellationToken); } public static Task LongCount(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(0L, (c, _) => c + 1, cancellationToken); } public static Task LongCount(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (predicate == null) throw new ArgumentNullException("predicate"); return source.Where(predicate).Aggregate(0L, (c, _) => c + 1, cancellationToken); } public static Task All(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (predicate == null) throw new ArgumentNullException("predicate"); var tcs = new TaskCompletionSource(); var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { try { if (!predicate(e.Current)) tcs.TrySetResult(false); else f(ct); } catch (Exception ex) { tcs.TrySetException(ex); } } else { tcs.TrySetResult(true); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Any(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (predicate == null) throw new ArgumentNullException("predicate"); var tcs = new TaskCompletionSource(); var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { try { if (predicate(e.Current)) tcs.TrySetResult(true); else f(ct); } catch (Exception ex) { tcs.TrySetException(ex); } } else { tcs.TrySetResult(false); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Any(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var e = source.GetEnumerator(); return e.MoveNext(cancellationToken); } public static Task Contains(this IAsyncEnumerable source, TSource value, IEqualityComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (comparer == null) throw new ArgumentNullException("comparer"); return source.Any(x => comparer.Equals(x, value), cancellationToken); } public static Task Contains(this IAsyncEnumerable source, TSource value, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Contains(value, EqualityComparer.Default, cancellationToken); } public static Task First(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var e = source.GetEnumerator(); e.MoveNext(cancellationToken).ContinueWith(t => { t.Handle(tcs, res => { if (res) tcs.TrySetResult(e.Current); else tcs.TrySetException(new InvalidOperationException()); }); }); return tcs.Task.Finally(e.Dispose); } public static Task First(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (predicate == null) throw new ArgumentNullException("predicate"); return source.Where(predicate).First(cancellationToken); } public static Task FirstOrDefault(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var e = source.GetEnumerator(); e.MoveNext(cancellationToken).ContinueWith(t => { t.Handle(tcs, res => { if (res) tcs.TrySetResult(e.Current); else tcs.TrySetResult(default(TSource)); }); }); return tcs.Task.Finally(e.Dispose); } public static Task FirstOrDefault(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (predicate == null) throw new ArgumentNullException("predicate"); return source.Where(predicate).FirstOrDefault(cancellationToken); } public static Task Last(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var e = source.GetEnumerator(); var last = default(TSource); var hasLast = false; var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { hasLast = true; last = e.Current; f(ct); } else { if (!hasLast) tcs.TrySetException(new InvalidOperationException()); else tcs.TrySetResult(last); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Last(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (predicate == null) throw new ArgumentNullException("predicate"); return source.Where(predicate).Last(cancellationToken); } public static Task LastOrDefault(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var e = source.GetEnumerator(); var last = default(TSource); var hasLast = false; var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { hasLast = true; last = e.Current; f(ct); } else { if (!hasLast) tcs.TrySetResult(default(TSource)); else tcs.TrySetResult(last); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task LastOrDefault(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (predicate == null) throw new ArgumentNullException("predicate"); return source.Where(predicate).LastOrDefault(cancellationToken); } public static Task Single(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var e = source.GetEnumerator(); e.MoveNext(cancellationToken).ContinueWith(t => { t.Handle(tcs, res => { if (res) { var result = e.Current; e.MoveNext(cancellationToken).ContinueWith(t1 => { t1.Handle(tcs, res1 => { if (res1) tcs.TrySetException(new InvalidOperationException()); else tcs.TrySetResult(result); }); }); } else tcs.TrySetException(new InvalidOperationException()); }); }); return tcs.Task.Finally(e.Dispose); } public static Task Single(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (predicate == null) throw new ArgumentNullException("predicate"); return source.Where(predicate).Single(cancellationToken); } public static Task SingleOrDefault(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var e = source.GetEnumerator(); e.MoveNext(cancellationToken).ContinueWith(t => { t.Handle(tcs, res => { if (res) { var result = e.Current; e.MoveNext(cancellationToken).ContinueWith(t1 => { t1.Handle(tcs, res1 => { if (res1) tcs.TrySetException(new InvalidOperationException()); else tcs.TrySetResult(result); }); }); } else tcs.TrySetResult(default(TSource)); }); }); return tcs.Task.Finally(e.Dispose); } public static Task SingleOrDefault(this IAsyncEnumerable source, Func predicate, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (predicate == null) throw new ArgumentNullException("predicate"); return source.Where(predicate).SingleOrDefault(cancellationToken); } public static Task ElementAt(this IAsyncEnumerable source, int index, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (index < 0) throw new ArgumentOutOfRangeException("index"); var tcs = new TaskCompletionSource(); var e = source.GetEnumerator(); var next = default(Action); next = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { if (index == 0) { tcs.TrySetResult(e.Current); } else { index--; next(ct); } } else { tcs.TrySetException(new ArgumentOutOfRangeException()); } }); }); next(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task ElementAtOrDefault(this IAsyncEnumerable source, int index, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (index < 0) throw new ArgumentOutOfRangeException("index"); var tcs = new TaskCompletionSource(); var e = source.GetEnumerator(); var next = default(Action); next = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { if (index == 0) { tcs.TrySetResult(e.Current); } else { index--; next(ct); } } else { tcs.TrySetResult(default(TSource)); } }); }); next(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task ToArray(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(new List(), (list, x) => { list.Add(x); return list; }, list => list.ToArray(), cancellationToken); } public static Task> ToList(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(new List(), (list, x) => { list.Add(x); return list; }, cancellationToken); } public static Task> ToDictionary(this IAsyncEnumerable source, Func keySelector, Func elementSelector, IEqualityComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); if (elementSelector == null) throw new ArgumentNullException("elementSelector"); if (comparer == null) throw new ArgumentNullException("comparer"); return source.Aggregate(new Dictionary(comparer), (d, x) => { d.Add(keySelector(x), elementSelector(x)); return d; }, cancellationToken); } public static Task> ToDictionary(this IAsyncEnumerable source, Func keySelector, Func elementSelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); if (elementSelector == null) throw new ArgumentNullException("elementSelector"); return source.ToDictionary(keySelector, elementSelector, EqualityComparer.Default, cancellationToken); } public static Task> ToDictionary(this IAsyncEnumerable source, Func keySelector, IEqualityComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); if (comparer == null) throw new ArgumentNullException("comparer"); return source.ToDictionary(keySelector, x => x, comparer, cancellationToken); } public static Task> ToDictionary(this IAsyncEnumerable source, Func keySelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); return source.ToDictionary(keySelector, x => x, EqualityComparer.Default, cancellationToken); } public static Task> ToLookup(this IAsyncEnumerable source, Func keySelector, Func elementSelector, IEqualityComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); if (elementSelector == null) throw new ArgumentNullException("elementSelector"); if (comparer == null) throw new ArgumentNullException("comparer"); return source.Aggregate(new Lookup(comparer), (lookup, x) => { lookup.Add(keySelector(x), elementSelector(x)); return lookup; }, lookup => (ILookup)lookup, cancellationToken); } public static Task> ToLookup(this IAsyncEnumerable source, Func keySelector, Func elementSelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); if (elementSelector == null) throw new ArgumentNullException("elementSelector"); return source.ToLookup(keySelector, elementSelector, EqualityComparer.Default, cancellationToken); } public static Task> ToLookup(this IAsyncEnumerable source, Func keySelector, IEqualityComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); if (comparer == null) throw new ArgumentNullException("comparer"); return source.ToLookup(keySelector, x => x, comparer, cancellationToken); } public static Task> ToLookup(this IAsyncEnumerable source, Func keySelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); return source.ToLookup(keySelector, x => x, EqualityComparer.Default, cancellationToken); } class Lookup : ILookup { private readonly Dictionary> map; public Lookup(IEqualityComparer comparer) { map = new Dictionary>(comparer); } public void Add(TKey key, TElement element) { var g = default(EnumerableGrouping); if (!map.TryGetValue(key, out g)) { g = new EnumerableGrouping(key); map.Add(key, g); } g.Add(element); } public bool Contains(TKey key) { return map.ContainsKey(key); } public int Count { get { return map.Keys.Count; } } public IEnumerable this[TKey key] { get { return map[key]; } } public IEnumerator> GetEnumerator() { return map.Values.Cast>().GetEnumerator(); } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return GetEnumerator(); } } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var count = 0L; var sum = 0.0; var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { count++; sum += e.Current; f(ct); } else { if (count == 0) tcs.TrySetException(new InvalidOperationException()); else tcs.TrySetResult(sum / count); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var count = 0L; var sum = 0.0; var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { if (e.Current.HasValue) { count++; sum += e.Current.Value; } f(ct); } else { if (count == 0) tcs.TrySetResult(null); else tcs.TrySetResult(sum / count); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var count = 0L; var sum = 0.0; var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { count++; sum += e.Current; f(ct); } else { if (count == 0) tcs.TrySetException(new InvalidOperationException()); else tcs.TrySetResult(sum / count); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var count = 0L; var sum = 0.0; var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { if (e.Current.HasValue) { count++; sum += e.Current.Value; } f(ct); } else { if (count == 0) tcs.TrySetResult(null); else tcs.TrySetResult(sum / count); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var count = 0L; var sum = 0.0; var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { count++; sum += e.Current; f(ct); } else { if (count == 0) tcs.TrySetException(new InvalidOperationException()); else tcs.TrySetResult(sum / count); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var count = 0L; var sum = 0.0; var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { if (e.Current.HasValue) { count++; sum += e.Current.Value; } f(ct); } else { if (count == 0) tcs.TrySetResult(null); else tcs.TrySetResult(sum / count); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var count = 0L; var sum = 0f; var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { count++; sum += e.Current; f(ct); } else { if (count == 0) tcs.TrySetException(new InvalidOperationException()); else tcs.TrySetResult(sum / count); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var count = 0L; var sum = 0f; var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { if (e.Current.HasValue) { count++; sum += e.Current.Value; } f(ct); } else { if (count == 0) tcs.TrySetResult(null); else tcs.TrySetResult(sum / count); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var count = 0L; var sum = 0m; var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { count++; sum += e.Current; f(ct); } else { if (count == 0) tcs.TrySetException(new InvalidOperationException()); else tcs.TrySetResult(sum / count); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Average(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var tcs = new TaskCompletionSource(); var count = 0L; var sum = 0m; var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (res) { if (e.Current.HasValue) { count++; sum += e.Current.Value; } f(ct); } else { if (count == 0) tcs.TrySetResult(null); else tcs.TrySetResult(sum / count); } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Average(cancellationToken); } public static Task Average(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Average(cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(Math.Max, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(Math.Max, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(Math.Max, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(Math.Max, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(Math.Max, cancellationToken); } static T? NullableMax(T? x, T? y) where T : struct, IComparable { if (!x.HasValue) return y; if (!y.HasValue) return x; if (x.Value.CompareTo(y.Value) >= 0) return x; return y; } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(default(int?), NullableMax, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(default(long?), NullableMax, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(default(double?), NullableMax, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(default(float?), NullableMax, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(default(decimal?), NullableMax, cancellationToken); } public static Task Max(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var comparer = Comparer.Default; return source.Aggregate((x, y) => comparer.Compare(x, y) >= 0 ? x : y, cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Max(cancellationToken); } public static Task Max(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Max(cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(Math.Min, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(Math.Min, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(Math.Min, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(Math.Min, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(Math.Min, cancellationToken); } static T? NullableMin(T? x, T? y) where T : struct, IComparable { if (!x.HasValue) return y; if (!y.HasValue) return x; if (x.Value.CompareTo(y.Value) <= 0) return x; return y; } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(default(int?), NullableMin, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(default(long?), NullableMin, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(default(double?), NullableMin, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(default(float?), NullableMin, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(default(decimal?), NullableMin, cancellationToken); } public static Task Min(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); var comparer = Comparer.Default; return source.Aggregate((x, y) => comparer.Compare(x, y) <= 0 ? x : y, cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Min(cancellationToken); } public static Task Min(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Min(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(0, (x, y) => x + y, cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(0L, (x, y) => x + y, cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(0.0, (x, y) => x + y, cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(0f, (x, y) => x + y, cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate(0m, (x, y) => x + y, cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate((int?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate((long?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate((double?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate((float?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken); } public static Task Sum(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Aggregate((decimal?)0, (x, y) => x + y.GetValueOrDefault(), cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Sum(cancellationToken); } public static Task Sum(this IAsyncEnumerable source, Func selector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (selector == null) throw new ArgumentNullException("selector"); return source.Select(selector).Sum(cancellationToken); } public static Task IsEmpty(this IAsyncEnumerable source, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); return source.Any(cancellationToken).ContinueWith(t => !t.Result); } public static Task Min(this IAsyncEnumerable source, IComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (comparer == null) throw new ArgumentNullException("comparer"); return MinBy(source, x => x, comparer, cancellationToken).ContinueWith(t => t.Result.First()); } public static Task> MinBy(this IAsyncEnumerable source, Func keySelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); return MinBy(source, keySelector, Comparer.Default, cancellationToken); } public static Task> MinBy(this IAsyncEnumerable source, Func keySelector, IComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); if (comparer == null) throw new ArgumentNullException("comparer"); return ExtremaBy(source, keySelector, (key, minValue) => -comparer.Compare(key, minValue), cancellationToken); } public static Task Max(this IAsyncEnumerable source, IComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (comparer == null) throw new ArgumentNullException("comparer"); return MaxBy(source, x => x, comparer, cancellationToken).ContinueWith(t => t.Result.First()); } public static Task> MaxBy(this IAsyncEnumerable source, Func keySelector, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); return MaxBy(source, keySelector, Comparer.Default, cancellationToken); } public static Task> MaxBy(this IAsyncEnumerable source, Func keySelector, IComparer comparer, CancellationToken cancellationToken) { if (source == null) throw new ArgumentNullException("source"); if (keySelector == null) throw new ArgumentNullException("keySelector"); if (comparer == null) throw new ArgumentNullException("comparer"); return ExtremaBy(source, keySelector, (key, minValue) => comparer.Compare(key, minValue), cancellationToken); } private static Task> ExtremaBy(IAsyncEnumerable source, Func keySelector, Func compare, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource>(); var result = new List(); var hasFirst = false; var current = default(TSource); var resKey = default(TKey); var e = source.GetEnumerator(); var f = default(Action); f = ct => e.MoveNext(ct).ContinueWith(t => { t.Handle(tcs, res => { if (!hasFirst) { if (!res) { tcs.TrySetException(new InvalidOperationException("Source sequence doesn't contain any elements.")); return; } current = e.Current; try { resKey = keySelector(current); } catch (Exception ex) { tcs.TrySetException(ex); return; } result.Add(current); hasFirst = true; f(ct); } else { if (res) { var key = default(TKey); var cmp = default(int); try { current = e.Current; key = keySelector(current); cmp = compare(key, resKey); } catch (Exception ex) { tcs.TrySetException(ex); return; } if (cmp == 0) { result.Add(current); } else if (cmp > 0) { result = new List { current }; resKey = key; } f(ct); } else { tcs.TrySetResult(result); } } }); }); f(cancellationToken); return tcs.Task.Finally(e.Dispose); } } }