# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/. */
import subprocess
import sys
from datetime
import datetime, timedelta
from threading
import Thread
from six.moves.queue
import Empty, Queue
from .adaptor
import xdr_annotate
from .progressbar
import ProgressBar
from .results
import NullTestOutput, TestOutput, escape_cmdline
class EndMarker:
pass
class TaskFinishedMarker:
pass
class MultiQueue:
def __init__(self, *queues):
self.queues = queues
self.output_queue = Queue(maxsize=1)
for q
in queues:
thread = Thread(target=self._queue_getter, args=(q,), daemon=
True)
thread.start()
def _queue_getter(self, q):
while True:
item = q.get()
self.output_queue.put(item)
if item
is EndMarker:
return
def get(self):
return self.output_queue.get()
def _do_work(
workerId,
qTasks,
qHeavyTasks,
qResults,
qWatch,
prefix,
tempdir,
run_skipped,
timeout,
show_cmd,
):
q = qTasks
required_end_markers = 1
if workerId == 0:
# Only one worker handles heavy tests.
q = MultiQueue(qTasks, qHeavyTasks)
required_end_markers = 2
num_end_markers = 0
while True:
test = q.get()
if test
is EndMarker:
num_end_markers += 1
if num_end_markers == required_end_markers:
qWatch.put(EndMarker)
qResults.put(EndMarker)
return
continue
if not test.enable
and not run_skipped:
qResults.put(NullTestOutput(test))
continue
# Spawn the test task.
cmd = test.get_command(prefix, tempdir)
if show_cmd:
print(escape_cmdline(cmd))
tStart = datetime.now()
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Push the task to the watchdog -- it will kill the task
# if it goes over the timeout while we keep its stdout
# buffer clear on the "main" worker thread.
qWatch.put(proc)
out, err = proc.communicate()
# We're not setting universal_newlines=True in subprocess.Popen due to
# still needing to support Python 3.5, which doesn't have the "encoding"
# parameter to the Popen constructor, so we have to decode the output
# here.
system_encoding =
"mbcs" if sys.platform ==
"win32" else "utf-8"
out = out.decode(system_encoding)
err = err.decode(system_encoding)
qWatch.put(TaskFinishedMarker)
# Create a result record and forward to result processing.
dt = datetime.now() - tStart
result = TestOutput(
test,
cmd,
out,
err,
proc.returncode,
dt.total_seconds(),
dt > timedelta(seconds=timeout),
)
qResults.put(result)
def _do_watch(qWatch, timeout):
while True:
proc = qWatch.get(
True)
if proc == EndMarker:
return
try:
fin = qWatch.get(block=
True, timeout=timeout)
assert fin
is TaskFinishedMarker,
"invalid finish marker"
except Empty:
# Timed out, force-kill the test.
try:
proc.terminate()
except WindowsError
as ex:
# If the process finishes after we time out but before we
# terminate, the terminate call will fail. We can safely
# ignore this.
if ex.winerror != 5:
raise
fin = qWatch.get()
assert fin
is TaskFinishedMarker,
"invalid finish marker"
def run_all_tests(tests, prefix, tempdir, pb, options):
"""
Uses scatter-gather to a thread-pool to manage children.
"""
qTasks, qHeavyTasks, qResults = Queue(), Queue(), Queue()
workers = []
watchdogs = []
for i
in range(options.worker_count):
qWatch = Queue()
watcher = Thread(target=_do_watch, args=(qWatch, options.timeout))
watcher.setDaemon(
True)
watcher.start()
watchdogs.append(watcher)
worker = Thread(
target=_do_work,
args=(
i,
qTasks,
qHeavyTasks,
qResults,
qWatch,
prefix,
tempdir,
options.run_skipped,
options.timeout,
options.show_cmd,
),
)
worker.setDaemon(
True)
worker.start()
workers.append(worker)
delay = ProgressBar.update_granularity().total_seconds()
# Before inserting all the tests cases, to be checked in parallel, we are
# only queueing the XDR encoding test case which would be responsible for
# recording the self-hosted code. Once completed, we will proceed by
# queueing the rest of the test cases.
if options.use_xdr:
tests = xdr_annotate(tests, options)
# This loop consumes the first elements of the `tests` iterator, until
# it reaches the self-hosted encoding test case, and leave the
# remaining tests in the iterator to be scheduled on multiple threads.
for test
in tests:
if test.selfhosted_xdr_mode ==
"encode":
qTasks.put(test)
yield qResults.get(block=
True)
break
assert not test.enable
and not options.run_skipped
yield NullTestOutput(test)
# Insert all jobs into the queue, followed by the queue-end
# marker, one per worker. This will not block on growing the
# queue, only on waiting for more items in the generator. The
# workers are already started, however, so this will process as
# fast as we can produce tests from the filesystem.
def _do_push(num_workers, qTasks):
for test
in tests:
if test.heavy:
qHeavyTasks.put(test)
else:
qTasks.put(test)
for _
in range(num_workers):
qTasks.put(EndMarker)
qHeavyTasks.put(EndMarker)
pusher = Thread(target=_do_push, args=(len(workers), qTasks))
pusher.setDaemon(
True)
pusher.start()
# Read from the results.
ended = 0
while ended < len(workers):
try:
result = qResults.get(block=
True, timeout=delay)
if result
is EndMarker:
ended += 1
else:
yield result
except Empty:
pb.poke()
# Cleanup and exit.
pusher.join()
for worker
in workers:
worker.join()
for watcher
in watchdogs:
watcher.join()
assert qTasks.empty(),
"Send queue not drained"
assert qHeavyTasks.empty(),
"Send queue (heavy tasks) not drained"
assert qResults.empty(),
"Result queue not drained"