Bug 776800: Keep track of transaction dependencies explicitly to improve performance. r=bent

This commit is contained in:
Kyle Huey 2013-02-05 17:01:07 +00:00
parent 8be677bc40
commit f48dd5d0a9
2 changed files with 246 additions and 271 deletions

View File

@ -27,32 +27,6 @@ const uint32_t kIdleThreadTimeoutMs = 30000;
TransactionThreadPool* gInstance = nullptr;
bool gShutdown = false;
inline
nsresult
CheckOverlapAndMergeObjectStores(nsTArray<nsString>& aLockedStores,
const nsTArray<nsString>& aObjectStores,
bool aShouldMerge,
bool* aStoresOverlap)
{
uint32_t length = aObjectStores.Length();
bool overlap = false;
for (uint32_t index = 0; index < length; index++) {
const nsString& storeName = aObjectStores[index];
if (aLockedStores.Contains(storeName)) {
overlap = true;
}
else if (aShouldMerge && !aLockedStores.AppendElement(storeName)) {
NS_WARNING("Out of memory!");
return NS_ERROR_OUT_OF_MEMORY;
}
}
*aStoresOverlap = overlap;
return NS_OK;
}
} // anonymous namespace
BEGIN_INDEXEDDB_NAMESPACE
@ -180,6 +154,28 @@ TransactionThreadPool::Cleanup()
return NS_OK;
}
// static
PLDHashOperator
TransactionThreadPool::MaybeUnblockTransaction(nsPtrHashKey<TransactionInfo>* aKey,
void* aUserArg)
{
NS_ASSERTION(NS_IsMainThread(), "Wrong thread!");
TransactionInfo* maybeUnblockedInfo = aKey->GetKey();
TransactionInfo* finishedInfo = static_cast<TransactionInfo*>(aUserArg);
NS_ASSERTION(maybeUnblockedInfo->blockedOn.Contains(finishedInfo),
"Huh?");
maybeUnblockedInfo->blockedOn.RemoveEntry(finishedInfo);
if (!maybeUnblockedInfo->blockedOn.Count() &&
!maybeUnblockedInfo->transaction->IsAborted()) {
// Let this transaction run.
maybeUnblockedInfo->queue->Unblock();
}
return PL_DHASH_NEXT;
}
void
TransactionThreadPool::FinishTransaction(IDBTransaction* aTransaction)
{
@ -197,10 +193,10 @@ TransactionThreadPool::FinishTransaction(IDBTransaction* aTransaction)
return;
}
nsTArray<TransactionInfo>& transactionsInProgress =
DatabaseTransactionInfo::TransactionHashtable& transactionsInProgress =
dbTransactionInfo->transactions;
uint32_t transactionCount = transactionsInProgress.Length();
uint32_t transactionCount = transactionsInProgress.Count();
#ifdef DEBUG
if (aTransaction->mMode == IDBTransaction::VERSION_CHANGE) {
@ -212,8 +208,8 @@ TransactionThreadPool::FinishTransaction(IDBTransaction* aTransaction)
if (transactionCount == 1) {
#ifdef DEBUG
{
TransactionInfo& info = transactionsInProgress[0];
NS_ASSERTION(info.transaction == aTransaction, "Transaction mismatch!");
const TransactionInfo* info = transactionsInProgress.Get(aTransaction);
NS_ASSERTION(info->transaction == aTransaction, "Transaction mismatch!");
}
#endif
mTransactionsInProgress.Remove(databaseId);
@ -228,73 +224,40 @@ TransactionThreadPool::FinishTransaction(IDBTransaction* aTransaction)
index++;
}
}
return;
}
else {
// We need to rebuild the locked object store list.
nsTArray<nsString> storesWriting, storesReading;
TransactionInfo* info = transactionsInProgress.Get(aTransaction);
NS_ASSERTION(info, "We've never heard of this transaction?!?");
for (uint32_t index = 0, count = transactionCount; index < count; index++) {
IDBTransaction* transaction = transactionsInProgress[index].transaction;
if (transaction == aTransaction) {
NS_ASSERTION(count == transactionCount, "More than one match?!");
const nsTArray<nsString>& objectStoreNames = aTransaction->mObjectStoreNames;
for (uint32_t index = 0, count = objectStoreNames.Length(); index < count;
index++) {
TransactionInfoPair* blockInfo =
dbTransactionInfo->blockingTransactions.Get(objectStoreNames[index]);
NS_ASSERTION(blockInfo, "Huh?");
transactionsInProgress.RemoveElementAt(index);
index--;
count--;
continue;
}
const nsTArray<nsString>& objectStores = transaction->mObjectStoreNames;
bool dummy;
if (transaction->mMode == IDBTransaction::READ_WRITE) {
if (NS_FAILED(CheckOverlapAndMergeObjectStores(storesWriting,
objectStores,
true, &dummy))) {
NS_WARNING("Out of memory!");
}
}
else if (transaction->mMode == IDBTransaction::READ_ONLY) {
if (NS_FAILED(CheckOverlapAndMergeObjectStores(storesReading,
objectStores,
true, &dummy))) {
NS_WARNING("Out of memory!");
}
}
else {
NS_NOTREACHED("Unknown mode!");
}
if (aTransaction->mMode == IDBTransaction::READ_WRITE &&
blockInfo->lastBlockingReads == info) {
blockInfo->lastBlockingReads = nullptr;
}
NS_ASSERTION(transactionsInProgress.Length() == transactionCount - 1,
"Didn't find the transaction we were looking for!");
dbTransactionInfo->storesWriting.SwapElements(storesWriting);
dbTransactionInfo->storesReading.SwapElements(storesReading);
}
// Try to dispatch all the queued transactions again.
nsTArray<QueuedDispatchInfo> queuedDispatch;
queuedDispatch.SwapElements(mDelayedDispatchQueue);
transactionCount = queuedDispatch.Length();
for (uint32_t index = 0; index < transactionCount; index++) {
if (NS_FAILED(Dispatch(queuedDispatch[index]))) {
NS_WARNING("Dispatch failed!");
uint32_t i = blockInfo->lastBlockingWrites.IndexOf(info);
if (i != blockInfo->lastBlockingWrites.NoIndex) {
blockInfo->lastBlockingWrites.RemoveElementAt(i);
}
}
info->blocking.EnumerateEntries(MaybeUnblockTransaction, info);
transactionsInProgress.Remove(aTransaction);
}
nsresult
TransactionThreadPool::TransactionCanRun(IDBTransaction* aTransaction,
bool* aCanRun,
TransactionQueue** aExistingQueue)
TransactionThreadPool::TransactionQueue&
TransactionThreadPool::GetQueueForTransaction(IDBTransaction* aTransaction)
{
NS_ASSERTION(NS_IsMainThread(), "Wrong thread!");
NS_ASSERTION(aTransaction, "Null pointer!");
NS_ASSERTION(aCanRun, "Null pointer!");
NS_ASSERTION(aExistingQueue, "Null pointer!");
nsIAtom* databaseId = aTransaction->mDatabase->Id();
const nsTArray<nsString>& objectStoreNames = aTransaction->mObjectStoreNames;
@ -303,55 +266,67 @@ TransactionThreadPool::TransactionCanRun(IDBTransaction* aTransaction,
// See if we can run this transaction now.
DatabaseTransactionInfo* dbTransactionInfo;
if (!mTransactionsInProgress.Get(databaseId, &dbTransactionInfo)) {
// First transaction for this database, fine to run.
*aCanRun = true;
*aExistingQueue = nullptr;
return NS_OK;
// First transaction for this database.
dbTransactionInfo = new DatabaseTransactionInfo();
mTransactionsInProgress.Put(databaseId, dbTransactionInfo);
}
nsTArray<TransactionInfo>& transactionsInProgress =
DatabaseTransactionInfo::TransactionHashtable& transactionsInProgress =
dbTransactionInfo->transactions;
TransactionInfo* info = transactionsInProgress.Get(aTransaction);
if (info) {
// We recognize this one.
return *info->queue;
}
uint32_t transactionCount = transactionsInProgress.Length();
NS_ASSERTION(transactionCount, "Should never be 0!");
TransactionInfo* transactionInfo = new TransactionInfo(aTransaction,
objectStoreNames);
for (uint32_t index = 0; index < transactionCount; index++) {
// See if this transaction is in out list of current transactions.
const TransactionInfo& info = transactionsInProgress[index];
if (info.transaction == aTransaction) {
*aCanRun = true;
*aExistingQueue = info.queue;
return NS_OK;
dbTransactionInfo->transactions.Put(aTransaction, transactionInfo);;
for (uint32_t index = 0, count = objectStoreNames.Length(); index < count;
index++) {
TransactionInfoPair* blockInfo =
dbTransactionInfo->blockingTransactions.Get(objectStoreNames[index]);
if (!blockInfo) {
blockInfo = new TransactionInfoPair();
blockInfo->lastBlockingReads = nullptr;
dbTransactionInfo->blockingTransactions.Put(objectStoreNames[index],
blockInfo);
}
// Mark what we are blocking on.
if (blockInfo->lastBlockingReads) {
TransactionInfo* blockingInfo = blockInfo->lastBlockingReads;
transactionInfo->blockedOn.PutEntry(blockingInfo);
blockingInfo->blocking.PutEntry(transactionInfo);
}
if (mode == IDBTransaction::READ_WRITE &&
blockInfo->lastBlockingWrites.Length()) {
for (uint32_t index = 0,
count = blockInfo->lastBlockingWrites.Length(); index < count;
index++) {
TransactionInfo* blockingInfo = blockInfo->lastBlockingWrites[index];
transactionInfo->blockedOn.PutEntry(blockingInfo);
blockingInfo->blocking.PutEntry(transactionInfo);
}
}
if (mode == IDBTransaction::READ_WRITE) {
blockInfo->lastBlockingReads = transactionInfo;
blockInfo->lastBlockingWrites.Clear();
}
else {
blockInfo->lastBlockingWrites.AppendElement(transactionInfo);
}
}
NS_ASSERTION(mode != IDBTransaction::VERSION_CHANGE, "How did we get here?");
bool writeOverlap;
nsresult rv =
CheckOverlapAndMergeObjectStores(dbTransactionInfo->storesWriting,
objectStoreNames,
mode == IDBTransaction::READ_WRITE,
&writeOverlap);
NS_ENSURE_SUCCESS(rv, rv);
bool readOverlap;
rv = CheckOverlapAndMergeObjectStores(dbTransactionInfo->storesReading,
objectStoreNames,
mode == IDBTransaction::READ_ONLY,
&readOverlap);
NS_ENSURE_SUCCESS(rv, rv);
if (writeOverlap ||
(readOverlap && mode == IDBTransaction::READ_WRITE)) {
*aCanRun = false;
*aExistingQueue = nullptr;
return NS_OK;
if (!transactionInfo->blockedOn.Count()) {
transactionInfo->queue->Unblock();
}
*aCanRun = true;
*aExistingQueue = nullptr;
return NS_OK;
return *transactionInfo->queue;
}
nsresult
@ -368,84 +343,13 @@ TransactionThreadPool::Dispatch(IDBTransaction* aTransaction,
return NS_ERROR_NOT_AVAILABLE;
}
bool canRun;
TransactionQueue* existingQueue;
nsresult rv = TransactionCanRun(aTransaction, &canRun, &existingQueue);
NS_ENSURE_SUCCESS(rv, rv);
TransactionQueue& queue = GetQueueForTransaction(aTransaction);
if (!canRun) {
QueuedDispatchInfo* info = mDelayedDispatchQueue.AppendElement();
NS_ENSURE_TRUE(info, NS_ERROR_OUT_OF_MEMORY);
info->transaction = aTransaction;
info->runnable = aRunnable;
info->finish = aFinish;
info->finishRunnable = aFinishRunnable;
return NS_OK;
}
if (existingQueue) {
existingQueue->Dispatch(aRunnable);
if (aFinish) {
existingQueue->Finish(aFinishRunnable);
}
return NS_OK;
}
nsIAtom* databaseId = aTransaction->mDatabase->Id();
#ifdef DEBUG
if (aTransaction->mMode == IDBTransaction::VERSION_CHANGE) {
NS_ASSERTION(!mTransactionsInProgress.Get(databaseId, nullptr),
"Shouldn't have anything in progress!");
}
#endif
DatabaseTransactionInfo* dbTransactionInfo;
nsAutoPtr<DatabaseTransactionInfo> autoDBTransactionInfo;
if (!mTransactionsInProgress.Get(databaseId, &dbTransactionInfo)) {
// Make a new struct for this transaction.
autoDBTransactionInfo = new DatabaseTransactionInfo();
dbTransactionInfo = autoDBTransactionInfo;
}
const nsTArray<nsString>& objectStoreNames = aTransaction->mObjectStoreNames;
nsTArray<nsString>& storesInUse =
aTransaction->mMode == IDBTransaction::READ_WRITE ?
dbTransactionInfo->storesWriting :
dbTransactionInfo->storesReading;
if (!storesInUse.AppendElements(objectStoreNames)) {
NS_WARNING("Out of memory!");
return NS_ERROR_OUT_OF_MEMORY;
}
nsTArray<TransactionInfo>& transactionInfoArray =
dbTransactionInfo->transactions;
TransactionInfo* transactionInfo = transactionInfoArray.AppendElement();
NS_ENSURE_TRUE(transactionInfo, NS_ERROR_OUT_OF_MEMORY);
transactionInfo->transaction = aTransaction;
transactionInfo->queue = new TransactionQueue(aTransaction, aRunnable);
queue.Dispatch(aRunnable);
if (aFinish) {
transactionInfo->queue->Finish(aFinishRunnable);
queue.Finish(aFinishRunnable);
}
if (!transactionInfo->objectStoreNames.AppendElements(objectStoreNames)) {
NS_WARNING("Out of memory!");
return NS_ERROR_OUT_OF_MEMORY;
}
if (autoDBTransactionInfo) {
mTransactionsInProgress.Put(databaseId, autoDBTransactionInfo);
autoDBTransactionInfo.forget();
}
return mThreadPool->Dispatch(transactionInfo->queue, NS_DISPATCH_NORMAL);
return NS_OK;
}
bool
@ -475,6 +379,19 @@ TransactionThreadPool::WaitForAllDatabasesToComplete(
return true;
}
// static
PLDHashOperator
TransactionThreadPool::CollectTransactions(IDBTransaction* aKey,
TransactionInfo* aValue,
void* aUserArg)
{
nsAutoTArray<nsRefPtr<IDBTransaction>, 50>* transactionArray =
static_cast<nsAutoTArray<nsRefPtr<IDBTransaction>, 50>*>(aUserArg);
transactionArray->AppendElement(aKey);
return PL_DHASH_NEXT;
}
void
TransactionThreadPool::AbortTransactionsForDatabase(IDBDatabase* aDatabase)
{
@ -484,39 +401,26 @@ TransactionThreadPool::AbortTransactionsForDatabase(IDBDatabase* aDatabase)
// Get list of transactions for this database id
DatabaseTransactionInfo* dbTransactionInfo;
if (!mTransactionsInProgress.Get(aDatabase->Id(), &dbTransactionInfo)) {
// If there are no running transactions, there can't be any pending ones
// If there are no transactions, we're done.
return;
}
nsAutoTArray<nsRefPtr<IDBTransaction>, 50> transactions;
// Collect any running transactions
nsTArray<TransactionInfo>& transactionsInProgress =
DatabaseTransactionInfo::TransactionHashtable& transactionsInProgress =
dbTransactionInfo->transactions;
uint32_t transactionCount = transactionsInProgress.Length();
NS_ASSERTION(transactionCount, "Should never be 0!");
NS_ASSERTION(transactionsInProgress.Count(), "Should never be 0!");
for (uint32_t index = 0; index < transactionCount; index++) {
// See if any transaction belongs to this IDBDatabase instance
IDBTransaction* transaction = transactionsInProgress[index].transaction;
if (transaction->Database() == aDatabase) {
transactions.AppendElement(transaction);
}
}
// Collect any pending transactions.
for (uint32_t index = 0; index < mDelayedDispatchQueue.Length(); index++) {
// See if any transaction belongs to this IDBDatabase instance
IDBTransaction* transaction = mDelayedDispatchQueue[index].transaction;
if (transaction->Database() == aDatabase) {
transactions.AppendElement(transaction);
}
}
nsAutoTArray<nsRefPtr<IDBTransaction>, 50> transactions;
transactionsInProgress.EnumerateRead(CollectTransactions, &transactions);
// Abort transactions. Do this after collecting the transactions in case
// calling Abort() modifies the data structures we're iterating above.
for (uint32_t index = 0; index < transactions.Length(); index++) {
if (transactions[index]->Database() != aDatabase) {
continue;
}
// This can fail, for example if the transaction is in the process of
// being comitted. That is expected and fine, so we ignore any returned
// errors.
@ -524,32 +428,48 @@ TransactionThreadPool::AbortTransactionsForDatabase(IDBDatabase* aDatabase)
}
}
struct NS_STACK_CLASS TransactionSearchInfo
{
TransactionSearchInfo(IDBDatabase* aDatabase)
: db(aDatabase), found(false)
{
}
IDBDatabase* db;
bool found;
};
// static
PLDHashOperator
TransactionThreadPool::FindTransaction(IDBTransaction* aKey,
TransactionInfo* aValue,
void* aUserArg)
{
TransactionSearchInfo* info = static_cast<TransactionSearchInfo*>(aUserArg);
if (aKey->Database() == info->db) {
info->found = true;
return PL_DHASH_STOP;
}
return PL_DHASH_NEXT;
}
bool
TransactionThreadPool::HasTransactionsForDatabase(IDBDatabase* aDatabase)
{
NS_ASSERTION(NS_IsMainThread(), "Wrong thread!");
NS_ASSERTION(aDatabase, "Null pointer!");
// Get list of transactions for this database id
DatabaseTransactionInfo* dbTransactionInfo;
if (!mTransactionsInProgress.Get(aDatabase->Id(), &dbTransactionInfo)) {
DatabaseTransactionInfo* dbTransactionInfo = nullptr;
dbTransactionInfo = mTransactionsInProgress.Get(aDatabase->Id());
if (!dbTransactionInfo) {
return false;
}
nsTArray<TransactionInfo>& transactionsInProgress =
dbTransactionInfo->transactions;
TransactionSearchInfo info(aDatabase);
dbTransactionInfo->transactions.EnumerateRead(FindTransaction, &info);
uint32_t transactionCount = transactionsInProgress.Length();
NS_ASSERTION(transactionCount, "Should never be 0!");
for (uint32_t index = 0; index < transactionCount; index++) {
// See if any transaction belongs to this IDBDatabase instance
if (transactionsInProgress[index].transaction->Database() == aDatabase) {
return true;
}
}
return false;
return info.found;
}
bool
@ -569,15 +489,23 @@ TransactionThreadPool::MaybeFireCallback(DatabasesCompleteCallback& aCallback)
}
TransactionThreadPool::
TransactionQueue::TransactionQueue(IDBTransaction* aTransaction,
nsIRunnable* aRunnable)
TransactionQueue::TransactionQueue(IDBTransaction* aTransaction)
: mMonitor("TransactionQueue::mMonitor"),
mTransaction(aTransaction),
mShouldFinish(false)
{
NS_ASSERTION(aTransaction, "Null pointer!");
NS_ASSERTION(aRunnable, "Null pointer!");
mQueue.AppendElement(aRunnable);
}
void
TransactionThreadPool::TransactionQueue::Unblock()
{
MonitorAutoLock lock(mMonitor);
// NB: Finish may be called before Unblock.
TransactionThreadPool::Get()->mThreadPool->
Dispatch(this, NS_DISPATCH_NORMAL);
}
void
@ -615,7 +543,7 @@ TransactionThreadPool::TransactionQueue::Run()
nsCOMPtr<nsIRunnable> finishRunnable;
bool shouldFinish = false;
while(!shouldFinish) {
do {
NS_ASSERTION(queue.IsEmpty(), "Should have cleared this!");
{
@ -643,7 +571,7 @@ TransactionThreadPool::TransactionQueue::Run()
if (count) {
queue.Clear();
}
}
} while (!shouldFinish);
nsCOMPtr<nsIRunnable> finishTransactionRunnable =
new FinishTransactionRunnable(mTransaction, finishRunnable);

View File

@ -63,12 +63,13 @@ protected:
NS_DECL_ISUPPORTS
NS_DECL_NSIRUNNABLE
inline TransactionQueue(IDBTransaction* aTransaction,
nsIRunnable* aRunnable);
TransactionQueue(IDBTransaction* aTransaction);
inline void Dispatch(nsIRunnable* aRunnable);
void Unblock();
inline void Finish(nsIRunnable* aFinishRunnable);
void Dispatch(nsIRunnable* aRunnable);
void Finish(nsIRunnable* aFinishRunnable);
private:
mozilla::Monitor mMonitor;
@ -78,31 +79,87 @@ protected:
bool mShouldFinish;
};
friend class TransactionQueue;
struct TransactionInfo
{
TransactionInfo(IDBTransaction* aTransaction,
const nsTArray<nsString>& aObjectStoreNames)
{
MOZ_COUNT_CTOR(TransactionInfo);
blockedOn.Init();
blocking.Init();
transaction = aTransaction;
queue = new TransactionQueue(aTransaction);
objectStoreNames.AppendElements(aObjectStoreNames);
}
~TransactionInfo()
{
MOZ_COUNT_DTOR(TransactionInfo);
}
nsRefPtr<IDBTransaction> transaction;
nsRefPtr<TransactionQueue> queue;
nsTArray<nsString> objectStoreNames;
nsTHashtable<nsPtrHashKey<TransactionInfo> > blockedOn;
nsTHashtable<nsPtrHashKey<TransactionInfo> > blocking;
};
struct TransactionInfoPair
{
TransactionInfoPair()
: lastBlockingReads(nullptr)
{
MOZ_COUNT_CTOR(TransactionInfoPair);
}
~TransactionInfoPair()
{
MOZ_COUNT_DTOR(TransactionInfoPair);
}
// Multiple reading transactions can block future writes.
nsTArray<TransactionInfo*> lastBlockingWrites;
// But only a single writing transaction can block future reads.
TransactionInfo* lastBlockingReads;
};
struct DatabaseTransactionInfo
{
nsTArray<TransactionInfo> transactions;
nsTArray<nsString> storesReading;
nsTArray<nsString> storesWriting;
DatabaseTransactionInfo()
{
MOZ_COUNT_CTOR(DatabaseTransactionInfo);
transactions.Init();
blockingTransactions.Init();
}
~DatabaseTransactionInfo()
{
MOZ_COUNT_DTOR(DatabaseTransactionInfo);
}
typedef nsClassHashtable<nsPtrHashKey<IDBTransaction>, TransactionInfo >
TransactionHashtable;
TransactionHashtable transactions;
nsClassHashtable<nsStringHashKey, TransactionInfoPair> blockingTransactions;
};
struct QueuedDispatchInfo
{
QueuedDispatchInfo()
: finish(false)
{ }
static PLDHashOperator
CollectTransactions(IDBTransaction* aKey,
TransactionInfo* aValue,
void* aUserArg);
nsRefPtr<IDBTransaction> transaction;
nsCOMPtr<nsIRunnable> runnable;
nsCOMPtr<nsIRunnable> finishRunnable;
bool finish;
};
static PLDHashOperator
FindTransaction(IDBTransaction* aKey,
TransactionInfo* aValue,
void* aUserArg);
static PLDHashOperator
MaybeUnblockTransaction(nsPtrHashKey<TransactionInfo>* aKey,
void* aUserArg);
struct DatabasesCompleteCallback
{
@ -118,15 +175,7 @@ protected:
void FinishTransaction(IDBTransaction* aTransaction);
nsresult TransactionCanRun(IDBTransaction* aTransaction,
bool* aCanRun,
TransactionQueue** aExistingQueue);
nsresult Dispatch(const QueuedDispatchInfo& aInfo)
{
return Dispatch(aInfo.transaction, aInfo.runnable, aInfo.finish,
aInfo.finishRunnable);
}
TransactionQueue& GetQueueForTransaction(IDBTransaction* aTransaction);
bool MaybeFireCallback(DatabasesCompleteCallback& aCallback);
@ -135,8 +184,6 @@ protected:
nsClassHashtable<nsISupportsHashKey, DatabaseTransactionInfo>
mTransactionsInProgress;
nsTArray<QueuedDispatchInfo> mDelayedDispatchQueue;
nsTArray<DatabasesCompleteCallback> mCompleteCallbacks;
};