Bug 1144486 - Implement direct tasks. r=jww

This commit is contained in:
Bobby Holley 2015-04-16 12:24:54 -07:00
parent 13c48d68da
commit 45b32d35a5
4 changed files with 83 additions and 3 deletions

View File

@ -68,7 +68,12 @@ public:
return in;
}
void FireTailDispatcher() { MOZ_ASSERT(mTailDispatcher.isSome()); mTailDispatcher.reset(); }
void FireTailDispatcher()
{
MOZ_DIAGNOSTIC_ASSERT(mTailDispatcher.isSome());
mTailDispatcher.ref().DrainDirectTasks();
mTailDispatcher.reset();
}
virtual TaskDispatcher& TailDispatcher() override
{

View File

@ -1817,8 +1817,19 @@ MediaStreamGraphImpl::RunInStableState(bool aSourceIsMSG)
mLifecycleState >= LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN;
#endif
TaskDispatcher& tailDispatcher = AbstractThread::MainThread()->TailDispatcher();
for (uint32_t i = 0; i < runnables.Length(); ++i) {
runnables[i]->Run();
// "Direct" tail dispatcher are supposed to run immediately following the
// execution of the current task. So the meta-tasking that we do here in
// RunInStableState() breaks that abstraction a bit unless we handle it here.
//
// This is particularly important because we can end up with a "stream
// ended" notification immediately following a "stream available" notification,
// and we need to make sure that the watcher responding to "stream available"
// has a chance to run before the second notification starts tearing things
// down.
tailDispatcher.DrainDirectTasks();
}
}

View File

@ -133,6 +133,8 @@ protected:
~AutoTaskGuard()
{
DrainDirectTasks();
MOZ_ASSERT(mQueue->mRunningThread == NS_GetCurrentThread());
mQueue->mRunningThread = nullptr;

View File

@ -28,7 +28,7 @@ namespace mozilla {
*
* TaskDispatcher is a general abstract class that accepts tasks and dispatches
* them at some later point. These groups of tasks are per-target-thread, and
* contain separate queues for two kinds of tasks - "state change tasks" (which
* contain separate queues for several kinds of tasks (see comments below). - "state change tasks" (which
* run first, and are intended to be used to update the value held by mirrors),
* and regular tasks, which are other arbitrary operations that the are gated
* to run after all the state changes have completed.
@ -39,13 +39,25 @@ public:
TaskDispatcher() {}
virtual ~TaskDispatcher() {}
// Direct tasks are run directly (rather than dispatched asynchronously) when
// the tail dispatcher fires. A direct task may cause other tasks to be added
// to the tail dispatcher.
virtual void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) = 0;
// State change tasks are dispatched asynchronously always run before regular
// tasks. They are intended to be used to update the value held by mirrors
// before any other dispatched tasks are run on the target thread.
virtual void AddStateChangeTask(AbstractThread* aThread,
already_AddRefed<nsIRunnable> aRunnable) = 0;
// Regular tasks are dispatched asynchronously, and run after state change
// tasks.
virtual void AddTask(AbstractThread* aThread,
already_AddRefed<nsIRunnable> aRunnable,
AbstractThread::DispatchFailureHandling aFailureHandling = AbstractThread::AssertDispatchSuccess) = 0;
virtual bool HasTasksFor(AbstractThread* aThread) = 0;
virtual void DrainDirectTasks() = 0;
};
/*
@ -58,6 +70,17 @@ public:
explicit AutoTaskDispatcher(bool aIsTailDispatcher = false) : mIsTailDispatcher(aIsTailDispatcher) {}
~AutoTaskDispatcher()
{
// Given that direct tasks may trigger other code that uses the tail
// dispatcher, it's better to avoid processing them in the tail dispatcher's
// destructor. So we require TailDispatchers to manually invoke
// DrainDirectTasks before the AutoTaskDispatcher gets destroyed. In truth,
// this is only necessary in the case where this AutoTaskDispatcher can be
// accessed by the direct tasks it dispatches (true for TailDispatchers, but
// potentially not true for other hypothetical AutoTaskDispatchers). Feel
// free to loosen this restriction to apply only to mIsTailDispatcher if a
// use-case requires it.
MOZ_ASSERT(mDirectTasks.empty());
for (size_t i = 0; i < mTaskGroups.Length(); ++i) {
UniquePtr<PerThreadTaskGroup> group(Move(mTaskGroups[i]));
nsRefPtr<AbstractThread> thread = group->mThread;
@ -70,6 +93,20 @@ public:
}
}
void DrainDirectTasks() override
{
while (!mDirectTasks.empty()) {
nsCOMPtr<nsIRunnable> r = mDirectTasks.front();
mDirectTasks.pop();
r->Run();
}
}
void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) override
{
mDirectTasks.push(Move(aRunnable));
}
void AddStateChangeTask(AbstractThread* aThread,
already_AddRefed<nsIRunnable> aRunnable) override
{
@ -90,7 +127,10 @@ public:
}
}
bool HasTasksFor(AbstractThread* aThread) override { return !!GetTaskGroup(aThread); }
bool HasTasksFor(AbstractThread* aThread) override
{
return !!GetTaskGroup(aThread) || (aThread == AbstractThread::GetCurrent() && !mDirectTasks.empty());
}
private:
@ -118,18 +158,37 @@ private:
NS_IMETHODIMP Run()
{
// State change tasks get run all together before any code is run, so
// that all state changes are made in an atomic unit.
for (size_t i = 0; i < mTasks->mStateChangeTasks.Length(); ++i) {
mTasks->mStateChangeTasks[i]->Run();
}
// Once the state changes have completed, drain any direct tasks
// generated by those state changes (i.e. watcher notification tasks).
// This needs to be outside the loop because we don't want to run code
// that might observe intermediate states.
MaybeDrainDirectTasks();
for (size_t i = 0; i < mTasks->mRegularTasks.Length(); ++i) {
mTasks->mRegularTasks[i]->Run();
// Scope direct tasks tightly to the task that generated them.
MaybeDrainDirectTasks();
}
return NS_OK;
}
private:
void MaybeDrainDirectTasks()
{
AbstractThread* currentThread = AbstractThread::GetCurrent();
if (currentThread) {
currentThread->TailDispatcher().DrainDirectTasks();
}
}
UniquePtr<PerThreadTaskGroup> mTasks;
};
@ -156,6 +215,9 @@ private:
return nullptr;
}
// Direct tasks.
std::queue<nsCOMPtr<nsIRunnable>> mDirectTasks;
// Task groups, organized by thread.
nsTArray<UniquePtr<PerThreadTaskGroup>> mTaskGroups;