diff --git a/dom/media/AbstractThread.cpp b/dom/media/AbstractThread.cpp index 22741d45b16..51ffa4ea718 100644 --- a/dom/media/AbstractThread.cpp +++ b/dom/media/AbstractThread.cpp @@ -68,7 +68,12 @@ public: return in; } - void FireTailDispatcher() { MOZ_ASSERT(mTailDispatcher.isSome()); mTailDispatcher.reset(); } + void FireTailDispatcher() + { + MOZ_DIAGNOSTIC_ASSERT(mTailDispatcher.isSome()); + mTailDispatcher.ref().DrainDirectTasks(); + mTailDispatcher.reset(); + } virtual TaskDispatcher& TailDispatcher() override { diff --git a/dom/media/MediaStreamGraph.cpp b/dom/media/MediaStreamGraph.cpp index 080458a2ded..a48a1c0ce01 100644 --- a/dom/media/MediaStreamGraph.cpp +++ b/dom/media/MediaStreamGraph.cpp @@ -1817,8 +1817,19 @@ MediaStreamGraphImpl::RunInStableState(bool aSourceIsMSG) mLifecycleState >= LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN; #endif + TaskDispatcher& tailDispatcher = AbstractThread::MainThread()->TailDispatcher(); for (uint32_t i = 0; i < runnables.Length(); ++i) { runnables[i]->Run(); + // "Direct" tail dispatcher are supposed to run immediately following the + // execution of the current task. So the meta-tasking that we do here in + // RunInStableState() breaks that abstraction a bit unless we handle it here. + // + // This is particularly important because we can end up with a "stream + // ended" notification immediately following a "stream available" notification, + // and we need to make sure that the watcher responding to "stream available" + // has a chance to run before the second notification starts tearing things + // down. + tailDispatcher.DrainDirectTasks(); } } diff --git a/dom/media/MediaTaskQueue.h b/dom/media/MediaTaskQueue.h index c0c6c58abbf..cf90e5b88e9 100644 --- a/dom/media/MediaTaskQueue.h +++ b/dom/media/MediaTaskQueue.h @@ -133,6 +133,8 @@ protected: ~AutoTaskGuard() { + DrainDirectTasks(); + MOZ_ASSERT(mQueue->mRunningThread == NS_GetCurrentThread()); mQueue->mRunningThread = nullptr; diff --git a/dom/media/TaskDispatcher.h b/dom/media/TaskDispatcher.h index 9352d343c89..6151c64245e 100644 --- a/dom/media/TaskDispatcher.h +++ b/dom/media/TaskDispatcher.h @@ -28,7 +28,7 @@ namespace mozilla { * * TaskDispatcher is a general abstract class that accepts tasks and dispatches * them at some later point. These groups of tasks are per-target-thread, and - * contain separate queues for two kinds of tasks - "state change tasks" (which + * contain separate queues for several kinds of tasks (see comments below). - "state change tasks" (which * run first, and are intended to be used to update the value held by mirrors), * and regular tasks, which are other arbitrary operations that the are gated * to run after all the state changes have completed. @@ -39,13 +39,25 @@ public: TaskDispatcher() {} virtual ~TaskDispatcher() {} + // Direct tasks are run directly (rather than dispatched asynchronously) when + // the tail dispatcher fires. A direct task may cause other tasks to be added + // to the tail dispatcher. + virtual void AddDirectTask(already_AddRefed aRunnable) = 0; + + // State change tasks are dispatched asynchronously always run before regular + // tasks. They are intended to be used to update the value held by mirrors + // before any other dispatched tasks are run on the target thread. virtual void AddStateChangeTask(AbstractThread* aThread, already_AddRefed aRunnable) = 0; + + // Regular tasks are dispatched asynchronously, and run after state change + // tasks. virtual void AddTask(AbstractThread* aThread, already_AddRefed aRunnable, AbstractThread::DispatchFailureHandling aFailureHandling = AbstractThread::AssertDispatchSuccess) = 0; virtual bool HasTasksFor(AbstractThread* aThread) = 0; + virtual void DrainDirectTasks() = 0; }; /* @@ -58,6 +70,17 @@ public: explicit AutoTaskDispatcher(bool aIsTailDispatcher = false) : mIsTailDispatcher(aIsTailDispatcher) {} ~AutoTaskDispatcher() { + // Given that direct tasks may trigger other code that uses the tail + // dispatcher, it's better to avoid processing them in the tail dispatcher's + // destructor. So we require TailDispatchers to manually invoke + // DrainDirectTasks before the AutoTaskDispatcher gets destroyed. In truth, + // this is only necessary in the case where this AutoTaskDispatcher can be + // accessed by the direct tasks it dispatches (true for TailDispatchers, but + // potentially not true for other hypothetical AutoTaskDispatchers). Feel + // free to loosen this restriction to apply only to mIsTailDispatcher if a + // use-case requires it. + MOZ_ASSERT(mDirectTasks.empty()); + for (size_t i = 0; i < mTaskGroups.Length(); ++i) { UniquePtr group(Move(mTaskGroups[i])); nsRefPtr thread = group->mThread; @@ -70,6 +93,20 @@ public: } } + void DrainDirectTasks() override + { + while (!mDirectTasks.empty()) { + nsCOMPtr r = mDirectTasks.front(); + mDirectTasks.pop(); + r->Run(); + } + } + + void AddDirectTask(already_AddRefed aRunnable) override + { + mDirectTasks.push(Move(aRunnable)); + } + void AddStateChangeTask(AbstractThread* aThread, already_AddRefed aRunnable) override { @@ -90,7 +127,10 @@ public: } } - bool HasTasksFor(AbstractThread* aThread) override { return !!GetTaskGroup(aThread); } + bool HasTasksFor(AbstractThread* aThread) override + { + return !!GetTaskGroup(aThread) || (aThread == AbstractThread::GetCurrent() && !mDirectTasks.empty()); + } private: @@ -118,18 +158,37 @@ private: NS_IMETHODIMP Run() { + // State change tasks get run all together before any code is run, so + // that all state changes are made in an atomic unit. for (size_t i = 0; i < mTasks->mStateChangeTasks.Length(); ++i) { mTasks->mStateChangeTasks[i]->Run(); } + // Once the state changes have completed, drain any direct tasks + // generated by those state changes (i.e. watcher notification tasks). + // This needs to be outside the loop because we don't want to run code + // that might observe intermediate states. + MaybeDrainDirectTasks(); + for (size_t i = 0; i < mTasks->mRegularTasks.Length(); ++i) { mTasks->mRegularTasks[i]->Run(); + + // Scope direct tasks tightly to the task that generated them. + MaybeDrainDirectTasks(); } return NS_OK; } private: + void MaybeDrainDirectTasks() + { + AbstractThread* currentThread = AbstractThread::GetCurrent(); + if (currentThread) { + currentThread->TailDispatcher().DrainDirectTasks(); + } + } + UniquePtr mTasks; }; @@ -156,6 +215,9 @@ private: return nullptr; } + // Direct tasks. + std::queue> mDirectTasks; + // Task groups, organized by thread. nsTArray> mTaskGroups;