SSL test_files.py
Interaktion und PortierbarkeitPython
# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/.
from mozbuild.dirutils import ensureParentDir from mozbuild.util import ensure_bytes from mozpack.errors import ErrorMessage, errors from mozpack.files import (
AbsoluteSymlinkFile,
ComposedFinder,
DeflatedFile,
Dest,
ExistingFile,
ExtractedTarFile,
File,
FileFinder,
GeneratedFile,
HardlinkFile,
JarFinder,
ManifestFile,
MercurialFile,
MercurialRevisionFinder,
MinifiedCommentStripped,
MinifiedJavaScript,
PreprocessedFile,
TarFinder,
)
# We don't have hglib installed everywhere. try: import hglib except ImportError:
hglib = None
import os import platform import random import sys import tarfile import unittest from io import BytesIO from tempfile import mkdtemp
import mozfile import mozunit import six
import mozpack.path as mozpath from mozpack.chrome.manifest import (
ManifestContent,
ManifestLocale,
ManifestOverride,
ManifestResource,
) from mozpack.mozjar import JarReader, JarWriter
class TestWithTmpDir(unittest.TestCase): def setUp(self):
self.tmpdir = mkdtemp()
# See comment in mozpack.files.AbsoluteSymlinkFile if hasattr(os, "symlink") and platform.system() != "Windows":
dummy_path = self.tmppath("dummy_file") with open(dummy_path, "a"): pass
rand = bytes(
random.choice(b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") for i in six.moves.xrange(131597)
)
samples = [
b"",
b"test",
b"fooo",
b"same",
b"same",
b"Different and longer",
rand,
rand,
rand[:-1] + b"_",
b"test",
]
class TestFile(TestWithTmpDir): def test_file(self): """
Check that File.copy yields the proper content in the destination file in all situations that trigger different code paths:
- different content
- different content of the same size
- same content
- long content """
src = self.tmppath("src")
dest = self.tmppath("dest")
for content in samples: with open(src, "wb") as tmp:
tmp.write(content) # Ensure the destination file, when it exists, is older than the # source if os.path.exists(dest):
time = os.path.getmtime(src) - 1
os.utime(dest, (time, time))
f = File(src)
f.copy(dest)
self.assertEqual(content, open(dest, "rb").read())
self.assertEqual(content, f.open().read())
self.assertEqual(content, f.open().read())
def test_file_dest(self): """
Similar to test_file, but for a destination object instead of
a destination file. This ensures the destination object is being
used properly by File.copy, ensuring that other subclasses of Dest
will work. """
src = self.tmppath("src")
dest = MockDest()
for content in samples: with open(src, "wb") as tmp:
tmp.write(content)
f = File(src)
f.copy(dest)
self.assertEqual(content, dest.getvalue())
def test_file_open(self): """
Test whether File.open returns an appropriately reset file object. """
src = self.tmppath("src")
content = b"".join(samples) with open(src, "wb") as tmp:
tmp.write(content)
f = File(src)
self.assertEqual(content[:42], f.open().read(42))
self.assertEqual(content, f.open().read())
def test_file_no_write(self): """
Test various conditions where File.copy is expected not to write in the destination file. """
src = self.tmppath("src")
dest = self.tmppath("dest")
# When the source file is newer, but with the same content, no copy # should occur
time = os.path.getmtime(src) - 1
os.utime(dest, (time, time))
f.copy(DestNoWrite(dest))
self.assertEqual(b"test", open(dest, "rb").read())
# When the source file is older than the destination file, even with # different content, no copy should occur. with open(src, "wb") as tmp:
tmp.write(b"fooo")
time = os.path.getmtime(dest) - 1
os.utime(src, (time, time))
f.copy(DestNoWrite(dest))
self.assertEqual(b"test", open(dest, "rb").read())
# Double check that under conditions where a copy occurs, we would get # an exception.
time = os.path.getmtime(src) - 1
os.utime(dest, (time, time))
self.assertRaises(RuntimeError, f.copy, DestNoWrite(dest))
# skip_if_older=False is expected to force a copy in this situation.
f.copy(dest, skip_if_older=False)
self.assertEqual(b"fooo", open(dest, "rb").read())
class TestAbsoluteSymlinkFile(TestWithTmpDir): def test_absolute_relative(self):
AbsoluteSymlinkFile("/foo")
with self.assertRaisesRegex(ValueError, "Symlink target not absolute"):
AbsoluteSymlinkFile("./foo")
def test_symlink_file(self):
source = self.tmppath("test_path") with open(source, "wt") as fh:
fh.write("Hello world")
s = AbsoluteSymlinkFile(source)
dest = self.tmppath("symlink")
self.assertTrue(s.copy(dest))
if self.symlink_supported:
self.assertTrue(os.path.islink(dest))
link = os.readlink(dest)
self.assertEqual(link, source) else:
self.assertTrue(os.path.isfile(dest))
content = open(dest).read()
self.assertEqual(content, "Hello world")
def test_replace_file_with_symlink(self): # If symlinks are supported, an existing file should be replaced by a # symlink.
source = self.tmppath("test_path") with open(source, "wt") as fh:
fh.write("source")
dest = self.tmppath("dest") with open(dest, "a"): pass
s = AbsoluteSymlinkFile(source)
s.copy(dest, skip_if_older=False)
if self.symlink_supported:
self.assertTrue(os.path.islink(dest))
link = os.readlink(dest)
self.assertEqual(link, source) else:
self.assertTrue(os.path.isfile(dest))
content = open(dest).read()
self.assertEqual(content, "source")
s = AbsoluteSymlinkFile(source)
self.assertTrue(s.copy(dest))
self.assertTrue(os.path.islink(dest))
link = os.readlink(dest)
self.assertEqual(link, source)
def test_noop(self): ifnot hasattr(os, "symlink") or sys.platform == "win32": return
source = self.tmppath("source")
dest = self.tmppath("dest")
with open(source, "a"): pass
os.symlink(source, dest)
link = os.readlink(dest)
self.assertEqual(link, source)
s = AbsoluteSymlinkFile(source)
self.assertFalse(s.copy(dest))
link = os.readlink(dest)
self.assertEqual(link, source)
class TestHardlinkFile(TestWithTmpDir): def test_absolute_relative(self):
HardlinkFile("/foo")
HardlinkFile("./foo")
def test_hardlink_file(self):
source = self.tmppath("test_path") with open(source, "wt") as fh:
fh.write("Hello world")
s = HardlinkFile(source)
dest = self.tmppath("hardlink")
self.assertTrue(s.copy(dest))
if self.hardlink_supported:
source_stat = os.stat(source)
dest_stat = os.stat(dest)
self.assertEqual(source_stat.st_dev, dest_stat.st_dev)
self.assertEqual(source_stat.st_ino, dest_stat.st_ino) else:
self.assertTrue(os.path.isfile(dest)) with open(dest) as f:
content = f.read()
self.assertEqual(content, "Hello world")
def test_replace_file_with_hardlink(self): # If hardlink are supported, an existing file should be replaced by a # symlink.
source = self.tmppath("test_path") with open(source, "wt") as fh:
fh.write("source")
dest = self.tmppath("dest") with open(dest, "a"): pass
s = HardlinkFile(source)
s.copy(dest, skip_if_older=False)
if self.hardlink_supported:
source_stat = os.stat(source)
dest_stat = os.stat(dest)
self.assertEqual(source_stat.st_dev, dest_stat.st_dev)
self.assertEqual(source_stat.st_ino, dest_stat.st_ino) else:
self.assertTrue(os.path.isfile(dest)) with open(dest) as f:
content = f.read()
self.assertEqual(content, "source")
def test_replace_hardlink(self): ifnot self.hardlink_supported: raise unittest.SkipTest("hardlink not supported")
source = self.tmppath("source") with open(source, "a"): pass
dest = self.tmppath("dest")
os.link(source, dest)
s = HardlinkFile(source)
self.assertFalse(s.copy(dest))
class TestPreprocessedFile(TestWithTmpDir): def test_preprocess(self): """
Test that copying the file invokes the preprocessor """
src = self.tmppath("src")
dest = self.tmppath("dest")
with open(src, "wb") as tmp:
tmp.write(b"#ifdef FOO\ntest\n#endif")
f = PreprocessedFile(src, depfile_path=None, marker="#", defines={"FOO": True})
self.assertTrue(f.copy(dest))
def test_preprocess_file_no_write(self): """
Test various conditions where PreprocessedFile.copy is expected not to
write in the destination file. """
src = self.tmppath("src")
dest = self.tmppath("dest")
depfile = self.tmppath("depfile")
with open(src, "wb") as tmp:
tmp.write(b"#ifdef FOO\ntest\n#endif")
# When the source file is older than the destination file, even with # different content, no copy should occur. with open(src, "wb") as tmp:
tmp.write(b"#ifdef FOO\nfooo\n#endif")
time = os.path.getmtime(dest) - 1
os.utime(src, (time, time))
self.assertFalse(f.copy(DestNoWrite(dest)))
self.assertEqual(b"test\n", open(dest, "rb").read())
# skip_if_older=False is expected to force a copy in this situation.
self.assertTrue(f.copy(dest, skip_if_older=False))
self.assertEqual(b"fooo\n", open(dest, "rb").read())
def test_preprocess_file_dependencies(self): """
Test that the preprocess runs if the dependencies of the source change """
src = self.tmppath("src")
dest = self.tmppath("dest")
incl = self.tmppath("incl")
deps = self.tmppath("src.pp")
with open(src, "wb") as tmp:
tmp.write(b"#ifdef FOO\ntest\n#endif")
with open(incl, "wb") as tmp:
tmp.write(b"foo bar")
# Update the source so it #includes the include file. with open(src, "wb") as tmp:
tmp.write(b"#include incl\n")
time = os.path.getmtime(dest) + 1
os.utime(src, (time, time))
self.assertTrue(f.copy(dest))
self.assertEqual(b"foo bar", open(dest, "rb").read())
# If one of the dependencies changes, the file should be updated. The # mtime of the dependency is set after the destination file, to avoid # both files having the same time. with open(incl, "wb") as tmp:
tmp.write(b"quux")
time = os.path.getmtime(dest) + 1
os.utime(incl, (time, time))
self.assertTrue(f.copy(dest))
self.assertEqual(b"quux", open(dest, "rb").read())
# Perform one final copy to confirm that we don't run the preprocessor # again. We update the mtime of the destination so it's newer than the # input files. This would "just work" if we weren't changing
time = os.path.getmtime(incl) + 1
os.utime(dest, (time, time))
self.assertFalse(f.copy(DestNoWrite(dest)))
def test_replace_symlink(self): """
Test that if the destination exists, andis a symlink, the target of
the symlink isnot overwritten by the preprocessor output. """ ifnot self.symlink_supported: return
class TestExistingFile(TestWithTmpDir): def test_required_missing_dest(self): with self.assertRaisesRegex(ErrorMessage, "Required existing file"):
f = ExistingFile(required=True)
f.copy(self.tmppath("dest"))
def test_required_existing_dest(self):
p = self.tmppath("dest") with open(p, "a"): pass
f = ExistingFile(required=True)
f.copy(p)
def test_optional_missing_dest(self):
f = ExistingFile(required=False)
f.copy(self.tmppath("dest"))
def test_optional_existing_dest(self):
p = self.tmppath("dest") with open(p, "a"): pass
f = ExistingFile(required=False)
f.copy(p)
class TestGeneratedFile(TestWithTmpDir): def test_generated_file(self): """
Check that GeneratedFile.copy yields the proper content in the
destination file in all situations that trigger different code paths
(see TestFile.test_file) """
dest = self.tmppath("dest")
for content in samples:
f = GeneratedFile(content)
f.copy(dest)
self.assertEqual(content, open(dest, "rb").read())
def test_generated_file_open(self): """
Test whether GeneratedFile.open returns an appropriately reset file
object. """
content = b"".join(samples)
f = GeneratedFile(content)
self.assertEqual(content[:42], f.open().read(42))
self.assertEqual(content, f.open().read())
def test_generated_file_no_write(self): """
Test various conditions where GeneratedFile.copy is expected not to
write in the destination file. """
dest = self.tmppath("dest")
# Initial copy
f = GeneratedFile(b"test")
f.copy(dest)
# When using a new instance with the same content, no copy should occur
f = GeneratedFile(b"test")
f.copy(DestNoWrite(dest))
self.assertEqual(b"test", open(dest, "rb").read())
# Double check that under conditions where a copy occurs, we would get # an exception.
f = GeneratedFile(b"fooo")
self.assertRaises(RuntimeError, f.copy, DestNoWrite(dest))
def test_generated_file_function(self): """
Test GeneratedFile behavior with functions. """
dest = self.tmppath("dest")
data = { "num_calls": 0,
}
class TestDeflatedFile(TestWithTmpDir): def test_deflated_file(self): """
Check that DeflatedFile.copy yields the proper content in the
destination file in all situations that trigger different code paths
(see TestFile.test_file) """
src = self.tmppath("src.jar")
dest = self.tmppath("dest")
contents = {} with JarWriter(src) as jar: for content in samples:
name = "".join(
random.choice( "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
) for i in range(8)
)
jar.add(name, content, compress=True)
contents[name] = content
for j in JarReader(src):
f = DeflatedFile(j)
f.copy(dest)
self.assertEqual(contents[j.filename], open(dest, "rb").read())
def test_deflated_file_open(self): """
Test whether DeflatedFile.open returns an appropriately reset file
object. """
src = self.tmppath("src.jar")
content = b"".join(samples) with JarWriter(src) as jar:
jar.add("content", content)
f = DeflatedFile(JarReader(src)["content"])
self.assertEqual(content[:42], f.open().read(42))
self.assertEqual(content, f.open().read())
def test_deflated_file_no_write(self): """
Test various conditions where DeflatedFile.copy is expected not to
write in the destination file. """
src = self.tmppath("src.jar")
dest = self.tmppath("dest")
with JarWriter(src) as jar:
jar.add("test", b"test")
jar.add("test2", b"test")
jar.add("fooo", b"fooo")
jar = JarReader(src) # Initial copy
f = DeflatedFile(jar["test"])
f.copy(dest)
# When using a different file with the same content, no copy should # occur
f = DeflatedFile(jar["test2"])
f.copy(DestNoWrite(dest))
self.assertEqual(b"test", open(dest, "rb").read())
# Double check that under conditions where a copy occurs, we would get # an exception.
f = DeflatedFile(jar["fooo"])
self.assertRaises(RuntimeError, f.copy, DestNoWrite(dest))
# Compiled typelib for the following IDL: # interface foo; # [scriptable, uuid(5f70da76-519c-4858-b71e-e3c92333e2d6)] # interface bar { # void bar(in foo f); # }; # We need to make this [scriptable] so it doesn't get deleted from the # typelib. We don't need to make the foo interfaces below [scriptable], # because they will be automatically included by virtue of being an # argument to a method of |bar|.
bar_xpt = GeneratedFile(
b"\x58\x50\x43\x4F\x4D\x0A\x54\x79\x70\x65\x4C\x69\x62\x0D\x0A\x1A"
+ b"\x01\x02\x00\x02\x00\x00\x00\x7B\x00\x00\x00\x24\x00\x00\x00\x5C"
+ b"\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
+ b"\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x5F"
+ b"\x70\xDA\x76\x51\x9C\x48\x58\xB7\x1E\xE3\xC9\x23\x33\xE2\xD6\x00"
+ b"\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x0D\x00\x66\x6F\x6F\x00"
+ b"\x62\x61\x72\x00\x62\x61\x72\x00\x00\x00\x00\x01\x00\x00\x00\x00"
+ b"\x09\x01\x80\x92\x00\x01\x80\x06\x00\x00\x80"
)
mini_lines = [six.ensure_text(s) for s in min_f.open().readlines()]
self.assertTrue(mini_lines)
self.assertTrue(len(mini_lines) < len(self.orig_lines))
def do_finder_test(self, finder):
self.assertTrue(finder.contains("foo/.foo"))
self.assertTrue(finder.contains("foo/.bar"))
self.assertTrue("foo/.foo"in [f for f, c in finder.find("foo/.foo")])
self.assertTrue("foo/.bar/foo"in [f for f, c in finder.find("foo/.bar")])
self.assertEqual(
sorted([f for f, c in finder.find("foo/.*")]), ["foo/.bar/foo", "foo/.foo"]
) for pattern in ["foo", "**", "**/*", "**/foo", "foo/*"]:
self.assertFalse("foo/.foo"in [f for f, c in finder.find(pattern)])
self.assertFalse("foo/.bar/foo"in [f for f, c in finder.find(pattern)])
self.assertEqual(
sorted([f for f, c in finder.find(pattern)]),
sorted([f for f, c in finder if mozpath.match(f, pattern)]),
)
def do_check(test, finder, pattern, result): if result:
test.assertTrue(finder.contains(pattern)) else:
test.assertFalse(finder.contains(pattern))
test.assertEqual(sorted(list(f for f, c in finder.find(pattern))), sorted(result))
class TestFileFinder(MatchTestTemplate, TestWithTmpDir): def add(self, path):
ensureParentDir(self.tmppath(path))
open(self.tmppath(path), "wb").write(six.ensure_binary(path))
def test_composed_finder(self):
self.prepare_match_test() # Also add files in $tmp/a/foo/qux because ComposedFinder is # expected to mask foo/qux entirely with content from $tmp/b.
ensureParentDir(self.tmppath("a/foo/qux/hoge"))
open(self.tmppath("a/foo/qux/hoge"), "wb").write(b"hoge")
open(self.tmppath("a/foo/qux/bar"), "wb").write(b"not the right content")
self.finder = ComposedFinder(
{ "": FileFinder(self.tmppath("a")), "foo/qux": FileFinder(self.tmppath("b")),
}
)
self.do_match_test()
@unittest.skipUnless(hglib, "hglib not available")
@unittest.skipIf(
six.PY3 and os.name == "nt", "Does not currently work in Python3 on Windows"
) class TestMercurialRevisionFinder(MatchTestTemplate, TestWithTmpDir): def setUp(self):
super(TestMercurialRevisionFinder, self).setUp()
hglib.init(self.tmpdir)
self._clients = []
def tearDown(self): # Ensure the hg client process is closed. Otherwise, Windows # may have trouble removing the repo directory because the process # has an open handle on it. for client in getattr(self, "_clients", []): if client.server:
client.close()
def add(self, path): with self._client() as c:
ensureParentDir(self.tmppath(path)) with open(self.tmppath(path), "wb") as fh:
fh.write(six.ensure_binary(path))
c.add(six.ensure_binary(self.tmppath(path)))
def test_old_revision(self):
with self._client() as c:
with open(self.tmppath("foo"), "wb") as fh:
fh.write(b"foo initial")
c.add(six.ensure_binary(self.tmppath("foo")))
c.commit("initial")
with open(self.tmppath("foo"), "wb") as fh:
fh.write(b"foo second")
with open(self.tmppath("bar"), "wb") as fh:
fh.write(b"bar second")
c.add(six.ensure_binary(self.tmppath("bar")))
c.commit("second")
# This wipes out the working directory, ensuring the finder isn't
# finding anything from the filesystem.
c.rawcommand([b"update", b"null"])
finder = self._get_finder(self.tmpdir, "0")
f = finder.get("foo")
self.assertEqual(f.read(), b"foo initial")
self.assertEqual(f.read(), b"foo initial", "read again for good measure")
self.assertIsNone(finder.get("bar"))
finder = self._get_finder(self.tmpdir, rev="1")
f = finder.get("foo")
self.assertEqual(f.read(), b"foo second")
f = finder.get("bar")
self.assertEqual(f.read(), b"bar second")
f = None
def test_recognize_repo_paths(self):
with self._client() as c:
with open(self.tmppath("foo"), "wb") as fh:
fh.write(b"initial")
c.add(six.ensure_binary(self.tmppath("foo")))
c.commit("initial")
c.rawcommand([b"update", b"null"])
finder = self._get_finder(self.tmpdir, "0", recognize_repo_paths=True)
with self.assertRaises(NotImplementedError):
list(finder.find(""))
with self.assertRaises(ValueError):
finder.get("foo")
with self.assertRaises(ValueError):
finder.get("")
f = finder.get(self.tmppath("foo"))
self.assertIsInstance(f, MercurialFile)
self.assertEqual(f.read(), b"initial")
f = None
if __name__ == "__main__":
mozunit.main()
¤ Diese beiden folgenden Angebotsgruppen bietet das Unternehmen0.19Angebot
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.