Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/python/taskcluster/taskcluster/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 6 kB image not shown  

Quelle  helper.py   Sprache: Python

 
# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import os

import datetime
import logging
import requests
from taskcluster.generated import _client_importer
from taskcluster.generated.aio import _client_importer as _async_client_importer
from taskcluster.utils import stringDate
import urllib.parse

logger = logging.getLogger(__name__)


class TaskclusterConfig(object):
    """
    Local configuration used to access Taskcluster service and objects
    """

    def __init__(self, url=None):
        self.options = None
        self.secrets = None
        self.default_url = url if url is not None else os.environ.get("TASKCLUSTER_ROOT_URL")
        assert self.default_url is not None"You must specify a Taskcluster deployment url"

    def auth(self, client_id=None, access_token=None, max_retries=12):
        """
        Build Taskcluster credentials options
        Supports, by order of preference:
         * directly provided credentials
         * credentials from environment variables
         * taskclusterProxy
         * no authentication
        """
        self.options = {"maxRetries": max_retries}

        if client_id is None and access_token is None:
            # Credentials preference: Use env. variables
            client_id = os.environ.get("TASKCLUSTER_CLIENT_ID")
            access_token = os.environ.get("TASKCLUSTER_ACCESS_TOKEN")
            logger.info("Using taskcluster credentials from environment")
        else:
            logger.info("Using taskcluster credentials from cli")

        if client_id is not None and access_token is not None:
            # Use provided credentials
            self.options["credentials"] = {
                "clientId": client_id,
                "accessToken": access_token,
            }
            self.options["rootUrl"] = self.default_url

        elif "TASK_ID" in os.environ:
            # Use Taskcluster Proxy when running in a task
            logger.info("Taskcluster Proxy enabled")
            self.options["rootUrl"] = os.environ.get("TASKCLUSTER_PROXY_URL""http://taskcluster")

        else:
            logger.info("No Taskcluster authentication.")
            self.options["rootUrl"] = self.default_url

    def get_service(self, service_name, use_async=False):
        """
        Build a Taskcluster service instance using current authentication
        """
        if self.options is None:
            self.auth()

        client_importer = _async_client_importer if use_async else _client_importer
        service = getattr(client_importer, service_name.capitalize(), None)
        assert service is not None"Invalid Taskcluster service {}".format(
            service_name
        )
        return service(self.options)

    def load_secrets(
        self, secret_name, prefixes=[], required=[], existing={}, local_secrets=None
    ):
        """Shortcut to use load_secrets helper with current authentication"""
        self.secrets = load_secrets(
            self.get_service('secrets'),
            secret_name,
            prefixes,
            required,
            existing,
            local_secrets,
        )
        return self.secrets

    def upload_artifact(self, artifact_path, content, content_type, ttl):
        """Shortcut to use upload_artifact helper with current authentication"""
        path = upload_artifact(
            self.get_service('queue'),
            artifact_path,
            content,
            content_type,
            ttl,
        )

        return urllib.parse.urljoin(self.default_url, path)


def load_secrets(
    secrets_service, secret_name, prefixes=[], required=[], existing={}, local_secrets=None
):
    """
    Fetch a specific set of secrets by name and verify that the required
    secrets exist.
    Also supports providing local secrets to avoid using remote Taskcluster service
    for local development (or contributor onboarding)
    A user can specify prefixes to limit the part of secrets used (useful when a secret
    is shared amongst several services)
    """
    secrets = {}
    if existing:
        secrets.update(existing)

    if isinstance(local_secrets, dict):
        # Use local secrets file to avoid using Taskcluster secrets
        logger.info("Using provided local secrets")
        all_secrets = local_secrets
    else:
        # Use Taskcluster secret service
        assert secret_name is not None"Missing Taskcluster secret secret_name"
        all_secrets = secrets_service.get(secret_name).get("secret", dict())
        logger.info("Loaded Taskcluster secret {}".format(secret_name))

    if prefixes:
        # Use secrets behind supported prefixes
        for prefix in prefixes:
            secrets.update(all_secrets.get(prefix, dict()))

    else:
        # Use all secrets available
        secrets.update(all_secrets)

    # Check required secrets
    for required_secret in required:
        if required_secret not in secrets:
            raise Exception("Missing value {} in secrets.".format(required_secret))

    return secrets


def upload_artifact(queue_service, artifact_path, content, content_type, ttl):
    """
    DEPRECATED. Do not use.
    """
    task_id = os.environ.get("TASK_ID")
    run_id = os.environ.get("RUN_ID")
    proxy = os.environ.get("TASKCLUSTER_PROXY_URL")
    assert task_id and run_id and proxy, "Can only run in Taskcluster tasks with proxy"
    # Contet can be str or bytes
    assert isinstance(content, str) or isinstance(content, bytes)
    assert isinstance(ttl, datetime.timedelta)

    # Create S3 artifact on Taskcluster
    resp = queue_service.createArtifact(
        task_id,
        run_id,
        artifact_path,
        {
            "storageType""s3",
            "expires": stringDate(datetime.datetime.utcnow() + ttl),
            "contentType": content_type,
        },
    )
    assert resp["storageType"] == "s3""Not an s3 storage"
    assert "putUrl" in resp, "Missing putUrl"
    assert "contentType" in resp, "Missing contentType"

    # Push the artifact on storage service
    headers = {"Content-Type": resp["contentType"]}
    push = requests.put(url=resp["putUrl"], headers=headers, data=content)
    push.raise_for_status()

    # Build the absolute url
    return "/api/queue/v1/task/{task_id}/runs/{run_id}/artifacts/{path}".format(
        task_id=task_id,
        run_id=run_id,
        path=artifact_path,
    )

95%


¤ Dauer der Verarbeitung: 0.16 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung ist noch experimentell.