#
# Copyright (C) 2012-2023 The Python Software Foundation.
# See LICENSE.txt and CONTRIBUTORS.txt.
#
import codecs
from collections
import deque
import contextlib
import csv
from glob
import iglob
as std_iglob
import io
import json
import logging
import os
import py_compile
import re
import socket
try:
import ssl
except ImportError:
# pragma: no cover
ssl =
None
import subprocess
import sys
import tarfile
import tempfile
import textwrap
try:
import threading
except ImportError:
# pragma: no cover
import dummy_threading
as threading
import time
from .
import DistlibException
from .compat
import (string_types, text_type, shutil, raw_input, StringIO,
cache_from_source, urlopen, urljoin, httplib, xmlrpclib,
HTTPHandler, BaseConfigurator, valid_ident,
Container, configparser, URLError, ZipFile, fsdecode,
unquote, urlparse)
logger = logging.getLogger(__name__)
#
# Requirement parsing code as per PEP 508
#
IDENTIFIER = re.compile(r
'^([\w\.-]+)\s*')
VERSION_IDENTIFIER = re.compile(r
'^([\w\.*+-]+)\s*')
COMPARE_OP = re.compile(r
'^(<=?|>=?|={2,3}|[~!]=)\s*')
MARKER_OP = re.compile(r
'^((<=?)|(>=?)|={2,3}|[~!]=|in|not\s+in)\s*')
OR = re.compile(r
'^or\b\s*')
AND = re.compile(r
'^and\b\s*')
NON_SPACE = re.compile(r
'(\S+)\s*')
STRING_CHUNK = re.compile(r
'([\s\w\.{}()*+#:;,/?!~`@$%^&=|<>\[\]-]+)')
def parse_marker(marker_string):
"""
Parse a marker string
and return a dictionary containing a marker expression.
The dictionary will contain keys
"op",
"lhs" and "rhs" for non-terminals
in
the expression grammar,
or strings. A string contained
in quotes
is to be
interpreted
as a literal string,
and a string
not contained
in quotes
is a
variable (such
as os_name).
"""
def marker_var(remaining):
# either identifier, or literal string
m = IDENTIFIER.match(remaining)
if m:
result = m.groups()[0]
remaining = remaining[m.end():]
elif not remaining:
raise SyntaxError(
'unexpected end of input')
else:
q = remaining[0]
if q
not in '\'"':
raise SyntaxError(
'invalid expression: %s' % remaining)
oq =
'\'"'.replace(q, '')
remaining = remaining[1:]
parts = [q]
while remaining:
# either a string chunk, or oq, or q to terminate
if remaining[0] == q:
break
elif remaining[0] == oq:
parts.append(oq)
remaining = remaining[1:]
else:
m = STRING_CHUNK.match(remaining)
if not m:
raise SyntaxError(
'error in string literal: %s' %
remaining)
parts.append(m.groups()[0])
remaining = remaining[m.end():]
else:
s =
''.join(parts)
raise SyntaxError(
'unterminated string: %s' % s)
parts.append(q)
result =
''.join(parts)
remaining = remaining[1:].lstrip()
# skip past closing quote
return result, remaining
def marker_expr(remaining):
if remaining
and remaining[0] ==
'(':
result, remaining = marker(remaining[1:].lstrip())
if remaining[0] !=
')':
raise SyntaxError(
'unterminated parenthesis: %s' % remaining)
remaining = remaining[1:].lstrip()
else:
lhs, remaining = marker_var(remaining)
while remaining:
m = MARKER_OP.match(remaining)
if not m:
break
op = m.groups()[0]
remaining = remaining[m.end():]
rhs, remaining = marker_var(remaining)
lhs = {
'op': op,
'lhs': lhs,
'rhs': rhs}
result = lhs
return result, remaining
def marker_and(remaining):
lhs, remaining = marker_expr(remaining)
while remaining:
m =
AND.match(remaining)
if not m:
break
remaining = remaining[m.end():]
rhs, remaining = marker_expr(remaining)
lhs = {
'op':
'and',
'lhs': lhs,
'rhs': rhs}
return lhs, remaining
def marker(remaining):
lhs, remaining = marker_and(remaining)
while remaining:
m =
OR.match(remaining)
if not m:
break
remaining = remaining[m.end():]
rhs, remaining = marker_and(remaining)
lhs = {
'op':
'or',
'lhs': lhs,
'rhs': rhs}
return lhs, remaining
return marker(marker_string)
def parse_requirement(req):
"""
Parse a requirement passed
in as a string.
Return a Container
whose attributes contain the various parts of the requirement.
"""
remaining = req.strip()
if not remaining
or remaining.startswith(
'#'):
return None
m = IDENTIFIER.match(remaining)
if not m:
raise SyntaxError(
'name expected: %s' % remaining)
distname = m.groups()[0]
remaining = remaining[m.end():]
extras = mark_expr = versions = uri =
None
if remaining
and remaining[0] ==
'[':
i = remaining.find(
']', 1)
if i < 0:
raise SyntaxError(
'unterminated extra: %s' % remaining)
s = remaining[1:i]
remaining = remaining[i + 1:].lstrip()
extras = []
while s:
m = IDENTIFIER.match(s)
if not m:
raise SyntaxError(
'malformed extra: %s' % s)
extras.append(m.groups()[0])
s = s[m.end():]
if not s:
break
if s[0] !=
',':
raise SyntaxError(
'comma expected in extras: %s' % s)
s = s[1:].lstrip()
if not extras:
extras =
None
if remaining:
if remaining[0] ==
'@':
# it's a URI
remaining = remaining[1:].lstrip()
m = NON_SPACE.match(remaining)
if not m:
raise SyntaxError(
'invalid URI: %s' % remaining)
uri = m.groups()[0]
t = urlparse(uri)
# there are issues with Python and URL parsing, so this test
# is a bit crude. See bpo-20271, bpo-23505. Python doesn't
# always parse invalid URLs correctly - it should raise
# exceptions for malformed URLs
if not (t.scheme
and t.netloc):
raise SyntaxError(
'Invalid URL: %s' % uri)
remaining = remaining[m.end():].lstrip()
else:
def get_versions(ver_remaining):
"""
Return a list of operator, version tuples
if any are
specified,
else None.
"""
m = COMPARE_OP.match(ver_remaining)
versions =
None
if m:
versions = []
while True:
op = m.groups()[0]
ver_remaining = ver_remaining[m.end():]
m = VERSION_IDENTIFIER.match(ver_remaining)
if not m:
raise SyntaxError(
'invalid version: %s' %
ver_remaining)
v = m.groups()[0]
versions.append((op, v))
ver_remaining = ver_remaining[m.end():]
if not ver_remaining
or ver_remaining[0] !=
',':
break
ver_remaining = ver_remaining[1:].lstrip()
# Some packages have a trailing comma which would break things
# See issue #148
if not ver_remaining:
break
m = COMPARE_OP.match(ver_remaining)
if not m:
raise SyntaxError(
'invalid constraint: %s' %
ver_remaining)
if not versions:
versions =
None
return versions, ver_remaining
if remaining[0] !=
'(':
versions, remaining = get_versions(remaining)
else:
i = remaining.find(
')', 1)
if i < 0:
raise SyntaxError(
'unterminated parenthesis: %s' %
remaining)
s = remaining[1:i]
remaining = remaining[i + 1:].lstrip()
# As a special diversion from PEP 508, allow a version number
# a.b.c in parentheses as a synonym for ~= a.b.c (because this
# is allowed in earlier PEPs)
if COMPARE_OP.match(s):
versions, _ = get_versions(s)
else:
m = VERSION_IDENTIFIER.match(s)
if not m:
raise SyntaxError(
'invalid constraint: %s' % s)
v = m.groups()[0]
s = s[m.end():].lstrip()
if s:
raise SyntaxError(
'invalid constraint: %s' % s)
versions = [(
'~=', v)]
if remaining:
if remaining[0] !=
';':
raise SyntaxError(
'invalid requirement: %s' % remaining)
remaining = remaining[1:].lstrip()
mark_expr, remaining = parse_marker(remaining)
if remaining
and remaining[0] !=
'#':
raise SyntaxError(
'unexpected trailing data: %s' % remaining)
if not versions:
rs = distname
else:
rs =
'%s %s' % (distname,
', '.join(
[
'%s %s' % con
for con
in versions]))
return Container(name=distname,
extras=extras,
constraints=versions,
marker=mark_expr,
url=uri,
requirement=rs)
def get_resources_dests(resources_root, rules):
"""Find destinations for resources files"""
def get_rel_path(root, path):
# normalizes and returns a lstripped-/-separated path
root = root.replace(os.path.sep,
'/')
path = path.replace(os.path.sep,
'/')
assert path.startswith(root)
return path[len(root):].lstrip(
'/')
destinations = {}
for base, suffix, dest
in rules:
prefix = os.path.join(resources_root, base)
for abs_base
in iglob(prefix):
abs_glob = os.path.join(abs_base, suffix)
for abs_path
in iglob(abs_glob):
resource_file = get_rel_path(resources_root, abs_path)
if dest
is None:
# remove the entry if it was here
destinations.pop(resource_file,
None)
else:
rel_path = get_rel_path(abs_base, abs_path)
rel_dest = dest.replace(os.path.sep,
'/').rstrip(
'/')
destinations[resource_file] = rel_dest +
'/' + rel_path
return destinations
def in_venv():
if hasattr(sys,
'real_prefix'):
# virtualenv venvs
result =
True
else:
# PEP 405 venvs
result = sys.prefix != getattr(sys,
'base_prefix', sys.prefix)
return result
def get_executable():
# The __PYVENV_LAUNCHER__ dance is apparently no longer needed, as
# changes to the stub launcher mean that sys.executable always points
# to the stub on OS X
# if sys.platform == 'darwin' and ('__PYVENV_LAUNCHER__'
# in os.environ):
# result = os.environ['__PYVENV_LAUNCHER__']
# else:
# result = sys.executable
# return result
# Avoid normcasing: see issue #143
# result = os.path.normcase(sys.executable)
result = sys.executable
if not isinstance(result, text_type):
result = fsdecode(result)
return result
def proceed(prompt, allowed_chars, error_prompt=
None, default=
None):
p = prompt
while True:
s = raw_input(p)
p = prompt
if not s
and default:
s = default
if s:
c = s[0].lower()
if c
in allowed_chars:
break
if error_prompt:
p =
'%c: %s\n%s' % (c, error_prompt, prompt)
return c
def extract_by_key(d, keys):
if isinstance(keys, string_types):
keys = keys.split()
result = {}
for key
in keys:
if key
in d:
result[key] = d[key]
return result
def read_exports(stream):
if sys.version_info[0] >= 3:
# needs to be a text stream
stream = codecs.getreader(
'utf-8')(stream)
# Try to load as JSON, falling back on legacy format
data = stream.read()
stream = StringIO(data)
try:
jdata = json.load(stream)
result = jdata[
'extensions'][
'python.exports'][
'exports']
for group, entries
in result.items():
for k, v
in entries.items():
s =
'%s = %s' % (k, v)
entry = get_export_entry(s)
assert entry
is not None
entries[k] = entry
return result
except Exception:
stream.seek(0, 0)
def read_stream(cp, stream):
if hasattr(cp,
'read_file'):
cp.read_file(stream)
else:
cp.readfp(stream)
cp = configparser.ConfigParser()
try:
read_stream(cp, stream)
except configparser.MissingSectionHeaderError:
stream.close()
data = textwrap.dedent(data)
stream = StringIO(data)
read_stream(cp, stream)
result = {}
for key
in cp.sections():
result[key] = entries = {}
for name, value
in cp.items(key):
s =
'%s = %s' % (name, value)
entry = get_export_entry(s)
assert entry
is not None
# entry.dist = self
entries[name] = entry
return result
def write_exports(exports, stream):
if sys.version_info[0] >= 3:
# needs to be a text stream
stream = codecs.getwriter(
'utf-8')(stream)
cp = configparser.ConfigParser()
for k, v
in exports.items():
# TODO check k, v for valid values
cp.add_section(k)
for entry
in v.values():
if entry.suffix
is None:
s = entry.prefix
else:
s =
'%s:%s' % (entry.prefix, entry.suffix)
if entry.flags:
s =
'%s [%s]' % (s,
', '.join(entry.flags))
cp.set(k, entry.name, s)
cp.write(stream)
@contextlib.contextmanager
def tempdir():
td = tempfile.mkdtemp()
try:
yield td
finally:
shutil.rmtree(td)
@contextlib.contextmanager
def chdir(d):
cwd = os.getcwd()
try:
os.chdir(d)
yield
finally:
os.chdir(cwd)
@contextlib.contextmanager
def socket_timeout(seconds=15):
cto = socket.getdefaulttimeout()
try:
socket.setdefaulttimeout(seconds)
yield
finally:
socket.setdefaulttimeout(cto)
class cached_property(object):
def __init__(self, func):
self.func = func
# for attr in ('__name__', '__module__', '__doc__'):
# setattr(self, attr, getattr(func, attr, None))
def __get__(self, obj, cls=
None):
if obj
is None:
return self
value = self.func(obj)
object.__setattr__(obj, self.func.__name__, value)
# obj.__dict__[self.func.__name__] = value = self.func(obj)
return value
def convert_path(pathname):
"""Return 'pathname' as a name that will work on the native filesystem.
The path
is split on
'/' and put back together again using the current
directory separator. Needed because filenames
in the setup script are
always supplied
in Unix style,
and have to be converted to the local
convention before we can actually use them
in the filesystem. Raises
ValueError on non-Unix-ish systems
if 'pathname' either starts
or
ends
with a slash.
"""
if os.sep ==
'/':
return pathname
if not pathname:
return pathname
if pathname[0] ==
'/':
raise ValueError(
"path '%s' cannot be absolute" % pathname)
if pathname[-1] ==
'/':
raise ValueError(
"path '%s' cannot end with '/'" % pathname)
paths = pathname.split(
'/')
while os.curdir
in paths:
paths.remove(os.curdir)
if not paths:
return os.curdir
return os.path.join(*paths)
class FileOperator(object):
def __init__(self, dry_run=
False):
self.dry_run = dry_run
self.ensured = set()
self._init_record()
def _init_record(self):
self.record =
False
self.files_written = set()
self.dirs_created = set()
def record_as_written(self, path):
if self.record:
self.files_written.add(path)
def newer(self, source, target):
"""Tell if the target is newer than the source.
Returns
true if 'source' exists
and is more recently modified than
'target',
or if 'source' exists
and 'target' doesn
't.
Returns
false if both exist
and 'target' is the same age
or younger
than
'source'.
Raise PackagingFileError
if 'source' does
not exist.
Note that this test
is not very accurate: files created
in the same
second will have the same
"age".
"""
if not os.path.exists(source):
raise DistlibException(
"file '%r' does not exist" %
os.path.abspath(source))
if not os.path.exists(target):
return True
return os.stat(source).st_mtime > os.stat(target).st_mtime
def copy_file(self, infile, outfile, check=
True):
"""Copy a file respecting dry-run and force flags.
"""
self.ensure_dir(os.path.dirname(outfile))
logger.info(
'Copying %s to %s', infile, outfile)
if not self.dry_run:
msg =
None
if check:
if os.path.islink(outfile):
msg =
'%s is a symlink' % outfile
elif os.path.exists(outfile)
and not os.path.isfile(outfile):
msg =
'%s is a non-regular file' % outfile
if msg:
raise ValueError(msg +
' which would be overwritten')
shutil.copyfile(infile, outfile)
self.record_as_written(outfile)
def copy_stream(self, instream, outfile, encoding=
None):
assert not os.path.isdir(outfile)
self.ensure_dir(os.path.dirname(outfile))
logger.info(
'Copying stream %s to %s', instream, outfile)
if not self.dry_run:
if encoding
is None:
outstream = open(outfile,
'wb')
else:
outstream = codecs.open(outfile,
'w', encoding=encoding)
try:
shutil.copyfileobj(instream, outstream)
finally:
outstream.close()
self.record_as_written(outfile)
def write_binary_file(self, path, data):
self.ensure_dir(os.path.dirname(path))
if not self.dry_run:
if os.path.exists(path):
os.remove(path)
with open(path,
'wb')
as f:
f.write(data)
self.record_as_written(path)
def write_text_file(self, path, data, encoding):
self.write_binary_file(path, data.encode(encoding))
def set_mode(self, bits, mask, files):
if os.name ==
'posix' or (os.name ==
'java' and os._name ==
'posix'):
# Set the executable bits (owner, group, and world) on
# all the files specified.
for f
in files:
if self.dry_run:
logger.info(
"changing mode of %s", f)
else:
mode = (os.stat(f).st_mode | bits) & mask
logger.info(
"changing mode of %s to %o", f, mode)
os.chmod(f, mode)
set_executable_mode =
lambda s, f: s.set_mode(0o555, 0o7777, f)
def ensure_dir(self, path):
path = os.path.abspath(path)
if path
not in self.ensured
and not os.path.exists(path):
self.ensured.add(path)
d, f = os.path.split(path)
self.ensure_dir(d)
logger.info(
'Creating %s' % path)
if not self.dry_run:
os.mkdir(path)
if self.record:
self.dirs_created.add(path)
def byte_compile(self,
path,
optimize=
False,
force=
False,
prefix=
None,
hashed_invalidation=
False):
dpath = cache_from_source(path,
not optimize)
logger.info(
'Byte-compiling %s to %s', path, dpath)
if not self.dry_run:
if force
or self.newer(path, dpath):
if not prefix:
diagpath =
None
else:
assert path.startswith(prefix)
diagpath = path[len(prefix):]
compile_kwargs = {}
if hashed_invalidation
and hasattr(py_compile,
'PycInvalidationMode'):
compile_kwargs[
'invalidation_mode'] = py_compile.PycInvalidationMode.CHECKED_HASH
py_compile.compile(path, dpath, diagpath,
True,
**compile_kwargs)
# raise error
self.record_as_written(dpath)
return dpath
def ensure_removed(self, path):
if os.path.exists(path):
if os.path.isdir(path)
and not os.path.islink(path):
logger.debug(
'Removing directory tree at %s', path)
if not self.dry_run:
shutil.rmtree(path)
if self.record:
if path
in self.dirs_created:
self.dirs_created.remove(path)
else:
if os.path.islink(path):
s =
'link'
else:
s =
'file'
logger.debug(
'Removing %s %s', s, path)
if not self.dry_run:
os.remove(path)
if self.record:
if path
in self.files_written:
self.files_written.remove(path)
def is_writable(self, path):
result =
False
while not result:
if os.path.exists(path):
result = os.access(path, os.W_OK)
break
parent = os.path.dirname(path)
if parent == path:
break
path = parent
return result
def commit(self):
"""
Commit recorded changes, turn off recording,
return
changes.
"""
assert self.record
result = self.files_written, self.dirs_created
self._init_record()
return result
def rollback(self):
if not self.dry_run:
for f
in list(self.files_written):
if os.path.exists(f):
os.remove(f)
# dirs should all be empty now, except perhaps for
# __pycache__ subdirs
# reverse so that subdirs appear before their parents
dirs = sorted(self.dirs_created, reverse=
True)
for d
in dirs:
flist = os.listdir(d)
if flist:
assert flist == [
'__pycache__']
sd = os.path.join(d, flist[0])
os.rmdir(sd)
os.rmdir(d)
# should fail if non-empty
self._init_record()
def resolve(module_name, dotted_path):
if module_name
in sys.modules:
mod = sys.modules[module_name]
else:
mod = __import__(module_name)
if dotted_path
is None:
result = mod
else:
parts = dotted_path.split(
'.')
result = getattr(mod, parts.pop(0))
for p
in parts:
result = getattr(result, p)
return result
class ExportEntry(object):
def __init__(self, name, prefix, suffix, flags):
self.name = name
self.prefix = prefix
self.suffix = suffix
self.flags = flags
@cached_property
def value(self):
return resolve(self.prefix, self.suffix)
def __repr__(self):
# pragma: no cover
return '' % (self.name, self.prefix,
self.suffix, self.flags)
def __eq__(self, other):
if not isinstance(other, ExportEntry):
result =
False
else:
result = (self.name == other.name
and self.prefix == other.prefix
and self.suffix == other.suffix
and self.flags == other.flags)
return result
__hash__ = object.__hash__
ENTRY_RE = re.compile(
r
'''(?P([^\[]\S*))
\s*=\s*(?P<callable>(\w+)([:\.]\w+)*)
\s*(\[\s*(?P<flags>[\w-]+(=\w+)?(,\s*\w+(=\w+)?)*)\s*\])?
''', re.VERBOSE)
def get_export_entry(specification):
m = ENTRY_RE.search(specification)
if not m:
result =
None
if '[' in specification
or ']' in specification:
raise DistlibException(
"Invalid specification "
"'%s'" % specification)
else:
d = m.groupdict()
name = d[
'name']
path = d[
'callable']
colons = path.count(
':')
if colons == 0:
prefix, suffix = path,
None
else:
if colons != 1:
raise DistlibException(
"Invalid specification "
"'%s'" % specification)
prefix, suffix = path.split(
':')
flags = d[
'flags']
if flags
is None:
if '[' in specification
or ']' in specification:
raise DistlibException(
"Invalid specification "
"'%s'" % specification)
flags = []
else:
flags = [f.strip()
for f
in flags.split(
',')]
result = ExportEntry(name, prefix, suffix, flags)
return result
def get_cache_base(suffix=
None):
"""
Return the default base location
for distlib caches.
If the directory does
not exist, it
is created. Use the suffix provided
for the base directory,
and default to
'.distlib' if it isn
't provided.
On Windows,
if LOCALAPPDATA
is defined
in the environment, then it
is
assumed to be a directory,
and will be the parent directory of the result.
On POSIX,
and on Windows
if LOCALAPPDATA
is not defined, the user
's home
directory - using os.expanduser(
'~') - will be the parent directory of
the result.
The result
is just the directory
'.distlib' in the parent directory
as
determined above,
or with the name specified
with ``suffix``.
"""
if suffix
is None:
suffix =
'.distlib'
if os.name ==
'nt' and 'LOCALAPPDATA' in os.environ:
result = os.path.expandvars(
'$localappdata')
else:
# Assume posix, or old Windows
result = os.path.expanduser(
'~')
# we use 'isdir' instead of 'exists', because we want to
# fail if there's a file with that name
if os.path.isdir(result):
usable = os.access(result, os.W_OK)
if not usable:
logger.warning(
'Directory exists but is not writable: %s', result)
else:
try:
os.makedirs(result)
usable =
True
except OSError:
logger.warning(
'Unable to create %s', result, exc_info=
True)
usable =
False
if not usable:
result = tempfile.mkdtemp()
logger.warning(
'Default location unusable, using %s', result)
return os.path.join(result, suffix)
def path_to_cache_dir(path):
"""
Convert an absolute path to a directory name
for use
in a cache.
The algorithm used
is:
#. On Windows, any ``':'`` in the drive is replaced with ``'---'``.
#. Any occurrence of ``os.sep`` is replaced with ``'--'``.
#. ``'.cache'`` is appended.
"""
d, p = os.path.splitdrive(os.path.abspath(path))
if d:
d = d.replace(
':',
'---')
p = p.replace(os.sep,
'--')
return d + p +
'.cache'
def ensure_slash(s):
if not s.endswith(
'/'):
return s +
'/'
return s
def parse_credentials(netloc):
username = password =
None
if '@' in netloc:
prefix, netloc = netloc.rsplit(
'@', 1)
if ':' not in prefix:
username = prefix
else:
username, password = prefix.split(
':', 1)
if username:
username = unquote(username)
if password:
password = unquote(password)
return username, password, netloc
def get_process_umask():
result = os.umask(0o22)
os.umask(result)
return result
def is_string_sequence(seq):
result =
True
i =
None
for i, s
in enumerate(seq):
if not isinstance(s, string_types):
result =
False
break
assert i
is not None
return result
PROJECT_NAME_AND_VERSION = re.compile(
'([a-z0-9_]+([.-][a-z_][a-z0-9_]*)*)-'
'([a-z0-9_.+-]+)', re.I)
PYTHON_VERSION = re.compile(r
'-py(\d\.?\d?)')
def split_filename(filename, project_name=
None):
"""
Extract name, version, python version
from a filename (no extension)
Return name, version, pyver
or None
"""
result =
None
pyver =
None
filename = unquote(filename).replace(
' ',
'-')
m = PYTHON_VERSION.search(filename)
if m:
pyver = m.group(1)
filename = filename[:m.start()]
if project_name
and len(filename) > len(project_name) + 1:
m = re.match(re.escape(project_name) + r
'\b', filename)
if m:
n = m.end()
result = filename[:n], filename[n + 1:], pyver
if result
is None:
m = PROJECT_NAME_AND_VERSION.match(filename)
if m:
result = m.group(1), m.group(3), pyver
return result
# Allow spaces in name because of legacy dists like "Twisted Core"
NAME_VERSION_RE = re.compile(r
'(?P[\w .-]+)\s*'
r
'\(\s*(?P[^\s)]+)\)$')
def parse_name_and_version(p):
"""
A utility method used to get name
and version
from a string.
From e.g. a Provides-Dist value.
:param p: A value
in a form
'foo (1.0)'
:
return: The name
and version
as a tuple.
"""
m = NAME_VERSION_RE.match(p)
if not m:
raise DistlibException(
'Ill-formed name/version string: \'%s\
'' % p)
d = m.groupdict()
return d[
'name'].strip().lower(), d[
'ver']
def get_extras(requested, available):
result = set()
requested = set(requested
or [])
available = set(available
or [])
if '*' in requested:
requested.remove(
'*')
result |= available
for r
in requested:
if r ==
'-':
result.add(r)
elif r.startswith(
'-'):
unwanted = r[1:]
if unwanted
not in available:
logger.warning(
'undeclared extra: %s' % unwanted)
if unwanted
in result:
result.remove(unwanted)
else:
if r
not in available:
logger.warning(
'undeclared extra: %s' % r)
result.add(r)
return result
#
# Extended metadata functionality
#
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
ct = headers.get(
'Content-Type')
if not ct.startswith(
'application/json'):
logger.debug(
'Unexpected response for JSON request: %s', ct)
else:
reader = codecs.getreader(
'utf-8')(resp)
# data = reader.read().decode('utf-8')
# result = json.loads(data)
result = json.load(reader)
except Exception
as e:
logger.exception(
'Failed to get external data for %s: %s', url, e)
return result
_external_data_base_url =
'https://www.red-dove.com/pypi/projects/'
def get_project_data(name):
url =
'%s/%s/project.json' % (name[0].upper(), name)
url = urljoin(_external_data_base_url, url)
result = _get_external_data(url)
return result
def get_package_data(name, version):
url =
'%s/%s/package-%s.json' % (name[0].upper(), name, version)
url = urljoin(_external_data_base_url, url)
return _get_external_data(url)
class Cache(object):
"""
A
class implementing a cache
for resources that need to live
in the file system
e.g. shared libraries. This
class was moved
from resources to here because it
could be used by other modules, e.g. the wheel module.
"""
def __init__(self, base):
"""
Initialise an instance.
:param base: The base directory where the cache should be located.
"""
# we use 'isdir' instead of 'exists', because we want to
# fail if there's a file with that name
if not os.path.isdir(base):
# pragma: no cover
os.makedirs(base)
if (os.stat(base).st_mode & 0o77) != 0:
logger.warning(
'Directory \'%s\
' is not private', base)
self.base = os.path.abspath(os.path.normpath(base))
def prefix_to_dir(self, prefix):
"""
Converts a resource prefix to a directory name
in the cache.
"""
return path_to_cache_dir(prefix)
def clear(self):
"""
Clear the cache.
"""
not_removed = []
for fn
in os.listdir(self.base):
fn = os.path.join(self.base, fn)
try:
if os.path.islink(fn)
or os.path.isfile(fn):
os.remove(fn)
elif os.path.isdir(fn):
shutil.rmtree(fn)
except Exception:
not_removed.append(fn)
return not_removed
class EventMixin(object):
"""
A very simple publish/subscribe system.
"""
def __init__(self):
self._subscribers = {}
def add(self, event, subscriber, append=
True):
"""
Add a subscriber
for an event.
:param event: The name of an event.
:param subscriber: The subscriber to be added (
and called when the
event
is published).
:param append: Whether to append
or prepend the subscriber to an
existing subscriber list
for the event.
"""
subs = self._subscribers
if event
not in subs:
subs[event] = deque([subscriber])
else:
sq = subs[event]
if append:
sq.append(subscriber)
else:
sq.appendleft(subscriber)
def remove(self, event, subscriber):
"""
Remove a subscriber
for an event.
:param event: The name of an event.
:param subscriber: The subscriber to be removed.
"""
subs = self._subscribers
if event
not in subs:
raise ValueError(
'No subscribers: %r' % event)
subs[event].remove(subscriber)
def get_subscribers(self, event):
"""
Return an iterator
for the subscribers
for an event.
:param event: The event to
return subscribers
for.
"""
return iter(self._subscribers.get(event, ()))
def publish(self, event, *args, **kwargs):
"""
Publish a event
and return a list of values returned by its
subscribers.
:param event: The event to publish.
:param args: The positional arguments to
pass to the event
's
subscribers.
:param kwargs: The keyword arguments to
pass to the event
's
subscribers.
"""
result = []
for subscriber
in self.get_subscribers(event):
try:
value = subscriber(event, *args, **kwargs)
except Exception:
logger.exception(
'Exception during event publication')
value =
None
result.append(value)
logger.debug(
'publish %s: args = %s, kwargs = %s, result = %s', event,
args, kwargs, result)
return result
#
# Simple sequencing
#
class Sequencer(object):
def __init__(self):
self._preds = {}
self._succs = {}
self._nodes = set()
# nodes with no preds/succs
def add_node(self, node):
self._nodes.add(node)
def remove_node(self, node, edges=
False):
if node
in self._nodes:
self._nodes.remove(node)
if edges:
for p
in set(self._preds.get(node, ())):
self.remove(p, node)
for s
in set(self._succs.get(node, ())):
self.remove(node, s)
# Remove empties
for k, v
in list(self._preds.items()):
if not v:
del self._preds[k]
for k, v
in list(self._succs.items()):
if not v:
del self._succs[k]
def add(self, pred, succ):
assert pred != succ
self._preds.setdefault(succ, set()).add(pred)
self._succs.setdefault(pred, set()).add(succ)
def remove(self, pred, succ):
assert pred != succ
try:
preds = self._preds[succ]
succs = self._succs[pred]
except KeyError:
# pragma: no cover
raise ValueError(
'%r not a successor of anything' % succ)
try:
preds.remove(pred)
succs.remove(succ)
except KeyError:
# pragma: no cover
raise ValueError(
'%r not a successor of %r' % (succ, pred))
def is_step(self, step):
return (step
in self._preds
or step
in self._succs
or step
in self._nodes)
def get_steps(self, final):
if not self.is_step(final):
raise ValueError(
'Unknown: %r' % final)
result = []
todo = []
seen = set()
todo.append(final)
while todo:
step = todo.pop(0)
if step
in seen:
# if a step was already seen,
# move it to the end (so it will appear earlier
# when reversed on return) ... but not for the
# final step, as that would be confusing for
# users
if step != final:
result.remove(step)
result.append(step)
else:
seen.add(step)
result.append(step)
preds = self._preds.get(step, ())
todo.extend(preds)
return reversed(result)
@property
def strong_connections(self):
# http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
index_counter = [0]
stack = []
lowlinks = {}
index = {}
result = []
graph = self._succs
def strongconnect(node):
# set the depth index for this node to the smallest unused index
index[node] = index_counter[0]
lowlinks[node] = index_counter[0]
index_counter[0] += 1
stack.append(node)
# Consider successors
try:
successors = graph[node]
except Exception:
successors = []
for successor
in successors:
if successor
not in lowlinks:
# Successor has not yet been visited
strongconnect(successor)
lowlinks[node] = min(lowlinks[node], lowlinks[successor])
elif successor
in stack:
# the successor is in the stack and hence in the current
# strongly connected component (SCC)
lowlinks[node] = min(lowlinks[node], index[successor])
# If `node` is a root node, pop the stack and generate an SCC
if lowlinks[node] == index[node]:
connected_component = []
while True:
successor = stack.pop()
connected_component.append(successor)
if successor == node:
break
component = tuple(connected_component)
# storing the result
result.append(component)
for node
in graph:
if node
not in lowlinks:
strongconnect(node)
return result
@property
def dot(self):
result = [
'digraph G {']
for succ
in self._preds:
preds = self._preds[succ]
for pred
in preds:
result.append(
' %s -> %s;' % (pred, succ))
for node
in self._nodes:
result.append(
' %s;' % node)
result.append(
'}')
return '\n'.join(result)
#
# Unarchiving functionality for zip, tar, tgz, tbz, whl
#
ARCHIVE_EXTENSIONS = (
'.tar.gz',
'.tar.bz2',
'.tar',
'.zip',
'.tgz',
'.tbz',
'.whl')
def unarchive(archive_filename, dest_dir, format=
None, check=
True):
def check_path(path):
if not isinstance(path, text_type):
path = path.decode(
'utf-8')
p = os.path.abspath(os.path.join(dest_dir, path))
if not p.startswith(dest_dir)
or p[plen] != os.sep:
raise ValueError(
'path outside destination: %r' % p)
dest_dir = os.path.abspath(dest_dir)
plen = len(dest_dir)
archive =
None
if format
is None:
if archive_filename.endswith((
'.zip',
'.whl')):
format =
'zip'
elif archive_filename.endswith((
'.tar.gz',
'.tgz')):
format =
'tgz'
mode =
'r:gz'
elif archive_filename.endswith((
'.tar.bz2',
'.tbz')):
format =
'tbz'
mode =
'r:bz2'
elif archive_filename.endswith(
'.tar'):
format =
'tar'
mode =
'r'
else:
# pragma: no cover
raise ValueError(
'Unknown format for %r' % archive_filename)
try:
if format ==
'zip':
archive = ZipFile(archive_filename,
'r')
if check:
names = archive.namelist()
for name
in names:
check_path(name)
else:
archive = tarfile.open(archive_filename, mode)
if check:
names = archive.getnames()
for name
in names:
check_path(name)
if format !=
'zip' and sys.version_info[0] < 3:
# See Python issue 17153. If the dest path contains Unicode,
# tarfile extraction fails on Python 2.x if a member path name
# contains non-ASCII characters - it leads to an implicit
# bytes -> unicode conversion using ASCII to decode.
for tarinfo
in archive.getmembers():
if not isinstance(tarinfo.name, text_type):
tarinfo.name = tarinfo.name.decode(
'utf-8')
# Limit extraction of dangerous items, if this Python
# allows it easily. If not, just trust the input.
# See: https://docs.python.org/3/library/tarfile.html#extraction-filters
def extraction_filter(member, path):
"""Run tarfile.tar_filter, but raise the expected ValueError"""
# This is only called if the current Python has tarfile filters
try:
return tarfile.tar_filter(member, path)
except tarfile.FilterError
as exc:
raise ValueError(str(exc))
archive.extraction_filter = extraction_filter
archive.extractall(dest_dir)
finally:
if archive:
archive.close()
def zip_dir(directory):
"""zip a directory tree into a BytesIO object"""
result = io.BytesIO()
dlen = len(directory)
with ZipFile(result,
"w")
as zf:
for root, dirs, files
in os.walk(directory):
for name
in files:
full = os.path.join(root, name)
rel = root[dlen:]
dest = os.path.join(rel, name)
zf.write(full, dest)
return result
#
# Simple progress bar
#
UNITS = (
'',
'K',
'M',
'G',
'T',
'P')
class Progress(object):
unknown =
'UNKNOWN'
def __init__(self, minval=0, maxval=100):
assert maxval
is None or maxval >= minval
self.min = self.cur = minval
self.max = maxval
self.started =
None
self.elapsed = 0
self.done =
False
def update(self, curval):
assert self.min <= curval
assert self.max
is None or curval <= self.max
self.cur = curval
now = time.time()
if self.started
is None:
self.started = now
else:
self.elapsed = now - self.started
def increment(self, incr):
assert incr >= 0
self.update(self.cur + incr)
def start(self):
self.update(self.min)
return self
def stop(self):
if self.max
is not None:
self.update(self.max)
self.done =
True
@property
def maximum(self):
return self.unknown
if self.max
is None else self.max
@property
def percentage(self):
if self.done:
result =
'100 %'
elif self.max
is None:
result =
' ?? %'
else:
v = 100.0 * (self.cur - self.min) / (self.max - self.min)
result =
'%3d %%' % v
return result
def format_duration(self, duration):
if (duration <= 0)
and self.max
is None or self.cur == self.min:
result =
'??:??:??'
# elif duration < 1:
# result = '--:--:--'
else:
result = time.strftime(
'%H:%M:%S', time.gmtime(duration))
return result
@property
def ETA(self):
if self.done:
prefix =
'Done'
t = self.elapsed
# import pdb; pdb.set_trace()
else:
prefix =
'ETA '
if self.max
is None:
t = -1
elif self.elapsed == 0
or (self.cur == self.min):
t = 0
else:
# import pdb; pdb.set_trace()
t = float(self.max - self.min)
t /= self.cur - self.min
t = (t - 1) * self.elapsed
return '%s: %s' % (prefix, self.format_duration(t))
@property
def speed(self):
if self.elapsed == 0:
result = 0.0
else:
result = (self.cur - self.min) / self.elapsed
for unit
in UNITS:
if result < 1000:
break
result /= 1000.0
return '%d %sB/s' % (result, unit)
#
# Glob functionality
#
RICH_GLOB = re.compile(r
'\{([^}]*)\}')
_CHECK_RECURSIVE_GLOB = re.compile(r
'[^/\\,{]\*\*|\*\*[^/\\,}]')
_CHECK_MISMATCH_SET = re.compile(r
'^[^{]*\}|\{[^}]*$')
def iglob(path_glob):
"""Extended globbing function that supports ** and {opt1,opt2,opt3}."""
if _CHECK_RECURSIVE_GLOB.search(path_glob):
msg =
"""invalid glob %r: recursive glob "**
" must be used alone"""
raise ValueError(msg % path_glob)
if _CHECK_MISMATCH_SET.search(path_glob):
msg =
"""invalid glob %r: mismatching set marker '{' or '}'"""
raise ValueError(msg % path_glob)
return _iglob(path_glob)
def _iglob(path_glob):
rich_path_glob = RICH_GLOB.split(path_glob, 1)
if len(rich_path_glob) > 1:
assert len(rich_path_glob) == 3, rich_path_glob
prefix, set, suffix = rich_path_glob
for item
in set.split(
','):
for path
in _iglob(
''.join((prefix, item, suffix))):
yield path
else:
if '**' not in path_glob:
for item
in std_iglob(path_glob):
yield item
else:
prefix, radical = path_glob.split(
'**', 1)
if prefix ==
'':
prefix =
'.'
if radical ==
'':
radical =
'*'
else:
# we support both
radical = radical.lstrip(
'/')
radical = radical.lstrip(
'\\')
for path, dir, files
in os.walk(prefix):
path = os.path.normpath(path)
for fn
in _iglob(os.path.join(path, radical)):
yield fn
if ssl:
from .compat
import (HTTPSHandler
as BaseHTTPSHandler, match_hostname,
CertificateError)
#
# HTTPSConnection which verifies certificates/matches domains
#
class HTTPSConnection(httplib.HTTPSConnection):
ca_certs =
None # set this to the path to the certs file (.pem)
check_domain =
True # only used if ca_certs is not None
# noinspection PyPropertyAccess
def connect(self):
sock = socket.create_connection((self.host, self.port),
self.timeout)
if getattr(self,
'_tunnel_host',
False):
self.sock = sock
self._tunnel()
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
if hasattr(ssl,
'OP_NO_SSLv2'):
context.options |= ssl.OP_NO_SSLv2
if getattr(self,
'cert_file',
None):
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl,
'HAS_SNI',
False):
kwargs[
'server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs
and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug(
'Host verified: %s', self.host)
except CertificateError:
# pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
class HTTPSHandler(BaseHTTPSHandler):
def __init__(self, ca_certs, check_domain=
True):
BaseHTTPSHandler.__init__(self)
self.ca_certs = ca_certs
self.check_domain = check_domain
def _conn_maker(self, *args, **kwargs):
"""
This
is called to create a connection instance. Normally you
'd
pass a connection
class to do_open, but it doesn
't actually check for
a
class,
and just expects a callable.
As long
as we behave just
as a
constructor would have, we should be OK.
If it ever changes so that
we *must*
pass a
class, we
'll create an UnsafeHTTPSConnection class
which just sets check_domain to
False in the
class definition,
and
choose which one to
pass to do_open.
"""
result = HTTPSConnection(*args, **kwargs)
if self.ca_certs:
result.ca_certs = self.ca_certs
result.check_domain = self.check_domain
return result
def https_open(self, req):
try:
return self.do_open(self._conn_maker, req)
except URLError
as e:
if 'certificate verify failed' in str(e.reason):
raise CertificateError(
'Unable to verify server certificate '
'for %s' % req.host)
else:
raise
#
# To prevent against mixing HTTP traffic with HTTPS (examples: A Man-In-The-
# Middle proxy using HTTP listens on port 443, or an index mistakenly serves
# HTML containing a http://xyz link when it should be https://xyz),
# you can use the following handler class, which does not allow HTTP traffic.
#
# It works by inheriting from HTTPHandler - so build_opener won't add a
# handler for HTTP itself.
#
class HTTPSOnlyHandler(HTTPSHandler, HTTPHandler):
def http_open(self, req):
raise URLError(
'Unexpected HTTP request on what should be a secure '
'connection: %s' % req)
#
# XML-RPC with timeouts
#
class Transport(xmlrpclib.Transport):
def __init__(self, timeout, use_datetime=0):
self.timeout = timeout
xmlrpclib.Transport.__init__(self, use_datetime)
def make_connection(self, host):
h, eh, x509 = self.get_host_info(host)
if not self._connection
or host != self._connection[0]:
self._extra_headers = eh
self._connection = host, httplib.HTTPConnection(h)
return self._connection[1]
if ssl:
class SafeTransport(xmlrpclib.SafeTransport):
def __init__(self, timeout, use_datetime=0):
self.timeout = timeout
xmlrpclib.SafeTransport.__init__(self, use_datetime)
def make_connection(self, host):
h, eh, kwargs = self.get_host_info(host)
if not kwargs:
kwargs = {}
kwargs[
'timeout'] = self.timeout
if not self._connection
or host != self._connection[0]:
self._extra_headers = eh
self._connection = host, httplib.HTTPSConnection(
h,
None, **kwargs)
return self._connection[1]
class ServerProxy(xmlrpclib.ServerProxy):
def __init__(self, uri, **kwargs):
self.timeout = timeout = kwargs.pop(
'timeout',
None)
# The above classes only come into play if a timeout
# is specified
if timeout
is not None:
# scheme = splittype(uri) # deprecated as of Python 3.8
scheme = urlparse(uri)[0]
use_datetime = kwargs.get(
'use_datetime', 0)
if scheme ==
'https':
tcls = SafeTransport
else:
tcls = Transport
kwargs[
'transport'] = t = tcls(timeout, use_datetime=use_datetime)
self.transport = t
xmlrpclib.ServerProxy.__init__(self, uri, **kwargs)
#
# CSV functionality. This is provided because on 2.x, the csv module can't
# handle Unicode. However, we need to deal with Unicode in e.g. RECORD files.
#
def _csv_open(fn, mode, **kwargs):
if sys.version_info[0] < 3:
mode +=
'b'
else:
kwargs[
'newline'] =
''
# Python 3 determines encoding from locale. Force 'utf-8'
# file encoding to match other forced utf-8 encoding
kwargs[
'encoding'] =
'utf-8'
return open(fn, mode, **kwargs)
class CSVBase(object):
defaults = {
'delimiter': str(
','),
# The strs are used because we need native
'quotechar': str(
'"'),
# str in the csv API (2.x won't take
'lineterminator': str(
'\n')
# Unicode)
}
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.stream.close()
class CSVReader(CSVBase):
def __init__(self, **kwargs):
if 'stream' in kwargs:
stream = kwargs[
'stream']
if sys.version_info[0] >= 3:
# needs to be a text stream
stream = codecs.getreader(
'utf-8')(stream)
self.stream = stream
else:
self.stream = _csv_open(kwargs[
'path'],
'r')
self.reader = csv.reader(self.stream, **self.defaults)
def __iter__(self):
return self
def next(self):
result = next(self.reader)
if sys.version_info[0] < 3:
for i, item
in enumerate(result):
if not isinstance(item, text_type):
result[i] = item.decode(
'utf-8')
return result
__next__ = next
class CSVWriter(CSVBase):
def __init__(self, fn, **kwargs):
self.stream = _csv_open(fn,
'w')
self.writer = csv.writer(self.stream, **self.defaults)
def writerow(self, row):
if sys.version_info[0] < 3:
r = []
for item
in row:
if isinstance(item, text_type):
item = item.encode(
'utf-8')
r.append(item)
row = r
self.writer.writerow(row)
#
# Configurator functionality
#
class Configurator(BaseConfigurator):
value_converters = dict(BaseConfigurator.value_converters)
value_converters[
'inc'] =
'inc_convert'
def __init__(self, config, base=
None):
super(Configurator, self).__init__(config)
self.base = base
or os.getcwd()
def configure_custom(self, config):
def convert(o):
if isinstance(o, (list, tuple)):
result = type(o)([convert(i)
for i
in o])
elif isinstance(o, dict):
if '()' in o:
result = self.configure_custom(o)
else:
result = {}
for k
in o:
result[k] = convert(o[k])
else:
result = self.convert(o)
return result
c = config.pop(
'()')
if not callable(c):
c = self.resolve(c)
props = config.pop(
'.',
None)
# Check for valid identifiers
args = config.pop(
'[]', ())
if args:
args = tuple([convert(o)
for o
in args])
items = [(k, convert(config[k]))
for k
in config
if valid_ident(k)]
kwargs = dict(items)
result = c(*args, **kwargs)
if props:
for n, v
in props.items():
setattr(result, n, convert(v))
return result
def __getitem__(self, key):
result = self.config[key]
if isinstance(result, dict)
and '()' in result:
self.config[key] = result = self.configure_custom(result)
return result
def inc_convert(self, value):
"""Default converter for the inc:// protocol."""
if not os.path.isabs(value):
value = os.path.join(self.base, value)
with codecs.open(value,
'r', encoding=
'utf-8')
as f:
result = json.load(f)
return result
class SubprocessMixin(object):
"""
Mixin
for running subprocesses
and capturing their output
"""
def __init__(self, verbose=
False, progress=
None):
self.verbose = verbose
self.progress = progress
def reader(self, stream, context):
"""
Read lines
from a subprocess
' output stream and either pass to a progress
callable (
if specified)
or write progress information to sys.stderr.
"""
progress = self.progress
verbose = self.verbose
while True:
s = stream.readline()
if not s:
break
if progress
is not None:
progress(s, context)
else:
if not verbose:
sys.stderr.write(
'.')
else:
sys.stderr.write(s.decode(
'utf-8'))
sys.stderr.flush()
stream.close()
def run_command(self, cmd, **kwargs):
p = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs)
t1 = threading.Thread(target=self.reader, args=(p.stdout,
'stdout'))
t1.start()
t2 = threading.Thread(target=self.reader, args=(p.stderr,
'stderr'))
t2.start()
p.wait()
t1.join()
t2.join()
if self.progress
is not None:
self.progress(
'done.',
'main')
elif self.verbose:
sys.stderr.write(
'done.\n')
return p
def normalize_name(name):
"""Normalize a python package name a la PEP 503"""
# https://www.python.org/dev/peps/pep-0503/#normalized-names
return re.sub(
'[-_.]+',
'-', name).lower()
# def _get_pypirc_command():
# """
# Get the distutils command for interacting with PyPI configurations.
# :return: the command.
# """
# from distutils.core import Distribution
# from distutils.config import PyPIRCCommand
# d = Distribution()
# return PyPIRCCommand(d)
class PyPIRCFile(object):
DEFAULT_REPOSITORY =
'https://upload.pypi.org/legacy/'
DEFAULT_REALM =
'pypi'
def __init__(self, fn=
None, url=
None):
if fn
is None:
fn = os.path.join(os.path.expanduser(
'~'),
'.pypirc')
self.filename = fn
self.url = url
def read(self):
result = {}
if os.path.exists(self.filename):
repository = self.url
or self.DEFAULT_REPOSITORY
config = configparser.RawConfigParser()
config.read(self.filename)
sections = config.sections()
if 'distutils' in sections:
# let's get the list of servers
index_servers = config.get(
'distutils',
'index-servers')
_servers = [
server.strip()
for server
in index_servers.split(
'\n')
if server.strip() !=
''
]
if _servers == []:
# nothing set, let's try to get the default pypi
if 'pypi' in sections:
_servers = [
'pypi']
else:
for server
in _servers:
result = {
'server': server}
result[
'username'] = config.get(server,
'username')
# optional params
for key, default
in ((
'repository',
self.DEFAULT_REPOSITORY),
(
'realm', self.DEFAULT_REALM),
(
'password',
None)):
if config.has_option(server, key):
result[key] = config.get(server, key)
else:
result[key] = default
# work around people having "repository" for the "pypi"
# section of their config set to the HTTP (rather than
# HTTPS) URL
if (server ==
'pypi' and repository
in (self.DEFAULT_REPOSITORY,
'pypi')):
result[
'repository'] = self.DEFAULT_REPOSITORY
elif (result[
'server'] != repository
and result[
'repository'] != repository):
result = {}
elif 'server-login' in sections:
# old format
server =
'server-login'
if config.has_option(server,
'repository'):
repository = config.get(server,
'repository')
else:
repository = self.DEFAULT_REPOSITORY
result = {
'username': config.get(server,
'username'),
'password': config.get(server,
'password'),
'repository': repository,
'server': server,
'realm': self.DEFAULT_REALM
}
return result
def update(self, username, password):
# import pdb; pdb.set_trace()
config = configparser.RawConfigParser()
fn = self.filename
config.read(fn)
if not config.has_section(
'pypi'):
config.add_section(
'pypi')
config.set(
'pypi',
'username', username)
config.set(
'pypi',
'password', password)
with open(fn,
'w')
as f:
config.write(f)
def _load_pypirc(index):
"""
Read the PyPI access configuration
as supported by distutils.
"""
return PyPIRCFile(url=index.url).read()
def _store_pypirc(index):
PyPIRCFile().update(index.username, index.password)
#
# get_platform()/get_host_platform() copied from Python 3.10.a0 source, with some minor
# tweaks
#
def get_host_platform():
"""Return a string that identifies the current platform. This is used mainly to
distinguish platform-specific build directories
and platform-specific built
distributions. Typically includes the OS name
and version
and the
architecture (
as supplied by
'os.uname()'), although the exact information
included depends on the OS; eg. on Linux, the kernel version isn
't
particularly important.
Examples of returned values:
linux-i586
linux-alpha (?)
solaris-2.6-sun4u
Windows will
return one of:
win-amd64 (64bit Windows on AMD64 (aka x86_64, Intel64, EM64T, etc)
win32 (all others - specifically, sys.platform
is returned)
For other non-POSIX platforms, currently just returns
'sys.platform'.
"""
if os.name ==
'nt':
if 'amd64' in sys.version.lower():
return 'win-amd64'
if '(arm)' in sys.version.lower():
return 'win-arm32'
if '(arm64)' in sys.version.lower():
return 'win-arm64'
return sys.platform
# Set for cross builds explicitly
if "_PYTHON_HOST_PLATFORM" in os.environ:
return os.environ[
"_PYTHON_HOST_PLATFORM"]
if os.name !=
'posix' or not hasattr(os,
'uname'):
# XXX what about the architecture? NT is Intel or Alpha,
# Mac OS is M68k or PPC, etc.
return sys.platform
# Try to distinguish various flavours of Unix
(osname, host, release, version, machine) = os.uname()
--> --------------------
--> maximum size reached
--> --------------------