Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  grouping.py   Sprache: Python

 
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import collections
import os
import platform
import subprocess
import sys

import six

from mozlog.formatters import base

DEFAULT_MOVE_UP_CODE = "\x1b[A"
DEFAULT_CLEAR_EOL_CODE = "\x1b[K"


class GroupingFormatter(base.BaseFormatter):
    """Formatter designed to produce unexpected test results grouped
    together in a readable format."""

    def __init__(self):
        super(GroupingFormatter, self).__init__()
        self.number_of_tests = 0
        self.completed_tests = 0
        self.need_to_erase_last_line = False
        self.current_display = ""
        self.running_tests = {}
        self.test_output = collections.defaultdict(str)
        self.subtest_failures = collections.defaultdict(list)
        self.test_failure_text = ""
        self.tests_with_failing_subtests = []
        self.interactive = os.isatty(sys.stdout.fileno())
        self.show_logs = False

        self.message_handler.register_message_handlers(
            "show_logs",
            {
                "on": self._enable_show_logs,
                "off": self._disable_show_logs,
            },
        )

        # TODO(mrobinson, 8313): We need to add support for Windows terminals here.
        if self.interactive:
            self.move_up, self.clear_eol = self.get_move_up_and_clear_eol_codes()
            if platform.system() != "Windows":
                self.line_width = int(
                    subprocess.check_output(["stty""size"]).split()[1]
                )
            else:
                # Until we figure out proper Windows support,
                # this makes things work well enough to run.
                self.line_width = 80

        self.expected = {
            "OK": 0,
            "PASS": 0,
            "FAIL": 0,
            "PRECONDITION_FAILED": 0,
            "ERROR": 0,
            "TIMEOUT": 0,
            "SKIP": 0,
            "CRASH": 0,
        }

        self.unexpected_tests = {
            "OK": [],
            "PASS": [],
            "FAIL": [],
            "PRECONDITION_FAILED": [],
            "ERROR": [],
            "TIMEOUT": [],
            "CRASH": [],
        }

        # Follows the format of {(<test>, <subtest>): <data>}, where
        # (<test>, None) represents a top level test.
        self.known_intermittent_results = {}

    def _enable_show_logs(self):
        self.show_logs = True

    def _disable_show_logs(self):
        self.show_logs = False

    def get_move_up_and_clear_eol_codes(self):
        try:
            import blessed
        except ImportError:
            return DEFAULT_MOVE_UP_CODE, DEFAULT_CLEAR_EOL_CODE

        try:
            self.terminal = blessed.Terminal()
            return self.terminal.move_up, self.terminal.clear_eol
        except Exception as exception:
            sys.stderr.write(
                "GroupingFormatter: Could not get terminal "
                "control characters: %s\n" % exception
            )
            return DEFAULT_MOVE_UP_CODE, DEFAULT_CLEAR_EOL_CODE

    def text_to_erase_display(self):
        if not self.interactive or not self.current_display:
            return ""
        return (self.move_up + self.clear_eol) * self.current_display.count("\n")

    def generate_output(self, text=None, new_display=None):
        if not self.interactive:
            return text

        output = self.text_to_erase_display()
        if text:
            output += text
        if new_display is not None:
            self.current_display = new_display
        return output + self.current_display

    def build_status_line(self):
        if self.number_of_tests == 0:
            new_display = " [%i] " % self.completed_tests
        else:
            new_display = " [%i/%i] " % (self.completed_tests, self.number_of_tests)

        if self.running_tests:
            indent = " " * len(new_display)
            if self.interactive:
                max_width = self.line_width - len(new_display)
            else:
                max_width = sys.maxsize
            return (
                new_display
                + ("\n%s" % indent).join(
                    val[:max_width] for val in self.running_tests.values()
                )
                + "\n"
            )
        else:
            return new_display + "No tests running.\n"

    def suite_start(self, data):
        self.number_of_tests = sum(
            len(tests) for tests in six.itervalues(data["tests"])
        )
        self.start_time = data["time"]

        if self.number_of_tests == 0:
            return "Running tests in %s\n\n" % data["source"]
        else:
            return "Running %i tests in %s\n\n" % (
                self.number_of_tests,
                data["source"],
            )

    def test_start(self, data):
        self.running_tests[data["thread"]] = data["test"]
        return self.generate_output(text=None, new_display=self.build_status_line())

    def wrap_and_indent_lines(self, lines, indent):
        assert len(lines) > 0

        output = indent + "\u25B6 %s\n" % lines[0]
        for line in lines[1:-1]:
            output += indent + "\u2502 %s\n" % line
        if len(lines) > 1:
            output += indent + "\u2514 %s\n" % lines[-1]
        return output

    def get_lines_for_unexpected_result(
        self, test_name, status, expected, message, stack
    ):
        # Test names sometimes contain control characters, which we want
        # to be printed in their raw form, and not their interpreted form.
        test_name = test_name.encode("unicode-escape").decode("utf-8")

        if expected:
            expected_text = " [expected %s]" % expected
        else:
            expected_text = ""

        lines = ["%s%s %s" % (status, expected_text, test_name)]
        if message:
            lines.append(" \u2192 %s" % message)
        if stack:
            lines.append("")
            lines += [stackline for stackline in stack.splitlines()]
        return lines

    def get_lines_for_known_intermittents(self, known_intermittent_results):
        lines = []

        for (test, subtest), data in six.iteritems(self.known_intermittent_results):
            status = data["status"]
            known_intermittent = ", ".join(data["known_intermittent"])
            expected = " [expected %s, known intermittent [%s]" % (
                data["expected"],
                known_intermittent,
            )
            lines += [
                "%s%s %s%s"
                % (
                    status,
                    expected,
                    test,
                    (", %s" % subtest) if subtest is not None else "",
                )
            ]
        output = self.wrap_and_indent_lines(lines, " ") + "\n"
        return output

    def get_output_for_unexpected_subtests(self, test_name, unexpected_subtests):
        if not unexpected_subtests:
            return ""

        def add_subtest_failure(lines, subtest, stack=None):
            lines += self.get_lines_for_unexpected_result(
                subtest.get("subtest"None),
                subtest.get("status"None),
                subtest.get("expected"None),
                subtest.get("message"None),
                stack,
            )

        def make_subtests_failure(test_name, subtests, stack=None):
            lines = ["Unexpected subtest result in %s:" % test_name]
            for subtest in subtests[:-1]:
                add_subtest_failure(lines, subtest, None)
            add_subtest_failure(lines, subtests[-1], stack)
            return self.wrap_and_indent_lines(lines, " ") + "\n"

        # Organize the failures by stack trace so we don't print the same stack trace
        # more than once. They are really tall and we don't want to flood the screen
        # with duplicate information.
        output = ""
        failures_by_stack = collections.defaultdict(list)
        for failure in unexpected_subtests:
            # Print stackless results first. They are all separate.
            if "stack" not in failure:
                output += make_subtests_failure(test_name, [failure], None)
            else:
                failures_by_stack[failure["stack"]].append(failure)

        for stack, failures in six.iteritems(failures_by_stack):
            output += make_subtests_failure(test_name, failures, stack)
        return output

    def test_end(self, data):
        self.completed_tests += 1
        test_status = data["status"]
        test_name = data["test"]
        known_intermittent_statuses = data.get("known_intermittent", [])
        subtest_failures = self.subtest_failures.pop(test_name, [])
        if "expected" in data and test_status not in known_intermittent_statuses:
            had_unexpected_test_result = True
        else:
            had_unexpected_test_result = False

        del self.running_tests[data["thread"]]
        new_display = self.build_status_line()

        if not had_unexpected_test_result and not subtest_failures:
            self.expected[test_status] += 1
            if self.interactive:
                return self.generate_output(text=None, new_display=new_display)
            else:
                return self.generate_output(
                    text=" %s\n" % test_name, new_display=new_display
                )

        if test_status in known_intermittent_statuses:
            self.known_intermittent_results[(test_name, None)] = data

        # If the test crashed or timed out, we also include any process output,
        # because there is a good chance that the test produced a stack trace
        # or other error messages.
        if test_status in ("CRASH""TIMEOUT"):
            stack = self.test_output[test_name] + data.get("stack""")
        else:
            stack = data.get("stack"None)

        output = ""
        if had_unexpected_test_result:
            self.unexpected_tests[test_status].append(data)
            lines = self.get_lines_for_unexpected_result(
                test_name,
                test_status,
                data.get("expected"None),
                data.get("message"None),
                stack,
            )
            output += self.wrap_and_indent_lines(lines, " ") + "\n"

        if subtest_failures:
            self.tests_with_failing_subtests.append(test_name)
            output += self.get_output_for_unexpected_subtests(
                test_name, subtest_failures
            )
        self.test_failure_text += output

        return self.generate_output(text=output, new_display=new_display)

    def test_status(self, data):
        if "expected" in data and data["status"not in data.get(
            "known_intermittent", []
        ):
            self.subtest_failures[data["test"]].append(data)
        elif data["status"in data.get("known_intermittent", []):
            self.known_intermittent_results[(data["test"], data["subtest"])] = data

    def suite_end(self, data):
        self.end_time = data["time"]

        if not self.interactive:
            output = "\n"
        else:
            output = ""

        output += "Ran %i tests finished in %.1f seconds.\n" % (
            self.completed_tests,
            (self.end_time - self.start_time) / 1000.0,
        )
        output += " \u2022 %i ran as expected. %i tests skipped.\n" % (
            sum(self.expected.values()),
            self.expected["SKIP"],
        )
        if self.known_intermittent_results:
            output += " \u2022 %i known intermittent results.\n" % (
                len(self.known_intermittent_results)
            )

        def text_for_unexpected_list(text, section):
            tests = self.unexpected_tests[section]
            if not tests:
                return ""
            return " \u2022 %i tests %s\n" % (len(tests), text)

        output += text_for_unexpected_list("crashed unexpectedly""CRASH")
        output += text_for_unexpected_list("had errors unexpectedly""ERROR")
        output += text_for_unexpected_list("failed unexpectedly""FAIL")
        output += text_for_unexpected_list(
            "precondition failed unexpectedly""PRECONDITION_FAILED"
        )
        output += text_for_unexpected_list("timed out unexpectedly""TIMEOUT")
        output += text_for_unexpected_list("passed unexpectedly""PASS")
        output += text_for_unexpected_list("unexpectedly okay""OK")

        num_with_failing_subtests = len(self.tests_with_failing_subtests)
        if num_with_failing_subtests:
            output += (
                " \u2022 %i tests had unexpected subtest results\n"
                % num_with_failing_subtests
            )
        output += "\n"

        # Repeat failing test output, so that it is easier to find, since the
        # non-interactive version prints all the test names.
        if not self.interactive and self.test_failure_text:
            output += "Tests with unexpected results:\n" + self.test_failure_text

        if self.known_intermittent_results:
            results = self.get_lines_for_known_intermittents(
                self.known_intermittent_results
            )
            output += "Tests with known intermittent results:\n" + results

        return self.generate_output(text=output, new_display="")

    def process_output(self, data):
        if data["thread"not in self.running_tests:
            return
        test_name = self.running_tests[data["thread"]]
        self.test_output[test_name] += data["data"] + "\n"

    def log(self, data):
        if data.get("component"):
            message = "%s %s %s" % (data["component"], data["level"], data["message"])
        else:
            message = "%s %s" % (data["level"], data["message"])
        if "stack" in data:
            message += "\n%s" % data["stack"]

        # We are logging messages that begin with STDERR, because that is how exceptions
        # in this formatter are indicated.
        if data["message"].startswith("STDERR"):
            return self.generate_output(text=message + "\n")

        if data["level"in ("CRITICAL""ERROR"):
            return self.generate_output(text=message + "\n")
        # Show all messages if show_logs switched on.
        if self.show_logs:
            return self.generate_output(text=message + "\n")

Messung V0.5
C=87 H=97 G=91

¤ Dauer der Verarbeitung: 0.1 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.






                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge