# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Code for parsing metrics.yaml files. """
import functools from pathlib import Path import textwrap from typing import Any, cast, Dict, Generator, Iterable, Optional, Set, Tuple, Union
from .metrics import Metric, ObjectTree from .pings import Ping, RESERVED_PING_NAMES from .tags import Tag from . import util from .util import DictWrapper
if filetype notin ("metrics", "pings", "tags"):
filetype = None
for error in validate(content, filepath):
content = {} yield error
return content, filetype
@functools.lru_cache(maxsize=1) def _load_schemas() -> Dict[str, Tuple[Any, Any]]: """
Load all of the known schemas from disk, and put them in a map based on the
schema's $id. """
schemas = {} for schema_path in SCHEMAS_DIR.glob("*.yaml"):
schema = util.load_yaml_or_json(schema_path)
resolver = util.get_null_resolver(schema)
validator_class = jsonschema.validators.validator_for(schema)
_update_validator(validator_class)
validator_class.check_schema(schema)
validator = validator_class(schema, resolver=resolver)
schemas[schema["$id"]] = (schema, validator) return schemas
def _get_schema(
schema_id: str, filepath: Union[str, Path] = ""
) -> Tuple[Any, Any]: """
Get the schema for the given schema $id. """
schemas = _load_schemas() if schema_id notin schemas: raise ValueError(
util.format_error(
filepath, "",
f"$schema key must be one of {', '.join(schemas.keys())}",
)
) return schemas[schema_id]
def _get_schema_for_content(
content: Dict[str, util.JSONType], filepath: Union[str, Path]
) -> Tuple[Any, Any]: """
Get the appropriate schema for the given JSON content. """
schema_url = content.get("$schema") ifnot isinstance(schema_url, str): raise TypeError("Invalid $schema type {schema_url}") return _get_schema(schema_url, filepath)
def validate(
content: Dict[str, util.JSONType], filepath: Union[str, Path] = ""
) -> Generator[str, None, None]: """
Validate the given content against the appropriate schema. """ try:
schema, validator = _get_schema_for_content(content, filepath) except ValueError as e: yield str(e) else: yieldfrom (
util.format_error(filepath, "", util.pprint_validation_error(e)) for e in validator.iter_errors(content)
)
def _instantiate_metrics(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str, None, None]: """
Load a list of metrics.yaml files, convert the JSON information into Metric
objects, and merge them into a single tree. """
global_no_lint = content.get("no_lint", [])
global_tags = content.get("$tags", []) assert isinstance(global_tags, list)
for category_key, category_val in sorted(content.items()): if category_key.startswith("$"): continue if category_key == "no_lint": continue ifnot config.get("allow_reserved") and category_key.split(".")[0] == "glean": yield util.format_error(
filepath,
f"For category '{category_key}'", "Categories beginning with 'glean' are reserved for " "Glean internal use.",
) continue
all_objects.setdefault(category_key, DictWrapper())
ifnot isinstance(category_val, dict): raise TypeError(f"Invalid content for {category_key}")
for metric_key, metric_val in sorted(category_val.items()): try:
metric_obj = Metric.make_metric(
category_key, metric_key, metric_val, validated=True, config=config
) except Exception as e: yield util.format_error(
filepath,
f"On instance {category_key}.{metric_key}",
str(e),
metric_val.defined_in["line"],
)
metric_obj = None else: if ( not config.get("allow_reserved") and"all-pings"in metric_obj.send_in_pings
): yield util.format_error(
filepath,
f"On instance {category_key}.{metric_key}", 'Only internal metrics may specify "all-pings" ' 'in "send_in_pings"',
metric_val.defined_in["line"],
)
metric_obj = None
if metric_obj isnotNone:
metric_obj.no_lint = sorted(set(metric_obj.no_lint + global_no_lint)) if len(global_tags):
metric_obj.metadata["tags"] = sorted(
set(metric_obj.metadata.get("tags", []) + global_tags)
)
if isinstance(filepath, Path):
metric_obj.defined_in["filepath"] = str(filepath)
already_seen = sources.get((category_key, metric_key)) if already_seen isnotNone: # We've seen this metric name already yield util.format_error(
filepath, "",
(
f"Duplicate metric name '{category_key}.{metric_key}' "
f"already defined in '{already_seen}'"
),
metric_obj.defined_in["line"],
) else:
all_objects[category_key][metric_key] = metric_obj
sources[(category_key, metric_key)] = filepath
def _instantiate_pings(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str, None, None]: """
Load a list of pings.yaml files, convert the JSON information into Ping
objects. """
global_no_lint = content.get("no_lint", []) assert isinstance(global_no_lint, list)
ping_schedule_reverse_map: Dict[str, Set[str]] = dict()
for ping_key, ping_val in sorted(content.items()): if ping_key.startswith("$"): continue if ping_key == "no_lint": continue ifnot config.get("allow_reserved"): if ping_key in RESERVED_PING_NAMES: yield util.format_error(
filepath,
f"For ping '{ping_key}'",
f"Ping uses a reserved name ({RESERVED_PING_NAMES})",
) continue ifnot isinstance(ping_val, dict): raise TypeError(f"Invalid content for ping {ping_key}")
ping_val["name"] = ping_key
if"metadata"in ping_val and"ping_schedule"in ping_val["metadata"]: if ping_key in ping_val["metadata"]["ping_schedule"]: yield util.format_error(
filepath,
f"For ping '{ping_key}'", "ping_schedule contains its own ping name",
) continue for ping_schedule in ping_val["metadata"]["ping_schedule"]: if ping_schedule notin ping_schedule_reverse_map:
ping_schedule_reverse_map[ping_schedule] = set()
ping_schedule_reverse_map[ping_schedule].add(ping_key)
if ping_obj isnotNone:
ping_obj.no_lint = sorted(set(ping_obj.no_lint + global_no_lint))
if isinstance(filepath, Path) and ping_obj.defined_in isnotNone:
ping_obj.defined_in["filepath"] = str(filepath)
already_seen = sources.get(ping_key) if already_seen isnotNone: # We've seen this ping name already yield util.format_error(
filepath, "",
f"Duplicate ping name '{ping_key}' "
f"already defined in '{already_seen}'",
) else:
all_objects.setdefault("pings", {})[ping_key] = ping_obj
sources[ping_key] = filepath
for scheduler, scheduled in ping_schedule_reverse_map.items(): if scheduler in all_objects["pings"] and isinstance(
all_objects["pings"][scheduler], Ping
):
scheduler_obj: Ping = cast(Ping, all_objects["pings"][scheduler])
scheduler_obj.schedules_pings = sorted(list(scheduled))
def _instantiate_tags(
all_objects: ObjectTree,
sources: Dict[Any, Path],
content: Dict[str, util.JSONType],
filepath: Path,
config: Dict[str, Any],
) -> Generator[str, None, None]: """
Load a list of tags.yaml files, convert the JSON information into Tag
objects. """
global_no_lint = content.get("no_lint", []) assert isinstance(global_no_lint, list)
for tag_key, tag_val in sorted(content.items()): if tag_key.startswith("$"): continue if tag_key == "no_lint": continue ifnot isinstance(tag_val, dict): raise TypeError(f"Invalid content for tag {tag_key}")
tag_val["name"] = tag_key try:
tag_obj = Tag(
defined_in=getattr(tag_val, "defined_in", None),
_validated=True,
**tag_val,
) except Exception as e: yield util.format_error(filepath, f"On instance '{tag_key}'", str(e)) continue
if tag_obj isnotNone:
tag_obj.no_lint = sorted(set(tag_obj.no_lint + global_no_lint))
if isinstance(filepath, Path) and tag_obj.defined_in isnotNone:
tag_obj.defined_in["filepath"] = str(filepath)
already_seen = sources.get(tag_key) if already_seen isnotNone: # We've seen this tag name already yield util.format_error(
filepath, "",
f"Duplicate tag name '{tag_key}' "
f"already defined in '{already_seen}'",
) else:
all_objects.setdefault("tags", {})[tag_key] = tag_obj
sources[tag_key] = filepath
def _preprocess_objects(objs: ObjectTree, config: Dict[str, Any]) -> ObjectTree: """
Preprocess the object tree to better set defaults. """ for category in objs.values(): for obj in category.values(): ifnot isinstance(obj, Metric): continue
if hasattr(obj, "send_in_pings"): if"default"in obj.send_in_pings:
obj.send_in_pings = obj.default_store_names + [
x for x in obj.send_in_pings if x != "default"
]
obj.send_in_pings = sorted(list(set(obj.send_in_pings))) return objs
@util.keep_value def parse_objects(
filepaths: Iterable[Path], config: Optional[Dict[str, Any]] = None
) -> Generator[str, None, ObjectTree]: """
Parse one or more metrics.yaml and/or pings.yaml files, returning a tree of
`metrics.Metric`, `pings.Ping`, and `tags.Tag` instances.
The result is a generator over any errors. If there are no errors, the
actual metrics can be obtained from `result.value`. For example::
result = metrics.parse_metrics(filepaths) for err in result:
print(err)
all_metrics = result.value
The result value is a dictionary of category names to categories, where
each category is a dictionary from metric name to `metrics.Metric`
instances. There are also the special categories `pings` and `tags`
containing all of the `pings.Ping` and `tags.Tag` instances, respectively.
:param filepaths: list of Path objects to metrics.yaml, pings.yaml, and/or
tags.yaml files
:param config: A dictionary of options that change parsing behavior.
Supported keys are:
- `allow_reserved`: Allow values reserved for internal Glean use.
- `do_not_disable_expired`: Don't mark expired metrics as disabled.
This is useful when you want to retain the original "disabled"
value from the `metrics.yaml`, rather than having it overridden when
the metric expires.
- `allow_missing_files`: Do notraise a `FileNotFoundError` if any of
the input `filepaths` do not exist.
- `interesting`: Contains an array of interesting metrics/ping files.
Probes not included in these files will be marked as disabled. """ if config isNone:
config = {}
if config.get("interesting"): # We're configured to disable probes not included in the interesting list.
filepaths = util.ensure_list(config.get("interesting"))
interesting_metrics_dict: Dict[str, Dict[str, Any]] = dict()
interesting_metrics_dict.setdefault("metrics", DictWrapper())
interesting_metrics_dict.setdefault("pings", DictWrapper()) for filepath in filepaths:
content, filetype = yieldfrom _load_file(filepath, config)
ifnot isinstance(content, dict): raise TypeError(f"Invalid content for {filepath}")
for category_key, category_val in sorted(content.items()): if category_key.startswith("$"): continue
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.