Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/testing/mozharness/test/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 35 kB image not shown  

Quelle  test_base_script.py   Sprache: Python

 
import gc
import os
import re
import shutil
import tempfile
import types
import unittest
from unittest import mock

import mozunit

PYWIN32 = False
if os.name == "nt":
    try:
        import win32file

        PYWIN32 = True
    except ImportError:
        pass


import mozharness.base.errors as errors
import mozharness.base.log as log
import mozharness.base.script as script
from mozharness.base.config import parse_config_file
from mozharness.base.log import CRITICAL, DEBUG, ERROR, FATAL, IGNORE, INFO, WARNING

here = os.path.dirname(os.path.abspath(__file__))

test_string = """foo
bar
baz"""


class CleanupObj(script.ScriptMixin, log.LogMixin):
    def __init__(self):
        super(CleanupObj, self).__init__()
        self.log_obj = None
        self.config = {"log_level": ERROR}


def cleanup(files=None):
    files = files or []
    files.extend(("test_logs""test_dir""tmpfile_stdout""tmpfile_stderr"))
    gc.collect()
    c = CleanupObj()
    for f in files:
        c.rmtree(f)


def get_debug_script_obj():
    s = script.BaseScript(
        config={"log_type""multi""log_level": DEBUG},
        initial_config_file="test/test.json",
    )
    return s


def _post_fatal(self, **kwargs):
    fh = open("tmpfile_stdout""w")
    print(test_string, file=fh)
    fh.close()


