Bug 816805 - Style fixes for ThreadPool, ForkJoin, and Monitor (r=jwalden)

This commit is contained in:
Shu-yu Guo 2012-11-30 15:15:20 -08:00
parent 77c6546972
commit 26cd833553
7 changed files with 379 additions and 387 deletions

View File

@ -1,7 +1,18 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: set ts=8 sw=4 et tw=78:
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef ForkJoin_inl_h__
#define ForkJoin_inl_h__
namespace js {
ForkJoinSlice *
ForkJoinSlice::current() {
inline ForkJoinSlice *
ForkJoinSlice::current()
{
#ifdef JS_THREADSAFE_ION
return (ForkJoinSlice*) PR_GetThreadPrivate(ThreadPrivateIndex);
#else
@ -9,4 +20,18 @@ ForkJoinSlice::current() {
#endif
}
// True if this thread is currently executing a parallel operation across
// multiple threads.
static inline bool
InParallelSection()
{
#ifdef JS_THREADSAFE
return ForkJoinSlice::current() != NULL;
#else
return false;
#endif
}
} // namespace js
#endif // ForkJoin_inl_h__

View File

@ -5,25 +5,25 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "ForkJoin.h"
#include "Monitor.h"
#include "jscntxt.h"
#include "jscompartment.h"
#include "ForkJoin-inl.h"
#include "vm/ForkJoin.h"
#include "vm/Monitor.h"
#include "vm/ForkJoin-inl.h"
#ifdef JS_THREADSAFE
# include "prthread.h"
#endif
namespace js {
using namespace js;
#ifdef JS_THREADSAFE
class ForkJoinShared
: public TaskExecutor,
public Monitor
class js::ForkJoinShared : public TaskExecutor, public Monitor
{
////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////
// Constant fields
JSContext *const cx_; // Current context
@ -32,14 +32,14 @@ class ForkJoinShared
const size_t numThreads_; // Total number of threads.
PRCondVar *rendezvousEnd_; // Cond. var used to signal end of rendezvous.
////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////
// Per-thread arenas
//
// Each worker thread gets an arena to use when allocating.
Vector<gc::ArenaLists *, 16> arenaListss_;
////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////
// Locked Fields
//
// Only to be accessed while holding the lock.
@ -48,23 +48,23 @@ class ForkJoinShared
size_t blocked_; // Number of threads that have joined the rendezvous.
size_t rendezvousIndex_; // Number of rendezvous attempts
////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////
// Asynchronous Flags
//
// These can be read without the lock (hence the |volatile| declaration).
// A thread has bailed and others should follow suit. Set and
// read asynchronously. After setting abort, workers will acquire
// the lock, decrement uncompleted, and then notify if uncompleted
// has reached blocked.
// A thread has bailed and others should follow suit. Set and read
// asynchronously. After setting abort, workers will acquire the lock,
// decrement uncompleted, and then notify if uncompleted has reached
// blocked.
volatile bool abort_;
// Set to true when a worker bails for a fatal reason.
volatile bool fatal_;
// A thread has request a rendezvous. Only *written* with the
// lock (in |initiateRendezvous()| and |endRendezvous()|) but may
// be *read* without the lock.
// A thread has request a rendezvous. Only *written* with the lock (in
// |initiateRendezvous()| and |endRendezvous()|) but may be *read* without
// the lock.
volatile bool rendezvous_;
// Invoked only from the main thread:
@ -76,24 +76,24 @@ class ForkJoinShared
// Rendezvous protocol:
//
// Use AutoRendezvous rather than invoking initiateRendezvous()
// and endRendezvous() directly.
// Use AutoRendezvous rather than invoking initiateRendezvous() and
// endRendezvous() directly.
friend class AutoRendezvous;
// Requests that the other threads stop. Must be invoked from the
// main thread.
// Requests that the other threads stop. Must be invoked from the main
// thread.
void initiateRendezvous(ForkJoinSlice &threadCx);
// If a rendezvous has been requested, blocks until the main
// thread says we may continue.
// If a rendezvous has been requested, blocks until the main thread says
// we may continue.
void joinRendezvous(ForkJoinSlice &threadCx);
// Permits other threads to resume execution. Must be invoked
// from the main thread after a call to initiateRendezvous().
// Permits other threads to resume execution. Must be invoked from the
// main thread after a call to initiateRendezvous().
void endRendezvous(ForkJoinSlice &threadCx);
public:
public:
ForkJoinShared(JSContext *cx,
ThreadPool *threadPool,
ForkJoinOp &op,
@ -108,12 +108,12 @@ public:
// Invoked from parallel worker threads:
virtual void executeFromWorker(size_t threadId, uintptr_t stackLimit);
// Moves all the per-thread arenas into the main compartment.
// This can only safely be invoked on the main thread, either
// during a rendezvous or after the workers have completed.
// Moves all the per-thread arenas into the main compartment. This can
// only safely be invoked on the main thread, either during a rendezvous
// or after the workers have completed.
void transferArenasToCompartment();
// Invoked during processing by worker threads to "check in"
// Invoked during processing by worker threads to "check in".
bool check(ForkJoinSlice &threadCx);
// See comment on |ForkJoinSlice::setFatal()| in forkjoin.h
@ -122,42 +122,40 @@ public:
JSRuntime *runtime() { return cx_->runtime; }
};
class AutoRendezvous {
private:
class js::AutoRendezvous
{
private:
ForkJoinSlice &threadCx;
public:
public:
AutoRendezvous(ForkJoinSlice &threadCx)
: threadCx(threadCx)
{
threadCx.shared->initiateRendezvous(threadCx);
}
~AutoRendezvous()
{
~AutoRendezvous() {
threadCx.shared->endRendezvous(threadCx);
}
};
PRUintn ForkJoinSlice::ThreadPrivateIndex;
class AutoSetForkJoinSlice
class js::AutoSetForkJoinSlice
{
public:
AutoSetForkJoinSlice(ForkJoinSlice *threadCx)
{
public:
AutoSetForkJoinSlice(ForkJoinSlice *threadCx) {
PR_SetThreadPrivate(ForkJoinSlice::ThreadPrivateIndex, threadCx);
}
~AutoSetForkJoinSlice()
{
~AutoSetForkJoinSlice() {
PR_SetThreadPrivate(ForkJoinSlice::ThreadPrivateIndex, NULL);
}
};
/****************************************************************************
* ForkJoinShared
*/
/////////////////////////////////////////////////////////////////////////////
// ForkJoinShared
//
ForkJoinShared::ForkJoinShared(JSContext *cx,
ThreadPool *threadPool,
@ -175,7 +173,7 @@ ForkJoinShared::ForkJoinShared(JSContext *cx,
abort_(false),
fatal_(false),
rendezvous_(false)
{}
{ }
bool
ForkJoinShared::init()
@ -215,9 +213,8 @@ ForkJoinShared::~ForkJoinShared()
{
PR_DestroyCondVar(rendezvousEnd_);
while (arenaListss_.length() > 0) {
while (arenaListss_.length() > 0)
delete arenaListss_.popCopy();
}
}
ParallelResult
@ -225,22 +222,22 @@ ForkJoinShared::execute()
{
AutoLockMonitor lock(*this);
// give the task set a chance to prepare for parallel workload
// Give the task set a chance to prepare for parallel workload.
if (!op_.pre(numThreads_))
return TP_RETRY_SEQUENTIALLY;
// notify workers to start and execute one portion on this thread
// Notify workers to start and execute one portion on this thread.
{
AutoUnlockMonitor unlock(*this);
threadPool_->submitAll(this);
executeFromMainThread(cx_->runtime->ionStackLimit);
}
// wait for workers to complete
// Wait for workers to complete.
while (uncompleted_ > 0)
lock.wait();
// check if any of the workers failed
// Check if any of the workers failed.
if (abort_) {
if (fatal_)
return TP_FATAL;
@ -250,25 +247,24 @@ ForkJoinShared::execute()
transferArenasToCompartment();
// give task set a chance to cleanup after parallel execution
// Give task set a chance to cleanup after parallel execution.
if (!op_.post(numThreads_))
return TP_RETRY_SEQUENTIALLY;
return TP_SUCCESS; // everything went swimmingly. give yourself a pat on the back.
// Everything went swimmingly. Give yourself a pat on the back.
return TP_SUCCESS;
}
void
ForkJoinShared::transferArenasToCompartment()
{
#if 0
// This code will become relevant once other
// bugs are merged down.
// XXX: This code will become relevant once other bugs are merged down.
JSRuntime *rt = cx_->runtime;
JSCompartment *comp = cx_->compartment;
for (unsigned i = 0; i < numThreads_; i++) {
for (unsigned i = 0; i < numThreads_; i++)
comp->arenas.adoptArenas(rt, arenaListss_[i]);
}
#endif
}
@ -285,9 +281,9 @@ ForkJoinShared::executeFromWorker(size_t workerId, uintptr_t stackLimit)
AutoLockMonitor lock(*this);
uncompleted_ -= 1;
if (blocked_ == uncompleted_) {
// Signal the main thread that we have terminated. It will be
// either working, arranging a rendezvous, or waiting for
// workers to complete.
// Signal the main thread that we have terminated. It will be either
// working, arranging a rendezvous, or waiting for workers to
// complete.
lock.notify();
}
}
@ -315,8 +311,8 @@ ForkJoinShared::executePortion(PerThreadData *perThread,
bool
ForkJoinShared::setFatal()
{
// Might as well set the abort flag to true, it will make
// propagation faster:
// Might as well set the abort flag to true, as it will make propagation
// faster.
abort_ = true;
fatal_ = true;
return false;
@ -330,13 +326,11 @@ ForkJoinShared::check(ForkJoinSlice &slice)
if (slice.isMainThread()) {
if (cx_->runtime->interrupt) {
// If interrupt is requested, bring worker threads to a
// halt, service the interrupt, then let them start back
// up again.
// If interrupt is requested, bring worker threads to a halt,
// service the interrupt, then let them start back up again.
AutoRendezvous autoRendezvous(slice);
if (!js_HandleExecutionInterrupt(cx_)) {
if (!js_HandleExecutionInterrupt(cx_))
return setFatal();
}
}
} else if (rendezvous_) {
joinRendezvous(slice);
@ -346,59 +340,53 @@ ForkJoinShared::check(ForkJoinSlice &slice)
}
void
ForkJoinShared::initiateRendezvous(ForkJoinSlice &slice) {
/*
The rendezvous protocol is always initiated by the main thread.
The main thread sets the rendezvous flag to true. Seeing this
flag, other threads will invoke |joinRendezvous()|, which causes
them to (1) read |rendezvousIndex| and (2) increment the
|blocked| counter. Once the |blocked| counter is equal to
|uncompleted|, all parallel threads have joined the rendezvous,
and so the main thread is signaled. That will cause this
function to return.
Some subtle points:
- Worker threads may potentially terminate their work before
they see the rendezvous flag. In this case, they would
decrement |uncompleted| rather than incrementing |blocked|.
Either way, if the two variables become equal, the main thread
will be notified
- The |rendezvousIndex| counter is used to detect the case where
the main thread signals the end of the rendezvous and then
starts another rendezvous before the workers have a chance to
exit. We circumvent this by having the workers read the
|rendezvousIndex| counter as they enter the rendezvous, and
then they only block until that counter is incremented.
Another alternative would be for the main thread to block in
|endRendezvous()| until all workers have exited, but that
would be slower and involve unnecessary synchronization.
Note that the main thread cannot ever get more than one
rendezvous ahead of the workers, because it must wait for all
of them to enter the rendezvous before it can end it, so the
solution of using a counter is perfectly general and we need
not fear rollover.
*/
ForkJoinShared::initiateRendezvous(ForkJoinSlice &slice)
{
// The rendezvous protocol is always initiated by the main thread. The
// main thread sets the rendezvous flag to true. Seeing this flag, other
// threads will invoke |joinRendezvous()|, which causes them to (1) read
// |rendezvousIndex| and (2) increment the |blocked| counter. Once the
// |blocked| counter is equal to |uncompleted|, all parallel threads have
// joined the rendezvous, and so the main thread is signaled. That will
// cause this function to return.
//
// Some subtle points:
//
// - Worker threads may potentially terminate their work before they see
// the rendezvous flag. In this case, they would decrement
// |uncompleted| rather than incrementing |blocked|. Either way, if the
// two variables become equal, the main thread will be notified
//
// - The |rendezvousIndex| counter is used to detect the case where the
// main thread signals the end of the rendezvous and then starts another
// rendezvous before the workers have a chance to exit. We circumvent
// this by having the workers read the |rendezvousIndex| counter as they
// enter the rendezvous, and then they only block until that counter is
// incremented. Another alternative would be for the main thread to
// block in |endRendezvous()| until all workers have exited, but that
// would be slower and involve unnecessary synchronization.
//
// Note that the main thread cannot ever get more than one rendezvous
// ahead of the workers, because it must wait for all of them to enter
// the rendezvous before it can end it, so the solution of using a
// counter is perfectly general and we need not fear rollover.
JS_ASSERT(slice.isMainThread());
JS_ASSERT(!rendezvous_ && blocked_ == 0);
AutoLockMonitor lock(*this);
// signal other threads we want to start a rendezvous
// Signal other threads we want to start a rendezvous.
rendezvous_ = true;
// wait until all the other threads blocked themselves
while (blocked_ != uncompleted_) {
// Wait until all the other threads blocked themselves.
while (blocked_ != uncompleted_)
lock.wait();
}
}
void
ForkJoinShared::joinRendezvous(ForkJoinSlice &slice) {
ForkJoinShared::joinRendezvous(ForkJoinSlice &slice)
{
JS_ASSERT(!slice.isMainThread());
JS_ASSERT(rendezvous_);
@ -407,21 +395,20 @@ ForkJoinShared::joinRendezvous(ForkJoinSlice &slice) {
blocked_ += 1;
// If we're the last to arrive, let the main thread know about it.
if (blocked_ == uncompleted_) {
if (blocked_ == uncompleted_)
lock.notify();
}
// Wait until the main thread terminates the rendezvous. We use a
// separate condition variable here to distinguish between workers
// notifying the main thread that they have completed and the main
// thread notifying the workers to resume.
while (rendezvousIndex_ == index) {
while (rendezvousIndex_ == index)
PR_WaitCondVar(rendezvousEnd_, PR_INTERVAL_NO_TIMEOUT);
}
}
void
ForkJoinShared::endRendezvous(ForkJoinSlice &slice) {
ForkJoinShared::endRendezvous(ForkJoinSlice &slice)
{
JS_ASSERT(slice.isMainThread());
AutoLockMonitor lock(*this);
@ -429,15 +416,15 @@ ForkJoinShared::endRendezvous(ForkJoinSlice &slice) {
blocked_ = 0;
rendezvousIndex_ += 1;
// signal other threads that rendezvous is over
// Signal other threads that rendezvous is over.
PR_NotifyAllCondVar(rendezvousEnd_);
}
#endif
#endif // JS_THREADSAFE
/****************************************************************************
* ForkJoinSlice
*/
/////////////////////////////////////////////////////////////////////////////
// ForkJoinSlice
//
ForkJoinSlice::ForkJoinSlice(PerThreadData *perThreadData,
size_t sliceId, size_t numSlices,
@ -449,7 +436,7 @@ ForkJoinSlice::ForkJoinSlice(PerThreadData *perThreadData,
ionStackLimit(stackLimit),
arenaLists(arenaLists),
shared(shared)
{}
{ }
bool
ForkJoinSlice::isMainThread()
@ -502,24 +489,25 @@ ForkJoinSlice::Initialize()
#endif
}
/****************************************************************************/
/////////////////////////////////////////////////////////////////////////////
ParallelResult ExecuteForkJoinOp(JSContext *cx, ForkJoinOp &op)
ParallelResult
js::ExecuteForkJoinOp(JSContext *cx, ForkJoinOp &op)
{
# ifndef JS_THREADSAFE_ION
return TP_RETRY_SEQUENTIALLY;
# else
JS_ASSERT(!InParallelSection()); // Recursive use of the ThreadPool is not supported.
#ifdef JS_THREADSAFE
// Recursive use of the ThreadPool is not supported.
JS_ASSERT(!InParallelSection());
ThreadPool *threadPool = &cx->runtime->threadPool;
size_t numThreads = threadPool->numWorkers() + 1; // parallel workers plus this main thread
// Parallel workers plus this main thread.
size_t numThreads = threadPool->numWorkers() + 1;
ForkJoinShared shared(cx, threadPool, op, numThreads, numThreads - 1);
if (!shared.init())
return TP_RETRY_SEQUENTIALLY;
return shared.execute();
# endif
}
#else
return TP_RETRY_SEQUENTIALLY;
#endif
}

View File

@ -5,103 +5,93 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef jstaskset_h___
#define jstaskset_h___
#ifndef ForkJoin_h__
#define ForkJoin_h__
#include "ThreadPool.h"
#include "vm/ThreadPool.h"
/*
* ForkJoin
*
* This is the building block for executing multi-threaded JavaScript
* with shared memory (as distinct from Web Workers). The idea is
* that you have some (typically data-parallel) operation which you
* wish to execute in parallel across as many threads as you have
* available. An example might be applying |map()| to a vector in
* parallel. To implement such a thing, you would define a subclass of
* |ForkJoinOp| to implement the operation and then invoke
* |ExecuteForkJoinOp()|, as follows:
*
* > class MyForkJoinOp {
* > ... define callbacks as appropriate for your operation ...
* > };
* > MyForkJoinOp op;
* > ExecuteForkJoinOp(cx, op);
*
* |ExecuteForkJoinOp()| will fire up the workers in the runtime's
* thread pool, have them execute the callbacks defined in the
* |ForkJoinOp| class, and then return once all the workers have
* completed.
*
* There are three callbacks defined in |ForkJoinOp|. The first,
* |pre()|, is invoked before the parallel section begins. It informs
* you how many slices your problem will be divided into (effectively,
* how many worker threads there will be). This is often useful for
* allocating an array for the workers to store their result or
* something like that.
*
* Next, you will receive |N| calls to the |parallel()| callback,
* where |N| is the number of slices that were specified in |pre()|.
* Each callback will be supplied with a |ForkJoinSlice| instance
* providing some context.
*
* Typically there will be one call to |parallel()| from each worker
* thread, but that is not something you should rely upon---if we
* implement work-stealing, for example, then it could be that a
* single worker thread winds up handling multiple slices.
*
* Finally, after the operation is complete the |post()| callback is
* invoked, giving you a chance to collect the various results.
*
* Operation callback:
*
* During parallel execution, you should periodically invoke
* |slice.check()|, which will handle the operation callback. If the
* operation callback is necessary, |slice.check()| will arrange a
* rendezvous---that is, as each active worker invokes |check()|, it
* will come to a halt until everyone is blocked (Stop The World). At
* this point, we perform the callback on the main thread, and then
* resume execution. If a worker thread terminates before calling
* |check()|, that's fine too. We assume that you do not do unbounded
* work without invoking |check()|.
*
* Sequential Fallback:
*
* It is assumed that anyone using this API must be prepared for a
* sequential fallback. Therefore, the |ExecuteForkJoinOp()| returns
* a status code indicating whether a fatal error occurred (in which
* case you should just stop) or whether you should retry the
* operation, but executing sequentially. An example of where the
* fallback would be useful is if the parallel code encountered an
* unexpected path that cannot safely be executed in parallel (writes
* to shared state, say).
*
* Current Limitations:
*
* - The API does not support recursive or nested use. That is, the
* |parallel()| callback of a |ForkJoinOp| may not itself invoke
* |ExecuteForkJoinOp()|. We may lift this limitation in the
* future.
*
* - No load balancing is performed between worker threads. That
* means that the fork-join system is best suited for problems that
* can be slice into uniform bits.
*/
// ForkJoin
//
// This is the building block for executing multi-threaded JavaScript with
// shared memory (as distinct from Web Workers). The idea is that you have
// some (typically data-parallel) operation which you wish to execute in
// parallel across as many threads as you have available. An example might be
// applying |map()| to a vector in parallel. To implement such a thing, you
// would define a subclass of |ForkJoinOp| to implement the operation and then
// invoke |ExecuteForkJoinOp()|, as follows:
//
// class MyForkJoinOp {
// ... define callbacks as appropriate for your operation ...
// };
// MyForkJoinOp op;
// ExecuteForkJoinOp(cx, op);
//
// |ExecuteForkJoinOp()| will fire up the workers in the runtime's thread
// pool, have them execute the callbacks defined in the |ForkJoinOp| class,
// and then return once all the workers have completed.
//
// There are three callbacks defined in |ForkJoinOp|. The first, |pre()|, is
// invoked before the parallel section begins. It informs you how many slices
// your problem will be divided into (effectively, how many worker threads
// there will be). This is often useful for allocating an array for the
// workers to store their result or something like that.
//
// Next, you will receive |N| calls to the |parallel()| callback, where |N| is
// the number of slices that were specified in |pre()|. Each callback will be
// supplied with a |ForkJoinSlice| instance providing some context.
//
// Typically there will be one call to |parallel()| from each worker thread,
// but that is not something you should rely upon---if we implement
// work-stealing, for example, then it could be that a single worker thread
// winds up handling multiple slices.
//
// Finally, after the operation is complete the |post()| callback is invoked,
// giving you a chance to collect the various results.
//
// Operation callback:
//
// During parallel execution, you should periodically invoke |slice.check()|,
// which will handle the operation callback. If the operation callback is
// necessary, |slice.check()| will arrange a rendezvous---that is, as each
// active worker invokes |check()|, it will come to a halt until everyone is
// blocked (Stop The World). At this point, we perform the callback on the
// main thread, and then resume execution. If a worker thread terminates
// before calling |check()|, that's fine too. We assume that you do not do
// unbounded work without invoking |check()|.
//
// Sequential Fallback:
//
// It is assumed that anyone using this API must be prepared for a sequential
// fallback. Therefore, the |ExecuteForkJoinOp()| returns a status code
// indicating whether a fatal error occurred (in which case you should just
// stop) or whether you should retry the operation, but executing
// sequentially. An example of where the fallback would be useful is if the
// parallel code encountered an unexpected path that cannot safely be executed
// in parallel (writes to shared state, say).
//
// Current Limitations:
//
// - The API does not support recursive or nested use. That is, the
// |parallel()| callback of a |ForkJoinOp| may not itself invoke
// |ExecuteForkJoinOp()|. We may lift this limitation in the future.
//
// - No load balancing is performed between worker threads. That means that
// the fork-join system is best suited for problems that can be slice into
// uniform bits.
namespace js {
// Parallel operations in general can have one of three states. They
// may succeed, fail, or "bail", where bail indicates that the code
// encountered an unexpected condition and should be re-run
// sequentially.
// Parallel operations in general can have one of three states. They may
// succeed, fail, or "bail", where bail indicates that the code encountered an
// unexpected condition and should be re-run sequentially.
enum ParallelResult { TP_SUCCESS, TP_RETRY_SEQUENTIALLY, TP_FATAL };
struct ForkJoinOp;
// Executes the given |TaskSet| in parallel using the runtime's
// |ThreadPool|, returning upon completion. In general, if there are
// |N| workers in the threadpool, the problem will be divided into
// |N+1| slices, as the main thread will also execute one slice.
// Executes the given |TaskSet| in parallel using the runtime's |ThreadPool|,
// returning upon completion. In general, if there are |N| workers in the
// threadpool, the problem will be divided into |N+1| slices, as the main
// thread will also execute one slice.
ParallelResult ExecuteForkJoinOp(JSContext *cx, ForkJoinOp &op);
class ForkJoinShared;
@ -111,7 +101,7 @@ namespace gc { struct ArenaLists; }
struct ForkJoinSlice
{
public:
public:
// PerThreadData corresponding to the current worker thread.
PerThreadData *perThreadData;
@ -125,31 +115,31 @@ public:
uintptr_t ionStackLimit;
// Arenas to use when allocating on this thread. See
// |ion::ParFunctions::ParNewGCThing()|. This should move
// into |perThreadData|.
// |ion::ParFunctions::ParNewGCThing()|. This should move into
// |perThreadData|.
gc::ArenaLists *const arenaLists;
ForkJoinSlice(PerThreadData *perThreadData, size_t sliceId, size_t numSlices,
uintptr_t stackLimit, gc::ArenaLists *arenaLists,
ForkJoinShared *shared);
// True if this is the main thread, false if it is one of the parallel workers
// True if this is the main thread, false if it is one of the parallel workers.
bool isMainThread();
// Generally speaking, if a thread returns false, that is
// interpreted as a "bailout"---meaning, a recoverable error. If
// however you call this function before returning false, then the
// error will be interpreted as *fatal*. This doesn't strike me
// as the most elegant solution here but I don't know what'd be better.
// Generally speaking, if a thread returns false, that is interpreted as a
// "bailout"---meaning, a recoverable error. If however you call this
// function before returning false, then the error will be interpreted as
// *fatal*. This doesn't strike me as the most elegant solution here but
// I don't know what'd be better.
//
// For convenience, *always* returns false.
bool setFatal();
// During the parallel phase, this method should be invoked
// periodically, for example on every backedge, similar to the
// interrupt check. If it returns false, then the parallel phase
// has been aborted and so you should bailout. The function may
// also rendesvous to perform GC or do other similar things.
// During the parallel phase, this method should be invoked periodically,
// for example on every backedge, similar to the interrupt check. If it
// returns false, then the parallel phase has been aborted and so you
// should bailout. The function may also rendesvous to perform GC or do
// other similar things.
bool check();
// Be wary, the runtime is shared between all threads!
@ -158,12 +148,13 @@ public:
static inline ForkJoinSlice *current();
static bool Initialize();
private:
private:
friend class AutoRendezvous;
friend class AutoSetForkJoinSlice;
#ifdef JS_THREADSAFE
static PRUintn ThreadPrivateIndex; // initialized by Initialize()
// Initialized by Initialize()
static PRUintn ThreadPrivateIndex;
#endif
ForkJoinShared *const shared;
@ -173,17 +164,16 @@ private:
// executed in a fork-join fashion.
struct ForkJoinOp
{
public:
// Invoked before parallel phase begins; informs the task set how
// many slices there will be and gives it a chance to initialize
// per-slice data structures.
public:
// Invoked before parallel phase begins; informs the task set how many
// slices there will be and gives it a chance to initialize per-slice data
// structures.
//
// Returns true on success, false to halt parallel execution.
virtual bool pre(size_t numSlices) = 0;
// Invoked from each parallel thread to process one slice. The
// |ForkJoinSlice| which is supplied will also be available using
// TLS.
// |ForkJoinSlice| which is supplied will also be available using TLS.
//
// Returns true on success, false to halt parallel execution.
virtual bool parallel(ForkJoinSlice &slice) = 0;
@ -195,16 +185,6 @@ public:
virtual bool post(size_t numSlices) = 0;
};
/* True if this thread is currently executing a ParallelArray
operation across multiple threads. */
static inline bool InParallelSection() {
# ifdef JS_THREADSAFE
return ForkJoinSlice::current() != NULL;
# else
return false;
# endif
}
} // namespace js
#endif
}
#endif // ForkJoin_h__

View File

@ -1,19 +1,13 @@
#include "Monitor.h"
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*-
* vim: set ts=8 sw=4 et tw=99:
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
namespace js {
#include "vm/Monitor.h"
Monitor::Monitor()
: lock_(NULL), condVar_(NULL)
{
}
Monitor::~Monitor()
{
#ifdef JS_THREADSAFE
PR_DestroyLock(lock_);
PR_DestroyCondVar(condVar_);
#endif
}
using namespace js;
bool
Monitor::init()
@ -30,5 +24,3 @@ Monitor::init()
return true;
}
}

View File

@ -5,8 +5,8 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef jsmonitor_h___
#define jsmonitor_h___
#ifndef Monitor_h__
#define Monitor_h__
#include <stdlib.h>
#include "mozilla/Util.h"
@ -15,35 +15,47 @@
namespace js {
/*
* A base class used for types intended to be used in a parallel
* fashion, such as the workers in the |ThreadPool| class. Combines a
* lock and a condition variable. You can acquire the lock or signal
* the condition variable using the |AutoLockMonitor| type.
*/
// A base class used for types intended to be used in a parallel
// fashion, such as the workers in the |ThreadPool| class. Combines a
// lock and a condition variable. You can acquire the lock or signal
// the condition variable using the |AutoLockMonitor| type.
class Monitor
{
protected:
protected:
friend class AutoLockMonitor;
friend class AutoUnlockMonitor;
PRLock *lock_;
PRCondVar *condVar_;
public:
Monitor();
~Monitor();
public:
Monitor()
: lock_(NULL),
condVar_(NULL)
{ }
~Monitor() {
#ifdef JS_THREADSAFE
if (lock_)
PR_DestroyLock(lock_);
if (condVar_)
PR_DestroyCondVar(condVar_);
#endif
}
bool init();
};
class AutoLockMonitor
{
private:
private:
Monitor &monitor;
public:
AutoLockMonitor(Monitor &monitor) : monitor(monitor) {
public:
AutoLockMonitor(Monitor &monitor)
: monitor(monitor)
{
#ifdef JS_THREADSAFE
PR_Lock(monitor.lock_);
#endif
@ -82,11 +94,14 @@ class AutoUnlockMonitor
Monitor &monitor;
public:
AutoUnlockMonitor(Monitor &monitor) : monitor(monitor) {
AutoUnlockMonitor(Monitor &monitor)
: monitor(monitor)
{
#ifdef JS_THREADSAFE
PR_Unlock(monitor.lock_);
#endif
}
~AutoUnlockMonitor() {
#ifdef JS_THREADSAFE
PR_Lock(monitor.lock_);
@ -94,6 +109,6 @@ class AutoUnlockMonitor
}
};
}
} // namespace js
#endif /* ndef jsmonitor_h___ */
#endif // Monitor_h__

View File

@ -7,74 +7,74 @@
#include "jscntxt.h"
#include "jslock.h"
#include "Monitor.h"
#include "ThreadPool.h"
#include "vm/Monitor.h"
#include "vm/ThreadPool.h"
#ifdef JS_THREADSAFE
# include "prthread.h"
#endif
namespace js {
using namespace js;
/****************************************************************************
* ThreadPoolWorker
*
* Each |ThreadPoolWorker| just hangs around waiting for items to be added
* to its |worklist_|. Whenever something is added, it gets executed.
* Once the worker's state is set to |TERMINATING|, the worker will
* exit as soon as its queue is empty.
*/
/////////////////////////////////////////////////////////////////////////////
// ThreadPoolWorker
//
// Each |ThreadPoolWorker| just hangs around waiting for items to be added
// to its |worklist_|. Whenever something is added, it gets executed.
// Once the worker's state is set to |TERMINATING|, the worker will
// exit as soon as its queue is empty.
#define WORKER_THREAD_STACK_SIZE (1*1024*1024)
const size_t WORKER_THREAD_STACK_SIZE = 1*1024*1024;
enum WorkerState {
CREATED, ACTIVE, TERMINATING, TERMINATED
};
class ThreadPoolWorker : public Monitor
class js::ThreadPoolWorker : public Monitor
{
const size_t workerId_;
ThreadPool *const threadPool_;
/* Currrent point in the worker's lifecycle.
*
* Modified only while holding the ThreadPoolWorker's lock */
WorkerState state_;
// Current point in the worker's lifecycle.
//
// Modified only while holding the ThreadPoolWorker's lock.
enum WorkerState {
CREATED, ACTIVE, TERMINATING, TERMINATED
} state_;
/* Worklist for this thread.
*
* Modified only while holding the ThreadPoolWorker's lock */
// Worklist for this thread.
//
// Modified only while holding the ThreadPoolWorker's lock.
js::Vector<TaskExecutor*, 4, SystemAllocPolicy> worklist_;
/* The thread's main function */
// The thread's main function
static void ThreadMain(void *arg);
void run();
public:
public:
ThreadPoolWorker(size_t workerId, ThreadPool *tp);
~ThreadPoolWorker();
bool init();
/* Invoked from main thread; signals worker to start */
// Invoked from main thread; signals worker to start.
bool start();
/* Submit work to be executed. If this returns true, you are
guaranteed that the task will execute before the thread-pool
terminates (barring an infinite loop in some prior task) */
// Submit work to be executed. If this returns true, you are guaranteed
// that the task will execute before the thread-pool terminates (barring
// an infinite loop in some prior task).
bool submit(TaskExecutor *task);
/* Invoked from main thread; signals worker to terminate
* and blocks until termination completes */
// Invoked from main thread; signals worker to terminate and blocks until
// termination completes.
void terminate();
};
ThreadPoolWorker::ThreadPoolWorker(size_t workerId, ThreadPool *tp)
: workerId_(workerId), threadPool_(tp), state_(CREATED), worklist_()
{}
: workerId_(workerId),
threadPool_(tp),
state_(CREATED),
worklist_()
{ }
ThreadPoolWorker::~ThreadPoolWorker()
{}
{ }
bool
ThreadPoolWorker::init()
@ -173,28 +173,26 @@ ThreadPoolWorker::terminate()
} else if (state_ == ACTIVE) {
state_ = TERMINATING;
lock.notify();
while (state_ != TERMINATED) {
while (state_ != TERMINATED)
lock.wait();
}
} else {
JS_ASSERT(state_ == TERMINATED);
}
}
/****************************************************************************
* ThreadPool
*
* The |ThreadPool| starts up workers, submits work to them, and shuts
* them down when requested.
*/
/////////////////////////////////////////////////////////////////////////////
// ThreadPool
//
// The |ThreadPool| starts up workers, submits work to them, and shuts
// them down when requested.
ThreadPool::ThreadPool(JSRuntime *rt)
: runtime_(rt),
nextId_(0)
{
}
: runtime_(rt),
nextId_(0)
{ }
ThreadPool::~ThreadPool() {
ThreadPool::~ThreadPool()
{
terminateWorkers();
while (workers_.length() > 0) {
ThreadPoolWorker *worker = workers_.popCopy();
@ -209,11 +207,10 @@ ThreadPool::init()
// Compute desired number of workers based on env var or # of CPUs.
size_t numWorkers = 0;
char *pathreads = getenv("PATHREADS");
if (pathreads != NULL) {
if (pathreads != NULL)
numWorkers = strtol(pathreads, NULL, 10);
} else {
else
numWorkers = GetCPUCount() - 1;
}
// Allocate workers array and then start the worker threads.
// Ensure that the field numWorkers_ always tracks the number of
@ -228,9 +225,8 @@ ThreadPool::init()
js_delete(worker);
return false;
}
if (!worker->start()) {
if (!worker->start())
return false;
}
}
#endif
@ -240,13 +236,13 @@ ThreadPool::init()
void
ThreadPool::terminateWorkers()
{
for (size_t i = 0; i < workers_.length(); i++) {
for (size_t i = 0; i < workers_.length(); i++)
workers_[i]->terminate();
}
}
bool
ThreadPool::submitOne(TaskExecutor *executor) {
ThreadPool::submitOne(TaskExecutor *executor)
{
runtime_->assertValidThread();
if (numWorkers() == 0)
@ -258,7 +254,8 @@ ThreadPool::submitOne(TaskExecutor *executor) {
}
bool
ThreadPool::submitAll(TaskExecutor *executor) {
ThreadPool::submitAll(TaskExecutor *executor)
{
for (size_t id = 0; id < workers_.length(); id++) {
if (!workers_[id]->submit(executor))
return false;
@ -267,9 +264,8 @@ ThreadPool::submitAll(TaskExecutor *executor) {
}
bool
ThreadPool::terminate() {
ThreadPool::terminate()
{
terminateWorkers();
return true;
}
}

View File

@ -5,8 +5,8 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef jsthreadpool_h___
#define jsthreadpool_h___
#ifndef ThreadPool_h__
#define ThreadPool_h__
#include <stddef.h>
#include "mozilla/StandardInteger.h"
@ -32,43 +32,41 @@ typedef void (*TaskFun)(void *userdata, size_t workerId, uintptr_t stackLimit);
class TaskExecutor
{
public:
public:
virtual void executeFromWorker(size_t workerId, uintptr_t stackLimit) = 0;
};
/*
* ThreadPool used for parallel JavaScript execution as well as
* parallel compilation. Unless you are building a new kind of
* parallel service, it is very likely that you do not wish to
* interact with the threadpool directly. In particular, if you wish
* to execute JavaScript in parallel, you probably want to look at
* |js::ForkJoin| in |forkjoin.cpp|.
*
* The ThreadPool always maintains a fixed pool of worker threads.
* You can query the number of worker threads via the method
* |numWorkers()|. Note that this number may be zero (generally if
* threads are disabled, or when manually specified for benchmarking
* purposes).
*
* You can either submit jobs in one of two ways. The first is
* |submitOne()|, which submits a job to be executed by one worker
* thread (this will fail if there are no worker threads). The job
* will be enqueued and executed by some worker (the current scheduler
* uses round-robin load balancing; something more sophisticated,
* e.g. a central queue or work stealing, might be better).
*
* The second way to submit a job is using |submitAll()|---in this
* case, the job will be executed by all worker threads. This does
* not fail if there are no worker threads, it simply does nothing.
* Of course, each thread may have any number of previously submitted
* things that they are already working on, and so they will finish
* those before they get to this job. Therefore it is possible to
* have some worker threads pick up (and even finish) their piece of
* the job before others have even started.
*/
// ThreadPool used for parallel JavaScript execution as well as
// parallel compilation. Unless you are building a new kind of
// parallel service, it is very likely that you do not wish to
// interact with the threadpool directly. In particular, if you wish
// to execute JavaScript in parallel, you probably want to look at
// |js::ForkJoin| in |forkjoin.cpp|.
// The ThreadPool always maintains a fixed pool of worker threads.
// You can query the number of worker threads via the method
// |numWorkers()|. Note that this number may be zero (generally if
// threads are disabled, or when manually specified for benchmarking
// purposes).
// You can either submit jobs in one of two ways. The first is
// |submitOne()|, which submits a job to be executed by one worker
// thread (this will fail if there are no worker threads). The job
// will be enqueued and executed by some worker (the current scheduler
// uses round-robin load balancing; something more sophisticated,
// e.g. a central queue or work stealing, might be better).
// The second way to submit a job is using |submitAll()|---in this
// case, the job will be executed by all worker threads. This does
// not fail if there are no worker threads, it simply does nothing.
// Of course, each thread may have any number of previously submitted
// things that they are already working on, and so they will finish
// those before they get to this job. Therefore it is possible to
// have some worker threads pick up (and even finish) their piece of
// the job before others have even started.
class ThreadPool
{
private:
private:
friend class ThreadPoolWorker;
// Initialized at startup only:
@ -80,7 +78,7 @@ private:
void terminateWorkers();
public:
public:
ThreadPool(JSRuntime *rt);
~ThreadPool();
@ -99,8 +97,6 @@ public:
bool terminate();
};
}
} // namespace js
#endif
#endif // ThreadPool_h__