# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""Implements Auth0 Device Code flow and Lando try submission.
# The supported variants of `Repository` for this workflow.
SupportedVcsRepository = Union[GitRepository, HgRepository]
here = os.path.abspath(os.path.dirname(__file__))
build = MozbuildObject.from_environment(cwd=here)
def convert_bytes_patch_to_base64(patch_bytes: bytes) -> str: """Return a base64 encoded `str` representing the passed `bytes` patch.""" return base64.b64encode(patch_bytes).decode("ascii")
def load_token_from_disk() -> Optional[dict]: """Load and validate an existing Auth0 token from disk.
Return the token as a `dict` if it can be validated, orreturn `None` if any error was encountered. """ ifnot TOKEN_FILE.exists():
print("No existing Auth0 token found.") returnNone
try:
user_token = json.loads(TOKEN_FILE.read_bytes()) except json.JSONDecodeError:
print("Existing Auth0 token could not be decoded as JSON.") returnNone
return user_token
def get_stack_info(
vcs: SupportedVcsRepository, head: Optional[str]
) -> Tuple[str, List[str]]: """Retrieve information about the current stack for submission via Lando.
Returns a tuple of the current public base commit as a Mercurial SHA, and a list of ordered base64 encoded patches. """
base_commit = vcs.base_ref_as_hg() ifnot base_commit: raise ValueError( "Could not determine base Mercurial commit hash for submission."
)
print("Using", base_commit, "as the hg base commit.")
# Reuse the base revision when on Mercurial to avoid multiple calls to `hg log`.
branch_nodes_kwargs = {} if isinstance(vcs, HgRepository):
branch_nodes_kwargs["base_ref"] = base_commit
nodes = vcs.get_branch_nodes(head, **branch_nodes_kwargs) ifnot nodes: raise ValueError("Could not find any commit hashes for submission.") elif len(nodes) == 1:
print("Submitting a single try config commit.") elif len(nodes) == 2:
print("Submitting 1 node and the try commit.") else:
print("Submitting stack of", len(nodes) - 1, "nodes and the try commit.")
patches = vcs.get_commit_patches(nodes)
base64_patches = [
convert_bytes_patch_to_base64(patch_bytes) for patch_bytes in patches
]
print("Patches gathered for submission.")
return base_commit, base64_patches
@dataclass class Auth0Config: """Helper class to interact with Auth0."""
@property def jwks_url(self) -> str: """URL of the JWKS file.""" return f"{self.base_url}/.well-known/jwks.json"
@property def oauth_token_url(self) -> str: """URL of the OAuth Token endpoint.""" return f"{self.base_url}/oauth/token"
def request_device_code(self) -> dict: """Request authorization from Auth0 using the Device Code Flow.
See https://auth0.com/docs/api/authentication#get-device-code for more. """
response = requests.post(
self.device_code_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={ "audience": self.audience, "client_id": self.client_id, "scope": self.scope,
},
)
response.raise_for_status()
return response.json()
def validate_token(self, user_token: dict) -> Optional[dict]: """Verify the given user token is valid.
Validate the ID token, and validate the access token's expiration claim. """ # Import `auth0-python` here to avoid `ImportError` in tests, since # the `python-test` site won't have `auth0-python` installed. import jwt from auth0.authentication.token_verifier import (
AsymmetricSignatureVerifier,
TokenVerifier,
) from auth0.exceptions import (
TokenValidationError,
)
try:
token_verifier.verify(user_token["id_token"]) except TokenValidationError as e:
print("Could not validate existing Auth0 ID token:", str(e)) returnNone
# Assert that the access token isn't expired or expiring within a minute. if time.time() > access_token_expiration + 60:
print("Access token is expired.") returnNone
device_code_data = self.request_device_code()
print( "1. On your computer or mobile device navigate to:",
device_code_data["verification_uri_complete"],
)
print("2. Enter the following code:", device_code_data["user_code"])
# Print successive periods on the same line to avoid moving the link # while the user is trying to click it.
print("Waiting...", end="", flush=True) while time.perf_counter() - start < device_code_lifetime_s:
response = requests.post(
self.oauth_token_url,
data={ "client_id": self.client_id, "device_code": device_code_data["device_code"], "grant_type": "urn:ietf:params:oauth:grant-type:device_code", "scope": self.scope,
},
)
response_data = response.json()
if response.status_code == 200:
print("\nLogin successful.") return response_data
if response_data["error"] notin ("authorization_pending", "slow_down"): raise RuntimeError(response_data["error_description"])
raise ValueError("Timed out waiting for Auth0 device code authentication!")
def get_token(self) -> dict: """Retrieve an access token for authentication.
If a cached token is found and can be confirmed to be valid, return it.
Otherwise, perform the Device Code Flow authorization to request a new
token, validate it and save it to disk. """ # Load a cached token and validate it if one is available.
cached_token = load_token_from_disk()
user_token = self.validate_token(cached_token) if cached_token elseNone
# Login with the Device Authorization Flow if an existing token isn't found. ifnot user_token:
new_token = self.device_authorization_flow()
user_token = self.validate_token(new_token)
ifnot user_token: raise ValueError("Could not get an Auth0 token.")
# Save token to disk. with TOKEN_FILE.open("w") as f:
json.dump(user_token, f, indent=2, sort_keys=True)
return user_token
class LandoAPIException(Exception): """Raised when Lando throws an exception."""
@property def api_headers(self) -> dict[str, str]: """Headers for use accessing and authenticating against the API.""" return { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json",
}
@classmethod def from_lando_config_file(cls, config_path: Path, section: str) -> LandoAPI: """Build a `LandoConfig` from `section` in the file at `config_path`.""" ifnot config_path.exists(): raise ValueError(f"Could not find a Lando config file at `{config_path}`.")
def post(self, url: str, body: dict) -> dict: """Make a POST request to Lando."""
response = requests.post(url, headers=self.api_headers, json=body)
try:
response_json = response.json() except json.JSONDecodeError: # If the server didn't send back a valid JSON object, raise a stack # trace to the terminal which includes error details.
response.raise_for_status()
# Raise `ValueError` if the response wasn't JSON and we didn't raise # from an invalid status. raise LandoAPIException(
detail="Response was not valid JSON yet status was valid."
)
if response.status_code >= 400: raise LandoAPIException(detail=response_json["detail"])
Send the list of base64-encoded `patches` in `patch_format` to Lando, to be applied to
the Mercurial `base_commit`, using the Auth0 `access_token` for authorization. """
request_json_body = { "base_commit": base_commit, "patch_format": patch_format, "patches": patches,
}
print("Submitting patches to Lando.")
response_json = self.post(self.lando_try_api_url, request_json_body)
return response_json
def push_to_lando_try(
vcs: SupportedVcsRepository, commit_message: str, changed_files: dict
): """Push a set of patches to Lando's try endpoint.""" # Map `Repository` subclasses to the `patch_format` value Lando expects.
PATCH_FORMAT_STRING_MAPPING = {
GitRepository: "git-format-patch",
HgRepository: "hgexport",
}
patch_format = PATCH_FORMAT_STRING_MAPPING.get(type(vcs)) ifnot patch_format: # Other VCS types (namely `src`) are unsupported. raise ValueError(f"Try push via Lando is not supported for `{vcs.name}`.")
# Use Lando Prod unless the `LANDO_TRY_USE_DEV` environment variable is defined.
lando_config_section = ( "lando-prod"ifnot os.getenv("LANDO_TRY_USE_DEV") else"lando-dev"
)
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.