# TestScript {{{1
class TestScript(unittest.TestCase):
    def setUp(self):
        cleanup()
        self.s = None
        self.tmpdir = tempfile.mkdtemp(suffix=".mozharness")

    def tearDown(self):
        # Close the logfile handles, or windows can't remove the logs
        if hasattr(self, "s"and isinstance(self.s, object):
            del self.s
        cleanup([self.tmpdir])

    # test _dump_config_hierarchy() when --dump-config-hierarchy is passed
    def test_dump_config_hierarchy_valid_files_len(self):
        try:
            self.s = script.BaseScript(
                initial_config_file="test/test.json",
                option_args=["--cfg""test/test_override.py,test/test_override2.py"],
                config={"dump_config_hierarchy"True},
            )
        except SystemExit:
            local_cfg_files = parse_config_file("test_logs/localconfigfiles.json")
            # first let's see if the correct number of config files were
            # realized
            self.assertEqual(
                len(local_cfg_files),
                4,
                msg="--dump-config-hierarchy dumped wrong number of config files",
            )

    def test_dump_config_hierarchy_keys_unique_and_valid(self):
        try:
            self.s = script.BaseScript(
                initial_config_file="test/test.json",
                option_args=["--cfg""test/test_override.py,test/test_override2.py"],
                config={"dump_config_hierarchy"True},
            )
        except SystemExit:
            local_cfg_files = parse_config_file("test_logs/localconfigfiles.json")
            # now let's see if only unique items were added from each config
            t_override = local_cfg_files.get("test/test_override.py", {})
            self.assertTrue(
                t_override.get("keep_string") == "don't change me"
                and len(t_override.keys()) == 1,
                msg="--dump-config-hierarchy dumped wrong keys/value for "
                "`test/test_override.py`. There should only be one "
                "item and it should be unique to all the other "
                "items in test_log/localconfigfiles.json.",
            )

    def test_dump_config_hierarchy_matches_self_config(self):
        try:
            ######
            # we need temp_cfg because self.s will be gcollected (NoneType) by
            # the time we get to SystemExit exception
            # temp_cfg will differ from self.s.config because of
            # 'dump_config_hierarchy'. we have to make a deepcopy because
            # config is a locked dict
            temp_s = script.BaseScript(
                initial_config_file="test/test.json",
                option_args=["--cfg""test/test_override.py,test/test_override2.py"],
            )
            from copy import deepcopy

            temp_cfg = deepcopy(temp_s.config)
            temp_cfg.update({"dump_config_hierarchy"True})
            ######
            self.s = script.BaseScript(
                initial_config_file="test/test.json",
                option_args=["--cfg""test/test_override.py,test/test_override2.py"],
                config={"dump_config_hierarchy"True},
            )
        except SystemExit:
            local_cfg_files = parse_config_file("test_logs/localconfigfiles.json")
            # finally let's just make sure that all the items added up, equals
            # what we started with: self.config
            target_cfg = {}
            for cfg_file in local_cfg_files:
                target_cfg.update(local_cfg_files[cfg_file])
            self.assertEqual(
                target_cfg,
                temp_cfg,
                msg="all of the items (combined) in each cfg file dumped via "
                "--dump-config-hierarchy does not equal self.config ",
            )

    # test _dump_config() when --dump-config is passed
    def test_dump_config_equals_self_config(self):
        try:
            ######
            # we need temp_cfg because self.s will be gcollected (NoneType) by
            # the time we get to SystemExit exception
            # temp_cfg will differ from self.s.config because of
            # 'dump_config_hierarchy'. we have to make a deepcopy because
            # config is a locked dict
            temp_s = script.BaseScript(
                initial_config_file="test/test.json",
                option_args=["--cfg""test/test_override.py,test/test_override2.py"],
            )
            from copy import deepcopy

            temp_cfg = deepcopy(temp_s.config)
            temp_cfg.update({"dump_config"True})
            ######
            self.s = script.BaseScript(
                initial_config_file="test/test.json",
                option_args=["--cfg""test/test_override.py,test/test_override2.py"],
                config={"dump_config"True},
            )
        except SystemExit:
            target_cfg = parse_config_file("test_logs/localconfig.json")
            self.assertEqual(
                target_cfg,
                temp_cfg,
                msg="all of the items (combined) in each cfg file dumped via "
                "--dump-config does not equal self.config ",
            )

    def test_nonexistent_mkdir_p(self):
        self.s = script.BaseScript(initial_config_file="test/test.json")
        self.s.mkdir_p("test_dir/foo/bar/baz")
        self.assertTrue(os.path.isdir("test_dir/foo/bar/baz"), msg="mkdir_p error")

    def test_existing_mkdir_p(self):
        self.s = script.BaseScript(initial_config_file="test/test.json")
        os.makedirs("test_dir/foo/bar/baz")
        self.s.mkdir_p("test_dir/foo/bar/baz")
        self.assertTrue(
            os.path.isdir("test_dir/foo/bar/baz"), msg="mkdir_p error when dir exists"
        )

    def test_chdir(self):
        self.s = script.BaseScript(initial_config_file="test/test.json")
        cwd = os.getcwd()
        self.s.chdir("test_logs")
        self.assertEqual(os.path.join(cwd, "test_logs"), os.getcwd(), msg="chdir error")
        self.s.chdir(cwd)

    def _test_log_helper(self, obj):
        obj.debug("Testing DEBUG")
        obj.warning("Testing WARNING")
        obj.error("Testing ERROR")
        obj.critical("Testing CRITICAL")
        try:
            obj.fatal("Testing FATAL")
        except SystemExit:
            pass
        else:
            self.assertTrue(False, msg="fatal() didn't SystemExit!")

    def test_log(self):
        self.s = get_debug_script_obj()
        self.s.log_obj = None
        self._test_log_helper(self.s)
        del self.s
        self.s = script.BaseScript(initial_config_file="test/test.json")
        self._test_log_helper(self.s)

    def test_run_nonexistent_command(self):
        self.s = get_debug_script_obj()
        self.s.run_command(
            command="this_cmd_should_not_exist --help",
            env={"GARBLE""FARG"},
            error_list=errors.PythonErrorList,
        )
        error_logsize = os.path.getsize("test_logs/test_info.log")
        self.assertTrue(error_logsize > 0, msg="command not found error not hit")

    def test_run_command_in_bad_dir(self):
        self.s = get_debug_script_obj()
        self.s.run_command(
            command="ls",
            cwd="/this_dir_should_not_exist",
            error_list=errors.PythonErrorList,
        )
        error_logsize = os.path.getsize("test_logs/test_error.log")
        self.assertTrue(error_logsize > 0, msg="bad dir error not hit")

    def test_get_output_from_command_in_bad_dir(self):
        self.s = get_debug_script_obj()
        self.s.get_output_from_command(command="ls", cwd="/this_dir_should_not_exist")
        error_logsize = os.path.getsize("test_logs/test_error.log")
        self.assertTrue(error_logsize > 0, msg="bad dir error not hit")

    def test_get_output_from_command_with_missing_file(self):
        self.s = get_debug_script_obj()
        self.s.get_output_from_command(command="ls /this_file_should_not_exist")
        error_logsize = os.path.getsize("test_logs/test_error.log")
        self.assertTrue(error_logsize > 0, msg="bad file error not hit")

    def test_get_output_from_command_with_missing_file2(self):
        self.s = get_debug_script_obj()
        self.s.run_command(
            command="cat mozharness/base/errors.py",
            error_list=[
                {"substr""error""level": ERROR},
                {
                    "regex": re.compile(",$"),
                    "level": IGNORE,
                },
                {
                    "substr""]$",
                    "level": WARNING,
                },
            ],
        )
        error_logsize = os.path.getsize("test_logs/test_error.log")
        self.assertTrue(error_logsize > 0, msg="error list not working properly")

    def test_download_unpack(self):
        # NOTE: The action is called *download*, however, it can work for files in disk
        self.s = get_debug_script_obj()

        archives_path = os.path.join(here, "helper_files""archives")

        # Test basic decompression
        for archive in (
            "archive.tar",
            "archive.tar.bz2",
            "archive.tar.gz",
            "archive.tar.xz",
            "archive.zip",
        ):
            self.s.download_unpack(
                url=os.path.join(archives_path, archive), extract_to=self.tmpdir
            )
            self.assertIn("script.sh", os.listdir(os.path.join(self.tmpdir, "bin")))
            self.assertIn("lorem.txt", os.listdir(self.tmpdir))
            shutil.rmtree(self.tmpdir)

        # Test permissions for extracted entries from zip archive
        self.s.download_unpack(
            url=os.path.join(archives_path, "archive.zip"),
            extract_to=self.tmpdir,
        )
        file_stats = os.stat(os.path.join(self.tmpdir, "bin""script.sh"))
        orig_fstats = os.stat(
            os.path.join(archives_path, "reference""bin""script.sh")
        )
        self.assertEqual(file_stats.st_mode, orig_fstats.st_mode)
        shutil.rmtree(self.tmpdir)

        # Test unzip specific dirs only
        self.s.download_unpack(
            url=os.path.join(archives_path, "archive.zip"),
            extract_to=self.tmpdir,
            extract_dirs=["bin/*"],
        )
        self.assertIn("bin", os.listdir(self.tmpdir))
        self.assertNotIn("lorem.txt", os.listdir(self.tmpdir))
        shutil.rmtree(self.tmpdir)

        # Test for invalid filenames (Windows only)
        if PYWIN32:
            with self.assertRaises(IOError):
                self.s.download_unpack(
                    url=os.path.join(archives_path, "archive_invalid_filename.zip"),
                    extract_to=self.tmpdir,
                )

        for archive in (
            "archive-setuid.tar",
            "archive-escape.tar",
            "archive-link.tar",
            "archive-link-abs.tar",
            "archive-double-link.tar",
        ):
            with self.assertRaises(Exception):
                self.s.download_unpack(
                    url=os.path.join(archives_path, archive),
                    extract_to=self.tmpdir,
                )

    def test_unpack(self):
        self.s = get_debug_script_obj()

        archives_path = os.path.join(here, "helper_files""archives")

        # Test basic decompression
        for archive in (
            "archive.tar",
            "archive.tar.bz2",
            "archive.tar.gz",
            "archive.tar.xz",
            "archive.zip",
        ):
            self.s.unpack(os.path.join(archives_path, archive), self.tmpdir)
            self.assertIn("script.sh", os.listdir(os.path.join(self.tmpdir, "bin")))
            self.assertIn("lorem.txt", os.listdir(self.tmpdir))
            shutil.rmtree(self.tmpdir)

        # Test permissions for extracted entries from zip archive
        self.s.unpack(os.path.join(archives_path, "archive.zip"), self.tmpdir)
        file_stats = os.stat(os.path.join(self.tmpdir, "bin""script.sh"))
        orig_fstats = os.stat(
            os.path.join(archives_path, "reference""bin""script.sh")
        )
        self.assertEqual(file_stats.st_mode, orig_fstats.st_mode)
        shutil.rmtree(self.tmpdir)

        # Test extract specific dirs only
        self.s.unpack(
            os.path.join(archives_path, "archive.zip"),
            self.tmpdir,
            extract_dirs=["bin/*"],
        )
        self.assertIn("bin", os.listdir(self.tmpdir))
        self.assertNotIn("lorem.txt", os.listdir(self.tmpdir))
        shutil.rmtree(self.tmpdir)

        # Test for invalid filenames (Windows only)
        if PYWIN32:
            with self.assertRaises(IOError):
                self.s.unpack(
                    os.path.join(archives_path, "archive_invalid_filename.zip"),
                    self.tmpdir,
                )

        for archive in (
            "archive-setuid.tar",
            "archive-escape.tar",
            "archive-link.tar",
            "archive-link-abs.tar",
            "archive-double-link.tar",
        ):
            with self.assertRaises(Exception):
                self.s.unpack(os.path.join(archives_path, archive), self.tmpdir)


# TestHelperFunctions {{{1
class TestHelperFunctions(unittest.TestCase):
    temp_file = "test_dir/mozilla"

    def setUp(self):
        cleanup()
        self.s = None

    def tearDown(self):
        # Close the logfile handles, or windows can't remove the logs
        if hasattr(self, "s"and isinstance(self.s, object):
            del self.s
        cleanup()

    def _create_temp_file(self, contents=test_string):
        os.mkdir("test_dir")
        fh = open(self.temp_file, "w+")
        fh.write(contents)
        fh.close

    def test_mkdir_p(self):
        self.s = script.BaseScript(initial_config_file="test/test.json")
        self.s.mkdir_p("test_dir")
        self.assertTrue(os.path.isdir("test_dir"), msg="mkdir_p error")

    def test_get_output_from_command(self):
        self._create_temp_file()
        self.s = script.BaseScript(initial_config_file="test/test.json")
        contents = self.s.get_output_from_command(
            ["bash""-c""cat %s" % self.temp_file]
        )
        self.assertEqual(
            test_string,
            contents,
            msg="get_output_from_command('cat file') differs from fh.write",
        )

    def test_run_command(self):
        self._create_temp_file()
        self.s = script.BaseScript(initial_config_file="test/test.json")
        temp_file_name = os.path.basename(self.temp_file)
        self.assertEqual(
            self.s.run_command("cat %s" % temp_file_name, cwd="test_dir"),
            0,
            msg="run_command('cat file') did not exit 0",
        )

    def test_move1(self):
        self._create_temp_file()
        self.s = script.BaseScript(initial_config_file="test/test.json")
        temp_file2 = "%s2" % self.temp_file
        self.s.move(self.temp_file, temp_file2)
        self.assertFalse(
            os.path.exists(self.temp_file),
            msg="%s still exists after move()" % self.temp_file,
        )

    def test_move2(self):
        self._create_temp_file()
        self.s = script.BaseScript(initial_config_file="test/test.json")
        temp_file2 = "%s2" % self.temp_file
        self.s.move(self.temp_file, temp_file2)
        self.assertTrue(
            os.path.exists(temp_file2), msg="%s doesn't exist after move()" % temp_file2
        )

    def test_copyfile(self):
        self._create_temp_file()
        self.s = script.BaseScript(initial_config_file="test/test.json")
        temp_file2 = "%s2" % self.temp_file
        self.s.copyfile(self.temp_file, temp_file2)
        self.assertEqual(
            os.path.getsize(self.temp_file),
            os.path.getsize(temp_file2),
            msg="%s and %s are different sizes after copyfile()"
            % (self.temp_file, temp_file2),
        )

    def test_existing_rmtree(self):
        self._create_temp_file()
        self.s = script.BaseScript(initial_config_file="test/test.json")
        self.s.mkdir_p("test_dir/foo/bar/baz")
        self.s.rmtree("test_dir")
        self.assertFalse(os.path.exists("test_dir"), msg="rmtree unsuccessful")

    def test_nonexistent_rmtree(self):
        self.s = script.BaseScript(initial_config_file="test/test.json")
        status = self.s.rmtree("test_dir")
        self.assertFalse(status, msg="nonexistent rmtree error")

    @unittest.skipUnless(PYWIN32, "PyWin32 specific")
    def test_long_dir_rmtree(self):
        self.s = script.BaseScript(initial_config_file="test/test.json")
        # create a very long path that the command-prompt cannot delete
        # by using unicode format (max path length 32000)
        path = "\\\\?\\%s\\test_dir" % os.getcwd()
        win32file.CreateDirectoryExW(".", path)

        for x in range(0, 20):
            print("path=%s" % path)
            path = path + "\\%sxxxxxxxxxxxxxxxxxxxx" % x
            win32file.CreateDirectoryExW(".", path)
        self.s.rmtree("test_dir")
        self.assertFalse(os.path.exists("test_dir"), msg="rmtree unsuccessful")

    @unittest.skipUnless(PYWIN32, "PyWin32 specific")
    def test_chmod_rmtree(self):
        self._create_temp_file()
        win32file.SetFileAttributesW(self.temp_file, win32file.FILE_ATTRIBUTE_READONLY)
        self.s = script.BaseScript(initial_config_file="test/test.json")
        self.s.rmtree("test_dir")
        self.assertFalse(os.path.exists("test_dir"), msg="rmtree unsuccessful")

    @unittest.skipIf(os.name == "nt""Not for Windows")
    def test_chmod(self):
        self._create_temp_file()
        self.s = script.BaseScript(initial_config_file="test/test.json")
        self.s.chmod(self.temp_file, 0o100700)
        self.assertEqual(os.stat(self.temp_file)[0], 33216, msg="chmod unsuccessful")

    def test_env_normal(self):
        self.s = script.BaseScript(initial_config_file="test/test.json")
        script_env = self.s.query_env()
        self.assertEqual(
            script_env,
            os.environ,
            msg="query_env() != env\n%s\n%s" % (script_env, os.environ),
        )

    def test_env_normal2(self):
        self.s = script.BaseScript(initial_config_file="test/test.json")
        self.s.query_env()
        script_env = self.s.query_env()
        self.assertEqual(
            script_env,
            os.environ,
            msg="Second query_env() != env\n%s\n%s" % (script_env, os.environ),
        )

    def test_env_partial(self):
        self.s = script.BaseScript(initial_config_file="test/test.json")
        script_env = self.s.query_env(partial_env={"foo""bar"})
        self.assertTrue("foo" in script_env and script_env["foo"] == "bar")

    def test_env_path(self):
        self.s = script.BaseScript(initial_config_file="test/test.json")
        partial_path = "yaddayadda:%(PATH)s"
        full_path = partial_path % {"PATH": os.environ["PATH"]}
        script_env = self.s.query_env(partial_env={"PATH": partial_path})
        self.assertEqual(script_env["PATH"], full_path)

    def test_query_exe(self):
        self.s = script.BaseScript(
            initial_config_file="test/test.json",
            config={"exes": {"foo""bar"}},
        )
        path = self.s.query_exe("foo")
        self.assertEqual(path, "bar")

    def test_query_exe_string_replacement(self):
        self.s = script.BaseScript(
            initial_config_file="test/test.json",
            config={
                "base_work_dir""foo",
                "work_dir""bar",
                "exes": {"foo": os.path.join("%(abs_work_dir)s""baz")},
            },
        )
        path = self.s.query_exe("foo")
        self.assertEqual(path, os.path.join("foo""bar""baz"))

    def test_read_from_file(self):
        self._create_temp_file()
        self.s = script.BaseScript(initial_config_file="test/test.json")
        contents = self.s.read_from_file(self.temp_file)
        self.assertEqual(contents, test_string)

    def test_read_from_nonexistent_file(self):
        self.s = script.BaseScript(initial_config_file="test/test.json")
        contents = self.s.read_from_file("nonexistent_file!!!")
        self.assertEqual(contents, None)


# TestScriptLogging {{{1
class TestScriptLogging(unittest.TestCase):
    # I need a log watcher helper function, here and in test_log.
    def setUp(self):
        cleanup()
        self.s = None

    def tearDown(self):
        # Close the logfile handles, or windows can't remove the logs
        if hasattr(self, "s"and isinstance(self.s, object):
            del self.s
        cleanup()

    def test_info_logsize(self):
        self.s = script.BaseScript(
            config={"log_type""multi"}, initial_config_file="test/test.json"
        )
        info_logsize = os.path.getsize("test_logs/test_info.log")
        self.assertTrue(info_logsize > 0, msg="initial info logfile missing/size 0")

    def test_add_summary_info(self):
        self.s = script.BaseScript(
            config={"log_type""multi"}, initial_config_file="test/test.json"
        )
        info_logsize = os.path.getsize("test_logs/test_info.log")
        self.s.add_summary("one")
        info_logsize2 = os.path.getsize("test_logs/test_info.log")
        self.assertTrue(
            info_logsize < info_logsize2, msg="add_summary() info not logged"
        )

    def test_add_summary_warning(self):
        self.s = script.BaseScript(
            config={"log_type""multi"}, initial_config_file="test/test.json"
        )
        warning_logsize = os.path.getsize("test_logs/test_warning.log")
        self.s.add_summary("two", level=WARNING)
        warning_logsize2 = os.path.getsize("test_logs/test_warning.log")
        self.assertTrue(
            warning_logsize < warning_logsize2,
            msg="add_summary(level=%s) not logged in warning log" % WARNING,
        )

    def test_summary(self):
        self.s = script.BaseScript(
            config={"log_type""multi"}, initial_config_file="test/test.json"
        )
        self.s.add_summary("one")
        self.s.add_summary("two", level=WARNING)
        info_logsize = os.path.getsize("test_logs/test_info.log")
        warning_logsize = os.path.getsize("test_logs/test_warning.log")
        self.s.summary()
        info_logsize2 = os.path.getsize("test_logs/test_info.log")
        warning_logsize2 = os.path.getsize("test_logs/test_warning.log")
        msg = ""
        if info_logsize >= info_logsize2:
            msg += "summary() didn't log to info!\n"
        if warning_logsize >= warning_logsize2:
            msg += "summary() didn't log to warning!\n"
        self.assertEqual(msg, "", msg=msg)

    def _test_log_level(self, log_level, log_level_file_list):
        self.s = script.BaseScript(
            config={"log_type""multi"}, initial_config_file="test/test.json"
        )
        if log_level != FATAL:
            self.s.log("testing", level=log_level)
        else:
            self.s._post_fatal = types.MethodType(_post_fatal, self.s)
            try:
                self.s.fatal("testing")
            except SystemExit:
                contents = None
                if os.path.exists("tmpfile_stdout"):
                    fh = open("tmpfile_stdout")
                    contents = fh.read()
                    fh.close()
                self.assertEqual(contents.rstrip(), test_string, "_post_fatal failed!")
        del self.s
        msg = ""
        for level in log_level_file_list:
            log_path = "test_logs/test_%s.log" % level
            if not os.path.exists(log_path):
                msg += "%s doesn't exist!\n" % log_path
            else:
                filesize = os.path.getsize(log_path)
                if not filesize > 0:
                    msg += "%s is size 0!\n" % log_path
        self.assertEqual(msg, "", msg=msg)

    def test_debug(self):
        self._test_log_level(DEBUG, [])

    def test_ignore(self):
        self._test_log_level(IGNORE, [])

    def test_info(self):
        self._test_log_level(INFO, [INFO])

    def test_warning(self):
        self._test_log_level(WARNING, [INFO, WARNING])

    def test_error(self):
        self._test_log_level(ERROR, [INFO, WARNING, ERROR])

    def test_critical(self):
        self._test_log_level(CRITICAL, [INFO, WARNING, ERROR, CRITICAL])

    def test_fatal(self):
        self._test_log_level(FATAL, [INFO, WARNING, ERROR, CRITICAL, FATAL])


# TestRetry {{{1
class NewError(Exception):
    pass


class OtherError(Exception):
    pass


class TestRetry(unittest.TestCase):
    def setUp(self):
        self.ATTEMPT_N = 1
        self.s = script.BaseScript(initial_config_file="test/test.json")

    def tearDown(self):
        # Close the logfile handles, or windows can't remove the logs
        if hasattr(self, "s"and isinstance(self.s, object):
            del self.s
        cleanup()

    def _succeedOnSecondAttempt(self, foo=None, exception=Exception):
        if self.ATTEMPT_N == 2:
            self.ATTEMPT_N += 1
            return
        self.ATTEMPT_N += 1
        raise exception("Fail")

    def _raiseCustomException(self):
        return self._succeedOnSecondAttempt(exception=NewError)

    def _alwaysPass(self):
        self.ATTEMPT_N += 1
        return True

    def _mirrorArgs(self, *args, **kwargs):
        return args, kwargs

    def _alwaysFail(self):
        raise Exception("Fail")

    def testRetrySucceed(self):
        # Will raise if anything goes wrong
        self.s.retry(self._succeedOnSecondAttempt, attempts=2, sleeptime=0)

    def testRetryFailWithoutCatching(self):
        self.assertRaises(
            Exception, self.s.retry, self._alwaysFail, sleeptime=0, exceptions=()
        )

    def testRetryFailEnsureRaisesLastException(self):
        self.assertRaises(
            SystemExit, self.s.retry, self._alwaysFail, sleeptime=0, error_level=FATAL
        )

    def testRetrySelectiveExceptionSucceed(self):
        self.s.retry(
            self._raiseCustomException,
            attempts=2,
            sleeptime=0,
            retry_exceptions=(NewError,),
        )

    def testRetrySelectiveExceptionFail(self):
        self.assertRaises(
            NewError,
            self.s.retry,
            self._raiseCustomException,
            attempts=2,
            sleeptime=0,
            retry_exceptions=(OtherError,),
        )

    # TODO: figure out a way to test that the sleep actually happened
    def testRetryWithSleep(self):
        self.s.retry(self._succeedOnSecondAttempt, attempts=2, sleeptime=1)

    def testRetryOnlyRunOnce(self):
        """Tests that retry() doesn't call the action again after success"""
        self.s.retry(self._alwaysPass, attempts=3, sleeptime=0)
        # self.ATTEMPT_N gets increased regardless of pass/fail
        self.assertEqual(2, self.ATTEMPT_N)

    def testRetryReturns(self):
        ret = self.s.retry(self._alwaysPass, sleeptime=0)
        self.assertEqual(ret, True)

    def testRetryCleanupIsCalled(self):
        cleanup = mock.Mock()
        self.s.retry(self._succeedOnSecondAttempt, cleanup=cleanup, sleeptime=0)
        self.assertEqual(cleanup.call_count, 1)

    def testRetryArgsPassed(self):
        args = (1, "two", 3)
        kwargs = dict(foo="a", bar=7)
        ret = self.s.retry(
            self._mirrorArgs, args=args, kwargs=kwargs.copy(), sleeptime=0
        )
        print(ret)
        self.assertEqual(ret[0], args)
        self.assertEqual(ret[1], kwargs)


class BaseScriptWithDecorators(script.BaseScript):
    def __init__(self, *args, **kwargs):
        super(BaseScriptWithDecorators, self).__init__(*args, **kwargs)

        self.pre_run_1_args = []
        self.raise_during_pre_run_1 = False
        self.pre_action_1_args = []
        self.raise_during_pre_action_1 = False
        self.pre_action_2_args = []
        self.pre_action_3_args = []
        self.post_action_1_args = []
        self.raise_during_post_action_1 = False
        self.post_action_2_args = []
        self.post_action_3_args = []
        self.post_run_1_args = []
        self.raise_during_post_run_1 = False
        self.post_run_2_args = []
        self.raise_during_build = False

    @script.PreScriptRun
    def pre_run_1(self, *args, **kwargs):
        self.pre_run_1_args.append((args, kwargs))

        if self.raise_during_pre_run_1:
            raise Exception(self.raise_during_pre_run_1)

    @script.PreScriptAction
    def pre_action_1(self, *args, **kwargs):
        self.pre_action_1_args.append((args, kwargs))

        if self.raise_during_pre_action_1:
            raise Exception(self.raise_during_pre_action_1)

    @script.PreScriptAction
    def pre_action_2(self, *args, **kwargs):
        self.pre_action_2_args.append((args, kwargs))

    @script.PreScriptAction("clobber")
    def pre_action_3(self, *args, **kwargs):
        self.pre_action_3_args.append((args, kwargs))

    @script.PostScriptAction
    def post_action_1(self, *args, **kwargs):
        self.post_action_1_args.append((args, kwargs))

        if self.raise_during_post_action_1:
            raise Exception(self.raise_during_post_action_1)

    @script.PostScriptAction
    def post_action_2(self, *args, **kwargs):
        self.post_action_2_args.append((args, kwargs))

    @script.PostScriptAction("build")
    def post_action_3(self, *args, **kwargs):
        self.post_action_3_args.append((args, kwargs))

    @script.PostScriptRun
    def post_run_1(self, *args, **kwargs):
        self.post_run_1_args.append((args, kwargs))

        if self.raise_during_post_run_1:
            raise Exception(self.raise_during_post_run_1)

    @script.PostScriptRun
    def post_run_2(self, *args, **kwargs):
        self.post_run_2_args.append((args, kwargs))

    def build(self):
        if self.raise_during_build:
            raise Exception(self.raise_during_build)


class TestScriptDecorators(unittest.TestCase):
    def setUp(self):
        cleanup()
        self.s = None

    def tearDown(self):
        if hasattr(self, "s"and isinstance(self.s, object):
            del self.s

        cleanup()

    def test_decorators_registered(self):
        self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")

        self.assertEqual(len(self.s._listeners["pre_run"]), 1)
        self.assertEqual(len(self.s._listeners["pre_action"]), 3)
        self.assertEqual(len(self.s._listeners["post_action"]), 3)
        self.assertEqual(len(self.s._listeners["post_run"]), 2)

    def test_pre_post_fired(self):
        self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
        self.s.run()

        self.assertEqual(len(self.s.pre_run_1_args), 1)
        self.assertEqual(len(self.s.pre_action_1_args), 2)
        self.assertEqual(len(self.s.pre_action_2_args), 2)
        self.assertEqual(len(self.s.pre_action_3_args), 1)
        self.assertEqual(len(self.s.post_action_1_args), 2)
        self.assertEqual(len(self.s.post_action_2_args), 2)
        self.assertEqual(len(self.s.post_action_3_args), 1)
        self.assertEqual(len(self.s.post_run_1_args), 1)

        self.assertEqual(self.s.pre_run_1_args[0], ((), {}))

        self.assertEqual(self.s.pre_action_1_args[0], (("clobber",), {}))
        self.assertEqual(self.s.pre_action_1_args[1], (("build",), {}))

        # pre_action_3 should only get called for the action it is registered
        # with.
        self.assertEqual(self.s.pre_action_3_args[0], (("clobber",), {}))

        self.assertEqual(self.s.post_action_1_args[0][0], ("clobber",))
        self.assertEqual(self.s.post_action_1_args[0][1], dict(success=True))
        self.assertEqual(self.s.post_action_1_args[1][0], ("build",))
        self.assertEqual(self.s.post_action_1_args[1][1], dict(success=True))

        # post_action_3 should only get called for the action it is registered
        # with.
        self.assertEqual(self.s.post_action_3_args[0], (("build",), dict(success=True)))

        self.assertEqual(self.s.post_run_1_args[0], ((), {}))

    def test_post_always_fired(self):
        self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
        self.s.raise_during_build = "Testing post always fired."

        with self.assertRaises(SystemExit):
            self.s.run()

        self.assertEqual(len(self.s.pre_run_1_args), 1)
        self.assertEqual(len(self.s.pre_action_1_args), 2)
        self.assertEqual(len(self.s.post_action_1_args), 2)
        self.assertEqual(len(self.s.post_action_2_args), 2)
        self.assertEqual(len(self.s.post_run_1_args), 1)
        self.assertEqual(len(self.s.post_run_2_args), 1)

        self.assertEqual(self.s.post_action_1_args[0][1], dict(success=True))
        self.assertEqual(self.s.post_action_1_args[1][1], dict(success=False))
        self.assertEqual(self.s.post_action_2_args[1][1], dict(success=False))

    def test_pre_run_exception(self):
        self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
        self.s.raise_during_pre_run_1 = "Error during pre run 1"

        with self.assertRaises(SystemExit):
            self.s.run()

        self.assertEqual(len(self.s.pre_run_1_args), 1)
        self.assertEqual(len(self.s.pre_action_1_args), 0)
        self.assertEqual(len(self.s.post_run_1_args), 1)
        self.assertEqual(len(self.s.post_run_2_args), 1)

    def test_pre_action_exception(self):
        self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
        self.s.raise_during_pre_action_1 = "Error during pre 1"

        with self.assertRaises(SystemExit):
            self.s.run()

        self.assertEqual(len(self.s.pre_run_1_args), 1)
        self.assertEqual(len(self.s.pre_action_1_args), 1)
        self.assertEqual(len(self.s.pre_action_2_args), 0)
        self.assertEqual(len(self.s.post_action_1_args), 1)
        self.assertEqual(len(self.s.post_action_2_args), 1)
        self.assertEqual(len(self.s.post_run_1_args), 1)
        self.assertEqual(len(self.s.post_run_2_args), 1)

    def test_post_action_exception(self):
        self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
        self.s.raise_during_post_action_1 = "Error during post 1"

        with self.assertRaises(SystemExit):
            self.s.run()

        self.assertEqual(len(self.s.pre_run_1_args), 1)
        self.assertEqual(len(self.s.post_action_1_args), 1)
        self.assertEqual(len(self.s.post_action_2_args), 1)
        self.assertEqual(len(self.s.post_run_1_args), 1)
        self.assertEqual(len(self.s.post_run_2_args), 1)

    def test_post_run_exception(self):
        self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
        self.s.raise_during_post_run_1 = "Error during post run 1"

        with self.assertRaises(SystemExit):
            self.s.run()

        self.assertEqual(len(self.s.post_run_1_args), 1)
        self.assertEqual(len(self.s.post_run_2_args), 1)


# main {{{1
if __name__ == "__main__":
    mozunit.main()

Messung V0.5
C=92 H=97 G=94

¤ Dauer der Verarbeitung: 0.9 Sekunden  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.