# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/.
import bz2 import gzip import os import signal import socket import sys import time from subprocess import PIPE, Popen from urllib.request import urlretrieve
def transform_platform(
str_to_transform, config_platform, config_processor=None, mitmproxy_version=None
): """Transform platform name i.e. 'mitmproxy-rel-bin-{platform}.manifest'
transforms to 'mitmproxy-rel-bin-osx.manifest'.
Also transform '{x64}'if needed for 64 bit / win 10 """ if"{platform}"notin str_to_transform and"{x64}"notin str_to_transform: return str_to_transform
if"win"in config_platform:
platform_id = "win" elif config_platform == "mac": # Bug 1920821 # If we are using mitmproxy 11 we need to ensure platform_id is configured # correctly for the folder structure. Having this check also keeps the ability to # playback on older versions which don't have ARM support if config_processor == "arm"and mitmproxy_version == "11.0.0":
platform_id = "osx-arm64" else:
platform_id = "osx" else:
platform_id = "linux64"
@retriable(sleeptime=2) def tooltool_download(manifest, run_local, raptor_dir): """Download a file from tooltool using the provided tooltool manifest"""
tooltool_path = None
for path in TOOLTOOL_PATHS: if os.path.exists(path):
tooltool_path = path break if tooltool_path isNone: raise Exception("Could not find tooltool path!")
if run_local:
command = [sys.executable, tooltool_path, "fetch", "-o", "-m", manifest] else: # Attempt to determine the tooltool cache path: # - TOOLTOOLCACHE is used by Raptor tests # - TOOLTOOL_CACHE is automatically set for taskcluster jobs # - fallback to a hardcoded path
_cache = next(
x for x in (
os.environ.get("TOOLTOOLCACHE"),
os.environ.get("TOOLTOOL_CACHE"), "/builds/tooltool_cache",
) if x isnotNone
)
# We pipe input to the decompressor program so that we can apply # custom decompressors that the program may not know about. if typ == "tar": if suffix == ".bz2":
ifh = bz2.open(str(path), "rb") elif suffix == ".gz":
ifh = gzip.open(str(path), "rb") elif suffix == ".xz": ifnot lzma: raise ValueError("lzma Python package not available")
ifh = lzma.open(str(path), "rb") elif suffix == ".zst": ifnot zstandard: raise ValueError("zstandard Python package not available")
dctx = zstandard.ZstdDecompressor()
ifh = dctx.stream_reader(path.open("rb")) elif suffix == ".tar":
ifh = path.open("rb") else: raise ValueError("unknown archive format for tar file: %s" % path)
args = ["tar", "xf", "-"]
pipe_stdin = True elif typ == "zip": # unzip from stdin has wonky behavior. We don't use a pipe for it.
ifh = open(os.devnull, "rb")
args = ["unzip", "-o", str(path)]
pipe_stdin = False else: raise ValueError("unknown archive format: %s" % path)
LOG.info("Extracting %s to %s using %r" % (path, dest_dir, args))
t0 = time.time() with ifh:
p = Popen(args, cwd=str(dest_dir), bufsize=0, stdin=PIPE) whileTrue: ifnot pipe_stdin: break
chunk = ifh.read(131072) ifnot chunk: break
p.stdin.write(chunk) # make sure we wait for the command to finish
p.communicate()
if p.returncode: raise Exception("%r exited %d" % (args, p.returncode))
LOG.info("%s extracted in %.3fs" % (path, time.time() - t0))
def download_file_from_url(url, local_dest, extract=False): """Receive a file in a URL and download it, i.e. for the hostutils tooltool manifest
the url received would be formatted like this:
config/tooltool-manifests/linux64/hostutils.manifest""" if os.path.exists(local_dest):
LOG.info("file already exists at: %s" % local_dest) ifnot extract: returnTrue else:
LOG.info("downloading: %s to %s" % (url, local_dest)) try:
retry(urlretrieve, args=(url, local_dest), attempts=3, sleeptime=5) except Exception:
LOG.error("Failed to download file: %s" % local_dest, exc_info=True) if os.path.exists(local_dest): # delete partial downloaded file
os.remove(local_dest) returnFalse
ifnot extract: return os.path.exists(local_dest)
typ = archive_type(local_dest) if typ isNone:
LOG.info("Not able to determine archive type for: %s" % local_dest) returnFalse
def get_available_port():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
s.listen(1)
port = s.getsockname()[1]
s.close() return port
¤ Dauer der Verarbeitung: 0.15 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.