Bug 1083101 - Add a task scheduler to Moz2D. r=jrmuizel

This commit is contained in:
Nicolas Silva 2015-09-24 17:35:10 +02:00
parent b40f0e1d22
commit 690f0044e7
9 changed files with 1195 additions and 0 deletions

236
gfx/2d/JobScheduler.cpp Normal file
View File

@ -0,0 +1,236 @@
/* -*- Mode: C++; tab-width: 20; indent-tabs-mode: nil; c-basic-offset: 2 -*-
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "JobScheduler.h"
namespace mozilla {
namespace gfx {
JobScheduler* JobScheduler::sSingleton = nullptr;
bool JobScheduler::Init(uint32_t aNumThreads, uint32_t aNumQueues)
{
MOZ_ASSERT(!sSingleton);
MOZ_ASSERT(aNumThreads >= aNumQueues);
sSingleton = new JobScheduler();
sSingleton->mNextQueue = 0;
for (uint32_t i = 0; i < aNumQueues; ++i) {
sSingleton->mDrawingQueues.push_back(new MultiThreadedJobQueue());
}
for (uint32_t i = 0; i < aNumThreads; ++i) {
sSingleton->mWorkerThreads.push_back(new WorkerThread(sSingleton->mDrawingQueues[i%aNumQueues]));
}
return true;
}
void JobScheduler::ShutDown()
{
MOZ_ASSERT(IsEnabled());
if (!IsEnabled()) {
return;
}
for (auto queue : sSingleton->mDrawingQueues) {
queue->ShutDown();
delete queue;
}
for (WorkerThread* thread : sSingleton->mWorkerThreads) {
// this will block until the thread is joined.
delete thread;
}
sSingleton->mWorkerThreads.clear();
delete sSingleton;
sSingleton = nullptr;
}
JobStatus
JobScheduler::ProcessJob(Job* aJob)
{
MOZ_ASSERT(aJob);
auto status = aJob->Run();
if (status == JobStatus::Error || status == JobStatus::Complete) {
delete aJob;
}
return status;
}
void
JobScheduler::SubmitJob(Job* aJob)
{
MOZ_ASSERT(aJob);
RefPtr<SyncObject> start = aJob->GetStartSync();
if (start && start->Register(aJob)) {
// The Job buffer starts with a non-signaled sync object, it
// is now registered in the list of task buffers waiting on the
// sync object, so we should not place it in the queue.
return;
}
GetQueueForJob(aJob)->SubmitJob(aJob);
}
MultiThreadedJobQueue*
JobScheduler::GetQueueForJob(Job* aJob)
{
return aJob->IsPinnedToAThread() ? aJob->GetWorkerThread()->GetJobQueue()
: GetDrawingQueue();
}
Job::Job(SyncObject* aStart, SyncObject* aCompletion, WorkerThread* aThread)
: mStartSync(aStart)
, mCompletionSync(aCompletion)
, mPinToThread(aThread)
{
if (mStartSync) {
mStartSync->AddSubsequent(this);
}
if (mCompletionSync) {
mCompletionSync->AddPrerequisite(this);
}
}
Job::~Job()
{
if (mCompletionSync) {
//printf(" -- Job %p dtor completion %p\n", this, mCompletionSync);
mCompletionSync->Signal();
mCompletionSync = nullptr;
}
}
JobStatus
SetEventJob::Run()
{
mEvent->Set();
return JobStatus::Complete;
}
SetEventJob::SetEventJob(EventObject* aEvent,
SyncObject* aStart, SyncObject* aCompletion,
WorkerThread* aWorker)
: Job(aStart, aCompletion, aWorker)
, mEvent(aEvent)
{}
SetEventJob::~SetEventJob()
{}
SyncObject::SyncObject(uint32_t aNumPrerequisites)
: mSignals(aNumPrerequisites)
#ifdef DEBUG
, mNumPrerequisites(aNumPrerequisites)
, mAddedPrerequisites(0)
#endif
{}
SyncObject::~SyncObject()
{
MOZ_ASSERT(mWaitingJobs.size() == 0);
}
bool
SyncObject::Register(Job* aJob)
{
MOZ_ASSERT(aJob);
// For now, ensure that when we schedule the first subsequent, we have already
// created all of the prerequisites. This is an arbitrary restriction because
// we specify the number of prerequisites in the constructor, but in the typical
// scenario, if the assertion FreezePrerequisite blows up here it probably means
// we got the initial nmber of prerequisites wrong. We can decide to remove
// this restriction if needed.
FreezePrerequisites();
int32_t signals = mSignals;
if (signals > 0) {
AddWaitingJob(aJob);
// Since Register and Signal can be called concurrently, it can happen that
// reading mSignals in Register happens before decrementing mSignals in Signal,
// but SubmitWaitingJobs happens before AddWaitingJob. This ordering means
// the SyncObject ends up in the signaled state with a task sitting in the
// waiting list. To prevent that we check mSignals a second time and submit
// again if signals reached zero in the mean time.
// We do this instead of holding a mutex around mSignals+mJobs to reduce
// lock contention.
int32_t signals2 = mSignals;
if (signals2 == 0) {
SubmitWaitingJobs();
}
return true;
}
return false;
}
void
SyncObject::Signal()
{
int32_t signals = --mSignals;
MOZ_ASSERT(signals >= 0);
if (signals == 0) {
SubmitWaitingJobs();
}
}
void
SyncObject::AddWaitingJob(Job* aJob)
{
MutexAutoLock lock(&mMutex);
mWaitingJobs.push_back(aJob);
}
void SyncObject::SubmitWaitingJobs()
{
std::vector<Job*> tasksToSubmit;
{
// Scheduling the tasks can cause code that modifies <this>'s reference
// count to run concurrently, and cause the caller of this function to
// be owned by another thread. We need to make sure the reference count
// does not reach 0 on another thread before mWaitingJobs.clear(), so
// hold a strong ref to prevent that!
RefPtr<SyncObject> kungFuDeathGrip(this);
MutexAutoLock lock(&mMutex);
tasksToSubmit = Move(mWaitingJobs);
mWaitingJobs.clear();
}
for (Job* task : tasksToSubmit) {
JobScheduler::GetQueueForJob(task)->SubmitJob(task);
}
}
bool
SyncObject::IsSignaled()
{
return mSignals == 0;
}
void
SyncObject::FreezePrerequisites()
{
MOZ_ASSERT(mAddedPrerequisites == mNumPrerequisites);
}
void
SyncObject::AddPrerequisite(Job* aJob)
{
MOZ_ASSERT(++mAddedPrerequisites <= mNumPrerequisites);
}
void
SyncObject::AddSubsequent(Job* aJob)
{
}
} //namespace
} //namespace

231
gfx/2d/JobScheduler.h Normal file
View File

@ -0,0 +1,231 @@
/* -*- Mode: C++; tab-width: 20; indent-tabs-mode: nil; c-basic-offset: 2 -*-
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef MOZILLA_GFX_TASKSCHEDULER_H_
#define MOZILLA_GFX_TASKSCHEDULER_H_
#include "mozilla/RefPtr.h"
#include "mozilla/gfx/Types.h"
#ifdef WIN32
#include "mozilla/gfx/JobScheduler_win32.h"
#else
#include "mozilla/gfx/JobScheduler_posix.h"
#endif
namespace mozilla {
namespace gfx {
class MultiThreadedJobQueue;
class SyncObject;
class JobScheduler {
public:
/// Return one of the queues that the drawing worker threads pull from, chosen
/// pseudo-randomly.
static MultiThreadedJobQueue* GetDrawingQueue()
{
return sSingleton->mDrawingQueues[
sSingleton->mNextQueue++ % sSingleton->mDrawingQueues.size()
];
}
/// Return one of the queues that the drawing worker threads pull from with a
/// hash to choose the queue.
///
/// Calling this function several times with the same hash will yield the same queue.
static MultiThreadedJobQueue* GetDrawingQueue(uint32_t aHash)
{
return sSingleton->mDrawingQueues[
aHash % sSingleton->mDrawingQueues.size()
];
}
/// Return the task queue associated to the worker the task is pinned to if
/// the task is pinned to a worker, or a random queue.
static MultiThreadedJobQueue* GetQueueForJob(Job* aJob);
/// Initialize the task scheduler with aNumThreads worker threads for drawing
/// and aNumQueues task queues.
///
/// The number of threads must be superior or equal to the number of queues
/// (since for now a worker thread only pulls from one queue).
static bool Init(uint32_t aNumThreads, uint32_t aNumQueues);
/// Shut the scheduler down.
///
/// This will block until worker threads are joined and deleted.
static void ShutDown();
/// Returns true if there is a successfully initialized JobScheduler singleton.
static bool IsEnabled() { return !!sSingleton; }
/// Submit a task buffer to its associated queue.
///
/// The caller looses ownership of the task buffer.
static void SubmitJob(Job* aJobs);
/// Process commands until the command buffer needs to block on a sync object,
/// completes, yields, or encounters an error.
///
/// Can be used on any thread. Worker threads basically loop over this, but the
/// main thread can also dequeue pending task buffers and process them alongside
/// the worker threads if it is about to block until completion anyway.
///
/// The caller looses ownership of the task buffer.
static JobStatus ProcessJob(Job* aJobs);
protected:
static JobScheduler* sSingleton;
// queues of Job that are ready to be processed
std::vector<MultiThreadedJobQueue*> mDrawingQueues;
std::vector<WorkerThread*> mWorkerThreads;
Atomic<uint32_t> mNextQueue;
};
/// Jobs are not reference-counted because they don't have shared ownership.
/// The ownership of tasks can change when they are passed to certain methods
/// of JobScheduler and SyncObject. See the docuumentaion of these classes.
class Job {
public:
Job(SyncObject* aStart, SyncObject* aCompletion, WorkerThread* aThread = nullptr);
virtual ~Job();
virtual JobStatus Run() = 0;
/// For use in JobScheduler::SubmitJob. Don't use it anywhere else.
//already_AddRefed<SyncObject> GetAndResetStartSync();
SyncObject* GetStartSync() { return mStartSync; }
bool IsPinnedToAThread() const { return !!mPinToThread; }
WorkerThread* GetWorkerThread() { return mPinToThread; }
protected:
RefPtr<SyncObject> mStartSync;
RefPtr<SyncObject> mCompletionSync;
WorkerThread* mPinToThread;
};
class EventObject;
/// This task will set an EventObject.
///
/// Typically used as the final task, so that the main thread can block on the
/// corresponfing EventObject until all of the tasks are processed.
class SetEventJob : public Job
{
public:
explicit SetEventJob(EventObject* aEvent,
SyncObject* aStart, SyncObject* aCompletion = nullptr,
WorkerThread* aPinToWorker = nullptr);
~SetEventJob();
JobStatus Run() override;
EventObject* GetEvent() { return mEvent; }
protected:
RefPtr<EventObject> mEvent;
};
/// A synchronization object that can be used to express dependencies and ordering between
/// tasks.
///
/// Jobs can register to SyncObjects in order to asynchronously wait for a signal.
/// In practice, Job objects usually start with a sync object (startSyc) and end
/// with another one (completionSync).
/// a Job never gets processed before its startSync is in the signaled state, and
/// signals its completionSync as soon as it finishes. This is how dependencies
/// between tasks is expressed.
class SyncObject final : public external::AtomicRefCounted<SyncObject> {
public:
MOZ_DECLARE_REFCOUNTED_TYPENAME(SyncObject)
/// Create a synchronization object.
///
/// aNumPrerequisites represents the number of times the object must be signaled
/// before actually entering the signaled state (in other words, it means the
/// number of dependencies of this sync object).
///
/// Explicitly specifying the number of prerequisites when creating sync objects
/// makes it easy to start scheduling some of the prerequisite tasks while
/// creating the others, which is how we typically use the task scheduler.
/// Automatically determining the number of prerequisites using Job's constructor
/// brings the risk that the sync object enters the signaled state while we
/// are still adding prerequisites which is hard to fix without using muteces.
explicit SyncObject(uint32_t aNumPrerequisites = 1);
~SyncObject();
/// Attempt to register a task.
///
/// If the sync object is already in the signaled state, the buffer is *not*
/// registered and the sync object does not take ownership of the task.
/// If the object is not yet in the signaled state, it takes ownership of
/// the task and places it in a list of pending tasks.
/// Pending tasks will not be processed by the worker thread.
/// When the SyncObject reaches the signaled state, it places the pending
/// tasks back in the available buffer queue, so that they can be
/// scheduled again.
///
/// Returns true if the SyncOject is not already in the signaled state.
/// This means that if this method returns true, the SyncObject has taken
/// ownership of the Job.
bool Register(Job* aJob);
/// Signal the SyncObject.
///
/// This decrements an internal counter. The sync object reaches the signaled
/// state when the counter gets to zero.
void Signal();
/// Returns true if mSignals is equal to zero. In other words, returns true
/// if all prerequisite tasks have already signaled the sync object.
bool IsSignaled();
/// Asserts that the number of added prerequisites is equal to the number
/// specified in the constructor (does nothin in release builds).
void FreezePrerequisites();
private:
// Called by Job's constructor
void AddSubsequent(Job* aJob);
void AddPrerequisite(Job* aJob);
void AddWaitingJob(Job* aJob);
void SubmitWaitingJobs();
std::vector<Job*> mWaitingJobs;
Mutex mMutex; // for concurrent access to mWaintingJobs
Atomic<int32_t> mSignals;
#ifdef DEBUG
uint32_t mNumPrerequisites;
Atomic<uint32_t> mAddedPrerequisites;
#endif
friend class Job;
friend class JobScheduler;
};
/// RAII helper.
struct MutexAutoLock {
MutexAutoLock(Mutex* aMutex) : mMutex(aMutex) { mMutex->Lock(); }
~MutexAutoLock() { mMutex->Unlock(); }
protected:
Mutex* mMutex;
};
} // namespace
} // namespace
#endif

View File

@ -0,0 +1,202 @@
/* -*- Mode: C++; tab-width: 20; indent-tabs-mode: nil; c-basic-offset: 2 -*-
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "JobScheduler.h"
#include "mozilla/gfx/Logging.h"
using namespace std;
namespace mozilla {
namespace gfx {
MultiThreadedJobQueue::MultiThreadedJobQueue()
: mThreadsCount(0)
, mShuttingDown(false)
{}
MultiThreadedJobQueue::~MultiThreadedJobQueue()
{
MOZ_ASSERT(mJobs.empty());
}
bool
MultiThreadedJobQueue::WaitForJob(Job*& aOutJob)
{
return PopJob(aOutJob, BLOCKING);
}
bool
MultiThreadedJobQueue::PopJob(Job*& aOutJobs, AccessType aAccess)
{
for (;;) {
MutexAutoLock lock(&mMutex);
while (aAccess == BLOCKING && !mShuttingDown && mJobs.empty()) {
mAvailableCondvar.Wait(&mMutex);
}
if (mShuttingDown) {
return false;
}
if (mJobs.empty()) {
if (aAccess == NON_BLOCKING) {
return false;
}
continue;
}
Job* task = mJobs.front();
MOZ_ASSERT(task);
mJobs.pop_front();
aOutJobs = task;
return true;
}
}
void
MultiThreadedJobQueue::SubmitJob(Job* aJobs)
{
MOZ_ASSERT(aJobs);
MutexAutoLock lock(&mMutex);
mJobs.push_back(aJobs);
mAvailableCondvar.Broadcast();
}
size_t
MultiThreadedJobQueue::NumJobs()
{
MutexAutoLock lock(&mMutex);
return mJobs.size();
}
bool
MultiThreadedJobQueue::IsEmpty()
{
MutexAutoLock lock(&mMutex);
return mJobs.empty();
}
void
MultiThreadedJobQueue::ShutDown()
{
MutexAutoLock lock(&mMutex);
mShuttingDown = true;
while (mThreadsCount) {
mAvailableCondvar.Broadcast();
mShutdownCondvar.Wait(&mMutex);
}
}
void
MultiThreadedJobQueue::RegisterThread()
{
mThreadsCount += 1;
}
void
MultiThreadedJobQueue::UnregisterThread()
{
MutexAutoLock lock(&mMutex);
mThreadsCount -= 1;
if (mThreadsCount == 0) {
mShutdownCondvar.Broadcast();
}
}
void* ThreadCallback(void* threadData)
{
WorkerThread* thread = (WorkerThread*)threadData;
thread->Run();
return nullptr;
}
WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue)
: mQueue(aJobQueue)
{
aJobQueue->RegisterThread();
pthread_create(&mThread, nullptr, ThreadCallback, this);
}
WorkerThread::~WorkerThread()
{
pthread_join(mThread, nullptr);
}
void
WorkerThread::SetName(const char* aName)
{
// Call this from the thread itself because of Mac.
#ifdef XP_MACOSX
pthread_setname_np(aName);
#elif defined(__DragonFly__) || defined(__FreeBSD__) || defined(__OpenBSD__)
pthread_set_name_np(mThread, aName);
#elif defined(__NetBSD__)
pthread_setname_np(mThread, "%s", (void*)aName);
#else
pthread_setname_np(mThread, aName);
#endif
}
void
WorkerThread::Run()
{
SetName("gfx worker");
for (;;) {
Job* commands = nullptr;
if (!mQueue->WaitForJob(commands)) {
mQueue->UnregisterThread();
return;
}
JobStatus status = JobScheduler::ProcessJob(commands);
if (status == JobStatus::Error) {
// Don't try to handle errors for now, but that's open to discussions.
// I expect errors to be mostly OOM issues.
MOZ_CRASH();
}
}
}
EventObject::EventObject()
: mIsSet(false)
{}
EventObject::~EventObject()
{}
bool
EventObject::Peak()
{
MutexAutoLock lock(&mMutex);
return mIsSet;
}
void
EventObject::Set()
{
MutexAutoLock lock(&mMutex);
if (!mIsSet) {
mIsSet = true;
mCond.Broadcast();
}
}
void
EventObject::Wait()
{
MutexAutoLock lock(&mMutex);
if (mIsSet) {
return;
}
mCond.Wait(&mMutex);
}
} // namespce
} // namespce

187
gfx/2d/JobScheduler_posix.h Normal file
View File

@ -0,0 +1,187 @@
/* -*- Mode: C++; tab-width: 20; indent-tabs-mode: nil; c-basic-offset: 2 -*-
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef WIN32
#ifndef MOZILLA_GFX_TASKSCHEDULER_POSIX_H_
#define MOZILLA_GFX_TASKSCHEDULER_POSIX_H_
#include <string>
#include <vector>
#include <list>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include "mozilla/RefPtr.h"
#include "mozilla/DebugOnly.h"
namespace mozilla {
namespace gfx {
class Job;
class PosixCondVar;
class Mutex {
public:
Mutex() {
DebugOnly<int> err = pthread_mutex_init(&mMutex, nullptr);
MOZ_ASSERT(!err);
}
~Mutex() {
DebugOnly<int> err = pthread_mutex_destroy(&mMutex);
MOZ_ASSERT(!err);
}
void Lock() {
DebugOnly<int> err = pthread_mutex_lock(&mMutex);
MOZ_ASSERT(!err);
}
void Unlock() {
DebugOnly<int> err = pthread_mutex_unlock(&mMutex);
MOZ_ASSERT(!err);
}
protected:
pthread_mutex_t mMutex;
friend class PosixCondVar;
};
// posix platforms only!
class PosixCondVar {
public:
PosixCondVar() {
DebugOnly<int> err = pthread_cond_init(&mCond, nullptr);
MOZ_ASSERT(!err);
}
~PosixCondVar() {
DebugOnly<int> err = pthread_cond_destroy(&mCond);
MOZ_ASSERT(!err);
}
void Wait(Mutex* aMutex) {
DebugOnly<int> err = pthread_cond_wait(&mCond, &aMutex->mMutex);
MOZ_ASSERT(!err);
}
void Broadcast() {
DebugOnly<int> err = pthread_cond_broadcast(&mCond);
MOZ_ASSERT(!err);
}
protected:
pthread_cond_t mCond;
};
/// A simple and naive multithreaded task queue
///
/// The public interface of this class must remain identical to its equivalent
/// in JobScheduler_win32.h
class MultiThreadedJobQueue {
public:
enum AccessType {
BLOCKING,
NON_BLOCKING
};
// Producer thread
MultiThreadedJobQueue();
// Producer thread
~MultiThreadedJobQueue();
// Worker threads
bool WaitForJob(Job*& aOutJob);
// Any thread
bool PopJob(Job*& aOutJob, AccessType aAccess);
// Any threads
void SubmitJob(Job* aJob);
// Producer thread
void ShutDown();
// Any thread
size_t NumJobs();
// Any thread
bool IsEmpty();
// Producer thread
void RegisterThread();
// Worker threads
void UnregisterThread();
protected:
std::list<Job*> mJobs;
Mutex mMutex;
PosixCondVar mAvailableCondvar;
PosixCondVar mShutdownCondvar;
int32_t mThreadsCount;
bool mShuttingDown;
friend class WorkerThread;
};
/// Worker thread that continuously dequeues Jobs from a MultiThreadedJobQueue
/// and process them.
///
/// The public interface of this class must remain identical to its equivalent
/// in JobScheduler_win32.h
class WorkerThread {
public:
explicit WorkerThread(MultiThreadedJobQueue* aJobQueue);
~WorkerThread();
void Run();
MultiThreadedJobQueue* GetJobQueue() { return mQueue; }
protected:
void SetName(const char* name);
MultiThreadedJobQueue* mQueue;
pthread_t mThread;
};
/// An object that a thread can synchronously wait on.
/// Usually set by a SetEventJob.
class EventObject : public external::AtomicRefCounted<EventObject>
{
public:
MOZ_DECLARE_REFCOUNTED_TYPENAME(EventObject)
EventObject();
~EventObject();
/// Synchronously wait until the event is set.
void Wait();
/// Return true if the event is set, without blocking.
bool Peak();
/// Set the event.
void Set();
protected:
Mutex mMutex;
PosixCondVar mCond;
bool mIsSet;
};
} // namespace
} // namespace
#include "JobScheduler.h"
#endif
#endif

View File

@ -0,0 +1,76 @@
/* -*- Mode: C++; tab-width: 20; indent-tabs-mode: nil; c-basic-offset: 2 -*-
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifdef WIN32
#ifndef MOZILLA_GFX_TASKSCHEDULER_WIN32_H_
#define MOZILLA_GFX_TASKSCHEDULER_WIN32_H_
#define NOT_IMPLEMENTED MOZ_CRASH("Not implemented")
#include "mozilla/RefPtr.h"
namespace mozilla {
namespace gfx {
class WorkerThread;
class Job;
class Mutex {
public:
Mutex() { NOT_IMPLEMENTED; }
~Mutex() { NOT_IMPLEMENTED; }
void Lock() { NOT_IMPLEMENTED; }
void Unlock() { NOT_IMPLEMENTED; }
};
// The public interface of this class must remain identical to its equivalent
// in JobScheduler_posix.h
class MultiThreadedJobQueue {
public:
enum AccessType {
BLOCKING,
NON_BLOCKING
};
bool WaitForJob(Job*& aOutCommands) { NOT_IMPLEMENTED; }
bool PopJob(Job*& aOutCommands, AccessType aAccess) { NOT_IMPLEMENTED; }
void SubmitJob(Job* aCommands) { NOT_IMPLEMENTED; }
void ShutDown() { NOT_IMPLEMENTED; }
size_t NumJobs() { NOT_IMPLEMENTED; }
bool IsEmpty() { NOT_IMPLEMENTED; }
void RegisterThread() { NOT_IMPLEMENTED; }
void UnregisterThread() { NOT_IMPLEMENTED; }
friend class WorkerThread;
};
// The public interface of this class must remain identical to its equivalent
// in JobScheduler_posix.h
class EventObject : public external::AtomicRefCounted<EventObject>
{
public:
MOZ_DECLARE_REFCOUNTED_TYPENAME(EventObject)
EventObject() { NOT_IMPLEMENTED; }
~EventObject() { NOT_IMPLEMENTED; }
void Wait() { NOT_IMPLEMENTED; }
bool Peak() { NOT_IMPLEMENTED; }
void Set() { NOT_IMPLEMENTED; }
};
// The public interface of this class must remain identical to its equivalent
// in JobScheduler_posix.h
class WorkerThread {
public:
explicit WorkerThread(MultiThreadedJobQueue* aJobQueue) { NOT_IMPLEMENTED; }
void Run();
};
} // namespace
} // namespace
#endif
#endif

View File

@ -289,6 +289,13 @@ struct GradientStop
Color color;
};
enum class JobStatus {
Complete,
Wait,
Yield,
Error
};
} // namespace gfx
} // namespace mozilla

View File

@ -26,6 +26,9 @@ EXPORTS.mozilla.gfx += [
'Helpers.h',
'HelpersCairo.h',
'IterableArena.h',
'JobScheduler.h',
'JobScheduler_posix.h',
'JobScheduler_win32.h',
'Logging.h',
'Matrix.h',
'NumericTools.h',
@ -71,6 +74,11 @@ elif CONFIG['MOZ_WIDGET_TOOLKIT'] == 'windows':
]
DEFINES['WIN32'] = True
if CONFIG['MOZ_WIDGET_TOOLKIT'] != 'windows':
SOURCES += [
'JobScheduler_posix.cpp',
]
if CONFIG['MOZ_ENABLE_SKIA']:
UNIFIED_SOURCES += [
'convolver.cpp',
@ -130,6 +138,7 @@ UNIFIED_SOURCES += [
'FilterProcessing.cpp',
'FilterProcessingScalar.cpp',
'ImageScaling.cpp',
'JobScheduler.cpp',
'Matrix.cpp',
'Path.cpp',
'PathCairo.cpp',

View File

@ -0,0 +1,246 @@
/* vim:set ts=2 sw=2 sts=2 et: */
/* Any copyright is dedicated to the Public Domain.
* http://creativecommons.org/publicdomain/zero/1.0/
*/
#ifndef WIN32
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "mozilla/gfx/JobScheduler.h"
#include <pthread.h>
#include <sched.h>
#include <stdlib.h>
#include <time.h>
namespace test_scheduler {
using namespace mozilla::gfx;
using namespace mozilla;
// Artificially cause threads to yield randomly in an attempt to make racy
// things more apparent (if any).
void MaybeYieldThread()
{
if (rand() % 5 == 0) {
sched_yield();
}
}
/// Used by the TestCommand to check that tasks are processed in the right order.
struct SanityChecker {
std::vector<uint64_t> mAdvancements;
mozilla::gfx::Mutex mMutex;
explicit SanityChecker(uint64_t aNumCmdBuffers)
{
for (uint32_t i = 0; i < aNumCmdBuffers; ++i) {
mAdvancements.push_back(0);
}
}
virtual void Check(uint64_t aJobId, uint64_t aCmdId)
{
MaybeYieldThread();
MutexAutoLock lock(&mMutex);
ASSERT_EQ(mAdvancements[aJobId], aCmdId-1);
mAdvancements[aJobId] = aCmdId;
}
};
/// Run checks that are specific to TestSchulerJoin.
struct JoinTestSanityCheck : public SanityChecker {
bool mSpecialJobHasRun;
explicit JoinTestSanityCheck(uint64_t aNumCmdBuffers)
: SanityChecker(aNumCmdBuffers)
, mSpecialJobHasRun(false)
{}
virtual void Check(uint64_t aJobId, uint64_t aCmdId) override
{
// Job 0 is the special task executed when everything is joined after task 1
if (aCmdId == 0) {
ASSERT_FALSE(mSpecialJobHasRun);
mSpecialJobHasRun = true;
for (auto advancement : mAdvancements) {
// Because of the synchronization point (beforeFilter), all
// task buffers should have run task 1 when task 0 is run.
ASSERT_EQ(advancement, (uint32_t)1);
}
} else {
// This check does not apply to task 0.
SanityChecker::Check(aJobId, aCmdId);
}
if (aCmdId == 2) {
ASSERT_TRUE(mSpecialJobHasRun);
}
}
};
class TestJob : public Job
{
public:
TestJob(uint64_t aCmdId, uint64_t aJobId, SanityChecker* aChecker,
SyncObject* aStart, SyncObject* aCompletion)
: Job(aStart, aCompletion, nullptr)
, mCmdId(aCmdId)
, mCmdBufferId(aJobId)
, mSanityChecker(aChecker)
{}
JobStatus Run()
{
MaybeYieldThread();
mSanityChecker->Check(mCmdBufferId, mCmdId);
MaybeYieldThread();
return JobStatus::Complete;
}
uint64_t mCmdId;
uint64_t mCmdBufferId;
SanityChecker* mSanityChecker;
};
/// This test creates aNumCmdBuffers task buffers with sync objects set up
/// so that all tasks will join after command 5 before a task buffer runs
/// a special task (task 0) after which all task buffers fork again.
/// This simulates the kind of scenario where all tiles must join at
/// a certain point to execute, say, a filter, and fork again after the filter
/// has been processed.
/// The main thread is only blocked when waiting for the completion of the entire
/// task stream (it doesn't have to wait at the filter's sync points to orchestrate it).
void TestSchedulerJoin(uint32_t aNumThreads, uint32_t aNumCmdBuffers)
{
JoinTestSanityCheck check(aNumCmdBuffers);
RefPtr<SyncObject> beforeFilter = new SyncObject(aNumCmdBuffers);
RefPtr<SyncObject> afterFilter = new SyncObject();
RefPtr<SyncObject> completion = new SyncObject(aNumCmdBuffers);
for (uint32_t i = 0; i < aNumCmdBuffers; ++i) {
Job* t1 = new TestJob(1, i, &check, nullptr, beforeFilter);
JobScheduler::SubmitJob(t1);
MaybeYieldThread();
}
beforeFilter->FreezePrerequisites();
// This task buffer is executed when all other tasks have joined after task 1
JobScheduler::SubmitJob(
new TestJob(0, 0, &check, beforeFilter, afterFilter)
);
afterFilter->FreezePrerequisites();
for (uint32_t i = 0; i < aNumCmdBuffers; ++i) {
Job* t2 = new TestJob(2, i, &check, afterFilter, completion);
JobScheduler::SubmitJob(t2);
MaybeYieldThread();
}
completion->FreezePrerequisites();
RefPtr<EventObject> waitForCompletion = new EventObject();
auto evtJob = new SetEventJob(waitForCompletion, completion);
JobScheduler::SubmitJob(evtJob);
MaybeYieldThread();
waitForCompletion->Wait();
MaybeYieldThread();
for (auto advancement : check.mAdvancements) {
ASSERT_TRUE(advancement == 2);
}
}
/// This test creates several chains of 10 task, tasks of a given chain are executed
/// sequentially, and chains are exectuted in parallel.
/// This simulates the typical scenario where we want to process sequences of drawing
/// commands for several tiles in parallel.
void TestSchedulerChain(uint32_t aNumThreads, uint32_t aNumCmdBuffers)
{
SanityChecker check(aNumCmdBuffers);
RefPtr<SyncObject> completion = new SyncObject(aNumCmdBuffers);
uint32_t numJobs = 10;
for (uint32_t i = 0; i < aNumCmdBuffers; ++i) {
std::vector<RefPtr<SyncObject>> syncs;
std::vector<Job*> tasks;
syncs.reserve(numJobs);
tasks.reserve(numJobs);
for (uint32_t t = 0; t < numJobs-1; ++t) {
syncs.push_back(new SyncObject());
tasks.push_back(new TestJob(t+1, i, &check, t == 0 ? nullptr
: syncs[t-1].get(),
syncs[t]));
syncs.back()->FreezePrerequisites();
}
tasks.push_back(new TestJob(numJobs, i, &check, syncs.back(), completion));
if (i % 2 == 0) {
// submit half of the tasks in order
for (Job* task : tasks) {
JobScheduler::SubmitJob(task);
MaybeYieldThread();
}
} else {
// ... and submit the other half in reverse order
for (int32_t reverse = numJobs-1; reverse >= 0; --reverse) {
JobScheduler::SubmitJob(tasks[reverse]);
MaybeYieldThread();
}
}
}
completion->FreezePrerequisites();
RefPtr<EventObject> waitForCompletion = new EventObject();
auto evtJob = new SetEventJob(waitForCompletion, completion);
JobScheduler::SubmitJob(evtJob);
MaybeYieldThread();
waitForCompletion->Wait();
for (auto advancement : check.mAdvancements) {
ASSERT_TRUE(advancement == numJobs);
}
}
} // namespace test_scheduler
TEST(Moz2D, JobScheduler_Join) {
srand(time(nullptr));
for (uint32_t threads = 1; threads < 8; ++threads) {
for (uint32_t queues = 1; queues < threads; ++queues) {
for (uint32_t buffers = 1; buffers < 100; buffers += 3) {
mozilla::gfx::JobScheduler::Init(threads, queues);
test_scheduler::TestSchedulerJoin(threads, buffers);
mozilla::gfx::JobScheduler::ShutDown();
}
}
}
}
TEST(Moz2D, JobScheduler_Chain) {
srand(time(nullptr));
for (uint32_t threads = 1; threads < 8; ++threads) {
for (uint32_t queues = 1; queues < threads; ++queues) {
for (uint32_t buffers = 1; buffers < 100; buffers += 3) {
mozilla::gfx::JobScheduler::Init(threads, queues);
test_scheduler::TestSchedulerChain(threads, buffers);
mozilla::gfx::JobScheduler::ShutDown();
}
}
}
}
#endif

View File

@ -14,6 +14,7 @@ UNIFIED_SOURCES += [
'TestCompositor.cpp',
'TestGfxPrefs.cpp',
'TestGfxWidgets.cpp',
'TestJobScheduler.cpp',
'TestLayers.cpp',
'TestMoz2D.cpp',
'TestQcms.cpp',