def cleanup(files=None):
files = files or []
files.extend(("test_logs", "test_dir", "tmpfile_stdout", "tmpfile_stderr"))
gc.collect()
c = CleanupObj() for f in files:
c.rmtree(f)
def get_debug_script_obj():
s = script.BaseScript(
config={"log_type": "multi", "log_level": DEBUG},
initial_config_file="test/test.json",
) return s
def tearDown(self): # Close the logfile handles, or windows can't remove the logs if hasattr(self, "s") and isinstance(self.s, object): del self.s
cleanup([self.tmpdir])
# test _dump_config_hierarchy() when --dump-config-hierarchy is passed def test_dump_config_hierarchy_valid_files_len(self): try:
self.s = script.BaseScript(
initial_config_file="test/test.json",
option_args=["--cfg", "test/test_override.py,test/test_override2.py"],
config={"dump_config_hierarchy": True},
) except SystemExit:
local_cfg_files = parse_config_file("test_logs/localconfigfiles.json") # first let's see if the correct number of config files were # realized
self.assertEqual(
len(local_cfg_files),
4,
msg="--dump-config-hierarchy dumped wrong number of config files",
)
def test_dump_config_hierarchy_keys_unique_and_valid(self): try:
self.s = script.BaseScript(
initial_config_file="test/test.json",
option_args=["--cfg", "test/test_override.py,test/test_override2.py"],
config={"dump_config_hierarchy": True},
) except SystemExit:
local_cfg_files = parse_config_file("test_logs/localconfigfiles.json") # now let's see if only unique items were added from each config
t_override = local_cfg_files.get("test/test_override.py", {})
self.assertTrue(
t_override.get("keep_string") == "don't change me" and len(t_override.keys()) == 1,
msg="--dump-config-hierarchy dumped wrong keys/value for " "`test/test_override.py`. There should only be one " "item and it should be unique to all the other " "items in test_log/localconfigfiles.json.",
)
def test_dump_config_hierarchy_matches_self_config(self): try: ###### # we need temp_cfg because self.s will be gcollected (NoneType) by # the time we get to SystemExit exception # temp_cfg will differ from self.s.config because of # 'dump_config_hierarchy'. we have to make a deepcopy because # config is a locked dict
temp_s = script.BaseScript(
initial_config_file="test/test.json",
option_args=["--cfg", "test/test_override.py,test/test_override2.py"],
) from copy import deepcopy
temp_cfg = deepcopy(temp_s.config)
temp_cfg.update({"dump_config_hierarchy": True}) ######
self.s = script.BaseScript(
initial_config_file="test/test.json",
option_args=["--cfg", "test/test_override.py,test/test_override2.py"],
config={"dump_config_hierarchy": True},
) except SystemExit:
local_cfg_files = parse_config_file("test_logs/localconfigfiles.json") # finally let's just make sure that all the items added up, equals # what we started with: self.config
target_cfg = {} for cfg_file in local_cfg_files:
target_cfg.update(local_cfg_files[cfg_file])
self.assertEqual(
target_cfg,
temp_cfg,
msg="all of the items (combined) in each cfg file dumped via " "--dump-config-hierarchy does not equal self.config ",
)
# test _dump_config() when --dump-config is passed def test_dump_config_equals_self_config(self): try: ###### # we need temp_cfg because self.s will be gcollected (NoneType) by # the time we get to SystemExit exception # temp_cfg will differ from self.s.config because of # 'dump_config_hierarchy'. we have to make a deepcopy because # config is a locked dict
temp_s = script.BaseScript(
initial_config_file="test/test.json",
option_args=["--cfg", "test/test_override.py,test/test_override2.py"],
) from copy import deepcopy
temp_cfg = deepcopy(temp_s.config)
temp_cfg.update({"dump_config": True}) ######
self.s = script.BaseScript(
initial_config_file="test/test.json",
option_args=["--cfg", "test/test_override.py,test/test_override2.py"],
config={"dump_config": True},
) except SystemExit:
target_cfg = parse_config_file("test_logs/localconfig.json")
self.assertEqual(
target_cfg,
temp_cfg,
msg="all of the items (combined) in each cfg file dumped via " "--dump-config does not equal self.config ",
)
# Test basic decompression for archive in ( "archive.tar", "archive.tar.bz2", "archive.tar.gz", "archive.tar.xz", "archive.zip",
):
self.s.download_unpack(
url=os.path.join(archives_path, archive), extract_to=self.tmpdir
)
self.assertIn("script.sh", os.listdir(os.path.join(self.tmpdir, "bin")))
self.assertIn("lorem.txt", os.listdir(self.tmpdir))
shutil.rmtree(self.tmpdir)
# Test permissions for extracted entries from zip archive
self.s.download_unpack(
url=os.path.join(archives_path, "archive.zip"),
extract_to=self.tmpdir,
)
file_stats = os.stat(os.path.join(self.tmpdir, "bin", "script.sh"))
orig_fstats = os.stat(
os.path.join(archives_path, "reference", "bin", "script.sh")
)
self.assertEqual(file_stats.st_mode, orig_fstats.st_mode)
shutil.rmtree(self.tmpdir)
# Test unzip specific dirs only
self.s.download_unpack(
url=os.path.join(archives_path, "archive.zip"),
extract_to=self.tmpdir,
extract_dirs=["bin/*"],
)
self.assertIn("bin", os.listdir(self.tmpdir))
self.assertNotIn("lorem.txt", os.listdir(self.tmpdir))
shutil.rmtree(self.tmpdir)
# Test for invalid filenames (Windows only) if PYWIN32: with self.assertRaises(IOError):
self.s.download_unpack(
url=os.path.join(archives_path, "archive_invalid_filename.zip"),
extract_to=self.tmpdir,
)
for archive in ( "archive-setuid.tar", "archive-escape.tar", "archive-link.tar", "archive-link-abs.tar", "archive-double-link.tar",
): with self.assertRaises(Exception):
self.s.download_unpack(
url=os.path.join(archives_path, archive),
extract_to=self.tmpdir,
)
# Test basic decompression for archive in ( "archive.tar", "archive.tar.bz2", "archive.tar.gz", "archive.tar.xz", "archive.zip",
):
self.s.unpack(os.path.join(archives_path, archive), self.tmpdir)
self.assertIn("script.sh", os.listdir(os.path.join(self.tmpdir, "bin")))
self.assertIn("lorem.txt", os.listdir(self.tmpdir))
shutil.rmtree(self.tmpdir)
# Test permissions for extracted entries from zip archive
self.s.unpack(os.path.join(archives_path, "archive.zip"), self.tmpdir)
file_stats = os.stat(os.path.join(self.tmpdir, "bin", "script.sh"))
orig_fstats = os.stat(
os.path.join(archives_path, "reference", "bin", "script.sh")
)
self.assertEqual(file_stats.st_mode, orig_fstats.st_mode)
shutil.rmtree(self.tmpdir)
# Test extract specific dirs only
self.s.unpack(
os.path.join(archives_path, "archive.zip"),
self.tmpdir,
extract_dirs=["bin/*"],
)
self.assertIn("bin", os.listdir(self.tmpdir))
self.assertNotIn("lorem.txt", os.listdir(self.tmpdir))
shutil.rmtree(self.tmpdir)
# Test for invalid filenames (Windows only) if PYWIN32: with self.assertRaises(IOError):
self.s.unpack(
os.path.join(archives_path, "archive_invalid_filename.zip"),
self.tmpdir,
)
for archive in ( "archive-setuid.tar", "archive-escape.tar", "archive-link.tar", "archive-link-abs.tar", "archive-double-link.tar",
): with self.assertRaises(Exception):
self.s.unpack(os.path.join(archives_path, archive), self.tmpdir)
# TestHelperFunctions {{{1 class TestHelperFunctions(unittest.TestCase):
temp_file = "test_dir/mozilla"
def setUp(self):
cleanup()
self.s = None
def tearDown(self): # Close the logfile handles, or windows can't remove the logs if hasattr(self, "s") and isinstance(self.s, object): del self.s
cleanup()
@unittest.skipUnless(PYWIN32, "PyWin32 specific") def test_long_dir_rmtree(self):
self.s = script.BaseScript(initial_config_file="test/test.json") # create a very long path that the command-prompt cannot delete # by using unicode format (max path length 32000)
path = "\\\\?\\%s\\test_dir" % os.getcwd()
win32file.CreateDirectoryExW(".", path)
for x in range(0, 20):
print("path=%s" % path)
path = path + "\\%sxxxxxxxxxxxxxxxxxxxx" % x
win32file.CreateDirectoryExW(".", path)
self.s.rmtree("test_dir")
self.assertFalse(os.path.exists("test_dir"), msg="rmtree unsuccessful")
# TestScriptLogging {{{1 class TestScriptLogging(unittest.TestCase): # I need a log watcher helper function, here and in test_log. def setUp(self):
cleanup()
self.s = None
def tearDown(self): # Close the logfile handles, or windows can't remove the logs if hasattr(self, "s") and isinstance(self.s, object): del self.s
cleanup()
class TestRetry(unittest.TestCase): def setUp(self):
self.ATTEMPT_N = 1
self.s = script.BaseScript(initial_config_file="test/test.json")
def tearDown(self): # Close the logfile handles, or windows can't remove the logs if hasattr(self, "s") and isinstance(self.s, object): del self.s
cleanup()
# TODO: figure out a way to test that the sleep actually happened def testRetryWithSleep(self):
self.s.retry(self._succeedOnSecondAttempt, attempts=2, sleeptime=1)
def testRetryOnlyRunOnce(self): """Tests that retry() doesn't call the action again after success"""
self.s.retry(self._alwaysPass, attempts=3, sleeptime=0) # self.ATTEMPT_N gets increased regardless of pass/fail
self.assertEqual(2, self.ATTEMPT_N)
def testRetryReturns(self):
ret = self.s.retry(self._alwaysPass, sleeptime=0)
self.assertEqual(ret, True)
# post_action_3 should only get called for the action it is registered # with.
self.assertEqual(self.s.post_action_3_args[0], (("build",), dict(success=True)))
def test_pre_run_exception(self):
self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
self.s.raise_during_pre_run_1 = "Error during pre run 1"
def test_post_run_exception(self):
self.s = BaseScriptWithDecorators(initial_config_file="test/test.json")
self.s.raise_during_post_run_1 = "Error during post run 1"
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.