"""Utility functions for handling and fetching repo archives in zip format."""
import os import tempfile from pathlib import Path from typing import Optional from zipfile import BadZipFile, ZipFile
import requests
from cookiecutter.exceptions import InvalidZipRepository from cookiecutter.prompt import prompt_and_delete, read_repo_password from cookiecutter.utils import make_sure_path_exists
def unzip(
zip_uri: str,
is_url: bool,
clone_to_dir: "os.PathLike[str]" = ".",
no_input: bool = False,
password: Optional[str] = None,
): """Download and unpack a zipfile at a given URI.
This will download the zipfile to the cookiecutter repository, and unpack into a temporary directory.
:param zip_uri: The URI for the zipfile.
:param is_url: Is the zip URI a URL or a file?
:param clone_to_dir: The cookiecutter repository directory
to put the archive into.
:param no_input: Do not prompt for user input and eventually force a refresh of
cached resources.
:param password: The password to use when unpacking the repository. """ # Ensure that clone_to_dir exists
clone_to_dir = Path(clone_to_dir).expanduser()
make_sure_path_exists(clone_to_dir)
if is_url: # Build the name of the cached zipfile, # and prompt to delete if it already exists.
identifier = zip_uri.rsplit('/', 1)[1]
zip_path = os.path.join(clone_to_dir, identifier)
if os.path.exists(zip_path):
download = prompt_and_delete(zip_path, no_input=no_input) else:
download = True
if download: # (Re) download the zipfile
r = requests.get(zip_uri, stream=True, timeout=100) with open(zip_path, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks
f.write(chunk) else: # Just use the local zipfile as-is.
zip_path = os.path.abspath(zip_uri)
# Now unpack the repository. The zipfile will be unpacked # into a temporary directory try:
zip_file = ZipFile(zip_path)
if len(zip_file.namelist()) == 0: raise InvalidZipRepository(f'Zip repository {zip_uri} is empty')
# The first record in the zipfile should be the directory entry for # the archive. If it isn't a directory, there's a problem.
first_filename = zip_file.namelist()[0] ifnot first_filename.endswith('/'): raise InvalidZipRepository(
f"Zip repository {zip_uri} does not include a top-level directory"
)
# Construct the final target directory
project_name = first_filename[:-1]
unzip_base = tempfile.mkdtemp()
unzip_path = os.path.join(unzip_base, project_name)
# Extract the zip file into the temporary directory try:
zip_file.extractall(path=unzip_base) except RuntimeError: # File is password protected; try to get a password from the # environment; if that doesn't work, ask the user. if password isnotNone: try:
zip_file.extractall(path=unzip_base, pwd=password.encode('utf-8')) except RuntimeError: raise InvalidZipRepository( 'Invalid password provided for protected repository'
) elif no_input: raise InvalidZipRepository( 'Unable to unlock password protected repository'
) else:
retry = 0 while retry isnotNone: try:
password = read_repo_password('Repo password')
zip_file.extractall(
path=unzip_base, pwd=password.encode('utf-8')
)
retry = None except RuntimeError:
retry += 1 if retry == 3: raise InvalidZipRepository( 'Invalid password provided for protected repository'
)
except BadZipFile: raise InvalidZipRepository(
f'Zip repository {zip_uri} is not a valid zip archive:'
)
return unzip_path
Messung V0.5
¤ Dauer der Verarbeitung: 0.19 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.