from urllib.parse import urlsplit, urljoin, parse_qs
try: from xml.etree import cElementTree as ElementTree except ImportError: from xml.etree import ElementTree as ElementTree # type: ignore
from . import fnmatch from . import rules from .. import localpaths from ..ci.tc.github_checks_output import get_gh_checks_outputter, GitHubChecksOutputter from ..gitignore.gitignore import PathFilter from ..wpt import testfiles from ..manifest.mputil import max_parallelism from ..manifest.vcs import walk
from ..manifest.sourcefile import SourceFile, js_meta_re, python_meta_re, space_chars, get_any_variants
from ..metadata.yaml.load import load_data_to_dict from ..metadata.meta.schema import META_YML_FILENAME, MetaFile from ..metadata.webfeatures.schema import WEB_FEATURES_YML_FILENAME, WebFeaturesFile
# The Ignorelist is a two level dictionary. The top level is indexed by # error names (e.g. 'TRAILING WHITESPACE'). Each of those then has a map of # file patterns (e.g. 'foo/*') to a set of specific line numbers for the # exception. The line numbers are optional; if missing the entire file # ignores the error.
Ignorelist = Dict[str, Dict[str, Set[Optional[int]]]]
# Define an arbitrary typevar
T = TypeVar("T")
logger: Optional[logging.Logger] = None
def setup_logging(prefix: bool = False) -> None: global logger if logger isNone:
logger = logging.getLogger(os.path.basename(os.path.splitext(__file__)[0]))
handler: logging.Handler = logging.StreamHandler(sys.stdout) # Only add a handler if the parent logger is missing a handler
parent = logger.parent assert isinstance(parent, logging.Logger) if parent and len(parent.handlers) == 0:
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler) if prefix:
format = logging.BASIC_FORMAT else:
format = "%(message)s"
formatter = logging.Formatter(format) for handler in logger.handlers:
handler.setFormatter(formatter)
logger.setLevel(logging.DEBUG)
However, instead of fixing a particular error, it's sometimes
OK to add a line to the lint.ignore file in the root of the
web-platform-tests directory to make the lint tool ignore it.
For example, to make the lint tool ignore all '%s'
errors in the %s file,
you could add the following line to the lint.ignore file.
%s: %s"""
def all_filesystem_paths(repo_root: Text, subdir: Optional[Text] = None) -> Iterable[Text]:
path_filter = PathFilter(repo_root.encode("utf8"),
extras=[b".git/"]) if subdir:
expanded_path = subdir.encode("utf8")
subdir_str = expanded_path else:
expanded_path = repo_root.encode("utf8") for dirpath, dirnames, filenames in path_filter(walk(expanded_path)): for filename, _ in filenames:
path = os.path.join(dirpath, filename) if subdir:
path = os.path.join(subdir_str, path) assertnot os.path.isabs(path), path yield path.decode("utf8")
def _all_files_equal(paths: Iterable[Text]) -> bool: """
Checks all the paths are files that are byte-for-byte identical
:param paths: the list of paths to compare
:returns: Trueif they are all identical """
paths = list(paths) if len(paths) < 2: returnTrue
first = paths.pop()
size = os.path.getsize(first) if any(os.path.getsize(path) != size for path in paths): returnFalse
# Chunk this to avoid eating up memory and file descriptors
bufsize = 4096*4 # 16KB, a "reasonable" number of disk sectors
groupsize = 8 # Hypothesised to be large enough in the common case that everything fits in one group with open(first, "rb") as first_f: for start in range(0, len(paths), groupsize):
path_group = paths[start:start+groupsize]
first_f.seek(0) try:
files = [open(x, "rb") for x in path_group] for _ in range(0, size, bufsize):
a = first_f.read(bufsize) for f in files:
b = f.read(bufsize) if a != b: returnFalse finally: for f in files:
f.close()
with tempfile.TemporaryFile('w+', newline='') as f: for path in paths:
f.write('%s\n' % os.path.join(repo_root, path))
f.seek(0) try:
matches = subprocess.check_output(
["git", "check-ignore", "--verbose", "--no-index", "--stdin"], stdin=f) for match in matches.strip().split(b'\n'):
match_filter, path_bytes = match.split()
_, _, filter_string = match_filter.split(b':') # If the matching filter reported by check-ignore is a special-case exception, # that's fine. Otherwise, it requires a new special-case exception. if filter_string[0:1] != b'!':
path = path_bytes.decode("utf8")
errors.append(rules.IgnoredPath.error(path, (path,))) except subprocess.CalledProcessError: # Nonzero return code means that no match exists. pass return errors
def check_unique_testharness_basenames(repo_root: Text, paths: List[Text]) -> List[rules.Error]: """
Checks that all testharness files have unique basename paths.
The 'basename path' refers to the entire path excluding the extension. For
example, 'foo/bar/baz.html'and'foo/bar/baz.xhtml' have the same basename
path, but 'foo/bar/baz.html'and'foo/qux/baz.html' do not.
for line in f:
line = line.strip() ifnot line or line.startswith("#"): continue
parts = [item.strip() for item in line.split(":")]
if len(parts) == 2:
error_types_s, file_match = parts
line_number: Optional[int] = None else:
error_types_s, file_match, line_number_s = parts
line_number = int(line_number_s)
error_types = {item.strip() for item in error_types_s.split(",")}
file_match = os.path.normcase(file_match)
if"*"in error_types:
skipped_files.add(file_match) else: for error_type in error_types:
data[error_type][file_match].add(line_number)
return data, skipped_files
def filter_ignorelist_errors(data: Ignorelist, errors: Sequence[rules.Error]) -> List[rules.Error]: """
Filter out those errors that are ignored in `data`. """
ifnot errors: return []
skipped = [Falsefor item in range(len(errors))]
for i, (error_type, msg, path, line) in enumerate(errors):
normpath = os.path.normcase(path) # Allow skipping all lint errors except the IGNORED PATH lint, # which explains how to fix it correctly and shouldn't be skipped. if error_type in data and error_type != "IGNORED PATH":
wl_files = data[error_type] for file_match, allowed_lines in wl_files.items(): ifNonein allowed_lines or line in allowed_lines: if fnmatch.fnmatchcase(normpath, file_match):
skipped[i] = True
return [item for i, item in enumerate(errors) ifnot skipped[i]]
if path.startswith("css/"): if (source_file.type != "support"and not source_file.name_is_reference and not source_file.name_is_tentative and not source_file.name_is_crashtest and not source_file.spec_links): return [rules.MissingLink.error(path)]
if source_file.name_is_non_test: return []
if source_file.markup_type isNone: return []
if source_file.root isNone: return [rules.ParseFailed.error(path)]
test_type = source_file.type
if test_type == "manual"andnot source_file.name_is_manual:
errors.append(rules.ContentManual.error(path))
if test_type == "visual"andnot source_file.name_is_visual:
errors.append(rules.ContentVisual.error(path))
about_blank_parts = urlsplit("about:blank") for reftest_node in source_file.reftest_nodes:
href = reftest_node.attrib.get("href", "").strip(space_chars)
parts = urlsplit(href)
if parts == about_blank_parts: continue
if (parts.scheme or parts.netloc):
errors.append(rules.AbsoluteUrlRef.error(path, (href,))) continue
if source_file.reftest_nodes: if test_type notin ("print-reftest", "reftest"):
errors.append(rules.ReferenceInOtherType.error(path, (test_type,)))
if len(source_file.timeout_nodes) > 1:
errors.append(rules.MultipleTimeout.error(path))
for timeout_node in source_file.timeout_nodes:
timeout_value = timeout_node.attrib.get("content", "").lower() if timeout_value != "long":
errors.append(rules.InvalidTimeout.error(path, (timeout_value,)))
if source_file.content_is_ref_node or source_file.content_is_testharness: for element in source_file.variant_nodes: if"content"notin element.attrib:
errors.append(rules.VariantMissing.error(path)) else:
variant = element.attrib["content"] if is_variant_malformed(variant):
value = f"{path} `` 'content' attribute"
errors.append(rules.MalformedVariant.error(path, (value,)))
required_elements: List[Text] = []
testharnessreport_nodes: List[ElementTree.Element] = [] if source_file.testharness_nodes: if test_type notin ("testharness", "manual"):
errors.append(rules.TestharnessInOtherType.error(path, (test_type,))) if len(source_file.testharness_nodes) > 1:
errors.append(rules.MultipleTestharness.error(path))
required_elements.extend(key for key, value in {"testharness": True, "testharnessreport": len(testharnessreport_nodes) > 0, "timeout": len(source_file.timeout_nodes) > 0}.items() if value)
testdriver_vendor_nodes: List[ElementTree.Element] = [] if source_file.testdriver_nodes: if test_type notin {"testharness", "reftest", "print-reftest", "crashtest", "support"}:
errors.append(rules.TestdriverInUnsupportedType.error(path, (test_type,)))
if len(source_file.testdriver_nodes) > 1:
errors.append(rules.MultipleTestdriver.error(path))
required_elements.append("testdriver") if len(testdriver_vendor_nodes) > 0:
required_elements.append("testdriver-vendor")
if required_elements:
seen_elements = defaultdict(bool)
for elem in source_file.root.iter(): if source_file.timeout_nodes and elem == source_file.timeout_nodes[0]:
seen_elements["timeout"] = True if seen_elements["testharness"]:
errors.append(rules.LateTimeout.error(path))
elif source_file.testharness_nodes and elem == source_file.testharness_nodes[0]:
seen_elements["testharness"] = True
def is_path_correct(script: Text, src: Text) -> bool: """ If the `src` relevant to the `script`, check that the `src` is the
correct path for `script`.
:param script: the script name to check the `src` for.
:param src: the included path.
:return: if the `src` irrelevant to the `script`, orif the `src`
path is the correct path. """ if script == src: # The src does not provide the full path. returnFalse
if"/%s" % script notin src: # The src is not relevant to the script. returnTrue
def is_query_string_correct(script: Text, src: Text,
allowed_query_string_params: Dict[str, List[str]]) -> bool: """
Checks if the query string in a script tag's `src` is valid.
Specifically, it verifies that the query string parameters and their
values are among those allowed for the given script. It handles vendor
prefixes (parameters or values containing a colon) by allowing them
unconditionally.
:param script: the name of the script (e.g., "testharness.js"). Used
to verify is the given `src` is related to the
script.
:param src: the full `src` attribute value from the script tag.
:param allowed_query_string_params: A dictionary where keys are
allowed parameter names and
values are lists of allowed
values for each parameter.
:return: if the query string is empty or contains only allowed
params. """ ifnot ("%s" % src).startswith("/resources/%s?" % script): # The src is not related to the script. returnTrue
try:
query_string = urlsplit(urljoin(source_file.url, src)).query
query_string_params = parse_qs(query_string,
keep_blank_values=True) except ValueError: # Parsing error means that the query string is incorrect. returnFalse
for param_name in query_string_params: if param_name notin allowed_query_string_params: returnFalse
for param_value in query_string_params[param_name]: if':'in param_value: # Allow for vendor-specific values in query parameters. continue if param_value notin allowed_query_string_params[
param_name]: returnFalse returnTrue
if (not is_path_correct("testharness.js", src) or not is_query_string_correct("testharness.js", src, {})):
errors.append(rules.TestharnessPath.error(path))
if (not is_path_correct("testharnessreport.js", src) or not is_query_string_correct("testharnessreport.js", src, {})):
errors.append(rules.TestharnessReportPath.error(path))
if (not is_path_correct("testdriver-vendor.js", src) or not is_query_string_correct("testdriver-vendor.js", src, {})):
errors.append(rules.TestdriverVendorPath.error(path))
script_path = None try:
script_path = urlsplit(urljoin(source_file.url, src)).path except ValueError: # This happens if the contents of src isn't something that looks like a URL to Python pass if (script_path == "/common/reftest-wait.js"and "reftest-wait"notin source_file.root.attrib.get("class", "").split()):
errors.append(rules.MissingReftestWait.error(path))
class OpenModeCheck(ASTCheck):
rule = rules.OpenNoMode
def check(self, root: ast.AST) -> List[int]:
errors = [] for node in ast.walk(root): if isinstance(node, ast.Call): if hasattr(node.func, "id") and node.func.id in ("open", "file"): if (len(node.args) < 2 and
all(item.arg != "mode"for item in node.keywords)):
errors.append(node.lineno) return errors
ast_checkers = [item() for item in [OpenModeCheck]]
def check_global_metadata(value: bytes) -> Iterable[Tuple[Type[rules.Rule], Tuple[Any, ...]]]:
global_values = {item.strip().decode("utf8") for item in value.split(b",") if item.strip()}
# TODO: this could check for duplicates and such for global_value in global_values: ifnot get_any_variants(global_value): yield (rules.UnknownGlobalMetadata, ())
done = False
errors = [] for line_no, line in enumerate(f, 1): assert isinstance(line, bytes), line
m = meta_re.match(line) if m:
key, value = m.groups() if key == b"global": for rule_class, context in check_global_metadata(value):
errors.append(rule_class.error(path, context, line_no)) elif key == b"timeout": if value != b"long":
errors.append(rules.UnknownTimeoutMetadata.error(path,
line_no=line_no)) elif key == b"variant": if is_variant_malformed(value.decode()):
value = f"{path} `META: variant=...` value"
errors.append(rules.MalformedVariant.error(path, (value,), line_no)) elif key == b"script": if value == b"/resources/testharness.js":
errors.append(rules.MultipleTestharness.error(path, line_no=line_no)) elif value == b"/resources/testharnessreport.js":
errors.append(rules.MultipleTestharnessReport.error(path, line_no=line_no)) elif key notin (b"title", b"quic"):
errors.append(rules.UnknownMetadata.error(path, line_no=line_no)) else:
done = True
if done: if meta_re.match(line):
errors.append(rules.StrayMetadata.error(path, line_no=line_no)) elif meta_re.search(line):
errors.append(rules.IndentedMetadata.error(path, line_no=line_no)) elif broken_metadata.search(line):
errors.append(rules.BrokenMetadata.error(path, line_no=line_no))
return errors
ahem_font_re = re.compile(br"font.*:.*ahem", flags=re.IGNORECASE) # Ahem can appear either in the global location or in the support # directory for legacy Mozilla imports
ahem_stylesheet_re = re.compile(br"\/fonts\/ahem\.css|support\/ahem.css",
flags=re.IGNORECASE)
def check_web_features_file(repo_root: Text, path: Text, f: IO[bytes]) -> List[rules.Error]: if os.path.basename(path) != WEB_FEATURES_YML_FILENAME: return [] try:
web_features_file: WebFeaturesFile = WebFeaturesFile(load_data_to_dict(f)) except Exception: return [rules.InvalidWebFeaturesFile.error(path)]
errors = []
base_dir = os.path.join(repo_root, os.path.dirname(path))
files_in_directory = [
f for f in os.listdir(base_dir) if os.path.isfile(os.path.join(base_dir, f))] for feature in web_features_file.features: if isinstance(feature.files, list): for file in feature.files: ifnot file.match_files(files_in_directory):
errors.append(rules.MissingTestInWebFeaturesFile.error(path, (file)))
return errors
def check_path(repo_root: Text, path: Text) -> List[rules.Error]: """
Runs lints that check the file path.
:param repo_root: the repository root
:param path: the path of the file within the repository
:returns: a list of errors found in ``path`` """
errors = [] for path_fn in path_lints:
errors.extend(path_fn(repo_root, path)) return errors
def check_all_paths(repo_root: Text, paths: List[Text]) -> List[rules.Error]: """
Runs lints that check all paths globally.
:param repo_root: the repository root
:param paths: a list of all the paths within the repository
:returns: a list of errors found in ``f`` """
errors = [] for paths_fn in all_paths_lints():
errors.extend(paths_fn(repo_root, paths)) return errors
def check_file_contents(repo_root: Text, path: Text, f: Optional[IO[bytes]] = None) -> List[rules.Error]: """
Runs lints that check the file contents.
:param repo_root: the repository root
:param path: the path of the file within the repository
:param f: a file-like object with the file contents
:returns: a list of errors found in ``f`` """ if f isNone:
f = open(os.path.join(repo_root, path), 'rb') with f:
errors = [] for file_fn in file_lints:
errors.extend(file_fn(repo_root, path, f))
f.seek(0) return errors
| Error Type | Position | Message |
|------------|----------|---------|""" for line in heading.split("\n"):
log(line) for error_type, description, path, line_number in errors:
pos_string = path if line_number:
pos_string += ":%s" % line_number
log(f"{error_type} | {pos_string} | {description} |")
def output_errors_json(log: Callable[[Any], None], errors: List[rules.Error]) -> None: for error_type, error, path, line_number in errors: # We use 'print' rather than the log function to ensure that the output # is valid JSON (e.g. with no logger preamble).
print(json.dumps({"path": path, "lineno": line_number, "rule": error_type, "message": error}))
def output_errors_github_checks(outputter: GitHubChecksOutputter, errors: List[rules.Error], first_reported: bool) -> None: """Output errors to the GitHub Checks output markdown format.
:param outputter: the GitHub Checks outputter
:param errors: a list of error tuples (error type, message, path, line number)
:param first_reported: Trueif these are the first reported errors """ if first_reported:
outputter.output( "\nChanges in this PR contain lint errors, listed below. These " "errors must either be fixed or added to the list of ignored " "errors; see [the documentation](" "https://web-platform-tests.org/writing-tests/lint-tool.html). " "For help, please tag `@web-platform-tests/wpt-core-team` in a " "comment.\n")
outputter.output("```")
output_errors_text(outputter.output, errors)
def lint_paths(kwargs: Dict[Text, Any], wpt_root: Text) -> List[Text]: if kwargs.get("paths"):
paths = [] for path in kwargs.get("paths", []): if os.path.isdir(path):
path_dir = list(all_filesystem_paths(wpt_root, path))
paths.extend(path_dir) elif os.path.isfile(path):
paths.append(os.path.relpath(os.path.abspath(path), wpt_root)) elif kwargs["all"]:
paths = list(all_filesystem_paths(wpt_root)) elif kwargs["paths_file"]:
paths = [] with open(kwargs["paths_file"], 'r', newline='') as f: for line in f.readlines():
path = line.strip() if os.path.isdir(path):
path_dir = list(all_filesystem_paths(wpt_root, path))
paths.extend(path_dir) elif os.path.isfile(path):
paths.append(os.path.relpath(os.path.abspath(path), wpt_root)) else:
changed_paths = changed_files(wpt_root)
force_all = False for path in changed_paths:
path = path.replace(os.path.sep, "/") if path == "lint.ignore"or path.startswith("tools/lint/"):
force_all = True break
paths = (list(changed_paths) ifnot force_all else list(all_filesystem_paths(wpt_root)))
return paths
def create_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument("paths", nargs="*",
help="List of paths to lint")
parser.add_argument("--json", action="store_true",
help="Output machine-readable JSON format")
parser.add_argument("--markdown", action="store_true",
help="Output markdown")
parser.add_argument("--repo-root", type=str,
help="The WPT directory. Use this " "option if the lint script exists outside the repository")
parser.add_argument("--ignore-glob", type=str, action="append",
help="Additional file glob to ignore (repeat to add more). " "Globs are matched against paths relative to REPO_ROOT " "using fnmatch, except that path separators are normalized.")
parser.add_argument("--all", action="store_true", help="If no paths are passed, try to lint the whole " "working directory, not just files that changed")
parser.add_argument("--github-checks-text-file", type=str,
help="Path to GitHub checks output file for Taskcluster runs")
parser.add_argument("-j", "--jobs", type=int, default=0,
help="Level to parallelism to use (defaults to 0, which detects the number of CPUs)")
parser.add_argument("--paths-file", help="File containing a list of files to lint, one per line") return parser
def main(venv: Any = None, **kwargs: Any) -> int:
assert logger isnotNone if kwargs.get("json") and kwargs.get("markdown"):
logger.critical("Cannot specify --json and --markdown")
sys.exit(2)
def process_errors(errors: List[rules.Error]) -> Optional[Tuple[Text, Text]]: """
Filters and prints the errors, and updates the ``error_count`` object.
:param errors: a list of error tuples (error type, message, path, line number)
:returns: ``None`` if there were no errors, or
a tuple of the error type and the path otherwise """
if jobs > 1 and len(to_check_content) >= MIN_FILES_FOR_PARALLEL:
pool = multiprocessing.Pool(jobs) # submit this job first, as it's the longest running
all_paths_result = pool.apply_async(check_all_paths, (repo_root, paths)) # each item tends to be quick, so pass things in large chunks to avoid too much IPC overhead
errors_it = pool.imap_unordered(check_file_contents_apply, to_check_content, chunksize=40)
pool.close() for errors in errors_it:
last = process_errors(errors) or last
errors = all_paths_result.get()
pool.join()
last = process_errors(errors) or last else: for item in to_check_content:
errors = check_file_contents(*item)
last = process_errors(errors) or last
errors = check_all_paths(repo_root, paths)
last = process_errors(errors) or last
if output_format in ("normal", "markdown"):
output_error_count(error_count) if error_count: assert last isnotNone assert logger isnotNone for line in (ERROR_MSG % (last[0], last[1], last[0], last[1])).split("\n"):
logger.info(line)
if error_count and github_checks_outputter:
github_checks_outputter.output("```")
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.