Bug 969722 - Part 1: Remove ThreadPoolWorker subclasses for ease of inlining. (r=nmatsakis)

This commit is contained in:
Shu-yu Guo 2014-02-14 13:59:13 -08:00
parent 6f93fad4e2
commit 72398e57b9
6 changed files with 200 additions and 309 deletions

View File

@ -1768,6 +1768,5 @@ CodeGeneratorX86Shared::visitNegF(LNegF *ins)
return true;
}
} // namespace jit
} // namespace js

View File

@ -349,14 +349,13 @@ class ForkJoinShared : public ParallelJob, public Monitor
/////////////////////////////////////////////////////////////////////////
// Asynchronous Flags
//
// These can be read without the lock (hence the |volatile| declaration).
// All fields should be *written with the lock*, however.
// These can be accessed without the lock and are thus atomic.
// Set to true when parallel execution should abort.
volatile bool abort_;
mozilla::Atomic<bool, mozilla::ReleaseAcquire> abort_;
// Set to true when a worker bails for a fatal reason.
volatile bool fatal_;
mozilla::Atomic<bool, mozilla::ReleaseAcquire> fatal_;
public:
ForkJoinShared(JSContext *cx,
@ -372,27 +371,26 @@ class ForkJoinShared : public ParallelJob, public Monitor
ParallelResult execute();
// Invoked from parallel worker threads:
virtual bool executeFromWorker(uint32_t workerId, uintptr_t stackLimit) MOZ_OVERRIDE;
virtual bool executeFromWorker(ThreadPoolWorker *worker, uintptr_t stackLimit) MOZ_OVERRIDE;
// Invoked only from the main thread:
virtual bool executeFromMainThread() MOZ_OVERRIDE;
virtual bool executeFromMainThread(ThreadPoolWorker *worker) MOZ_OVERRIDE;
// Executes the user-supplied function a worker or the main thread.
void executePortion(PerThreadData *perThread, uint32_t workerId);
void executePortion(PerThreadData *perThread, ThreadPoolWorker *worker);
// Moves all the per-thread arenas into the main compartment and processes
// any pending requests for a GC. This can only safely be invoked on the
// main thread after the workers have completed.
void transferArenasToCompartmentAndProcessGCRequests();
// Invoked during processing by worker threads to "check in".
bool check(ForkJoinContext &cx);
// Requests a GC, either full or specific to a zone.
void requestGC(JS::gcreason::Reason reason);
void requestZoneGC(JS::Zone *zone, JS::gcreason::Reason reason);
// Requests that computation abort.
void setAbortFlagDueToInterrupt(ForkJoinContext &cx);
void setAbortFlagAndTriggerOperationCallback(bool fatal);
// Set the fatal flag for the next abort.
@ -598,12 +596,12 @@ ForkJoinOperation::apply()
SpewBeginOp(cx_, "ForkJoinOperation");
// How many workers do we have, counting the main thread.
unsigned numWorkersWithMain = cx_->runtime()->threadPool.numWorkers() + 1;
unsigned numWorkers = cx_->runtime()->threadPool.numWorkers();
if (!bailoutRecords_.resize(numWorkersWithMain))
if (!bailoutRecords_.resize(numWorkers))
return SpewEndOp(ExecutionFatal);
for (uint32_t i = 0; i < numWorkersWithMain; i++)
for (uint32_t i = 0; i < numWorkers; i++)
bailoutRecords_[i].init(cx_);
if (enqueueInitialScript(&status) == RedLight)
@ -633,7 +631,7 @@ ForkJoinOperation::apply()
}
while (bailouts < MAX_BAILOUTS) {
for (uint32_t i = 0; i < numWorkersWithMain; i++)
for (uint32_t i = 0; i < numWorkers; i++)
bailoutRecords_[i].reset(cx_);
if (compileForParallelExecution(&status) == RedLight)
@ -1371,7 +1369,7 @@ ForkJoinShared::init()
if (!cxLock_)
return false;
for (unsigned i = 0; i < (threadPool_->numWorkers() + 1); i++) {
for (unsigned i = 0; i < threadPool_->numWorkers(); i++) {
Allocator *allocator = cx_->new_<Allocator>(cx_->zone());
if (!allocator)
return false;
@ -1439,7 +1437,7 @@ void
ForkJoinShared::transferArenasToCompartmentAndProcessGCRequests()
{
JSCompartment *comp = cx_->compartment();
for (unsigned i = 0; i < (threadPool_->numWorkers() + 1); i++)
for (unsigned i = 0; i < threadPool_->numWorkers(); i++)
comp->adoptWorkerAllocator(allocators_[i]);
if (gcRequested_) {
@ -1453,7 +1451,7 @@ ForkJoinShared::transferArenasToCompartmentAndProcessGCRequests()
}
bool
ForkJoinShared::executeFromWorker(uint32_t workerId, uintptr_t stackLimit)
ForkJoinShared::executeFromWorker(ThreadPoolWorker *worker, uintptr_t stackLimit)
{
PerThreadData thisThread(cx_->runtime());
if (!thisThread.init()) {
@ -1469,21 +1467,21 @@ ForkJoinShared::executeFromWorker(uint32_t workerId, uintptr_t stackLimit)
// Don't use setIonStackLimit() because that acquires the ionStackLimitLock, and the
// lock has not been initialized in these cases.
thisThread.jitStackLimit = stackLimit;
executePortion(&thisThread, workerId);
executePortion(&thisThread, worker);
TlsPerThreadData.set(nullptr);
return !abort_;
}
bool
ForkJoinShared::executeFromMainThread()
ForkJoinShared::executeFromMainThread(ThreadPoolWorker *worker)
{
executePortion(&cx_->mainThread(), threadPool_->numWorkers());
executePortion(&cx_->mainThread(), worker);
return !abort_;
}
void
ForkJoinShared::executePortion(PerThreadData *perThread, uint32_t workerId)
ForkJoinShared::executePortion(PerThreadData *perThread, ThreadPoolWorker *worker)
{
// WARNING: This code runs ON THE PARALLEL WORKER THREAD.
// Be careful when accessing cx_.
@ -1493,8 +1491,8 @@ ForkJoinShared::executePortion(PerThreadData *perThread, uint32_t workerId)
// here for maximum clarity.
JS::AutoAssertNoGC nogc(runtime());
Allocator *allocator = allocators_[workerId];
ForkJoinContext cx(perThread, workerId, allocator, this, &records_[workerId]);
Allocator *allocator = allocators_[worker->id()];
ForkJoinContext cx(perThread, worker, allocator, this, &records_[worker->id()]);
AutoSetForkJoinContext autoContext(&cx);
#ifdef DEBUG
@ -1523,7 +1521,7 @@ ForkJoinShared::executePortion(PerThreadData *perThread, uint32_t workerId)
} else {
ParallelIonInvoke<2> fii(cx_->runtime(), fun_, 2);
fii.args[0] = Int32Value(workerId);
fii.args[0] = Int32Value(worker->id());
fii.args[1] = BooleanValue(false);
bool ok = fii.invoke(perThread);
@ -1535,35 +1533,21 @@ ForkJoinShared::executePortion(PerThreadData *perThread, uint32_t workerId)
Spew(SpewOps, "Down");
}
bool
ForkJoinShared::check(ForkJoinContext &cx)
void
ForkJoinShared::setAbortFlagDueToInterrupt(ForkJoinContext &cx)
{
JS_ASSERT(cx_->runtime()->interruptPar);
if (abort_)
return false;
// Note: We must check if the main thread has exited successfully here, as
// without a main thread the worker threads which are tripping on the
// interrupt flag would never exit.
if (cx.isMainThread() || !threadPool_->isMainThreadActive()) {
JS_ASSERT(!cx_->runtime()->gcIsNeeded);
if (cx_->runtime()->interruptPar) {
// The GC Needed flag should not be set during parallel
// execution. Instead, one of the requestGC() or
// requestZoneGC() methods should be invoked.
JS_ASSERT(!cx_->runtime()->gcIsNeeded);
if (!abort_) {
cx.bailoutRecord->setCause(ParallelBailoutInterrupt);
setAbortFlagAndTriggerOperationCallback(false);
return false;
}
}
return true;
}
void
ForkJoinShared::setAbortFlagAndTriggerOperationCallback(bool fatal)
{
@ -1610,15 +1594,15 @@ ForkJoinShared::requestZoneGC(JS::Zone *zone, JS::gcreason::Reason reason)
// ForkJoinContext
//
ForkJoinContext::ForkJoinContext(PerThreadData *perThreadData, uint32_t workerId,
ForkJoinContext::ForkJoinContext(PerThreadData *perThreadData, ThreadPoolWorker *worker,
Allocator *allocator, ForkJoinShared *shared,
ParallelBailoutRecord *bailoutRecord)
: ThreadSafeContext(shared->runtime(), perThreadData, Context_ForkJoin),
workerId(workerId),
bailoutRecord(bailoutRecord),
targetRegionStart(nullptr),
targetRegionEnd(nullptr),
shared(shared),
shared_(shared),
worker_(worker),
acquiredJSContext_(false),
nogc_(shared->runtime())
{
@ -1640,19 +1624,19 @@ ForkJoinContext::ForkJoinContext(PerThreadData *perThreadData, uint32_t workerId
bool
ForkJoinContext::isMainThread() const
{
return perThreadData == &shared->runtime()->mainThread;
return perThreadData == &shared_->runtime()->mainThread;
}
JSRuntime *
ForkJoinContext::runtime()
{
return shared->runtime();
return shared_->runtime();
}
JSContext *
ForkJoinContext::acquireJSContext()
{
JSContext *cx = shared->acquireJSContext();
JSContext *cx = shared_->acquireJSContext();
JS_ASSERT(!acquiredJSContext_);
acquiredJSContext_ = true;
return cx;
@ -1663,7 +1647,7 @@ ForkJoinContext::releaseJSContext()
{
JS_ASSERT(acquiredJSContext_);
acquiredJSContext_ = false;
return shared->releaseJSContext();
return shared_->releaseJSContext();
}
bool
@ -1675,32 +1659,33 @@ ForkJoinContext::hasAcquiredJSContext() const
bool
ForkJoinContext::check()
{
if (runtime()->interruptPar)
return shared->check(*this);
else
if (runtime()->interruptPar) {
shared_->setAbortFlagDueToInterrupt(*this);
return false;
}
return true;
}
void
ForkJoinContext::requestGC(JS::gcreason::Reason reason)
{
shared->requestGC(reason);
shared_->requestGC(reason);
bailoutRecord->setCause(ParallelBailoutRequestedGC);
shared->setAbortFlagAndTriggerOperationCallback(false);
shared_->setAbortFlagAndTriggerOperationCallback(false);
}
void
ForkJoinContext::requestZoneGC(JS::Zone *zone, JS::gcreason::Reason reason)
{
shared->requestZoneGC(zone, reason);
shared_->requestZoneGC(zone, reason);
bailoutRecord->setCause(ParallelBailoutRequestedZoneGC);
shared->setAbortFlagAndTriggerOperationCallback(false);
shared_->setAbortFlagAndTriggerOperationCallback(false);
}
bool
ForkJoinContext::setPendingAbortFatal(ParallelBailoutCause cause)
{
shared->setPendingAbortFatal();
shared_->setPendingAbortFatal();
bailoutRecord->setCause(cause);
return false;
}
@ -1901,8 +1886,8 @@ class ParallelSpewer
char bufbuf[BufferSize];
JS_snprintf(bufbuf, BufferSize, "[%%sParallel:%%0%du%%s] ",
NumberOfDigits(cx->maxWorkerId));
JS_snprintf(buf, BufferSize, bufbuf, workerColor(cx->workerId),
cx->workerId, reset());
JS_snprintf(buf, BufferSize, bufbuf, workerColor(cx->workerId()),
cx->workerId(), reset());
} else {
JS_snprintf(buf, BufferSize, "[Parallel:M] ");
}

View File

@ -312,9 +312,6 @@ struct ForkJoinShared;
class ForkJoinContext : public ThreadSafeContext
{
public:
// The worker that is doing the work.
const uint32_t workerId;
// Bailout record used to record the reason this thread stopped executing
ParallelBailoutRecord *const bailoutRecord;
@ -343,17 +340,16 @@ class ForkJoinContext : public ThreadSafeContext
uint8_t *targetRegionStart;
uint8_t *targetRegionEnd;
ForkJoinContext(PerThreadData *perThreadData, uint32_t workerId,
ForkJoinContext(PerThreadData *perThreadData, ThreadPoolWorker *worker,
Allocator *allocator, ForkJoinShared *shared,
ParallelBailoutRecord *bailoutRecord);
// Get the worker id. The main thread by convention has the id of the max
// worker thread id + 1.
uint32_t workerId() const { return worker_->id(); }
// Get a slice of work for the worker associated with the context.
bool getSlice(uint16_t *sliceId) {
ThreadPool &pool = runtime()->threadPool;
return (isMainThread()
? pool.getSliceForMainThread(sliceId)
: pool.getSliceForWorker(workerId, sliceId));
}
bool getSlice(uint16_t *sliceId) { return worker_->getSlice(this, sliceId); }
// True if this is the main thread, false if it is one of the parallel workers.
bool isMainThread() const;
@ -388,7 +384,7 @@ class ForkJoinContext : public ThreadSafeContext
// also rendesvous to perform GC or do other similar things.
//
// This function is guaranteed to have no effect if both
// runtime()->interrupt is zero. Ion-generated code takes
// runtime()->interruptPar is zero. Ion-generated code takes
// advantage of this by inlining the checks on those flags before
// actually calling this function. If this function ends up
// getting called a lot from outside ion code, we can refactor
@ -416,7 +412,9 @@ class ForkJoinContext : public ThreadSafeContext
// Initialized by initialize()
static mozilla::ThreadLocal<ForkJoinContext*> tlsForkJoinContext;
ForkJoinShared *const shared;
ForkJoinShared *const shared_;
ThreadPoolWorker *worker_;
bool acquiredJSContext_;

View File

@ -299,7 +299,7 @@ static bool
intrinsic_ForkJoinNumWorkers(JSContext *cx, unsigned argc, Value *vp)
{
CallArgs args = CallArgsFromVp(argc, vp);
args.rval().setInt32(cx->runtime()->threadPool.numWorkers() + 1);
args.rval().setInt32(cx->runtime()->threadPool.numWorkers());
return true;
}

View File

@ -18,128 +18,31 @@ using namespace js;
const size_t WORKER_THREAD_STACK_SIZE = 1*1024*1024;
/////////////////////////////////////////////////////////////////////////////
// ThreadPoolBaseWorker
//
// Base class for worker threads in the pool.
class js::ThreadPoolBaseWorker
static inline uint32_t
ComposeSliceBounds(uint16_t from, uint16_t to)
{
protected:
const uint32_t workerId_;
ThreadPool *pool_;
private:
// Slices this thread is responsible for.
//
// This a uint32 composed of two uint16s (the lower and upper bounds) so
// that we may do a single CAS. See {Compose,Decompose}SliceBounds
// functions below.
mozilla::Atomic<uint32_t, mozilla::ReleaseAcquire> sliceBounds_;
protected:
static uint32_t ComposeSliceBounds(uint16_t from, uint16_t to) {
MOZ_ASSERT(from <= to);
return (uint32_t(from) << 16) | to;
}
static void DecomposeSliceBounds(uint32_t bounds, uint16_t *from, uint16_t *to) {
static inline void
DecomposeSliceBounds(uint32_t bounds, uint16_t *from, uint16_t *to)
{
*from = bounds >> 16;
*to = bounds & uint16_t(~0);
MOZ_ASSERT(*from <= *to);
}
bool hasWork() const {
bool
ThreadPoolWorker::hasWork() const
{
uint16_t from, to;
DecomposeSliceBounds(sliceBounds_, &from, &to);
return from != to;
}
bool popSliceFront(uint16_t *sliceId);
bool popSliceBack(uint16_t *sliceId);
bool stealFrom(ThreadPoolBaseWorker *victim, uint16_t *sliceId);
public:
ThreadPoolBaseWorker(uint32_t workerId, ThreadPool *pool)
: workerId_(workerId),
pool_(pool),
sliceBounds_(0)
{ }
void submitSlices(uint16_t sliceFrom, uint16_t sliceTo) {
MOZ_ASSERT(!hasWork());
sliceBounds_ = ComposeSliceBounds(sliceFrom, sliceTo);
}
void abort();
};
/////////////////////////////////////////////////////////////////////////////
// ThreadPoolWorker
//
// Each |ThreadPoolWorker| just hangs around waiting for slices to be added to
// its worklist. Whenever something is added, it gets executed. Once the
// worker's state is set to |TERMINATED|, the worker will exit as soon as its
// queue is empty.
class js::ThreadPoolWorker : public ThreadPoolBaseWorker
{
friend class ThreadPoolMainWorker;
// Current point in the worker's lifecycle.
//
// Modified only while holding the ThreadPoolWorker's lock.
volatile enum WorkerState {
CREATED, ACTIVE, TERMINATED
} state_;
// The thread's main function
static void ThreadMain(void *arg);
void run();
public:
ThreadPoolWorker(uint32_t workerId, ThreadPool *pool)
: ThreadPoolBaseWorker(workerId, pool),
state_(CREATED)
{ }
// Get a slice of work, from ourself or steal work from other workers
// (or from the main thread).
bool getSlice(uint16_t *sliceId);
// Invoked from main thread; signals worker to start.
bool start();
// Invoked from main thread; signals the worker loop to return.
void terminate(AutoLockMonitor &lock);
};
// ThreadPoolMainWorker
//
// This class abstracts the main thread as a worker thread with a private
// queue to allow for work stealing.
class js::ThreadPoolMainWorker : public ThreadPoolBaseWorker
{
friend class ThreadPoolWorker;
public:
bool isActive;
ThreadPoolMainWorker(ThreadPool *pool)
: ThreadPoolBaseWorker(0, pool),
isActive(false)
{ }
// Get a slice of work, from ourself or steal work from other workers.
bool getSlice(uint16_t *sliceId);
// Execute a job on the main thread.
void executeJob();
};
bool
ThreadPoolBaseWorker::popSliceFront(uint16_t *sliceId)
ThreadPoolWorker::popSliceFront(uint16_t *sliceId)
{
uint32_t bounds;
uint16_t from, to;
@ -156,7 +59,7 @@ ThreadPoolBaseWorker::popSliceFront(uint16_t *sliceId)
}
bool
ThreadPoolBaseWorker::popSliceBack(uint16_t *sliceId)
ThreadPoolWorker::popSliceBack(uint16_t *sliceId)
{
uint32_t bounds;
uint16_t from, to;
@ -173,7 +76,7 @@ ThreadPoolBaseWorker::popSliceBack(uint16_t *sliceId)
}
void
ThreadPoolBaseWorker::abort()
ThreadPoolWorker::discardSlices()
{
uint32_t bounds;
uint16_t from, to;
@ -186,7 +89,7 @@ ThreadPoolBaseWorker::abort()
}
bool
ThreadPoolBaseWorker::stealFrom(ThreadPoolBaseWorker *victim, uint16_t *sliceId)
ThreadPoolWorker::stealFrom(ThreadPoolWorker *victim, uint16_t *sliceId)
{
// Instead of popping the slice from the front by incrementing sliceFrom_,
// decrement sliceTo_. Usually this gives us better locality.
@ -204,13 +107,16 @@ ThreadPoolWorker::start()
#ifndef JS_THREADSAFE
return false;
#else
if (isMainThread())
return true;
MOZ_ASSERT(state_ == CREATED);
// Set state to active now, *before* the thread starts:
state_ = ACTIVE;
if (!PR_CreateThread(PR_USER_THREAD,
ThreadMain, this,
HelperThreadMain, this,
PR_PRIORITY_NORMAL, PR_LOCAL_THREAD,
PR_UNJOINABLE_THREAD,
WORKER_THREAD_STACK_SIZE))
@ -225,44 +131,17 @@ ThreadPoolWorker::start()
}
void
ThreadPoolWorker::ThreadMain(void *arg)
ThreadPoolWorker::HelperThreadMain(void *arg)
{
ThreadPoolWorker *worker = (ThreadPoolWorker*) arg;
worker->run();
}
bool
ThreadPoolWorker::getSlice(uint16_t *sliceId)
{
// First see whether we have any work ourself.
if (popSliceFront(sliceId))
return true;
// Try to steal work.
if (!pool_->workStealing())
return false;
ThreadPoolBaseWorker *victim;
do {
if (!pool_->hasWork())
return false;
// Add one to add the main thread into the mix.
uint32_t victimId = rand() % (pool_->numWorkers() + 1);
// By convention consider worker id 0 the main thread.
if (victimId == 0)
victim = pool_->mainWorker_;
else
victim = pool_->workers_[victimId - 1];
} while (!stealFrom(victim, sliceId));
return true;
worker->helperLoop();
}
void
ThreadPoolWorker::run()
ThreadPoolWorker::helperLoop()
{
MOZ_ASSERT(!isMainThread());
// This is hokey in the extreme. To compute the stack limit,
// subtract the size of the stack from the address of a local
// variable and give a 100k buffer. Is there a better way?
@ -271,6 +150,7 @@ ThreadPoolWorker::run()
uintptr_t stackLimit = (((uintptr_t)&stackLimitOffset) +
stackLimitOffset * JS_STACK_GROWTH_DIRECTION);
for (;;) {
// Wait for work to arrive or for us to terminate.
{
@ -286,7 +166,7 @@ ThreadPoolWorker::run()
pool_->activeWorkers_++;
}
if (!pool_->job()->executeFromWorker(workerId_, stackLimit))
if (!pool_->job()->executeFromWorker(this, stackLimit))
pool_->abortJob();
// Join the pool.
@ -298,22 +178,14 @@ ThreadPoolWorker::run()
}
void
ThreadPoolWorker::terminate(AutoLockMonitor &lock)
ThreadPoolWorker::submitSlices(uint16_t sliceFrom, uint16_t sliceTo)
{
MOZ_ASSERT(lock.isFor(*pool_));
MOZ_ASSERT(state_ != TERMINATED);
state_ = TERMINATED;
}
void
ThreadPoolMainWorker::executeJob()
{
if (!pool_->job()->executeFromMainThread())
pool_->abortJob();
MOZ_ASSERT(!hasWork());
sliceBounds_ = ComposeSliceBounds(sliceFrom, sliceTo);
}
bool
ThreadPoolMainWorker::getSlice(uint16_t *sliceId)
ThreadPoolWorker::getSlice(ForkJoinContext *cx, uint16_t *sliceId)
{
// First see whether we have any work ourself.
if (popSliceFront(sliceId))
@ -323,18 +195,26 @@ ThreadPoolMainWorker::getSlice(uint16_t *sliceId)
if (!pool_->workStealing())
return false;
// Pick a random target with work left over.
ThreadPoolWorker *victim;
do {
if (!pool_->hasWork())
return false;
// Add one to add the main thread into the mix.
victim = pool_->workers_[rand() % pool_->numWorkers()];
} while (!stealFrom(victim, sliceId));
return true;
}
void
ThreadPoolWorker::terminate(AutoLockMonitor &lock)
{
MOZ_ASSERT(lock.isFor(*pool_));
MOZ_ASSERT(state_ != TERMINATED);
state_ = TERMINATED;
}
/////////////////////////////////////////////////////////////////////////////
// ThreadPool
//
@ -343,14 +223,14 @@ ThreadPoolMainWorker::getSlice(uint16_t *sliceId)
ThreadPool::ThreadPool(JSRuntime *rt)
: runtime_(rt),
mainWorker_(nullptr),
activeWorkers_(0),
joinBarrier_(nullptr),
job_(nullptr),
#ifdef DEBUG
stolenSlices_(0),
#endif
pendingSlices_(0)
pendingSlices_(0),
isMainThreadActive_(false)
{ }
ThreadPool::~ThreadPool()
@ -379,10 +259,9 @@ uint32_t
ThreadPool::numWorkers() const
{
#ifdef JS_THREADSAFE
// Subtract one for the main thread, which always exists.
return WorkerThreadState().cpuCount - 1;
return WorkerThreadState().cpuCount;
#else
return 0;
return 1;
#endif
}
@ -397,12 +276,6 @@ ThreadPool::workStealing() const
return true;
}
bool
ThreadPool::isMainThreadActive() const
{
return mainWorker_ && mainWorker_->isActive;
}
bool
ThreadPool::lazyStartWorkers(JSContext *cx)
{
@ -464,7 +337,7 @@ ThreadPool::terminateWorkers()
// Wake up all the workers. Set the number of active workers to the
// current number of workers so we can make sure they all join.
activeWorkers_ = workers_.length();
activeWorkers_ = workers_.length() - 1;
lock.notifyAll();
// Wait for all workers to join.
@ -473,8 +346,6 @@ ThreadPool::terminateWorkers()
while (workers_.length() > 0)
js_delete(workers_.popCopy());
}
js_delete(mainWorker_);
}
void
@ -508,22 +379,13 @@ ThreadPool::executeJob(JSContext *cx, ParallelJob *job, uint16_t sliceFrom, uint
MOZ_ASSERT(activeWorkers_ == 0);
MOZ_ASSERT(!hasWork());
// Create the main thread worker and off-main-thread workers if necessary.
if (!mainWorker_) {
mainWorker_ = cx->new_<ThreadPoolMainWorker>(this);
if (!mainWorker_) {
terminateWorkersAndReportOOM(cx);
return TP_FATAL;
}
}
if (!lazyStartWorkers(cx))
return TP_FATAL;
// Evenly distribute slices to the workers.
uint16_t numSlices = sliceMax - sliceFrom;
uint16_t slicesPerWorker = numSlices / (numWorkers() + 1);
uint16_t leftover = numSlices % (numWorkers() + 1);
uint16_t slicesPerWorker = numSlices / numWorkers();
uint16_t leftover = numSlices % numWorkers();
uint16_t sliceTo = sliceFrom;
for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) {
if (leftover > 0) {
@ -536,7 +398,6 @@ ThreadPool::executeJob(JSContext *cx, ParallelJob *job, uint16_t sliceFrom, uint
sliceFrom = sliceTo;
}
MOZ_ASSERT(leftover == 0);
mainWorker_->submitSlices(sliceFrom, sliceFrom + slicesPerWorker);
// Notify the worker threads that there's work now.
{
@ -550,9 +411,10 @@ ThreadPool::executeJob(JSContext *cx, ParallelJob *job, uint16_t sliceFrom, uint
}
// Do work on the main thread.
mainWorker_->isActive = true;
mainWorker_->executeJob();
mainWorker_->isActive = false;
isMainThreadActive_ = true;
if (!job->executeFromMainThread(mainThreadWorker()))
abortJob();
isMainThreadActive_ = false;
// Wait for all threads to join. While there are no pending slices at this
// point, the slices themselves may not be finished processing.
@ -569,31 +431,16 @@ ThreadPool::executeJob(JSContext *cx, ParallelJob *job, uint16_t sliceFrom, uint
return TP_SUCCESS;
}
bool
ThreadPool::getSliceForWorker(uint32_t workerId, uint16_t *sliceId)
{
MOZ_ASSERT(workers_[workerId]);
return workers_[workerId]->getSlice(sliceId);
}
bool
ThreadPool::getSliceForMainThread(uint16_t *sliceId)
{
MOZ_ASSERT(mainWorker_);
return mainWorker_->getSlice(sliceId);
}
void
ThreadPool::abortJob()
{
mainWorker_->abort();
for (uint32_t workerId = 0; workerId < numWorkers(); workerId++)
workers_[workerId]->abort();
workers_[workerId]->discardSlices();
// Spin until pendingSlices_ reaches 0.
//
// The reason for this is that while calling abort() clears all workers'
// bounds, the pendingSlices_ cache might still be > 0 due to
// The reason for this is that while calling discardSlices() clears all
// workers' bounds, the pendingSlices_ cache might still be > 0 due to
// still-executing calls to popSliceBack or popSliceFront in other
// threads. When those finish, we will be sure that !hasWork(), which is
// important to ensure that an aborted worker does not start again due to

View File

@ -11,6 +11,7 @@
#include "jsalloc.h"
#include "jslock.h"
#include "jsmath.h"
#include "jspubtd.h"
#include "js/Vector.h"
@ -21,22 +22,88 @@ struct JSCompartment;
namespace js {
class ThreadPoolBaseWorker;
class ThreadPoolWorker;
class ThreadPoolMainWorker;
class ThreadPool;
/////////////////////////////////////////////////////////////////////////////
// ThreadPoolWorker
//
// Class for worker threads in the pool. All threads (i.e. helpers and main
// thread) have a worker associted with them. By convention, the worker id of
// the main thread is 0.
class ThreadPoolWorker
{
const uint32_t workerId_;
ThreadPool *pool_;
// Slices this thread is responsible for.
//
// This a uint32 composed of two uint16s (the lower and upper bounds) so
// that we may do a single CAS. See {Compose,Decompose}SliceBounds
// functions below.
mozilla::Atomic<uint32_t, mozilla::ReleaseAcquire> sliceBounds_;
// Current point in the worker's lifecycle.
volatile enum WorkerState {
CREATED, ACTIVE, TERMINATED
} state_;
// The thread's main function.
static void HelperThreadMain(void *arg);
void helperLoop();
bool hasWork() const;
bool popSliceFront(uint16_t *sliceId);
bool popSliceBack(uint16_t *sliceId);
bool stealFrom(ThreadPoolWorker *victim, uint16_t *sliceId);
public:
ThreadPoolWorker(uint32_t workerId, ThreadPool *pool)
: workerId_(workerId),
pool_(pool),
sliceBounds_(0),
state_(CREATED)
{ }
uint32_t id() const { return workerId_; }
bool isMainThread() const { return id() == 0; }
// Submits a new set of slices. Assumes !hasWork().
void submitSlices(uint16_t sliceFrom, uint16_t sliceTo);
// Get the next slice; work stealing happens here if work stealing is
// on. Returns false if there are no more slices to hand out.
bool getSlice(ForkJoinContext *cx, uint16_t *sliceId);
// Discard remaining slices. Used for aborting jobs.
void discardSlices();
// Invoked from the main thread; signals worker to start.
bool start();
// Invoked from the main thread; signals the worker loop to return.
void terminate(AutoLockMonitor &lock);
static size_t offsetOfSliceBounds() {
return offsetof(ThreadPoolWorker, sliceBounds_);
}
};
/////////////////////////////////////////////////////////////////////////////
// A ParallelJob is the main runnable abstraction in the ThreadPool.
//
// The unit of work here is in terms of threads, *not* slices. The
// user-provided function has the responsibility of getting slices of work via
// the |ForkJoinGetSlice| intrinsic.
class ParallelJob
{
public:
virtual bool executeFromWorker(uint32_t workerId, uintptr_t stackLimit) = 0;
virtual bool executeFromMainThread() = 0;
virtual bool executeFromWorker(ThreadPoolWorker *worker, uintptr_t stackLimit) = 0;
virtual bool executeFromMainThread(ThreadPoolWorker *mainWorker) = 0;
};
/////////////////////////////////////////////////////////////////////////////
// ThreadPool used for parallel JavaScript execution. Unless you are building
// a new kind of parallel service, it is very likely that you do not wish to
// interact with the threadpool directly. In particular, if you wish to
@ -79,17 +146,13 @@ class ParallelJob
class ThreadPool : public Monitor
{
private:
friend class ThreadPoolBaseWorker;
friend class ThreadPoolWorker;
friend class ThreadPoolMainWorker;
// Initialized at startup only.
JSRuntime *const runtime_;
// Worker threads and the main thread worker have different
// logic. Initialized lazily.
// Initialized lazily.
js::Vector<ThreadPoolWorker *, 8, SystemAllocPolicy> workers_;
ThreadPoolMainWorker *mainWorker_;
// The number of active workers. Should only access under lock.
uint32_t activeWorkers_;
@ -106,11 +169,15 @@ class ThreadPool : public Monitor
// Number of pending slices in the current job.
mozilla::Atomic<uint32_t, mozilla::ReleaseAcquire> pendingSlices_;
// Whether the main thread is currently processing slices.
bool isMainThreadActive_;
bool lazyStartWorkers(JSContext *cx);
void terminateWorkers();
void terminateWorkersAndReportOOM(JSContext *cx);
void join(AutoLockMonitor &lock);
void waitForWorkers(AutoLockMonitor &lock);
ThreadPoolWorker *mainThreadWorker() { return workers_[0]; }
public:
ThreadPool(JSRuntime *rt);
@ -118,7 +185,7 @@ class ThreadPool : public Monitor
bool init();
// Return number of worker threads in the pool, not counting the main thread.
// Return number of worker threads in the pool, counting the main thread.
uint32_t numWorkers() const;
// Returns whether we have any pending slices.
@ -134,7 +201,7 @@ class ThreadPool : public Monitor
bool workStealing() const;
// Returns whether or not the main thread is working.
bool isMainThreadActive() const;
bool isMainThreadActive() const { return isMainThreadActive_; }
#ifdef DEBUG
// Return the number of stolen slices in the last parallel job.
@ -151,11 +218,6 @@ class ThreadPool : public Monitor
ParallelResult executeJob(JSContext *cx, ParallelJob *job, uint16_t sliceStart,
uint16_t numSlices);
// Get the next slice; work stealing happens here if work stealing is
// on. Returns false if there are no more slices to hand out.
bool getSliceForWorker(uint32_t workerId, uint16_t *sliceId);
bool getSliceForMainThread(uint16_t *sliceId);
// Abort the current job.
void abortJob();
};