Bug 985312 - PJS: Remove the bounds function from ForkJoin. (r=nmatsakis)

--HG--
rename : js/src/builtin/Parallel.js => js/src/builtin/ParallelUtilities.js
This commit is contained in:
Shu-yu Guo 2014-03-26 05:30:22 -07:00
parent d767bb2e5a
commit 714c15c5d3
12 changed files with 254 additions and 331 deletions

View File

@ -366,6 +366,7 @@ selfhosting:: selfhosted.out.h
selfhosting_srcs := \
$(srcdir)/builtin/Utilities.js \
$(srcdir)/builtin/ParallelUtilities.js \
$(srcdir)/builtin/Array.js \
$(srcdir)/builtin/Date.js \
$(srcdir)/builtin/Intl.js \
@ -373,7 +374,6 @@ selfhosting_srcs := \
$(srcdir)/builtin/Iterator.js \
$(srcdir)/builtin/Map.js \
$(srcdir)/builtin/Number.js \
$(srcdir)/builtin/Parallel.js \
$(srcdir)/builtin/String.js \
$(srcdir)/builtin/Set.js \
$(srcdir)/builtin/TypedObject.js \

View File

@ -595,7 +595,7 @@ function ArrayMapPar(func, mode) {
break parallel;
var slicesInfo = ComputeSlicesInfo(length);
ForkJoin(mapThread, ShrinkLeftmost(slicesInfo), ForkJoinMode(mode));
ForkJoin(mapThread, 0, slicesInfo.count, ForkJoinMode(mode));
return buffer;
}
@ -605,17 +605,16 @@ function ArrayMapPar(func, mode) {
UnsafePutElements(buffer, i, func(self[i], i, self));
return buffer;
function mapThread(_, warmup) {
function mapThread(workerId, sliceStart, sliceEnd) {
var sliceShift = slicesInfo.shift;
var sliceId;
while (GET_SLICE(slicesInfo, sliceId)) {
var indexStart = SLICE_START(slicesInfo, sliceId);
var indexEnd = SLICE_END(slicesInfo, indexStart, length);
while (GET_SLICE(sliceStart, sliceEnd, sliceId)) {
var indexStart = SLICE_START_INDEX(sliceShift, sliceId);
var indexEnd = SLICE_END_INDEX(sliceShift, indexStart, length);
for (var i = indexStart; i < indexEnd; i++)
UnsafePutElements(buffer, i, func(self[i], i, self));
MARK_SLICE_DONE(slicesInfo, sliceId);
if (warmup)
return;
}
return sliceId;
}
return undefined;
@ -642,10 +641,10 @@ function ArrayReducePar(func, mode) {
break parallel;
var slicesInfo = ComputeSlicesInfo(length);
var numSlices = SLICE_COUNT(slicesInfo);
var numSlices = slicesInfo.count;
var subreductions = NewDenseArray(numSlices);
ForkJoin(reduceThread, ShrinkLeftmost(slicesInfo), ForkJoinMode(mode));
ForkJoin(reduceThread, 0, numSlices, ForkJoinMode(mode));
var accumulator = subreductions[0];
for (var i = 1; i < numSlices; i++)
@ -660,19 +659,18 @@ function ArrayReducePar(func, mode) {
accumulator = func(accumulator, self[i]);
return accumulator;
function reduceThread(_, warmup) {
function reduceThread(workerId, sliceStart, sliceEnd) {
var sliceShift = slicesInfo.shift;
var sliceId;
while (GET_SLICE(slicesInfo, sliceId)) {
var indexStart = SLICE_START(slicesInfo, sliceId);
var indexEnd = SLICE_END(slicesInfo, indexStart, length);
while (GET_SLICE(sliceStart, sliceEnd, sliceId)) {
var indexStart = SLICE_START_INDEX(sliceShift, sliceId);
var indexEnd = SLICE_END_INDEX(sliceShift, indexStart, length);
var accumulator = self[indexStart];
for (var i = indexStart + 1; i < indexEnd; i++)
accumulator = func(accumulator, self[i]);
UnsafePutElements(subreductions, sliceId, accumulator);
MARK_SLICE_DONE(slicesInfo, sliceId);
if (warmup)
return;
}
return sliceId;
}
return undefined;
@ -702,10 +700,10 @@ function ArrayScanPar(func, mode) {
break parallel;
var slicesInfo = ComputeSlicesInfo(length);
var numSlices = SLICE_COUNT(slicesInfo);
var numSlices = slicesInfo.count;
// Scan slices individually (see comment on phase1()).
ForkJoin(phase1, ShrinkLeftmost(slicesInfo), ForkJoinMode(mode));
ForkJoin(phase1, 0, numSlices, ForkJoinMode(mode));
// Compute intermediates array (see comment on phase2()).
var intermediates = [];
@ -716,14 +714,12 @@ function ArrayScanPar(func, mode) {
ARRAY_PUSH(intermediates, accumulator);
}
// Clear the slices' statuses in between phases.
SlicesInfoClearStatuses(slicesInfo);
// There is no work to be done for slice 0, so mark it as done.
MARK_SLICE_DONE(slicesInfo, 0);
// Complete each slice using intermediates array (see comment on phase2()).
ForkJoin(phase2, ShrinkLeftmost(slicesInfo), ForkJoinMode(mode));
//
// We start from slice 1 instead of 0 since there is no work to be done
// for slice 0.
if (numSlices > 1)
ForkJoin(phase2, 1, numSlices, ForkJoinMode(mode));
return buffer;
}
@ -757,23 +753,23 @@ function ArrayScanPar(func, mode) {
*
* Read on in phase2 to see what we do next!
*/
function phase1(_, warmup) {
function phase1(workerId, sliceStart, sliceEnd) {
var sliceShift = slicesInfo.shift;
var sliceId;
while (GET_SLICE(slicesInfo, sliceId)) {
var indexStart = SLICE_START(slicesInfo, sliceId);
var indexEnd = SLICE_END(slicesInfo, indexStart, length);
while (GET_SLICE(sliceStart, sliceEnd, sliceId)) {
var indexStart = SLICE_START_INDEX(sliceShift, sliceId);
var indexEnd = SLICE_END_INDEX(sliceShift, indexStart, length);
scan(self[indexStart], indexStart, indexEnd);
MARK_SLICE_DONE(slicesInfo, sliceId);
if (warmup)
return;
}
return sliceId;
}
/**
* Computes the index of the final element computed by the slice |sliceId|.
*/
function finalElement(sliceId) {
return SLICE_END(slicesInfo, SLICE_START(slicesInfo, sliceId), length) - 1;
var sliceShift = slicesInfo.shift;
return SLICE_END_INDEX(sliceShift, SLICE_START_INDEX(sliceShift, sliceId), length) - 1;
}
/**
@ -809,20 +805,17 @@ function ArrayScanPar(func, mode) {
* result is [(A+B+C)+D, (A+B+C)+(D+E), (A+B+C)+(D+E+F)]. Again I
* am using parentheses to clarify how these results were reduced.
*/
function phase2(_, warmup) {
function phase2(workerId, sliceStart, sliceEnd) {
var sliceShift = slicesInfo.shift;
var sliceId;
while (GET_SLICE(slicesInfo, sliceId)) {
var indexPos = SLICE_START(slicesInfo, sliceId);
var indexEnd = SLICE_END(slicesInfo, indexPos, length);
while (GET_SLICE(sliceStart, sliceEnd, sliceId)) {
var indexPos = SLICE_START_INDEX(sliceShift, sliceId);
var indexEnd = SLICE_END_INDEX(sliceShift, indexPos, length);
var intermediate = intermediates[sliceId - 1];
for (; indexPos < indexEnd; indexPos++)
UnsafePutElements(buffer, indexPos, func(intermediate, buffer[indexPos]));
MARK_SLICE_DONE(slicesInfo, sliceId);
if (warmup)
return;
}
return sliceId;
}
return undefined;
@ -937,15 +930,12 @@ function ArrayFilterPar(func, mode) {
// preserved from within one slice.
//
// FIXME(bug 844890): Use typed arrays here.
var numSlices = SLICE_COUNT(slicesInfo);
var numSlices = slicesInfo.count;
var counts = NewDenseArray(numSlices);
for (var i = 0; i < numSlices; i++)
UnsafePutElements(counts, i, 0);
var survivors = NewDenseArray(computeNum32BitChunks(length));
ForkJoin(findSurvivorsThread, ShrinkLeftmost(slicesInfo), ForkJoinMode(mode));
// Clear the slices' statuses in between phases.
SlicesInfoClearStatuses(slicesInfo);
ForkJoin(findSurvivorsThread, 0, numSlices, ForkJoinMode(mode));
// Step 2. Compress the slices into one contiguous set.
var count = 0;
@ -953,7 +943,7 @@ function ArrayFilterPar(func, mode) {
count += counts[i];
var buffer = NewDenseArray(count);
if (count > 0)
ForkJoin(copySurvivorsThread, ShrinkLeftmost(slicesInfo), ForkJoinMode(mode));
ForkJoin(copySurvivorsThread, 0, numSlices, ForkJoinMode(mode));
return buffer;
}
@ -984,12 +974,13 @@ function ArrayFilterPar(func, mode) {
* time. When we finish a chunk, we record our current count and
* the next chunk sliceId, lest we should bail.
*/
function findSurvivorsThread(_, warmup) {
function findSurvivorsThread(workerId, sliceStart, sliceEnd) {
var sliceShift = slicesInfo.shift;
var sliceId;
while (GET_SLICE(slicesInfo, sliceId)) {
while (GET_SLICE(sliceStart, sliceEnd, sliceId)) {
var count = 0;
var indexStart = SLICE_START(slicesInfo, sliceId);
var indexEnd = SLICE_END(slicesInfo, indexStart, length);
var indexStart = SLICE_START_INDEX(sliceShift, sliceId);
var indexEnd = SLICE_END_INDEX(sliceShift, indexStart, length);
var chunkStart = computeNum32BitChunks(indexStart);
var chunkEnd = computeNum32BitChunks(indexEnd);
for (var chunkPos = chunkStart; chunkPos < chunkEnd; chunkPos++, indexStart += 32) {
@ -1002,16 +993,14 @@ function ArrayFilterPar(func, mode) {
UnsafePutElements(survivors, chunkPos, chunkBits);
}
UnsafePutElements(counts, sliceId, count);
MARK_SLICE_DONE(slicesInfo, sliceId);
if (warmup)
return;
}
return sliceId;
}
function copySurvivorsThread(_, warmup) {
function copySurvivorsThread(workerId, sliceStart, sliceEnd) {
var sliceShift = slicesInfo.shift;
var sliceId;
while (GET_SLICE(slicesInfo, sliceId)) {
while (GET_SLICE(sliceStart, sliceEnd, sliceId)) {
// Copies the survivors from this slice into the correct position.
// Note that this is an idempotent operation that does not invoke
// user code. Therefore, we don't expect bailouts and make an
@ -1024,18 +1013,16 @@ function ArrayFilterPar(func, mode) {
// Compute the final index we expect to write.
var count = total - counts[sliceId];
if (count === total) {
MARK_SLICE_DONE(slicesInfo, sliceId);
if (count === total)
continue;
}
// Iterate over the chunks assigned to us. Read the bitset for
// each chunk. Copy values where a 1 appears until we have
// written all the values that we expect to. We can just iterate
// from 0...CHUNK_SIZE without fear of a truncated final chunk
// because we are already checking for when count==total.
var indexStart = SLICE_START(slicesInfo, sliceId);
var indexEnd = SLICE_END(slicesInfo, indexStart, length);
var indexStart = SLICE_START_INDEX(sliceShift, sliceId);
var indexEnd = SLICE_END_INDEX(sliceShift, indexStart, length);
var chunkStart = computeNum32BitChunks(indexStart);
var chunkEnd = computeNum32BitChunks(indexEnd);
for (var chunkPos = chunkStart; chunkPos < chunkEnd; chunkPos++, indexStart += 32) {
@ -1054,11 +1041,9 @@ function ArrayFilterPar(func, mode) {
if (count == total)
break;
}
MARK_SLICE_DONE(slicesInfo, sliceId);
if (warmup)
return;
}
return sliceId;
}
return undefined;
@ -1101,7 +1086,7 @@ function ArrayStaticBuildPar(length, func, mode) {
break parallel;
var slicesInfo = ComputeSlicesInfo(length);
ForkJoin(constructThread, ShrinkLeftmost(slicesInfo), ForkJoinMode(mode));
ForkJoin(constructThread, 0, slicesInfo.count, ForkJoinMode(mode));
return buffer;
}
@ -1111,17 +1096,16 @@ function ArrayStaticBuildPar(length, func, mode) {
UnsafePutElements(buffer, i, func(i));
return buffer;
function constructThread(_, warmup) {
function constructThread(workerId, sliceStart, sliceEnd) {
var sliceShift = slicesInfo.shift;
var sliceId;
while (GET_SLICE(slicesInfo, sliceId)) {
var indexStart = SLICE_START(slicesInfo, sliceId);
var indexEnd = SLICE_END(slicesInfo, indexStart, length);
while (GET_SLICE(sliceStart, sliceEnd, sliceId)) {
var indexStart = SLICE_START_INDEX(sliceShift, sliceId);
var indexEnd = SLICE_END_INDEX(sliceShift, indexStart, length);
for (var i = indexStart; i < indexEnd; i++)
UnsafePutElements(buffer, i, func(i));
MARK_SLICE_DONE(slicesInfo, sliceId);
if (warmup)
return;
}
return sliceId;
}
return undefined;

View File

@ -1,67 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
// Shared utility functions for parallel operations in `Array.js`
// and `TypedObject.js`.
/**
* Determine the number and size of slices.
*/
function ComputeSlicesInfo(length) {
var count = length >>> MAX_SLICE_SHIFT;
var numWorkers = ForkJoinNumWorkers();
if (count < numWorkers)
count = numWorkers;
else if (count >= numWorkers * MAX_SLICES_PER_WORKER)
count = numWorkers * MAX_SLICES_PER_WORKER;
// Round the slice size to be a power of 2.
var shift = std_Math_max(std_Math_log2(length / count) | 0, 1);
// Recompute count with the rounded size.
count = length >>> shift;
if (count << shift !== length)
count += 1;
return { shift: shift, statuses: new Uint8Array(count), lastSequentialId: 0 };
}
/**
* Reset the status array of the slices info object.
*/
function SlicesInfoClearStatuses(info) {
var statuses = info.statuses;
var length = statuses.length;
for (var i = 0; i < length; i++)
UnsafePutElements(statuses, i, 0);
info.lastSequentialId = 0;
}
/**
* Compute the slice such that all slices before it (but not including it) are
* completed.
*/
function NextSequentialSliceId(info, doneMarker) {
var statuses = info.statuses;
var length = statuses.length;
for (var i = info.lastSequentialId; i < length; i++) {
if (statuses[i] === SLICE_STATUS_DONE)
continue;
info.lastSequentialId = i;
return i;
}
return doneMarker == undefined ? length : doneMarker;
}
/**
* Determinism-preserving bounds function.
*/
function ShrinkLeftmost(info) {
return function () {
return [NextSequentialSliceId(info), SLICE_COUNT(info)]
};
}

View File

@ -0,0 +1,74 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
// Shared utility functions for and macros parallel operations in `Array.js`
// and `TypedObject.js`.
#ifdef ENABLE_PARALLEL_JS
/* The mode asserts options object. */
#define TRY_PARALLEL(MODE) \
((!MODE || MODE.mode !== "seq"))
#define ASSERT_SEQUENTIAL_IS_OK(MODE) \
do { if (MODE) AssertSequentialIsOK(MODE) } while(false)
/**
* The ParallelSpew intrinsic is only defined in debug mode, so define a dummy
* if debug is not on.
*/
#ifndef DEBUG
#define ParallelSpew(args)
#endif
#define MAX_SLICE_SHIFT 6
#define MAX_SLICE_SIZE 64
#define MAX_SLICES_PER_WORKER 8
/**
* Macros to help compute the start and end indices of slices based on id. Use
* with the object returned by ComputeSliceInfo.
*/
#define SLICE_START_INDEX(shift, id) \
(id << shift)
#define SLICE_END_INDEX(shift, start, length) \
std_Math_min(start + (1 << shift), length)
/**
* ForkJoinGetSlice acts as identity when we are not in a parallel section, so
* pass in the next sequential value when we are in sequential mode. The
* reason for this odd API is because intrinsics *need* to be called during
* ForkJoin's warmup to fill the TI info.
*/
#define GET_SLICE(sliceStart, sliceEnd, id) \
((id = ForkJoinGetSlice((InParallelSection() ? -1 : sliceStart++) | 0)) < sliceEnd)
/**
* Determine the number and size of slices. The info object has the following
* properties:
*
* - shift: amount to shift by to compute indices
* - count: number of slices
* - seqSliceId: the slice id for which slices [0,id] have been run
* sequentially and cannot be re-run in parallel.
*/
function ComputeSlicesInfo(length) {
var count = length >>> MAX_SLICE_SHIFT;
var numWorkers = ForkJoinNumWorkers();
if (count < numWorkers)
count = numWorkers;
else if (count >= numWorkers * MAX_SLICES_PER_WORKER)
count = numWorkers * MAX_SLICES_PER_WORKER;
// Round the slice size to be a power of 2.
var shift = std_Math_max(std_Math_log2(length / count) | 0, 1);
// Recompute count with the rounded size.
count = length >>> shift;
if (count << shift !== length)
count += 1;
return { shift: shift, count: count };
}
#endif // ENABLE_PARALLEL_JS

View File

@ -1434,23 +1434,26 @@ function MapTypedParImplDepth1(inArray, inArrayType, outArrayType, func) {
// relative to its owner (which is often but not always 0).
const inBaseOffset = TYPEDOBJ_BYTEOFFSET(inArray);
ForkJoin(mapThread, ShrinkLeftmost(slicesInfo), ForkJoinMode(mode));
ForkJoin(mapThread, 0, slicesInfo.count, ForkJoinMode(mode));
return outArray;
function mapThread(workerId, warmup) {
function mapThread(workerId, sliceStart, sliceEnd) {
assert(TO_INT32(workerId) === workerId,
"workerId not int: " + workerId);
assert(workerId >= 0 && workerId < pointers.length,
"workerId too large: " + workerId + " >= " + pointers.length);
assert(!!pointers[workerId],
assert(workerId < pointers.length,
"workerId too large: " + workerId + " >= " + pointers.length);
var pointerIndex = InParallelSection() ? workerId : 0;
assert(!!pointers[pointerIndex],
"no pointer data for workerId: " + workerId);
const { inTypedObject, outTypedObject } = pointers[pointerIndex];
const sliceShift = slicesInfo.shift;
var sliceId;
const { inTypedObject, outTypedObject } = pointers[workerId];
while (GET_SLICE(slicesInfo, sliceId)) {
const indexStart = SLICE_START(slicesInfo, sliceId);
const indexEnd = SLICE_END(slicesInfo, indexStart, length);
while (GET_SLICE(sliceStart, sliceEnd, sliceId)) {
const indexStart = SLICE_START_INDEX(sliceShift, sliceId);
const indexEnd = SLICE_END_INDEX(sliceShift, indexStart, length);
var inOffset = inBaseOffset + std_Math_imul(inGrainTypeSize, indexStart);
var outOffset = std_Math_imul(outGrainTypeSize, indexStart);
@ -1482,7 +1485,7 @@ function MapTypedParImplDepth1(inArray, inArrayType, outArrayType, func) {
if (outGrainTypeIsComplex)
SetTypedObjectValue(outGrainType, outArray, outOffset, r);
else
UnsafePutElements(outArray, i, r);
UnsafePutElements(outArray, i, r);
}
inOffset += inGrainTypeSize;
outOffset += outGrainTypeSize;
@ -1493,11 +1496,9 @@ function MapTypedParImplDepth1(inArray, inArrayType, outArrayType, func) {
// to escape.
if (outGrainTypeIsTransparent)
ClearThreadLocalArenas();
MARK_SLICE_DONE(slicesInfo, sliceId);
if (warmup)
return;
}
return sliceId;
}
return undefined;

View File

@ -96,57 +96,6 @@ var std_Set_iterator_next = Object.getPrototypeOf(Set()[std_iterator]()).next;
#define ARRAY_SLICE(ARRAY, ELEMENT) \
callFunction(std_Array_slice, ARRAY, ELEMENT);
/********** Parallel JavaScript macros and so on **********/
#ifdef ENABLE_PARALLEL_JS
/* The mode asserts options object. */
#define TRY_PARALLEL(MODE) \
((!MODE || MODE.mode !== "seq"))
#define ASSERT_SEQUENTIAL_IS_OK(MODE) \
do { if (MODE) AssertSequentialIsOK(MODE) } while(false)
/**
* The ParallelSpew intrinsic is only defined in debug mode, so define a dummy
* if debug is not on.
*/
#ifndef DEBUG
#define ParallelSpew(args)
#endif
#define MAX_SLICE_SHIFT 6
#define MAX_SLICE_SIZE 64
#define MAX_SLICES_PER_WORKER 8
/**
* Macros to help compute the start and end indices of slices based on id. Use
* with the object returned by ComputeSliceInfo.
*/
#define SLICE_START(info, id) \
(id << info.shift)
#define SLICE_END(info, start, length) \
std_Math_min(start + (1 << info.shift), length)
#define SLICE_COUNT(info) \
info.statuses.length
/**
* ForkJoinGetSlice acts as identity when we are not in a parallel section, so
* pass in the next sequential value when we are in sequential mode. The
* reason for this odd API is because intrinsics *need* to be called during
* ForkJoin's warmup to fill the TI info.
*/
#define GET_SLICE(info, id) \
((id = ForkJoinGetSlice(InParallelSection() ? -1 : NextSequentialSliceId(info, -1))) >= 0)
#define SLICE_STATUS_DONE 1
/**
* Macro to mark a slice as completed in the info object.
*/
#define MARK_SLICE_DONE(info, id) \
UnsafePutElements(info.statuses, id, SLICE_STATUS_DONE)
#endif // ENABLE_PARALLEL_JS
/********** List specification type **********/

View File

@ -2010,9 +2010,9 @@ JitRuntime::generateForkJoinGetSliceStub(JSContext *cx)
masm.pop(cxReg);
masm.ret();
// There's no more slices to give out, return -1.
// There's no more slices to give out, return a sentinel value.
masm.bind(&noMoreWork);
masm.move32(Imm32(-1), output);
masm.move32(Imm32(ThreadPool::MAX_SLICE_ID), output);
masm.pop(cxReg);
masm.ret();

View File

@ -51,14 +51,20 @@ using mozilla::ThreadLocal;
// altogether.
static bool
ExecuteSequentially(JSContext *cx_, HandleValue funVal);
ExecuteSequentially(JSContext *cx_, HandleValue funVal, uint16_t *sliceStart,
uint16_t sliceEnd);
#if !defined(JS_THREADSAFE) || !defined(JS_ION)
bool
js::ForkJoin(JSContext *cx, CallArgs &args)
{
RootedValue argZero(cx, args[0]);
return ExecuteSequentially(cx, argZero);
uint16_t sliceStart = uint16_t(args[1].toInt32());
uint16_t sliceEnd = uint16_t(args[2].toInt32());
if (!ExecuteSequentially(cx, argZero, &sliceStart, sliceEnd))
return false;
MOZ_ASSERT(sliceStart == sliceEnd);
return true;
}
JSContext *
@ -184,17 +190,22 @@ JS_JITINFO_NATIVE_PARALLEL(js::intrinsic_ClearThreadLocalArenasInfo,
// Some code that is shared between degenerate and parallel configurations.
static bool
ExecuteSequentially(JSContext *cx, HandleValue funVal)
ExecuteSequentially(JSContext *cx, HandleValue funVal, uint16_t *sliceStart,
uint16_t sliceEnd)
{
FastInvokeGuard fig(cx, funVal);
InvokeArgs &args = fig.args();
if (!args.init(2))
if (!args.init(3))
return false;
args.setCallee(funVal);
args.setThis(UndefinedValue());
args[0].setInt32(0); // always worker 0 in seq
args[1].setBoolean(!!cx->runtime()->forkJoinWarmup);
return fig.invoke(cx);
args[0].setInt32(0);
args[1].setInt32(*sliceStart);
args[2].setInt32(sliceEnd);
if (!fig.invoke(cx))
return false;
*sliceStart = (uint16_t)(args.rval().toInt32());
return true;
}
ThreadLocal<ForkJoinContext*> ForkJoinContext::tlsForkJoinContext;
@ -267,8 +278,8 @@ class ForkJoinOperation
RootedScript bailoutScript;
jsbytecode *bailoutBytecode;
ForkJoinOperation(JSContext *cx, HandleFunction fun, HandleFunction boundsFun,
ForkJoinMode mode);
ForkJoinOperation(JSContext *cx, HandleFunction fun, uint16_t sliceStart,
uint16_t sliceEnd, ForkJoinMode mode);
ExecutionStatus apply();
private:
@ -307,7 +318,8 @@ class ForkJoinOperation
JSContext *cx_;
HandleFunction fun_;
HandleFunction boundsFun_;
uint16_t sliceStart_;
uint16_t sliceEnd_;
Vector<ParallelBailoutRecord, 16> bailoutRecords_;
AutoScriptVector worklist_;
Vector<WorklistData, 16> worklistData_;
@ -329,8 +341,6 @@ class ForkJoinOperation
TrafficLight appendCallTargetToWorklist(HandleScript script, ExecutionStatus *status);
bool addToWorklist(HandleScript script);
inline bool hasScript(Vector<types::RecompileInfo> &scripts, JSScript *script);
bool computeBounds(uint16_t *start, uint16_t *end);
}; // class ForkJoinOperation
class ForkJoinShared : public ParallelJob, public Monitor
@ -341,8 +351,8 @@ class ForkJoinShared : public ParallelJob, public Monitor
JSContext *const cx_; // Current context
ThreadPool *const threadPool_; // The thread pool
HandleFunction fun_; // The JavaScript function to execute
uint16_t sliceFrom_; // The starting slice id.
uint16_t sliceTo_; // The ending slice id + 1.
uint16_t sliceStart_; // The starting slice id.
uint16_t sliceEnd_; // The ending slice id + 1.
PRLock *cxLock_; // Locks cx_ for parallel VM calls
ParallelBailoutRecord *const records_; // Bailout records for each worker
@ -377,8 +387,8 @@ class ForkJoinShared : public ParallelJob, public Monitor
ForkJoinShared(JSContext *cx,
ThreadPool *threadPool,
HandleFunction fun,
uint16_t sliceFrom,
uint16_t sliceTo,
uint16_t sliceStart,
uint16_t sliceEnd,
ParallelBailoutRecord *records);
~ForkJoinShared();
@ -492,19 +502,24 @@ static const char *ForkJoinModeString(ForkJoinMode mode);
bool
js::ForkJoin(JSContext *cx, CallArgs &args)
{
JS_ASSERT(args.length() == 3); // else the self-hosted code is wrong
JS_ASSERT(args.length() == 4); // else the self-hosted code is wrong
JS_ASSERT(args[0].isObject());
JS_ASSERT(args[0].toObject().is<JSFunction>());
JS_ASSERT(args[1].isObject());
JS_ASSERT(args[1].toObject().is<JSFunction>());
JS_ASSERT(args[1].isInt32());
JS_ASSERT(args[2].isInt32());
JS_ASSERT(args[2].toInt32() < NumForkJoinModes);
JS_ASSERT(args[3].isInt32());
JS_ASSERT(args[3].toInt32() < NumForkJoinModes);
RootedFunction fun(cx, &args[0].toObject().as<JSFunction>());
RootedFunction boundsFun(cx, &args[1].toObject().as<JSFunction>());
ForkJoinMode mode = (ForkJoinMode) args[2].toInt32();
uint16_t sliceStart = (uint16_t)(args[1].toInt32());
uint16_t sliceEnd = (uint16_t)(args[2].toInt32());
ForkJoinMode mode = (ForkJoinMode)(args[3].toInt32());
ForkJoinOperation op(cx, fun, boundsFun, mode);
MOZ_ASSERT(sliceStart == args[1].toInt32());
MOZ_ASSERT(sliceEnd == args[2].toInt32());
MOZ_ASSERT(sliceStart < sliceEnd);
ForkJoinOperation op(cx, fun, sliceStart, sliceEnd, mode);
ExecutionStatus status = op.apply();
if (status == ExecutionFatal)
return false;
@ -562,15 +577,16 @@ ForkJoinModeString(ForkJoinMode mode) {
return "???";
}
ForkJoinOperation::ForkJoinOperation(JSContext *cx, HandleFunction fun, HandleFunction boundsFun,
ForkJoinMode mode)
ForkJoinOperation::ForkJoinOperation(JSContext *cx, HandleFunction fun, uint16_t sliceStart,
uint16_t sliceEnd, ForkJoinMode mode)
: bailouts(0),
bailoutCause(ParallelBailoutNone),
bailoutScript(cx),
bailoutBytecode(nullptr),
cx_(cx),
fun_(fun),
boundsFun_(boundsFun),
sliceStart_(sliceStart),
sliceEnd_(sliceEnd),
bailoutRecords_(cx),
worklist_(cx),
worklistData_(cx),
@ -1005,9 +1021,13 @@ ForkJoinOperation::sequentialExecution(bool disqualified)
Spew(SpewOps, "Executing sequential execution (disqualified=%d).",
disqualified);
if (sliceStart_ == sliceEnd_)
return ExecutionSequential;
RootedValue funVal(cx_, ObjectValue(*fun_));
if (!ExecuteSequentially(cx_, funVal))
if (!ExecuteSequentially(cx_, funVal, &sliceStart_, sliceEnd_))
return ExecutionFatal;
MOZ_ASSERT(sliceStart_ == sliceEnd_);
return ExecutionSequential;
}
@ -1166,13 +1186,7 @@ ForkJoinOperation::warmupExecution(bool stopIfComplete, ExecutionStatus *status)
// GreenLight: warmup succeeded, still more work to do
// RedLight: fatal error or warmup completed all work (check status)
uint16_t from, to;
if (!computeBounds(&from, &to)) {
*status = ExecutionFatal;
return RedLight;
}
if (from == to) {
if (sliceStart_ == sliceEnd_) {
Spew(SpewOps, "Warmup execution finished all the work.");
if (stopIfComplete) {
@ -1192,11 +1206,11 @@ ForkJoinOperation::warmupExecution(bool stopIfComplete, ExecutionStatus *status)
return GreenLight;
}
Spew(SpewOps, "Executing warmup.");
Spew(SpewOps, "Executing warmup from slice %d.", sliceStart_);
AutoEnterWarmup warmup(cx_->runtime());
RootedValue funVal(cx_, ObjectValue(*fun_));
if (!ExecuteSequentially(cx_, funVal)) {
if (!ExecuteSequentially(cx_, funVal, &sliceStart_, sliceStart_ + 1)) {
*status = ExecutionFatal;
return RedLight;
}
@ -1215,13 +1229,7 @@ ForkJoinOperation::parallelExecution(ExecutionStatus *status)
// functions such as ForkJoin().
JS_ASSERT(ForkJoinContext::current() == nullptr);
uint16_t from, to;
if (!computeBounds(&from, &to)) {
*status = ExecutionFatal;
return RedLight;
}
if (from == to) {
if (sliceStart_ == sliceEnd_) {
Spew(SpewOps, "Warmup execution finished all the work.");
*status = ExecutionWarmup;
return RedLight;
@ -1229,7 +1237,7 @@ ForkJoinOperation::parallelExecution(ExecutionStatus *status)
ForkJoinActivation activation(cx_);
ThreadPool *threadPool = &cx_->runtime()->threadPool;
ForkJoinShared shared(cx_, threadPool, fun_, from, to, &bailoutRecords_[0]);
ForkJoinShared shared(cx_, threadPool, fun_, sliceStart_, sliceEnd_, &bailoutRecords_[0]);
if (!shared.init()) {
*status = ExecutionFatal;
return RedLight;
@ -1290,36 +1298,6 @@ ForkJoinOperation::hasScript(Vector<types::RecompileInfo> &scripts, JSScript *sc
return false;
}
bool
ForkJoinOperation::computeBounds(uint16_t *start, uint16_t *end)
{
RootedValue funVal(cx_, ObjectValue(*boundsFun_));
FastInvokeGuard fig(cx_, funVal);
InvokeArgs &args = fig.args();
if (!args.init(0))
return false;
args.setCallee(funVal);
args.setThis(UndefinedValue());
if (!fig.invoke(cx_))
return false;
MOZ_ASSERT(args.rval().toObject().is<ArrayObject>());
MOZ_ASSERT(args.rval().toObject().getDenseInitializedLength() == 2);
int32_t start32 = args.rval().toObject().getDenseElement(0).toInt32();
int32_t end32 = args.rval().toObject().getDenseElement(1).toInt32();
MOZ_ASSERT(int32_t(uint16_t(start32)) == start32);
MOZ_ASSERT(int32_t(uint16_t(end32)) == end32);
*start = uint16_t(start32);
*end = uint16_t(end32);
return true;
}
// Can only enter callees with a valid IonScript.
template <uint32_t maxArgc>
class ParallelIonInvoke
@ -1368,14 +1346,14 @@ class ParallelIonInvoke
ForkJoinShared::ForkJoinShared(JSContext *cx,
ThreadPool *threadPool,
HandleFunction fun,
uint16_t sliceFrom,
uint16_t sliceTo,
uint16_t sliceStart,
uint16_t sliceEnd,
ParallelBailoutRecord *records)
: cx_(cx),
threadPool_(threadPool),
fun_(fun),
sliceFrom_(sliceFrom),
sliceTo_(sliceTo),
sliceStart_(sliceStart),
sliceEnd_(sliceEnd),
cxLock_(nullptr),
records_(records),
allocators_(cx),
@ -1445,7 +1423,7 @@ ForkJoinShared::execute()
AutoUnlockMonitor unlock(*this);
// Push parallel tasks and wait until they're all done.
jobResult = threadPool_->executeJob(cx_, this, sliceFrom_, sliceTo_);
jobResult = threadPool_->executeJob(cx_, this, sliceStart_, sliceEnd_);
if (jobResult == TP_FATAL)
return TP_FATAL;
}
@ -1461,7 +1439,7 @@ ForkJoinShared::execute()
#ifdef DEBUG
Spew(SpewOps, "Completed parallel job [slices: %d, threads: %d, stolen: %d (work stealing:%s)]",
sliceTo_ - sliceFrom_ + 1,
sliceEnd_ - sliceStart_,
threadPool_->numWorkers(),
threadPool_->stolenSlices(),
threadPool_->workStealing() ? "ON" : "OFF");
@ -1557,10 +1535,11 @@ ForkJoinShared::executePortion(PerThreadData *perThread, ThreadPoolWorker *worke
cx.bailoutRecord->setCause(ParallelBailoutMainScriptNotPresent);
setAbortFlagAndRequestInterrupt(false);
} else {
ParallelIonInvoke<2> fii(cx_->runtime(), fun_, 2);
ParallelIonInvoke<3> fii(cx_->runtime(), fun_, 3);
fii.args[0] = Int32Value(worker->id());
fii.args[1] = BooleanValue(false);
fii.args[1] = Int32Value(sliceStart_);
fii.args[2] = Int32Value(sliceEnd_);
bool ok = fii.invoke(perThread);
JS_ASSERT(ok == !cx.bailoutRecord->topScript);

View File

@ -30,7 +30,7 @@
// to enable parallel execution. At the top-level, it consists of a native
// function (exposed as the ForkJoin intrinsic) that is used like so:
//
// ForkJoin(func, boundsFunc, mode)
// ForkJoin(func, sliceStart, sliceEnd, mode)
//
// The intention of this statement is to start some some number (usually the
// number of hardware threads) of copies of |func()| running in parallel. Each
@ -41,38 +41,39 @@
// is not something you should rely upon---if work-stealing is enabled it
// could be that a single worker thread winds up handling multiple slices.
//
// The second argument, |boundsFunc|, is a function that must return an array
// of exactly two integers. This function is called before every attempt at
// execution: warmup, sequential, or parallel. The bounds are taken from a
// function call instead of taken as two static integers so that the bounds
// may be shrunk when recovering from bailout.
// The second and third arguments, |sliceStart| and |sliceEnd|, are the slice
// boundaries. These numbers must each fit inside an uint16_t.
//
// The third argument, |mode|, is an internal mode integer giving finer
// The fourth argument, |mode|, is an internal mode integer giving finer
// control over the behavior of ForkJoin. See the |ForkJoinMode| enum.
//
// func() should expect the following arguments:
//
// func(warmup)
// func(workerId, sliceStart, sliceEnd)
//
// The parameter |warmup| is true for a *warmup or recovery phase*. Warmup
// phases are discussed below in more detail, but the general idea is that if
// |warmup| is true, |func| should only do a fixed amount of work. If |warmup|
// is false, |func| should try to do all remaining work is assigned.
// The |workerId| parameter is the id of the worker executing the function. It
// is 0 in sequential mode.
//
// The |sliceStart| and |sliceEnd| parameters are the current bounds that that
// the worker is handling. In parallel execution, these parameters are not
// used. In sequential execution, they tell the worker what slices should be
// processed. During the warm up phase, sliceEnd == sliceStart + 1.
//
// |func| can keep asking for more work from the scheduler by calling the
// intrinsic |GetForkJoinSlice(id)|. When there are no more slices to hand
// out, -1 is returned as a sentinel value. By exposing this function as an
// intrinsic, we reduce the number of JS-C++ boundary crossings incurred by
// workstealing, which may have many slices.
// intrinsic |GetForkJoinSlice(sliceStart, sliceEnd, id)|. When there are no
// more slices to hand out, ThreadPool::MAX_SLICE_ID is returned as a sentinel
// value. By exposing this function as an intrinsic, we reduce the number of
// JS-C++ boundary crossings incurred by workstealing, which may have many
// slices.
//
// |func| MUST PROCESS ALL SLICES BEFORE RETURNING! Not doing so is an error
// |and is protected by debug asserts in ThreadPool.
// In sequential execution, |func| should return the maximum computed slice id
// S for which all slices with id < S have already been processed. This is so
// ThreadPool can track the leftmost completed slice id to maintain
// determinism. Slices which have been completed in sequential execution
// cannot be re-run in parallel execution.
//
// Note well that there is a separation of concern between *scheduling* slices
// and *interpreting* slices. ForkJoin only schedules slices by handing out
// slice ids; it does not interpret what slice ids mean. Instead, |func|
// should track how much work it has accomplished thus far; consult |Array.js|
// for some examples.
// In parallel execution, |func| MUST PROCESS ALL SLICES BEFORE RETURNING!
// Not doing so is an error and is protected by debug asserts in ThreadPool.
//
// Warmups and Sequential Fallbacks
// --------------------------------
@ -192,7 +193,7 @@
// during parallel exeution. But we're not there yet.
//
// Load balancing (work stealing):
//
// The ForkJoin job is dynamically divided into a fixed number of slices,
// and is submitted for parallel execution in the pool. When the number
// of slices is big enough (typically greater than the number of workers

View File

@ -335,7 +335,7 @@ intrinsic_ForkJoinGetSlicePar(ForkJoinContext *cx, unsigned argc, Value *vp)
if (cx->getSlice(&sliceId))
args.rval().setInt32(sliceId);
else
args.rval().setInt32(-1);
args.rval().setInt32(ThreadPool::MAX_SLICE_ID);
return true;
}

View File

@ -99,8 +99,8 @@ ThreadPoolWorker::discardSlices()
bool
ThreadPoolWorker::stealFrom(ThreadPoolWorker *victim, uint16_t *sliceId)
{
// Instead of popping the slice from the front by incrementing sliceFrom_,
// decrement sliceTo_. Usually this gives us better locality.
// Instead of popping the slice from the front by incrementing sliceStart_,
// decrement sliceEnd_. Usually this gives us better locality.
if (!victim->popSliceBack(sliceId))
return false;
#ifdef DEBUG
@ -198,10 +198,10 @@ ThreadPoolWorker::helperLoop()
}
void
ThreadPoolWorker::submitSlices(uint16_t sliceFrom, uint16_t sliceTo)
ThreadPoolWorker::submitSlices(uint16_t sliceStart, uint16_t sliceEnd)
{
MOZ_ASSERT(!hasWork());
sliceBounds_ = ComposeSliceBounds(sliceFrom, sliceTo);
sliceBounds_ = ComposeSliceBounds(sliceStart, sliceEnd);
}
bool
@ -392,9 +392,9 @@ ThreadPool::waitForWorkers(AutoLockMonitor &lock)
}
ParallelResult
ThreadPool::executeJob(JSContext *cx, ParallelJob *job, uint16_t sliceFrom, uint16_t sliceMax)
ThreadPool::executeJob(JSContext *cx, ParallelJob *job, uint16_t sliceStart, uint16_t sliceMax)
{
MOZ_ASSERT(sliceFrom < sliceMax);
MOZ_ASSERT(sliceStart < sliceMax);
MOZ_ASSERT(CurrentThreadCanAccessRuntime(runtime_));
MOZ_ASSERT(activeWorkers_ == 0);
MOZ_ASSERT(!hasWork());
@ -403,19 +403,19 @@ ThreadPool::executeJob(JSContext *cx, ParallelJob *job, uint16_t sliceFrom, uint
return TP_FATAL;
// Evenly distribute slices to the workers.
uint16_t numSlices = sliceMax - sliceFrom;
uint16_t numSlices = sliceMax - sliceStart;
uint16_t slicesPerWorker = numSlices / numWorkers();
uint16_t leftover = numSlices % numWorkers();
uint16_t sliceTo = sliceFrom;
uint16_t sliceEnd = sliceStart;
for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) {
if (leftover > 0) {
sliceTo += slicesPerWorker + 1;
sliceEnd += slicesPerWorker + 1;
leftover--;
} else {
sliceTo += slicesPerWorker;
sliceEnd += slicesPerWorker;
}
workers_[workerId]->submitSlices(sliceFrom, sliceTo);
sliceFrom = sliceTo;
workers_[workerId]->submitSlices(sliceStart, sliceEnd);
sliceStart = sliceEnd;
}
MOZ_ASSERT(leftover == 0);

View File

@ -81,7 +81,7 @@ class ThreadPoolWorker
bool isMainThread() const { return id() == 0; }
// Submits a new set of slices. Assumes !hasWork().
void submitSlices(uint16_t sliceFrom, uint16_t sliceTo);
void submitSlices(uint16_t sliceStart, uint16_t sliceEnd);
// Get the next slice; work stealing happens here if work stealing is
// on. Returns false if there are no more slices to hand out.
@ -208,6 +208,8 @@ class ThreadPool : public Monitor
return offsetof(ThreadPool, workers_);
}
static const uint16_t MAX_SLICE_ID = UINT16_MAX;
ThreadPool(JSRuntime *rt);
~ThreadPool();