# A unix-oriented process dispatcher. Uses a single thread with select and # waitpid to dispatch tasks. This avoids several deadlocks that are possible # with fork/exec + threads + Python.
import errno import os import select import signal import sys from datetime import datetime, timedelta
from .adaptor import xdr_annotate from .progressbar import ProgressBar from .results import NullTestOutput, TestOutput, escape_cmdline
def get_max_wait(tasks, timeout): """ Return the maximum time we can wait before any task should time out. """
# If we have a progress-meter, we need to wake up to update it frequently.
wait = ProgressBar.update_granularity()
# If a timeout is supplied, we need to wake up for the first task to # timeout if that is sooner. if timeout:
now = datetime.now()
timeout_delta = timedelta(seconds=timeout) for task in tasks:
remaining = task.start + timeout_delta - now if remaining < wait:
wait = remaining
# Return the wait time in seconds, clamped between zero and max_wait. return max(wait.total_seconds(), 0)
def flush_input(fd, frags): """
Read any pages sitting in the file descriptor 'fd' into the list 'frags'. """
rv = os.read(fd, 4096)
frags.append(rv) while len(rv) == 4096: # If read() returns a full buffer, it may indicate there was 1 buffer # worth of data, or that there is more data to read. Poll the socket # before we read again to ensure that we will not block indefinitly.
readable, _, _ = select.select([fd], [], [], 0) ifnot readable: return
rv = os.read(fd, 4096)
frags.append(rv)
def read_input(tasks, timeout): """
Select on input or errors from the given task list for a max of timeout
seconds. """
rlist = []
exlist = []
outmap = {} # Fast access to fragment list given fd. for t in tasks:
rlist.append(t.stdout)
rlist.append(t.stderr)
outmap[t.stdout] = t.out
outmap[t.stderr] = t.err # This will trigger with a close event when the child dies, allowing # us to respond immediately and not leave cores idle.
exlist.append(t.stdout)
def remove_task(tasks, pid): """
Remove a task from the tasks list andreturn it. """
index = None for i, t in enumerate(tasks): if t.pid == pid:
index = i break else: raise KeyError("No such pid: {}".format(pid))
out = tasks[index]
tasks.pop(index) return out
def timed_out(task, timeout): """ Return a timedelta with the amount we are overdue, orFalseif the timeout
has not yet been reached (or timeout is falsy, indicating there is no
timeout.) """ ifnot timeout: returnFalse
elapsed = datetime.now() - task.start
over = elapsed - timedelta(seconds=timeout) return over if over.total_seconds() > 0 elseFalse
def reap_zombies(tasks, timeout): """
Search for children of this process that have finished. If they are tasks,
then this routine will clean up the child. This method returns a new task
list that has had the ended tasks removed, followed by the list of finished
tasks. """
finished = [] whileTrue: try:
pid, status = os.waitpid(0, os.WNOHANG) if pid == 0: break except OSError as e: if e.errno == errno.ECHILD: break raise e
def kill_undead(tasks, timeout): """
Signal all children that are over the given timeout. Use SIGABRT first to
generate a stack dump. If it still doesn't die for another 30 seconds, kill with SIGKILL. """ for task in tasks:
over = timed_out(task, timeout) if over: if over.total_seconds() < 30:
os.kill(task.pid, signal.SIGABRT) else:
os.kill(task.pid, signal.SIGKILL)
def run_all_tests(tests, prefix, tempdir, pb, options): # Copy and reverse for fast pop off end.
tests = list(tests)
tests = tests[:]
tests.reverse()
# The set of currently running tests.
tasks = []
# Piggy back on the first test to generate the XDR content needed for all # other tests to run. To avoid read/write races, we temporarily limit the # number of workers.
wait_for_encoding = False
worker_count = options.worker_count if options.use_xdr and len(tests) > 1: # The next loop pops tests, thus we iterate over the tests in reversed # order.
tests = list(xdr_annotate(reversed(tests), options))
tests = tests[:]
tests.reverse()
wait_for_encoding = True
worker_count = 1
def running_heavy_test(): return any(task.test.heavy for task in tasks)
heavy_tests = [t for t in tests if t.heavy]
light_tests = [t for t in tests ifnot t.heavy]
encoding_test = None while light_tests or heavy_tests or tasks:
new_tests = []
max_new_tests = worker_count - len(tasks) if (
heavy_tests andnot running_heavy_test() and len(new_tests) < max_new_tests andnot wait_for_encoding
): # Schedule a heavy test if available.
new_tests.append(heavy_tests.pop()) while light_tests and len(new_tests) < max_new_tests: # Schedule as many more light tests as we can.
new_tests.append(light_tests.pop())
assert len(tasks) + len(new_tests) <= worker_count assert len([x for x in new_tests if x.heavy]) <= 1
for test in new_tests:
task = spawn_test(
test,
prefix,
tempdir,
options.passthrough,
options.run_skipped,
options.show_cmd,
) if task:
tasks.append(task) ifnot encoding_test:
encoding_test = test else: yield NullTestOutput(test)
for out in finished: yield out if wait_for_encoding and out.test == encoding_test: assert encoding_test.selfhosted_xdr_mode == "encode"
wait_for_encoding = False
worker_count = options.worker_count
# If we did not finish any tasks, poke the progress bar to show that # the test harness is at least not frozen. if len(finished) == 0:
pb.poke()
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.