# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import datetime
import logging
import requests
from taskcluster.generated
import _client_importer
from taskcluster.generated.aio
import _client_importer
as _async_client_importer
from taskcluster.utils
import stringDate
import urllib.parse
logger = logging.getLogger(__name__)
class TaskclusterConfig(object):
"""
Local configuration used to access Taskcluster service
and objects
"""
def __init__(self, url=
None):
self.options =
None
self.secrets =
None
self.default_url = url
if url
is not None else os.environ.get(
"TASKCLUSTER_ROOT_URL")
assert self.default_url
is not None,
"You must specify a Taskcluster deployment url"
def auth(self, client_id=
None, access_token=
None, max_retries=12):
"""
Build Taskcluster credentials options
Supports, by order of preference:
* directly provided credentials
* credentials
from environment variables
* taskclusterProxy
* no authentication
"""
self.options = {
"maxRetries": max_retries}
if client_id
is None and access_token
is None:
# Credentials preference: Use env. variables
client_id = os.environ.get(
"TASKCLUSTER_CLIENT_ID")
access_token = os.environ.get(
"TASKCLUSTER_ACCESS_TOKEN")
logger.info(
"Using taskcluster credentials from environment")
else:
logger.info(
"Using taskcluster credentials from cli")
if client_id
is not None and access_token
is not None:
# Use provided credentials
self.options[
"credentials"] = {
"clientId": client_id,
"accessToken": access_token,
}
self.options[
"rootUrl"] = self.default_url
elif "TASK_ID" in os.environ:
# Use Taskcluster Proxy when running in a task
logger.info(
"Taskcluster Proxy enabled")
self.options[
"rootUrl"] = os.environ.get(
"TASKCLUSTER_PROXY_URL",
"http://taskcluster")
else:
logger.info(
"No Taskcluster authentication.")
self.options[
"rootUrl"] = self.default_url
def get_service(self, service_name, use_async=
False):
"""
Build a Taskcluster service instance using current authentication
"""
if self.options
is None:
self.auth()
client_importer = _async_client_importer
if use_async
else _client_importer
service = getattr(client_importer, service_name.capitalize(),
None)
assert service
is not None,
"Invalid Taskcluster service {}".format(
service_name
)
return service(self.options)
def load_secrets(
self, secret_name, prefixes=[], required=[], existing={}, local_secrets=
None
):
"""Shortcut to use load_secrets helper with current authentication"""
self.secrets = load_secrets(
self.get_service(
'secrets'),
secret_name,
prefixes,
required,
existing,
local_secrets,
)
return self.secrets
def upload_artifact(self, artifact_path, content, content_type, ttl):
"""Shortcut to use upload_artifact helper with current authentication"""
path = upload_artifact(
self.get_service(
'queue'),
artifact_path,
content,
content_type,
ttl,
)
return urllib.parse.urljoin(self.default_url, path)
def load_secrets(
secrets_service, secret_name, prefixes=[], required=[], existing={}, local_secrets=
None
):
"""
Fetch a specific set of secrets by name
and verify that the required
secrets exist.
Also supports providing local secrets to avoid using remote Taskcluster service
for local development (
or contributor onboarding)
A user can specify prefixes to limit the part of secrets used (useful when a secret
is shared amongst several services)
"""
secrets = {}
if existing:
secrets.update(existing)
if isinstance(local_secrets, dict):
# Use local secrets file to avoid using Taskcluster secrets
logger.info(
"Using provided local secrets")
all_secrets = local_secrets
else:
# Use Taskcluster secret service
assert secret_name
is not None,
"Missing Taskcluster secret secret_name"
all_secrets = secrets_service.get(secret_name).get(
"secret", dict())
logger.info(
"Loaded Taskcluster secret {}".format(secret_name))
if prefixes:
# Use secrets behind supported prefixes
for prefix
in prefixes:
secrets.update(all_secrets.get(prefix, dict()))
else:
# Use all secrets available
secrets.update(all_secrets)
# Check required secrets
for required_secret
in required:
if required_secret
not in secrets:
raise Exception(
"Missing value {} in secrets.".format(required_secret))
return secrets
def upload_artifact(queue_service, artifact_path, content, content_type, ttl):
"""
DEPRECATED. Do
not use.
"""
task_id = os.environ.get(
"TASK_ID")
run_id = os.environ.get(
"RUN_ID")
proxy = os.environ.get(
"TASKCLUSTER_PROXY_URL")
assert task_id
and run_id
and proxy,
"Can only run in Taskcluster tasks with proxy"
# Contet can be str or bytes
assert isinstance(content, str)
or isinstance(content, bytes)
assert isinstance(ttl, datetime.timedelta)
# Create S3 artifact on Taskcluster
resp = queue_service.createArtifact(
task_id,
run_id,
artifact_path,
{
"storageType":
"s3",
"expires": stringDate(datetime.datetime.utcnow() + ttl),
"contentType": content_type,
},
)
assert resp[
"storageType"] ==
"s3",
"Not an s3 storage"
assert "putUrl" in resp,
"Missing putUrl"
assert "contentType" in resp,
"Missing contentType"
# Push the artifact on storage service
headers = {
"Content-Type": resp[
"contentType"]}
push = requests.put(url=resp[
"putUrl"], headers=headers, data=content)
push.raise_for_status()
# Build the absolute url
return "/api/queue/v1/task/{task_id}/runs/{run_id}/artifacts/{path}".format(
task_id=task_id,
run_id=run_id,
path=artifact_path,
)