# Copyright 2020 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Helpers related to multiprocessing.
Based on: //tools/binary_size/libsupersize/parallel.py """
def __call__(self, index, _=None): try: return self._func(*_fork_params[index], **_fork_kwargs) except Exception as e: # Only keep the exception type for builtin exception types or else risk # further marshalling exceptions.
exception_type = None if hasattr(__builtins__, type(e).__name__):
exception_type = type(e).__name__ # multiprocessing is supposed to catch and return exceptions automatically # but it doesn't seem to work properly :(. return _ExceptionWrapper(traceback.format_exc(), exception_type) except: # pylint: disable=bare-except return _ExceptionWrapper(traceback.format_exc())
class _WrappedResult(object): """Allows for host-side logic to be run after child process has terminated.
* Unregisters associated pool _all_pools.
* Raises exception caught by _FuncWrapper. """
def __init__(self, result, pool=None):
self._result = result
self._pool = pool
def get(self):
self.wait()
value = self._result.get()
_CheckForException(value) return value
def wait(self):
self._result.wait() if self._pool:
_all_pools.remove(self._pool)
self._pool = None
def _TerminatePools(): """Calls .terminate() on all active process pools.
Not supposed to be necessary according to the docs, but seems to be required
when child process throws an exception or Ctrl-C is hit. """ global _silence_exceptions
_silence_exceptions = True # Child processes cannot have pools, but atexit runs this function because # it was registered before fork()ing. if _is_child_process: return
for i, pool in enumerate(_all_pools): # Without calling terminate() on a separate thread, the call can block # forever.
thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
target=close_pool,
args=(pool, ))
thread.daemon = True
thread.start()
def _CheckForException(value): if isinstance(value, _ExceptionWrapper): global _silence_exceptions ifnot _silence_exceptions:
value.MaybeThrow()
_silence_exceptions = True
logging.error('Subprocess raised an exception:\n%s', value.msg)
sys.exit(1)
def _MakeProcessPool(job_params, **job_kwargs): global _all_pools global _fork_params global _fork_kwargs assert _fork_params isNone assert _fork_kwargs isNone
pool_size = min(len(job_params), multiprocessing.cpu_count())
_fork_params = job_params
_fork_kwargs = job_kwargs
ret = multiprocessing.Pool(pool_size)
_fork_params = None
_fork_kwargs = None if _all_pools isNone:
_all_pools = []
atexit.register(_TerminatePools)
_all_pools.append(ret) return ret
def ForkAndCall(func, args): """Runs |func| in a fork'ed process.
Returns:
A Result object (call .get() to get the return value) """ if DISABLE_ASYNC:
pool = None
result = _ImmediateResult(func(*args)) else:
pool = _MakeProcessPool([args]) # Omit |kwargs|.
result = pool.apply_async(_FuncWrapper(func), (0, ))
pool.close() return _WrappedResult(result, pool=pool)
def BulkForkAndCall(func, arg_tuples, **kwargs): """Calls |func| in a fork'ed process for each set of args within |arg_tuples|.
Args:
kwargs: Common keyword arguments to be passed to |func|.
Yields the return values in order. """
arg_tuples = list(arg_tuples) ifnot arg_tuples: return
if DISABLE_ASYNC: for args in arg_tuples: yield func(*args, **kwargs) return
pool = _MakeProcessPool(arg_tuples, **kwargs)
wrapped_func = _FuncWrapper(func) try: for result in pool.imap(wrapped_func, range(len(arg_tuples))):
_CheckForException(result) yield result finally:
pool.close()
pool.join()
_all_pools.remove(pool)
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.