Bug 574507 - 'IndexedDB: Fix transaction queue logic to prevent starving transactions across multiple objectStores'. r=sicking

This commit is contained in:
Ben Turner 2010-07-12 10:05:01 -04:00
parent 46309e70df
commit 885a8faedd
3 changed files with 148 additions and 150 deletions

View File

@ -61,22 +61,36 @@ const PRUint32 kIdleThreadTimeoutMs = 30000;
TransactionThreadPool* gInstance = nsnull;
bool gShutdown = false;
inline
nsresult
CheckOverlapAndMergeObjectStores(nsTArray<nsString>& aLockedStores,
const nsTArray<nsString>& aObjectStores,
bool aShouldMerge,
bool* aStoresOverlap)
{
PRUint32 length = aObjectStores.Length();
bool overlap = false;
for (PRUint32 index = 0; index < length; index++) {
const nsString& storeName = aObjectStores[index];
if (aLockedStores.Contains(storeName)) {
overlap = true;
}
else if (aShouldMerge && !aLockedStores.AppendElement(storeName)) {
NS_WARNING("Out of memory!");
return NS_ERROR_OUT_OF_MEMORY;
}
}
*aStoresOverlap = overlap;
return NS_OK;
}
} // anonymous namespace
BEGIN_INDEXEDDB_NAMESPACE
struct QueuedDispatchInfo
{
QueuedDispatchInfo()
: finish(false)
{ }
nsRefPtr<IDBTransaction> transaction;
nsCOMPtr<nsIRunnable> runnable;
nsCOMPtr<nsIRunnable> finishRunnable;
bool finish;
};
class FinishTransactionRunnable : public nsIRunnable
{
public:
@ -240,27 +254,34 @@ TransactionThreadPool::FinishTransaction(IDBTransaction* aTransaction)
"Didn't find the transaction we were looking for!");
}
// We need to rebuild the locked object store list if dbTransactionInfo is
// still alive.
if (count > 1) {
dbTransactionInfo->storesWriting.Clear();
dbTransactionInfo->storesReading.Clear();
}
// Try to dispatch all the queued transactions again.
nsTArray<QueuedDispatchInfo> queuedDispatch;
queuedDispatch.SwapElements(mDelayedDispatchQueue);
count = queuedDispatch.Length();
for (PRUint32 index = 0; index < count; index++) {
QueuedDispatchInfo& info = queuedDispatch[index];
if (NS_FAILED(Dispatch(info.transaction, info.runnable, info.finish,
info.finishRunnable))) {
if (NS_FAILED(Dispatch(queuedDispatch[index]))) {
NS_WARNING("Dispatch failed!");
}
}
}
bool
nsresult
TransactionThreadPool::TransactionCanRun(IDBTransaction* aTransaction,
TransactionQueue** aQueue)
bool* aCanRun,
TransactionQueue** aExistingQueue)
{
NS_ASSERTION(NS_IsMainThread(), "Wrong thread!");
NS_ASSERTION(aTransaction, "Null pointer!");
NS_ASSERTION(aQueue, "Null pointer!");
NS_ASSERTION(aCanRun, "Null pointer!");
NS_ASSERTION(aExistingQueue, "Null pointer!");
const PRUint32 databaseId = aTransaction->mDatabase->Id();
const nsTArray<nsString>& objectStoreNames = aTransaction->mObjectStoreNames;
@ -270,123 +291,62 @@ TransactionThreadPool::TransactionCanRun(IDBTransaction* aTransaction,
DatabaseTransactionInfo* dbTransactionInfo;
if (!mTransactionsInProgress.Get(databaseId, &dbTransactionInfo)) {
// First transaction for this database, fine to run.
*aQueue = nsnull;
return true;
*aCanRun = true;
*aExistingQueue = nsnull;
return NS_OK;
}
nsTArray<TransactionInfo>& transactionsInProgress =
dbTransactionInfo->transactions;
PRUint32 transactionCount = transactionsInProgress.Length();
NS_ASSERTION(transactionCount, "Should never be 0!");
if (mode == IDBTransaction::FULL_LOCK) {
switch (transactionCount) {
case 0: {
*aQueue = nsnull;
return true;
}
case 1: {
if (transactionsInProgress[0].transaction == aTransaction) {
*aQueue = transactionsInProgress[0].queue;
return true;
}
return false;
}
default: {
dbTransactionInfo->lockPending = true;
return false;
}
}
}
bool locked = dbTransactionInfo->locked || dbTransactionInfo->lockPending;
bool mayRun = true;
// Another transaction for this database is already running. See if we can
// run at the same time.
for (PRUint32 transactionIndex = 0;
transactionIndex < transactionCount;
transactionIndex++) {
TransactionInfo& transactionInfo = transactionsInProgress[transactionIndex];
if (transactionInfo.transaction == aTransaction) {
// Same transaction, already running.
*aQueue = transactionInfo.queue;
return true;
}
if (locked) {
// Full lock in place or requested, no need to check objectStores.
continue;
}
// Not our transaction. See if the objectStores overlap.
nsTArray<TransactionObjectStoreInfo>& objectStoreInfoArray =
transactionInfo.objectStoreInfo;
PRUint32 objectStoreCount = objectStoreInfoArray.Length();
for (PRUint32 objectStoreIndex = 0;
objectStoreIndex < objectStoreCount;
objectStoreIndex++) {
TransactionObjectStoreInfo& objectStoreInfo =
objectStoreInfoArray[objectStoreIndex];
if (objectStoreNames.Contains(objectStoreInfo.objectStoreName)) {
// Overlapping name, see if the modes are compatible.
switch (mode) {
case nsIIDBTransaction::READ_WRITE: {
// Someone else is reading or writing to this table, we can't
// run now. Mark that we're waiting for it though.
objectStoreInfo.writerWaiting = true;
// We need to mark all overlapping transactions, not just the first
// one we find. Set the retval to false here but don't return until
// we've gone through the rest of the open transactions.
mayRun = false;
} break;
case nsIIDBTransaction::READ_ONLY: {
if (objectStoreInfo.writing || objectStoreInfo.writerWaiting) {
// Someone else is writing to this table, we can't run now.
return false;
}
} break;
case nsIIDBTransaction::SNAPSHOT_READ: {
NS_NOTYETIMPLEMENTED("Not implemented!");
} break;
default: {
NS_NOTREACHED("Should never get here!");
for (PRUint32 index = 0; index < transactionCount; index++) {
// See if this transaction is in out list of current transactions.
const TransactionInfo& info = transactionsInProgress[index];
if (info.transaction == aTransaction) {
*aCanRun = true;
*aExistingQueue = info.queue;
return NS_OK;
}
}
if (dbTransactionInfo->locked || dbTransactionInfo->lockPending) {
*aCanRun = false;
*aExistingQueue = nsnull;
return NS_OK;
}
// Continue on to the next TransactionObjectStoreInfo.
bool writeOverlap;
nsresult rv =
CheckOverlapAndMergeObjectStores(dbTransactionInfo->storesWriting,
objectStoreNames,
mode == nsIIDBTransaction::READ_WRITE,
&writeOverlap);
NS_ENSURE_SUCCESS(rv, rv);
bool readOverlap;
rv = CheckOverlapAndMergeObjectStores(dbTransactionInfo->storesReading,
objectStoreNames,
mode == nsIIDBTransaction::READ_ONLY,
&readOverlap);
NS_ENSURE_SUCCESS(rv, rv);
if (writeOverlap ||
(readOverlap && mode == nsIIDBTransaction::READ_WRITE)) {
*aCanRun = false;
*aExistingQueue = nsnull;
return NS_OK;
}
// Continue on to the next TransactionInfo.
}
if (locked) {
// If we got here then this is a new transaction and we won't allow it to
// start.
return false;
}
if (!mayRun) {
// If we got here then there are conflicting transactions and we can't run
// yet.
return false;
}
// If we got here then there are no conflicting transactions and we should
// be fine to run.
*aQueue = nsnull;
return true;
*aCanRun = true;
*aExistingQueue = nsnull;
return NS_OK;
}
nsresult
@ -399,8 +359,12 @@ TransactionThreadPool::Dispatch(IDBTransaction* aTransaction,
NS_ASSERTION(aTransaction, "Null pointer!");
NS_ASSERTION(aRunnable, "Null pointer!");
bool canRun;
TransactionQueue* existingQueue;
if (!TransactionCanRun(aTransaction, &existingQueue)) {
nsresult rv = TransactionCanRun(aTransaction, &canRun, &existingQueue);
NS_ENSURE_SUCCESS(rv, rv);
if (!canRun) {
QueuedDispatchInfo* info = mDelayedDispatchQueue.AppendElement();
NS_ENSURE_TRUE(info, NS_ERROR_OUT_OF_MEMORY);
@ -443,6 +407,18 @@ TransactionThreadPool::Dispatch(IDBTransaction* aTransaction,
dbTransactionInfo->locked = true;
}
const nsTArray<nsString>& objectStoreNames = aTransaction->mObjectStoreNames;
nsTArray<nsString>& storesInUse =
aTransaction->mMode == nsIIDBTransaction::READ_WRITE ?
dbTransactionInfo->storesWriting :
dbTransactionInfo->storesReading;
if (!storesInUse.AppendElements(objectStoreNames)) {
NS_WARNING("Out of memory!");
return NS_ERROR_OUT_OF_MEMORY;
}
nsTArray<TransactionInfo>& transactionInfoArray =
dbTransactionInfo->transactions;
@ -456,21 +432,15 @@ TransactionThreadPool::Dispatch(IDBTransaction* aTransaction,
}
transactionInfo->mode = aTransaction->mMode;
const nsTArray<nsString>& objectStoreNames = aTransaction->mObjectStoreNames;
PRUint32 count = objectStoreNames.Length();
for (PRUint32 index = 0; index < count; index++) {
TransactionObjectStoreInfo* info =
transactionInfo->objectStoreInfo.AppendElement();
NS_ENSURE_TRUE(info, NS_ERROR_OUT_OF_MEMORY);
info->objectStoreName = objectStoreNames[index];
info->writing = transactionInfo->mode == nsIIDBTransaction::READ_WRITE;
if (!transactionInfo->objectStoreNames.AppendElements(objectStoreNames)) {
NS_WARNING("Out of memory!");
return NS_ERROR_OUT_OF_MEMORY;
}
if (autoDBTransactionInfo) {
if (!mTransactionsInProgress.Put(databaseId, autoDBTransactionInfo)) {
NS_ERROR("Failed to put!");
return NS_ERROR_FAILURE;
NS_WARNING("Failed to put!");
return NS_ERROR_OUT_OF_MEMORY;
}
autoDBTransactionInfo.forget();
}

View File

@ -101,17 +101,6 @@ protected:
bool mShouldFinish;
};
struct TransactionObjectStoreInfo
{
TransactionObjectStoreInfo()
: writing(false), writerWaiting(false)
{ }
nsString objectStoreName;
bool writing;
bool writerWaiting;
};
struct TransactionInfo
{
TransactionInfo()
@ -120,7 +109,7 @@ protected:
nsRefPtr<IDBTransaction> transaction;
nsRefPtr<TransactionQueue> queue;
nsTArray<TransactionObjectStoreInfo> objectStoreInfo;
nsTArray<nsString> objectStoreNames;
PRUint16 mode;
};
@ -133,6 +122,20 @@ protected:
bool locked;
bool lockPending;
nsTArray<TransactionInfo> transactions;
nsTArray<nsString> storesReading;
nsTArray<nsString> storesWriting;
};
struct QueuedDispatchInfo
{
QueuedDispatchInfo()
: finish(false)
{ }
nsRefPtr<IDBTransaction> transaction;
nsCOMPtr<nsIRunnable> runnable;
nsCOMPtr<nsIRunnable> finishRunnable;
bool finish;
};
TransactionThreadPool();
@ -143,8 +146,15 @@ protected:
void FinishTransaction(IDBTransaction* aTransaction);
bool TransactionCanRun(IDBTransaction* aTransaction,
TransactionQueue** aQueue);
nsresult TransactionCanRun(IDBTransaction* aTransaction,
bool* aCanRun,
TransactionQueue** aExistingQueue);
nsresult Dispatch(const QueuedDispatchInfo& aInfo)
{
return Dispatch(aInfo.transaction, aInfo.runnable, aInfo.finish,
aInfo.finishRunnable);
}
nsCOMPtr<nsIThreadPool> mThreadPool;

View File

@ -49,7 +49,16 @@
.add({});
request.onerror = errorHandler;
request.onsuccess = function(event) {
ok(stepNumber == 1 || stepNumber == 2, "This callback came before 3");
is(stepNumber, 1, "This callback came first");
stepNumber++;
}
request = db.transaction(["foo"], READ_WRITE)
.objectStore("foo")
.add({});
request.onerror = errorHandler;
request.onsuccess = function(event) {
is(stepNumber, 2, "This callback came second");
stepNumber++;
}
@ -58,9 +67,17 @@
.add({});
request.onerror = errorHandler;
request.onsuccess = function(event) {
is(stepNumber, 3, "This callback came in at 3");
is(stepNumber, 3, "This callback came third");
stepNumber++;
}
request = db.transaction(["foo", "bar"], READ_WRITE)
.objectStore("bar")
.add({});
request.onerror = errorHandler;
request.onsuccess = function(event) {
is(stepNumber, 4, "This callback came fourth");
stepNumber++;
event.transaction.oncomplete = grabEventAndContinueHandler;
}
request = db.transaction(["bar"], READ_WRITE)
@ -68,14 +85,15 @@
.add({});
request.onerror = errorHandler;
request.onsuccess = function(event) {
ok(stepNumber == 1 || stepNumber == 2, "This callback came before 3");
is(stepNumber, 5, "This callback came fifth");
stepNumber++;
event.transaction.oncomplete = grabEventAndContinueHandler;
}
stepNumber++;
yield;
is(stepNumber, 4, "All callbacks received");
is(stepNumber, 6, "All callbacks received");
}
finishTest();