Bug 827960 - Allow jit_test.py to run tests in parallel. r=terrence

This commit is contained in:
Christian Holler 2013-01-11 02:25:15 +01:00
parent 7f7b67b696
commit 303292da3b
2 changed files with 239 additions and 50 deletions

View File

@ -10,6 +10,14 @@ import datetime, os, re, sys, tempfile, traceback, time, shlex
import subprocess
from subprocess import *
from threading import Thread
import signal
try:
from multiprocessing import Process, Queue, Manager, cpu_count
HAVE_MULTIPROCESSING = True
except ImportError:
HAVE_MULTIPROCESSING = False
def add_libdir_to_path():
from os.path import dirname, exists, join, realpath
@ -178,6 +186,29 @@ def run_timeout_cmd(cmdline, options, timeout=60.0):
l = [ None, None ]
timed_out = False
th = Thread(target=th_run_cmd, args=(cmdline, options, l))
# If our SIGINT handler is set to SIG_IGN (ignore)
# then we are running as a child process for parallel
# execution and we must ensure to kill our child
# when we are signaled to exit.
import signal
sigint_handler = signal.getsignal(signal.SIGINT)
sigterm_handler = signal.getsignal(signal.SIGTERM)
if (sigint_handler == signal.SIG_IGN):
def handleChildSignal(sig, frame):
try:
if sys.platform != 'win32':
os.kill(l[0].pid, signal.SIGKILL)
else:
import ctypes
ctypes.windll.kernel32.TerminateProcess(int(l[0]._handle), -1)
except OSError:
pass
if (sig == signal.SIGTERM):
sys.exit(0)
signal.signal(signal.SIGINT, handleChildSignal)
signal.signal(signal.SIGTERM, handleChildSignal)
th.start()
th.join(timeout)
while th.isAlive():
@ -187,13 +218,23 @@ def run_timeout_cmd(cmdline, options, timeout=60.0):
import signal
if sys.platform != 'win32':
os.kill(l[0].pid, signal.SIGKILL)
else:
import ctypes
ctypes.windll.kernel32.TerminateProcess(int(l[0]._handle), -1)
time.sleep(.1)
timed_out = True
except OSError:
# Expecting a "No such process" error
pass
th.join()
# Restore old signal handlers
if (sigint_handler == signal.SIG_IGN):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, sigterm_handler)
(out, err, code) = l[1]
return (out, err, code, timed_out)
def run_cmd(cmdline, env, timeout):
@ -266,31 +307,191 @@ def check_output(out, err, rc, test):
return True
def print_tinderbox(label, test, message=None):
jitflags = " ".join(test.jitflags)
result = "%s | jit_test.py %-15s| %s" % (label, jitflags, test.path)
if (test != None):
jitflags = " ".join(test.jitflags)
result = "%s | jit_test.py %-15s| %s" % (label, jitflags, test.path)
else:
result = "%s | jit_test.py " % label
if message:
result += ": " + message
print result
def run_tests(tests, test_dir, lib_dir, shell_args):
def wrap_parallel_run_test(test, lib_dir, shell_args, resultQueue, options, js):
# This is necessary because on Windows global variables are not automatically
# available in the children, while on Linux and OSX this is the case (because of fork).
global OPTIONS
global JS
OPTIONS = options
JS = js
# Ignore SIGINT in the child
signal.signal(signal.SIGINT, signal.SIG_IGN)
result = run_test(test, lib_dir, shell_args) + (test,)
resultQueue.put(result)
return result
def run_tests_parallel(tests, test_dir, lib_dir, shell_args):
# This queue will contain the results of the various tests run.
# We could make this queue a global variable instead of using
# a manager to share, but this will not work on Windows.
queue_manager = Manager()
async_test_result_queue = queue_manager.Queue()
# This queue will be used by the result process to indicate
# that it has received a result and we can start a new process
# on our end. The advantage is that we don't have to sleep and
# check for worker completion ourselves regularly.
notify_queue = queue_manager.Queue()
# This queue will contain the return value of the function
# processing the test results.
result_process_return_queue = queue_manager.Queue()
result_process = Process(target=process_test_results_parallel, args=(async_test_result_queue, result_process_return_queue, notify_queue, len(tests), OPTIONS, JS))
result_process.start()
# Ensure that a SIGTERM is handled the same way as SIGINT
# to terminate all child processes.
sigint_handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGTERM, sigint_handler)
worker_processes = []
def remove_completed_workers(workers):
new_workers = []
for worker in workers:
if worker.is_alive():
new_workers.append(worker)
else:
worker.join()
return new_workers
try:
testcnt = 0
# Initially start as many jobs as allowed to run parallel
for i in range(min(OPTIONS.max_jobs,len(tests))):
notify_queue.put(True)
# For every item in the notify queue, start one new worker.
# Every completed worker adds a new item to this queue.
while notify_queue.get():
if (testcnt < len(tests)):
# Start one new worker
worker_process = Process(target=wrap_parallel_run_test, args=(tests[testcnt], lib_dir, shell_args, async_test_result_queue, OPTIONS, JS))
worker_processes.append(worker_process)
worker_process.start()
testcnt += 1
# Collect completed workers
worker_processes = remove_completed_workers(worker_processes)
else:
break
# Wait for all processes to terminate
while len(worker_processes) > 0:
worker_processes = remove_completed_workers(worker_processes)
# Signal completion to result processor, then wait for it to complete on its own
async_test_result_queue.put(None)
result_process.join()
# Return what the result process has returned to us
return result_process_return_queue.get()
except (Exception, KeyboardInterrupt) as e:
# Print the exception if it's not an interrupt,
# might point to a bug or other faulty condition
if not isinstance(e,KeyboardInterrupt):
traceback.print_exc()
for worker in worker_processes:
try:
worker.terminate()
except:
pass
result_process.terminate()
return False
def get_parallel_results(async_test_result_queue, notify_queue):
while True:
async_test_result = async_test_result_queue.get()
# Check if we are supposed to terminate
if (async_test_result == None):
return
# Notify parent that we got a result
notify_queue.put(True)
yield async_test_result
def process_test_results_parallel(async_test_result_queue, return_queue, notify_queue, num_tests, options, js):
gen = get_parallel_results(async_test_result_queue, notify_queue)
ok = process_test_results(gen, num_tests, options, js)
return_queue.put(ok)
def print_test_summary(failures, complete, doing, options):
if failures:
if options.write_failures:
try:
out = open(options.write_failures, 'w')
# Don't write duplicate entries when we are doing multiple failures per job.
written = set()
for test, fout, ferr, fcode, _ in failures:
if test.path not in written:
out.write(os.path.relpath(test.path, test_dir) + '\n')
if options.write_failure_output:
out.write(fout)
out.write(ferr)
out.write('Exit code: ' + str(fcode) + "\n")
written.add(test.path)
out.close()
except IOError:
sys.stderr.write("Exception thrown trying to write failure file '%s'\n"%
options.write_failures)
traceback.print_exc()
sys.stderr.write('---\n')
def show_test(test):
if options.show_failed:
print(' ' + subprocess.list2cmdline(get_test_cmd(test.path, test.jitflags, lib_dir, shell_args)))
else:
print(' ' + ' '.join(test.jitflags + [ test.path ]))
print('FAILURES:')
for test, _, __, ___, timed_out in failures:
if not timed_out:
show_test(test)
print('TIMEOUTS:')
for test, _, __, ___, timed_out in failures:
if timed_out:
show_test(test)
return False
else:
print('PASSED ALL' + ('' if complete else ' (partial run -- interrupted by user %s)'%doing))
return True
def process_test_results(results, num_tests, options, js):
pb = NullProgressBar()
if not OPTIONS.hide_progress and not OPTIONS.show_cmd and ProgressBar.conservative_isatty():
if not options.hide_progress and not options.show_cmd and ProgressBar.conservative_isatty():
fmt = [
{'value': 'PASS', 'color': 'green'},
{'value': 'FAIL', 'color': 'red'},
{'value': 'TIMEOUT', 'color': 'blue'},
{'value': 'SKIP', 'color': 'brightgray'},
]
pb = ProgressBar(len(tests), fmt)
pb = ProgressBar(num_tests, fmt)
failures = []
timeouts = 0
complete = False
doing = 'before starting'
try:
for i, test in enumerate(tests):
doing = 'on %s'%test.path
ok, out, err, code, timed_out = run_test(test, lib_dir, shell_args)
for i, (ok, out, err, code, timed_out, test) in enumerate(results):
doing = 'after %s'%test.path
if not ok:
@ -299,7 +500,7 @@ def run_tests(tests, test_dir, lib_dir, shell_args):
if timed_out:
timeouts += 1
if OPTIONS.tinderbox:
if options.tinderbox:
if ok:
print_tinderbox("TEST-PASS", test);
else:
@ -320,51 +521,21 @@ def run_tests(tests, test_dir, lib_dir, shell_args):
)
complete = True
except KeyboardInterrupt:
print_tinderbox("TEST-UNEXPECTED-FAIL", test);
print_tinderbox("TEST-UNEXPECTED-FAIL", None, "Test execution interrupted by user");
pb.finish(True)
return print_test_summary(failures, complete, doing, options)
if failures:
if OPTIONS.write_failures:
try:
out = open(OPTIONS.write_failures, 'w')
# Don't write duplicate entries when we are doing multiple failures per job.
written = set()
for test, fout, ferr, fcode, _ in failures:
if test.path not in written:
out.write(os.path.relpath(test.path, test_dir) + '\n')
if OPTIONS.write_failure_output:
out.write(fout)
out.write(ferr)
out.write('Exit code: ' + str(fcode) + "\n")
written.add(test.path)
out.close()
except IOError:
sys.stderr.write("Exception thrown trying to write failure file '%s'\n"%
OPTIONS.write_failures)
traceback.print_exc()
sys.stderr.write('---\n')
def show_test(test):
if OPTIONS.show_failed:
print(' ' + subprocess.list2cmdline(get_test_cmd(test.path, test.jitflags, lib_dir, shell_args)))
else:
print(' ' + ' '.join(test.jitflags + [ test.path ]))
def get_serial_results(tests, lib_dir, shell_args):
for test in tests:
result = run_test(test, lib_dir, shell_args)
yield result + (test,)
print('FAILURES:')
for test, _, __, ___, timed_out in failures:
if not timed_out:
show_test(test)
print('TIMEOUTS:')
for test, _, __, ___, timed_out in failures:
if timed_out:
show_test(test)
return False
else:
print('PASSED ALL' + ('' if complete else ' (partial run -- interrupted by user %s)'%doing))
return True
def run_tests(tests, test_dir, lib_dir, shell_args):
gen = get_serial_results(tests, lib_dir, shell_args)
ok = process_test_results(gen, len(tests), OPTIONS, JS)
return ok
def parse_jitflags():
jitflags = [ [ '-' + flag for flag in flags ]
@ -400,6 +571,15 @@ def main(argv):
test_dir = os.path.join(script_dir, 'tests')
lib_dir = os.path.join(script_dir, 'lib')
# If no multiprocessing is available, fallback to serial test execution
max_jobs_default = 1
if HAVE_MULTIPROCESSING:
try:
max_jobs_default = cpu_count()
print "Defaulting to %d jobs in parallel" % max_jobs_default
except NotImplementedError:
pass
# The [TESTS] optional arguments are paths of test files relative
# to the jit-test/tests directory.
@ -446,6 +626,9 @@ def main(argv):
help='Run tests once with --ion-eager and once with --no-jm (ignores --jitflags)')
op.add_option('--tbpl', dest='tbpl', action='store_true',
help='Run tests with all IonMonkey option combinations (ignores --jitflags)')
op.add_option('-j', '--worker-count', dest='max_jobs', type=int, default=max_jobs_default,
help='Number of tests to run in parallel (default %default)')
(OPTIONS, args) = op.parse_args(argv)
if len(args) < 1:
op.error('missing JS_SHELL argument')
@ -560,7 +743,11 @@ def main(argv):
sys.exit()
try:
ok = run_tests(job_list, test_dir, lib_dir, shell_args)
ok = None
if OPTIONS.max_jobs > 1 and HAVE_MULTIPROCESSING:
ok = run_tests_parallel(job_list, test_dir, lib_dir, shell_args)
else:
ok = run_tests(job_list, test_dir, lib_dir, shell_args)
if not ok:
sys.exit(2)
except OSError:

View File

@ -1,3 +1,5 @@
// |jit-test| slow;
eval(" function x() {}" + Array(241).join(" "));
for (var i = 0; i < 100; i++) {
gczeal(4, 2);