Imported Upstream version 3.12.0

Former-commit-id: cf92446697332992ec36726e78eb8703e1f259d7
This commit is contained in:
Jo Shields
2015-01-13 10:44:36 +00:00
parent 8b9b85e7f5
commit 181b81b4a4
659 changed files with 12743 additions and 16300 deletions

View File

@ -124,8 +124,12 @@ namespace System.Collections.Concurrent
int cachedRemoveId = removeId;
int itemsIn = cachedAddId - cachedRemoveId;
// Check our transaction id against completed stored one
if (isComplete.Value && cachedAddId >= completeId)
ThrowCompleteException ();
// If needed, we check and wait that the collection isn't full
if (upperBound != -1 && itemsIn > upperBound) {
if (upperBound != -1 && itemsIn >= upperBound) {
if (millisecondsTimeout == 0)
return false;
@ -144,10 +148,6 @@ namespace System.Collections.Concurrent
continue;
}
// Check our transaction id against completed stored one
if (isComplete.Value && cachedAddId >= completeId)
ThrowCompleteException ();
// Validate the steps we have been doing until now
if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
continue;
@ -291,30 +291,28 @@ namespace System.Collections.Concurrent
public static int AddToAny (BlockingCollection<T>[] collections, T item)
{
CheckArray (collections);
int index = 0;
foreach (var coll in collections) {
try {
coll.Add (item);
return index;
} catch {}
index++;
}
return -1;
return AddToAny (collections, item, CancellationToken.None);
}
public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)
{
CheckArray (collections);
int index = 0;
foreach (var coll in collections) {
try {
coll.Add (item, cancellationToken);
return index;
} catch {}
index++;
WaitHandle[] wait_table = null;
while (true) {
for (int i = 0; i < collections.Length; ++i) {
if (collections [i].TryAdd (item))
return i;
}
cancellationToken.ThrowIfCancellationRequested ();
if (wait_table == null) {
wait_table = new WaitHandle [collections.Length + 1];
for (int i = 0; i < collections.Length; ++i)
wait_table [i] = collections [i].mreAdd.WaitHandle;
wait_table [collections.Length] = cancellationToken.WaitHandle;
}
WaitHandle.WaitAny (wait_table);
cancellationToken.ThrowIfCancellationRequested ();
}
return -1;
}
public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
@ -368,21 +366,7 @@ namespace System.Collections.Concurrent
public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
{
item = default (T);
CheckArray (collections);
WaitHandle[] wait_table = null;
while (true) {
for (int i = 0; i < collections.Length; ++i) {
if (collections [i].TryTake (out item))
return i;
}
if (wait_table == null) {
wait_table = new WaitHandle [collections.Length];
for (int i = 0; i < collections.Length; ++i)
wait_table [i] = collections [i].mreRemove.WaitHandle;
}
WaitHandle.WaitAny (wait_table);
}
return TakeFromAny (collections, out item, CancellationToken.None);
}
public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken)