Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  parallel.py   Sprache: Python

 
# Copyright 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Helpers related to multiprocessing.

Based on: //tools/binary_size/libsupersize/parallel.py
"""

import atexit
import logging
import multiprocessing
import os
import sys
import threading
import traceback

DISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1'
if DISABLE_ASYNC:
  logging.warning('Running in synchronous mode.')

_all_pools = None
_is_child_process = False
_silence_exceptions = False

# Used to pass parameters to forked processes without pickling.
_fork_params = None
_fork_kwargs = None


class _ImmediateResult(object):
  def __init__(self, value):
    self._value = value

  def get(self):
    return self._value

  def wait(self):
    pass

  def ready(self):
    return True

  def successful(self):
    return True


class _ExceptionWrapper(object):
  """Used to marshal exception messages back to main process."""

  def __init__(self, msg, exception_type=None):
    self.msg = msg
    self.exception_type = exception_type

  def MaybeThrow(self):
    if self.exception_type:
      raise getattr(__builtins__,
                    self.exception_type)('Originally caused by: ' + self.msg)


class _FuncWrapper(object):
  """Runs on the fork()'ed side to catch exceptions and spread *args."""

  def __init__(self, func):
    global _is_child_process
    _is_child_process = True
    self._func = func

  def __call__(self, index, _=None):
    try:
      return self._func(*_fork_params[index], **_fork_kwargs)
    except Exception as e:
      # Only keep the exception type for builtin exception types or else risk
      # further marshalling exceptions.
      exception_type = None
      if hasattr(__builtins__, type(e).__name__):
        exception_type = type(e).__name__
      # multiprocessing is supposed to catch and return exceptions automatically
      # but it doesn't seem to work properly :(.
      return _ExceptionWrapper(traceback.format_exc(), exception_type)
    except:  # pylint: disable=bare-except
      return _ExceptionWrapper(traceback.format_exc())


class _WrappedResult(object):
  """Allows for host-side logic to be run after child process has terminated.

  * Unregisters associated pool _all_pools.
  * Raises exception caught by _FuncWrapper.
  """

  def __init__(self, result, pool=None):
    self._result = result
    self._pool = pool

  def get(self):
    self.wait()
    value = self._result.get()
    _CheckForException(value)
    return value

  def wait(self):
    self._result.wait()
    if self._pool:
      _all_pools.remove(self._pool)
      self._pool = None

  def ready(self):
    return self._result.ready()

  def successful(self):
    return self._result.successful()


def _TerminatePools():
  """Calls .terminate() on all active process pools.

  Not supposed to be necessary according to the docs, but seems to be required
  when child process throws an exception or Ctrl-C is hit.
  """
  global _silence_exceptions
  _silence_exceptions = True
  # Child processes cannot have pools, but atexit runs this function because
  # it was registered before fork()ing.
  if _is_child_process:
    return

  def close_pool(pool):
    try:
      pool.terminate()
    except:  # pylint: disable=bare-except
      pass

  for i, pool in enumerate(_all_pools):
    # Without calling terminate() on a separate thread, the call can block
    # forever.
    thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
                              target=close_pool,
                              args=(pool, ))
    thread.daemon = True
    thread.start()


def _CheckForException(value):
  if isinstance(value, _ExceptionWrapper):
    global _silence_exceptions
    if not _silence_exceptions:
      value.MaybeThrow()
      _silence_exceptions = True
      logging.error('Subprocess raised an exception:\n%s', value.msg)
    sys.exit(1)


def _MakeProcessPool(job_params, **job_kwargs):
  global _all_pools
  global _fork_params
  global _fork_kwargs
  assert _fork_params is None
  assert _fork_kwargs is None
  pool_size = min(len(job_params), multiprocessing.cpu_count())
  _fork_params = job_params
  _fork_kwargs = job_kwargs
  ret = multiprocessing.Pool(pool_size)
  _fork_params = None
  _fork_kwargs = None
  if _all_pools is None:
    _all_pools = []
    atexit.register(_TerminatePools)
  _all_pools.append(ret)
  return ret


def ForkAndCall(func, args):
  """Runs |func| in a fork'ed process.

  Returns:
    A Result object (call .get() to get the return value)
  """
  if DISABLE_ASYNC:
    pool = None
    result = _ImmediateResult(func(*args))
  else:
    pool = _MakeProcessPool([args])  # Omit |kwargs|.
    result = pool.apply_async(_FuncWrapper(func), (0, ))
    pool.close()
  return _WrappedResult(result, pool=pool)


def BulkForkAndCall(func, arg_tuples, **kwargs):
  """Calls |func| in a fork'ed process for each set of args within |arg_tuples|.

  Args:
    kwargs: Common keyword arguments to be passed to |func|.

  Yields the return values in order.
  """
  arg_tuples = list(arg_tuples)
  if not arg_tuples:
    return

  if DISABLE_ASYNC:
    for args in arg_tuples:
      yield func(*args, **kwargs)
    return

  pool = _MakeProcessPool(arg_tuples, **kwargs)
  wrapped_func = _FuncWrapper(func)
  try:
    for result in pool.imap(wrapped_func, range(len(arg_tuples))):
      _CheckForException(result)
      yield result
  finally:
    pool.close()
    pool.join()
    _all_pools.remove(pool)

Messung V0.5
C=93 H=95 G=93

¤ Dauer der Verarbeitung: 0.19 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.






                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....
    

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge