Bug 1207871 - Process files in descending file size order; r=ted

Large files take longer to process. Scheduling large files after smaller
files means there is a higher chance a large file may be a long pole
during processing.

This commit changes the scheduling logic to exhaustively obtain the set
of files to be processed. It then sorts them by descending file size and
schedules them in the resulting order, thus minimizing the chances for a
large file to be the long pole holding up processing completion.

On my machine this doesn't change wall execution time. However,
automation may be different. And the logic of the new behavior is sound.
This commit is contained in:
Gregory Szorc 2015-09-23 22:26:41 -04:00
parent 84426d366d
commit fa967a4cfd
2 changed files with 61 additions and 20 deletions

View File

@ -395,13 +395,13 @@ class Dumper:
symbol files--|copy_debug|, mostly useful for creating a
Microsoft Symbol Server from the resulting output.
You don't want to use this directly if you intend to call
ProcessDir. Instead, call GetPlatformSpecificDumper to
get an instance of a subclass.
You don't want to use this directly if you intend to process files.
Instead, call GetPlatformSpecificDumper to get an instance of a
subclass.
Processing is performed asynchronously via worker processes; in
order to wait for processing to finish and cleanup correctly, you
must call Finish after all Process/ProcessDir calls have been made.
must call Finish after all ProcessFiles calls have been made.
You must also call Dumper.GlobalInit before creating or using any
instances."""
def __init__(self, dump_syms, symbol_path,
@ -555,26 +555,40 @@ class Dumper:
if stop_pool:
JobPool.shutdown()
def Process(self, file_or_dir):
"""Process a file or all the (valid) files in a directory; processing is performed
asynchronously, and Finish must be called to wait for it complete and cleanup."""
if os.path.isdir(file_or_dir) and not self.ShouldSkipDir(file_or_dir):
self.ProcessDir(file_or_dir)
elif os.path.isfile(file_or_dir):
self.ProcessFiles((file_or_dir,))
def Process(self, *args):
"""Process files recursively in args."""
# We collect all files to process first then sort by size to schedule
# larger files first because larger files tend to take longer and we
# don't like long pole stragglers.
files = set()
for arg in args:
for f in self.get_files_to_process(arg):
files.add(f)
def ProcessDir(self, dir):
"""Process all the valid files in this directory. Valid files
are determined by calling ShouldProcess; processing is performed
asynchronously, and Finish must be called to wait for it complete and cleanup."""
for root, dirs, files in os.walk(dir):
for f in sorted(files, key=os.path.getsize, reverse=True):
self.ProcessFiles((f,))
def get_files_to_process(self, file_or_dir):
"""Generate the files to process from an input."""
if os.path.isdir(file_or_dir) and not self.ShouldSkipDir(file_or_dir):
for f in self.get_files_to_process_in_dir(file_or_dir):
yield f
elif os.path.isfile(file_or_dir):
yield file_or_dir
def get_files_to_process_in_dir(self, path):
"""Generate the files to process in a directory.
Valid files are are determined by calling ShouldProcess.
"""
for root, dirs, files in os.walk(path):
for d in dirs[:]:
if self.ShouldSkipDir(d):
dirs.remove(d)
for f in files:
fullpath = os.path.join(root, f)
if self.ShouldProcess(fullpath):
self.ProcessFiles((fullpath,))
yield fullpath
def SubmitJob(self, file_key, func_name, args, callback):
"""Submits a job to the pool of workers"""
@ -1011,8 +1025,8 @@ to canonical locations in the source repository. Specify
exclude=options.exclude,
repo_manifest=options.repo_manifest,
file_mapping=file_mapping)
for arg in args[2:]:
dumper.Process(arg)
dumper.Process(*args[2:])
dumper.Finish()
# run main if run directly

View File

@ -71,6 +71,31 @@ class HelperMixin(object):
os.makedirs(d)
writer(f)
class TestSizeOrder(HelperMixin, unittest.TestCase):
def test_size_order(self):
"""
Test that files are processed ordered by size on disk.
"""
processed = []
def mock_process_file(filenames):
for filename in filenames:
processed.append((filename[len(self.test_dir):] if filename.startswith(self.test_dir) else filename).replace('\\', '/'))
return True
for f, size in (('a/one', 10), ('b/c/two', 30), ('c/three', 20)):
f = os.path.join(self.test_dir, f)
d = os.path.dirname(f)
if d and not os.path.exists(d):
os.makedirs(d)
open(f, 'wb').write('x' * size)
d = symbolstore.GetPlatformSpecificDumper(dump_syms="dump_syms",
symbol_path="symbol_path")
d.ShouldProcess = lambda f: True
d.ProcessFiles = mock_process_file
d.Process(self.test_dir)
d.Finish(stop_pool=False)
self.assertEqual(processed, ['b/c/two', 'c/three', 'a/one'])
class TestExclude(HelperMixin, unittest.TestCase):
def test_exclude_wildcard(self):
"""
@ -93,6 +118,7 @@ class TestExclude(HelperMixin, unittest.TestCase):
expected.sort()
self.assertEqual(processed, expected)
def test_exclude_filenames(self):
"""
Test that excluding a filename without a wildcard works.
@ -114,6 +140,7 @@ class TestExclude(HelperMixin, unittest.TestCase):
expected.sort()
self.assertEqual(processed, expected)
def mock_dump_syms(module_id, filename, extra=[]):
return ["MODULE os x86 %s %s" % (module_id, filename)
] + extra + [