# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import hashlib
import json
import os
import tarfile
import tempfile
from pathlib
import Path
import requests
import yaml
from mozlint.pathutils
import expand_exclusions
BROWSERTIME_FETCHES_PATH = Path(
"taskcluster/kinds/fetch/browsertime.yml")
CUSTOMIZATIONS_PATH = Path(
"testing/condprofile/condprof/customization/")
DOWNLOAD_TIMEOUT = 30
ERR_FETCH_TASK_MISSING =
"firefox-addons taskcluster fetch config section not found"
ERR_FETCH_TASK_ADDPREFIX =
"firefox-addons taskcluster config 'add-prefix' attribute should be set to 'firefox-addons/'"
ERR_FETCH_TASK_ARCHIVE = (
"Error downloading or opening archive from firefox-addons taskcluster fetch url"
)
LINTER_NAME =
"condprof-addons"
MOZ_FETCHES_DIR = os.environ.get(
"MOZ_FETCHES_DIR")
RULE_DESC =
"condprof addons all listed in firefox-addons.tar fetched archive"
MOZ_AUTOMATION =
"MOZ_AUTOMATION" in os.environ
tempdir = tempfile.gettempdir()
def lint(paths, config, logger, fix=
None, **lintargs):
filepaths = [Path(p)
for p
in expand_exclusions(paths, config, lintargs[
"root"])]
if len(filepaths) == 0:
return
linter = CondprofAddonsLinter(topsrcdir=lintargs[
"root"], logger=logger)
for filepath
in filepaths:
linter.lint(filepath)
class CondprofAddonsLinter:
def __init__(self, topsrcdir, logger):
self.topsrcdir = topsrcdir
self.logger = logger
self.BROWSERTIME_FETCHES_FULLPATH = Path(
self.topsrcdir, BROWSERTIME_FETCHES_PATH
)
self.CUSTOMIZATIONS_FULLPATH = Path(self.topsrcdir, CUSTOMIZATIONS_PATH)
self.tar_xpi_filenames = self.get_firefox_addons_tar_names()
def lint(self, filepath):
data = self.read_json(filepath)
if "addons" not in data:
return
for addon_key
in data[
"addons"]:
xpi_url = data[
"addons"][addon_key]
xpi_filename = xpi_url.split(
"/")[-1]
self.logger.info(f
"Found addon {xpi_filename}")
if xpi_filename
not in self.tar_xpi_filenames:
self.logger.lint_error(
self.get_missing_xpi_msg(xpi_filename),
lineno=0,
column=
None,
path=str(filepath),
linter=LINTER_NAME,
rule=RULE_DESC,
)
def get_missing_xpi_msg(self, xpi_filename):
return f
"{xpi_filename} is missing from the firefox-addons.tar archive"
def read_json(self, filepath):
with filepath.open(
"r")
as f:
return json.load(f)
def read_yaml(self, filepath):
with filepath.open(
"r")
as f:
return yaml.safe_load(f)
def download_firefox_addons_tar(self, firefox_addons_tar_url, tar_tmp_path):
self.logger.info(f
"Downloading {firefox_addons_tar_url} to {tar_tmp_path}")
res = requests.get(
firefox_addons_tar_url, stream=
True, timeout=DOWNLOAD_TIMEOUT
)
res.raise_for_status()
with tar_tmp_path.open(
"wb")
as f:
for chunk
in res.iter_content(chunk_size=1024):
if chunk
is not None:
f.write(chunk)
f.flush()
def get_firefox_addons_tar_names(self):
# Get firefox-addons fetch task config.
browsertime_fetches = self.read_yaml(self.BROWSERTIME_FETCHES_FULLPATH)
if not (
"firefox-addons" in browsertime_fetches
and "fetch" in browsertime_fetches[
"firefox-addons"]
):
self.logger.lint_error(
ERR_FETCH_TASK_MISSING,
lineno=0,
column=
None,
path=BROWSERTIME_FETCHES_PATH,
linter=LINTER_NAME,
rule=RULE_DESC,
)
return []
fetch_config = browsertime_fetches[
"firefox-addons"][
"fetch"]
if not (
"add-prefix" in fetch_config
and fetch_config[
"add-prefix"] ==
"firefox-addons/"
):
self.logger.lint_error(
ERR_FETCH_TASK_ADDPREFIX,
lineno=0,
column=
None,
path=BROWSERTIME_FETCHES_PATH,
linter=LINTER_NAME,
rule=RULE_DESC,
)
return []
firefox_addons_tar_url = fetch_config[
"url"]
firefox_addons_tar_sha256 = fetch_config[
"sha256"]
tar_xpi_files = list()
# When running on the CI, try to retrieve the list of xpi files from the target MOZ_FETCHES_DIR
# subdirectory instead of downloading the archive from the fetch url.
if MOZ_AUTOMATION:
fetches_path = (
Path(MOZ_FETCHES_DIR)
if MOZ_FETCHES_DIR
is not None else None
)
if fetches_path
is not None and fetches_path.exists():
self.logger.info(
"Detected MOZ_FETCHES_DIR, look for pre-downloaded firefox-addons fetch results"
)
# add-prefix presence and value has been enforced at the start of this method.
fetches_addons = Path(fetches_path,
"firefox-addons/")
if fetches_addons.exists():
self.logger.info(
f
"Retrieve list of xpi files from firefox-addons fetch result at {str(fetches_addons)}"
)
for xpi_path
in fetches_addons.iterdir():
if xpi_path.suffix ==
".xpi":
tar_xpi_files.append(xpi_path.name)
return tar_xpi_files
else:
self.logger.warning(
"No 'firefox-addons/' subdir found in MOZ_FETCHES_DIR"
)
# Fallback to download the tar archive and retrieve the list of xpi file from it
# (e.g. when linting the local changes on the developers environment).
tar_tmp_path = Path(tempdir,
"firefox-addons.tar")
tar_tmp_ready =
False
# If the firefox-addons.tar file is found in the tempdir, check if the
# file hash matches, if it does then don't download it again.
if tar_tmp_path.exists():
tar_tmp_hash = hashlib.sha256()
with tar_tmp_path.open(
"rb")
as f:
while chunk := f.read(1024):
tar_tmp_hash.update(chunk)
if tar_tmp_hash.hexdigest() == firefox_addons_tar_sha256:
self.logger.info(
f
"Pre-downloaded file for {tar_tmp_path} found and sha256 matching"
)
tar_tmp_ready =
True
else:
self.logger.info(
f
"{tar_tmp_path} sha256 does not match the fetch config"
)
# If the file is not found or the hash doesn't match, download it from the fetch task url.
if not tar_tmp_ready:
try:
self.download_firefox_addons_tar(firefox_addons_tar_url, tar_tmp_path)
except requests.exceptions.HTTPError
as http_err:
self.logger.lint_error(
f
"{ERR_FETCH_TASK_ARCHIVE}, {str(http_err)}",
lineno=0,
column=
None,
path=BROWSERTIME_FETCHES_PATH,
linter=LINTER_NAME,
rule=RULE_DESC,
)
return []
# Retrieve and return the list of xpi file names.
try:
with tarfile.open(tar_tmp_path,
"r")
as tf:
names = tf.getnames()
for name
in names:
file_path = Path(name)
if file_path.suffix ==
".xpi":
tar_xpi_files.append(file_path.name)
except tarfile.ReadError
as read_err:
self.logger.lint_error(
f
"{ERR_FETCH_TASK_ARCHIVE}, {str(read_err)}",
lineno=0,
column=
None,
path=BROWSERTIME_FETCHES_PATH,
linter=LINTER_NAME,
rule=RULE_DESC,
)
return []
return tar_xpi_files