...
 
Commits (3)
......@@ -89,3 +89,9 @@ class Config(object):
self.check_ge("max_diff_block_lines", "max_page_diff_block_lines")
self.check_ge("max_report_size", "max_page_size")
self.check_ge("max_report_size", "max_page_size_child")
def check_parallel(self):
from importlib import util
multiprocess = util.find_spec("multiprocess")
dill = util.find_spec("dill")
self.parallel = (multiprocess and dill)
......@@ -24,6 +24,8 @@ from . import feeders
from .exc import RequiredToolNotFound
from .diff import diff, reverse_unified_diff, diff_split_lines
from .excludes import command_excluded
from .config import Config
logger = logging.getLogger(__name__)
......@@ -230,6 +232,7 @@ class Difference(object):
@staticmethod
def from_command_exc(klass, path1, path2, *args, **kwargs):
logger.debug("Starting execution of command %s", klass)
command_args = []
if 'command_args' in kwargs:
command_args = kwargs['command_args']
......@@ -247,8 +250,31 @@ class Difference(object):
command.start()
return feeder, command, False
feeder1, command1, excluded1 = command_and_feeder(path1)
feeder2, command2, excluded2 = command_and_feeder(path2)
results = {}
if Config().parallel:
import dill
from .parallel.comparison_pool import ComparisonPool, \
CommandFailedToExecute
pool = ComparisonPool()
try:
pool.map(command_and_feeder, args=[path1, path2], callback=results)
except CommandFailedToExecute as e:
logger.debug("Command failed while executing %s", e)
if not results:
logger.debug("Parallel execution failed or disabled. \
Falling back to serial execution.")
cmd1 = command_and_feeder(path1)
cmd2 = command_and_feeder(path2)
else:
cmd1, cmd2 = results[0], results[1]
feeder1, command1, excluded1 = cmd1
feeder2, command2, excluded2 = cmd2
if not feeder1 or not feeder2:
assert excluded1 or excluded2
return None, True
......
......@@ -371,6 +371,7 @@ def run_diffoscope(parsed_args):
Config().exclude_commands = parsed_args.exclude_commands
Config().exclude_directory_metadata = parsed_args.exclude_directory_metadata
Config().compute_visual_diffs = PresenterManager().compute_visual_diffs()
Config().check_parallel()
Config().check_constraints()
tool_prepend_prefix(parsed_args.tool_prefix_binutils, *"ar as ld ld.bfd nm objcopy objdump ranlib readelf strip".split())
set_path()
......
import logging
import dill
from multiprocess import Pool
from diffoscope.config import Config
from functools import partial
from pickle import PickleError
logger = logging.getLogger(__name__)
class CommandFailedToExecute(Exception):
def __init__(self, err):
self.err = err
def __str__(self):
return repr(self.err)
class ComparisonPool(object):
def __init__(self):
self._pool = Pool()
logger.debug("ComparisonPool initialized.")
def map(self, fun, args=[], callback=None):
logger.debug("Invoking parallel map for function %s", fun)
pool = self._pool
jobs = []
def _callback(result, index):
callback[index] = result
for index, arg in enumerate(args):
logger.debug("Adding new process for %s (%d: %s)", fun, index, arg)
new_callback = partial(_callback, index=index)
jobs.append(pool.apply_async(fun, args=(arg,), callback= new_callback))
for job in jobs:
try:
job.get()
except PickleError as e:
raise CommandFailedToExecute(e)
if not job.successful():
raise CommandFailedToExecute(job._err_callback)
logger.debug("Closing Pools")
pool.close()
pool.join()
jobs = None
logger.debug("Ending ComparisonPool.map")