# -*- coding: utf-8 -*- # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import datetime import logging import requests from taskcluster.generated import _client_importer from taskcluster.generated.aio import _client_importer as _async_client_importer from taskcluster.utils import stringDate import urllib.parse
logger = logging.getLogger(__name__)
class TaskclusterConfig(object): """
Local configuration used to access Taskcluster service and objects """
def __init__(self, url=None):
self.options = None
self.secrets = None
self.default_url = url if url isnotNoneelse os.environ.get("TASKCLUSTER_ROOT_URL") assert self.default_url isnotNone, "You must specify a Taskcluster deployment url"
def auth(self, client_id=None, access_token=None, max_retries=12): """
Build Taskcluster credentials options
Supports, by order of preference:
* directly provided credentials
* credentials from environment variables
* taskclusterProxy
* no authentication """
self.options = {"maxRetries": max_retries}
if client_id isNoneand access_token isNone: # Credentials preference: Use env. variables
client_id = os.environ.get("TASKCLUSTER_CLIENT_ID")
access_token = os.environ.get("TASKCLUSTER_ACCESS_TOKEN")
logger.info("Using taskcluster credentials from environment") else:
logger.info("Using taskcluster credentials from cli")
if client_id isnotNoneand access_token isnotNone: # Use provided credentials
self.options["credentials"] = { "clientId": client_id, "accessToken": access_token,
}
self.options["rootUrl"] = self.default_url
elif"TASK_ID"in os.environ: # Use Taskcluster Proxy when running in a task
logger.info("Taskcluster Proxy enabled")
self.options["rootUrl"] = os.environ.get("TASKCLUSTER_PROXY_URL", "http://taskcluster")
def get_service(self, service_name, use_async=False): """
Build a Taskcluster service instance using current authentication """ if self.options isNone:
self.auth()
client_importer = _async_client_importer if use_async else _client_importer
service = getattr(client_importer, service_name.capitalize(), None) assert service isnotNone, "Invalid Taskcluster service {}".format(
service_name
) return service(self.options)
def load_secrets(
self, secret_name, prefixes=[], required=[], existing={}, local_secrets=None
): """Shortcut to use load_secrets helper with current authentication"""
self.secrets = load_secrets(
self.get_service('secrets'),
secret_name,
prefixes,
required,
existing,
local_secrets,
) return self.secrets
def upload_artifact(self, artifact_path, content, content_type, ttl): """Shortcut to use upload_artifact helper with current authentication"""
path = upload_artifact(
self.get_service('queue'),
artifact_path,
content,
content_type,
ttl,
)
def load_secrets(
secrets_service, secret_name, prefixes=[], required=[], existing={}, local_secrets=None
): """
Fetch a specific set of secrets by name and verify that the required
secrets exist.
Also supports providing local secrets to avoid using remote Taskcluster service for local development (or contributor onboarding)
A user can specify prefixes to limit the part of secrets used (useful when a secret is shared amongst several services) """
secrets = {} if existing:
secrets.update(existing)
if isinstance(local_secrets, dict): # Use local secrets file to avoid using Taskcluster secrets
logger.info("Using provided local secrets")
all_secrets = local_secrets else: # Use Taskcluster secret service assert secret_name isnotNone, "Missing Taskcluster secret secret_name"
all_secrets = secrets_service.get(secret_name).get("secret", dict())
logger.info("Loaded Taskcluster secret {}".format(secret_name))
if prefixes: # Use secrets behind supported prefixes for prefix in prefixes:
secrets.update(all_secrets.get(prefix, dict()))
else: # Use all secrets available
secrets.update(all_secrets)
# Check required secrets for required_secret in required: if required_secret notin secrets: raise Exception("Missing value {} in secrets.".format(required_secret))
return secrets
def upload_artifact(queue_service, artifact_path, content, content_type, ttl): """
DEPRECATED. Do not use. """
task_id = os.environ.get("TASK_ID")
run_id = os.environ.get("RUN_ID")
proxy = os.environ.get("TASKCLUSTER_PROXY_URL") assert task_id and run_id and proxy, "Can only run in Taskcluster tasks with proxy" # Contet can be str or bytes assert isinstance(content, str) or isinstance(content, bytes) assert isinstance(ttl, datetime.timedelta)
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.