import math
import re
import sys
import warnings
from collections.abc
import Mapping, Sequence
from contextlib
import suppress
from functools
import _CacheInfo, lru_cache
from ipaddress
import ip_address
from typing
import (
TYPE_CHECKING,
Any,
Iterable,
List,
SupportsInt,
Tuple,
TypedDict,
TypeVar,
Union,
overload,
)
from urllib.parse
import (
SplitResult,
parse_qsl,
quote,
urlsplit,
uses_netloc,
uses_relative,
)
import idna
from multidict
import MultiDict, MultiDictProxy, istr
from propcache.api
import under_cached_property
as cached_property
from ._quoting
import _Quoter, _Unquoter
DEFAULT_PORTS = {
"http": 80,
"https": 443,
"ws": 80,
"wss": 443,
"ftp": 21}
USES_AUTHORITY = frozenset(uses_netloc)
USES_RELATIVE = frozenset(uses_relative)
# Special schemes https://url.spec.whatwg.org/#special-scheme
# are not allowed to have an empty host https://url.spec.whatwg.org/#url-representation
SCHEME_REQUIRES_HOST = frozenset((
"http",
"https",
"ws",
"wss",
"ftp"))
sentinel = object()
# reg-name: unreserved / pct-encoded / sub-delims
# this pattern matches anything that is *not* in those classes. and is only used
# on lower-cased ASCII values.
_not_reg_name = re.compile(
r
"""
# any character not in the unreserved or sub-delims sets, plus %
# (validated with the additional check for pct-encoded sequences below)
[^a-z0-9\-._~!$&
'()*+,;=%]
|
# % only allowed if it is part of a pct-encoded
# sequence of 2 hex digits.
%(?![0-9a-f]{2})
""",
re.VERBOSE,
)
SimpleQuery = Union[str, int, float]
QueryVariable = Union[SimpleQuery,
"Sequence[SimpleQuery]"]
Query = Union[
None, str,
"Mapping[str, QueryVariable]",
"Sequence[Tuple[str, QueryVariable]]"
]
_T = TypeVar(
"_T")
if sys.version_info >= (3, 11):
from typing
import Self
else:
Self = Any
class CacheInfo(TypedDict):
"""Host encoding cache."""
idna_encode: _CacheInfo
idna_decode: _CacheInfo
ip_address: _CacheInfo
host_validate: _CacheInfo
class _SplitResultDict(TypedDict, total=
False):
scheme: str
netloc: str
path: str
query: str
fragment: str
class _InternalURLCache(TypedDict, total=
False):
_origin:
"URL"
absolute: bool
scheme: str
raw_authority: str
_default_port: Union[int,
None]
authority: str
raw_user: Union[str,
None]
user: Union[str,
None]
raw_password: Union[str,
None]
password: Union[str,
None]
raw_host: Union[str,
None]
host: Union[str,
None]
host_subcomponent: Union[str,
None]
port: Union[int,
None]
explicit_port: Union[int,
None]
raw_path: str
path: str
_parsed_query: List[Tuple[str, str]]
query:
"MultiDictProxy[str]"
raw_query_string: str
query_string: str
path_qs: str
raw_path_qs: str
raw_fragment: str
fragment: str
raw_parts: Tuple[str, ...]
parts: Tuple[str, ...]
parent:
"URL"
raw_name: str
name: str
raw_suffix: str
suffix: str
raw_suffixes: Tuple[str, ...]
suffixes: Tuple[str, ...]
def rewrite_module(obj: _T) -> _T:
obj.__module__ =
"yarl"
return obj
def _normalize_path_segments(segments:
"Sequence[str]") -> List[str]:
"""Drop '.' and '..' from a sequence of str segments"""
resolved_path: List[str] = []
for seg
in segments:
if seg ==
"..":
# ignore any .. segments that would otherwise cause an
# IndexError when popped from resolved_path if
# resolving for rfc3986
with suppress(IndexError):
resolved_path.pop()
elif seg !=
".":
resolved_path.append(seg)
if segments
and segments[-1]
in (
".",
".."):
# do some post-processing here.
# if the last segment was a relative dir,
# then we need to append the trailing '/'
resolved_path.append(
"")
return resolved_path
@rewrite_module
class URL:
# Don't derive from str
# follow pathlib.Path design
# probably URL will not suffer from pathlib problems:
# it's intended for libraries like aiohttp,
# not to be passed into standard library functions like os.open etc.
# URL grammar (RFC 3986)
# pct-encoded = "%" HEXDIG HEXDIG
# reserved = gen-delims / sub-delims
# gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@"
# sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
# / "*" / "+" / "," / ";" / "="
# unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
# URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ]
# hier-part = "//" authority path-abempty
# / path-absolute
# / path-rootless
# / path-empty
# scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." )
# authority = [ userinfo "@" ] host [ ":" port ]
# userinfo = *( unreserved / pct-encoded / sub-delims / ":" )
# host = IP-literal / IPv4address / reg-name
# IP-literal = "[" ( IPv6address / IPvFuture ) "]"
# IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" )
# IPv6address = 6( h16 ":" ) ls32
# / "::" 5( h16 ":" ) ls32
# / [ h16 ] "::" 4( h16 ":" ) ls32
# / [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32
# / [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32
# / [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32
# / [ *4( h16 ":" ) h16 ] "::" ls32
# / [ *5( h16 ":" ) h16 ] "::" h16
# / [ *6( h16 ":" ) h16 ] "::"
# ls32 = ( h16 ":" h16 ) / IPv4address
# ; least-significant 32 bits of address
# h16 = 1*4HEXDIG
# ; 16 bits of address represented in hexadecimal
# IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet
# dec-octet = DIGIT ; 0-9
# / %x31-39 DIGIT ; 10-99
# / "1" 2DIGIT ; 100-199
# / "2" %x30-34 DIGIT ; 200-249
# / "25" %x30-35 ; 250-255
# reg-name = *( unreserved / pct-encoded / sub-delims )
# port = *DIGIT
# path = path-abempty ; begins with "/" or is empty
# / path-absolute ; begins with "/" but not "//"
# / path-noscheme ; begins with a non-colon segment
# / path-rootless ; begins with a segment
# / path-empty ; zero characters
# path-abempty = *( "/" segment )
# path-absolute = "/" [ segment-nz *( "/" segment ) ]
# path-noscheme = segment-nz-nc *( "/" segment )
# path-rootless = segment-nz *( "/" segment )
# path-empty = 0<pchar>
# segment = *pchar
# segment-nz = 1*pchar
# segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" )
# ; non-zero-length segment without any colon ":"
# pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
# query = *( pchar / "/" / "?" )
# fragment = *( pchar / "/" / "?" )
# URI-reference = URI / relative-ref
# relative-ref = relative-part [ "?" query ] [ "#" fragment ]
# relative-part = "//" authority path-abempty
# / path-absolute
# / path-noscheme
# / path-empty
# absolute-URI = scheme ":" hier-part [ "?" query ]
__slots__ = (
"_cache",
"_val")
_QUOTER = _Quoter(requote=
False)
_REQUOTER = _Quoter()
_PATH_QUOTER = _Quoter(safe=
"@:", protected=
"/+", requote=
False)
_PATH_REQUOTER = _Quoter(safe=
"@:", protected=
"/+")
_QUERY_QUOTER = _Quoter(safe=
"?/:@", protected=
"=+&;", qs=
True, requote=
False)
_QUERY_REQUOTER = _Quoter(safe=
"?/:@", protected=
"=+&;", qs=
True)
_QUERY_PART_QUOTER = _Quoter(safe=
"?/:@", qs=
True, requote=
False)
_FRAGMENT_QUOTER = _Quoter(safe=
"?/:@", requote=
False)
_FRAGMENT_REQUOTER = _Quoter(safe=
"?/:@")
_UNQUOTER = _Unquoter()
_PATH_UNQUOTER = _Unquoter(unsafe=
"+")
_PATH_SAFE_UNQUOTER = _Unquoter(ignore=
"/%", unsafe=
"+")
_QS_UNQUOTER = _Unquoter(qs=
True)
_val: SplitResult
def __new__(
cls,
val: Union[str, SplitResult,
"URL"] =
"",
*,
encoded: bool =
False,
strict: Union[bool,
None] =
None,
) -> Self:
if strict
is not None:
# pragma: no cover
warnings.warn(
"strict parameter is ignored")
if type(val)
is cls:
return val
if type(val)
is str:
val = urlsplit(val)
elif type(val)
is SplitResult:
if not encoded:
raise ValueError(
"Cannot apply decoding to SplitResult")
elif isinstance(val, str):
val = urlsplit(str(val))
else:
raise TypeError(
"Constructor parameter should be str")
cache: _InternalURLCache = {}
if not encoded:
host: Union[str,
None]
scheme, netloc, path, query, fragment = val
if not netloc:
# netloc
host =
""
else:
username, password, host, port = cls._split_netloc(val[1])
if host
is None:
if scheme
in SCHEME_REQUIRES_HOST:
msg = (
"Invalid URL: host is required for "
f
"absolute urls with the {scheme} scheme"
)
raise ValueError(msg)
else:
host =
""
host = cls._encode_host(host, validate_host=
False)
raw_user =
None if username
is None else cls._REQUOTER(username)
raw_password =
None if password
is None else cls._REQUOTER(password)
netloc = cls._make_netloc(raw_user, raw_password, host, port)
# Remove brackets as host encoder adds back brackets for IPv6 addresses
cache[
"raw_host"] = host[1:-1]
if "[" in host
else host
cache[
"raw_user"] = raw_user
cache[
"raw_password"] = raw_password
cache[
"explicit_port"] = port
if path:
path = cls._PATH_REQUOTER(path)
if netloc:
if "." in path:
path = cls._normalize_path(path)
cls._validate_authority_uri_abs_path(host, path)
query = cls._QUERY_REQUOTER(query)
if query
else query
fragment = cls._FRAGMENT_REQUOTER(fragment)
if fragment
else fragment
cache[
"scheme"] = scheme
cache[
"raw_query_string"] = query
cache[
"raw_fragment"] = fragment
# There is a good chance that the SplitResult is already normalized
# so we can avoid the extra work of creating a new SplitResult
# if the input SplitResult is already normalized
if (
val.netloc != netloc
or val.path != path
or val.query != query
or val.fragment != fragment
):
val = SplitResult(scheme, netloc, path, query, fragment)
self = object.__new__(cls)
self._val = val
self._cache = cache
return self
@classmethod
def build(
cls,
*,
scheme: str =
"",
authority: str =
"",
user: Union[str,
None] =
None,
password: Union[str,
None] =
None,
host: str =
"",
port: Union[int,
None] =
None,
path: str =
"",
query: Union[Query,
None] =
None,
query_string: str =
"",
fragment: str =
"",
encoded: bool =
False,
) ->
"URL":
"""Creates and returns a new URL"""
if authority
and (user
or password
or host
or port):
raise ValueError(
'Can\'t mix
"authority" with "user",
"password",
"host" or "port".
'
)
if port
is not None and not isinstance(port, int):
raise TypeError(
"The port is required to be int.")
if port
and not host:
raise ValueError(
'Can\'t build URL
with "port" but without
"host".
')
if query
and query_string:
raise ValueError(
'Only one of "query" or "query_string" should be passed')
if (
scheme
is None
or authority
is None
or host
is None
or path
is None
or query_string
is None
or fragment
is None
):
raise TypeError(
'NoneType is illegal for "scheme", "authority", "host", "path", '
'"query_string", and "fragment" args, use empty string instead.'
)
if encoded:
if authority:
netloc = authority
elif host:
if port
is not None:
port =
None if port == DEFAULT_PORTS.get(scheme)
else port
netloc = cls._make_netloc(user, password, host, port)
else:
netloc =
""
else:
# not encoded
_host: Union[str,
None] =
None
if authority:
user, password, _host, port = cls._split_netloc(authority)
_host = cls._encode_host(_host, validate_host=
False)
if _host
else ""
elif host:
_host = cls._encode_host(host)
else:
netloc =
""
if _host
is not None:
if port
is not None:
port =
None if port == DEFAULT_PORTS.get(scheme)
else port
netloc = cls._make_netloc(user, password, _host, port,
True)
path = cls._PATH_QUOTER(path)
if path
else path
if path
and netloc:
if "." in path:
path = cls._normalize_path(path)
cls._validate_authority_uri_abs_path(host, path)
query_string = (
cls._QUERY_QUOTER(query_string)
if query_string
else query_string
)
fragment = cls._FRAGMENT_QUOTER(fragment)
if fragment
else fragment
url = cls._from_val(SplitResult(scheme, netloc, path, query_string, fragment))
if query:
return url.with_query(query)
return url
@classmethod
def _from_val(cls, val: SplitResult) ->
"URL":
"""Create a new URL from a SplitResult."""
self = object.__new__(cls)
self._val = val
self._cache = {}
return self
def __init_subclass__(cls):
raise TypeError(f
"Inheriting a class {cls!r} from URL is forbidden")
def __str__(self) -> str:
val = self._val
scheme, netloc, path, query, fragment = val
if not val.path
and val.netloc
and (val.query
or val.fragment):
path =
"/"
if (port := self.explicit_port)
is not None and port == self._default_port:
# port normalization - using None for default ports to remove from rendering
# https://datatracker.ietf.org/doc/html/rfc3986.html#section-6.2.3
host = self.host_subcomponent
netloc = self._make_netloc(self.raw_user, self.raw_password, host,
None)
return self._unsplit_result(scheme, netloc, path, query, fragment)
@staticmethod
def _unsplit_result(
scheme: str, netloc: str, url: str, query: str, fragment: str
) -> str:
"""Unsplit a URL without any normalization."""
if netloc
or (scheme
and scheme
in USES_AUTHORITY)
or url[:2] ==
"//":
if url
and url[:1] !=
"/":
url = f
"//{netloc or ''}/{url}"
else:
url = f
"//{netloc or ''}{url}"
if scheme:
url = f
"{scheme}:{url}"
if query:
url = f
"{url}?{query}"
return f
"{url}#{fragment}" if fragment else url
def __repr__(self) -> str:
return f
"{self.__class__.__name__}('{str(self)}')"
def __bytes__(self) -> bytes:
return str(self).encode(
"ascii")
def __eq__(self, other: object) -> bool:
if type(other)
is not URL:
return NotImplemented
val1 = self._val
if not val1.path
and val1.netloc:
val1 = val1._replace(path=
"/")
val2 = other._val
if not val2.path
and val2.netloc:
val2 = val2._replace(path=
"/")
return val1 == val2
def __hash__(self) -> int:
ret = self._cache.get(
"hash")
if ret
is None:
val = self._val
if not val.path
and val.netloc:
val = val._replace(path=
"/")
ret = self._cache[
"hash"] = hash(val)
return ret
def __le__(self, other: object) -> bool:
if type(other)
is not URL:
return NotImplemented
return self._val <= other._val
def __lt__(self, other: object) -> bool:
if type(other)
is not URL:
return NotImplemented
return self._val < other._val
def __ge__(self, other: object) -> bool:
if type(other)
is not URL:
return NotImplemented
return self._val >= other._val
def __gt__(self, other: object) -> bool:
if type(other)
is not URL:
return NotImplemented
return self._val > other._val
def __truediv__(self, name: str) ->
"URL":
if not isinstance(name, str):
return NotImplemented
return self._make_child((str(name),))
def __mod__(self, query: Query) ->
"URL":
return self.update_query(query)
def __bool__(self) -> bool:
val = self._val
return bool(val.netloc
or val.path
or val.query
or val.fragment)
def __getstate__(self) -> Tuple[SplitResult]:
return (self._val,)
def __setstate__(self, state):
if state[0]
is None and isinstance(state[1], dict):
# default style pickle
self._val = state[1][
"_val"]
else:
self._val, *unused = state
self._cache = {}
def _cache_netloc(self) ->
None:
"""Cache the netloc parts of the URL."""
cache = self._cache
(
cache[
"raw_user"],
cache[
"raw_password"],
cache[
"raw_host"],
cache[
"explicit_port"],
) = self._split_netloc(self._val.netloc)
def is_absolute(self) -> bool:
"""A check for absolute URLs.
Return True for absolute ones (having scheme
or starting
with //),
False otherwise.
Is is preferred to call the .absolute property instead
as it
is cached.
"""
return self.absolute
def is_default_port(self) -> bool:
"""A check for default port.
Return True if port
is default
for specified scheme,
e.g.
'http://python.org' or 'http://python.org:80',
False
otherwise.
Return False for relative URLs.
"""
if (explicit := self.explicit_port)
is None:
# If the explicit port is None, then the URL must be
# using the default port unless its a relative URL
# which does not have an implicit port / default port
return self._val.netloc !=
""
return explicit == self._default_port
def origin(self) ->
"URL":
"""Return an URL with scheme, host and port parts only.
user, password, path, query
and fragment are removed.
"""
# TODO: add a keyword-only option for keeping user/pass maybe?
return self._origin
@cached_property
def _origin(self) ->
"URL":
"""Return an URL with scheme, host and port parts only.
user, password, path, query
and fragment are removed.
"""
v = self._val
if not v.netloc:
raise ValueError(
"URL should be absolute")
if not v.scheme:
raise ValueError(
"URL should have scheme")
if "@" not in v.netloc:
val = v._replace(path=
"", query=
"", fragment=
"")
else:
encoded_host = self._encode_host(v.hostname)
if v.hostname
else ""
netloc = self._make_netloc(
None,
None, encoded_host, v.port)
val = v._replace(netloc=netloc, path=
"", query=
"", fragment=
"")
return self._from_val(val)
def relative(self) ->
"URL":
"""Return a relative part of the URL.
scheme, user, password, host
and port are removed.
"""
if not self._val.netloc:
raise ValueError(
"URL should be absolute")
val = self._val._replace(scheme=
"", netloc=
"")
return self._from_val(val)
@cached_property
def absolute(self) -> bool:
"""A check for absolute URLs.
Return True for absolute ones (having scheme
or starting
with //),
False otherwise.
"""
# `netloc`` is an empty string for relative URLs
# Checking `netloc` is faster than checking `hostname`
# because `hostname` is a property that does some extra work
# to parse the host from the `netloc`
return self._val.netloc !=
""
@cached_property
def scheme(self) -> str:
"""Scheme for absolute URLs.
Empty string
for relative URLs
or URLs starting
with //
"""
return self._val.scheme
@cached_property
def raw_authority(self) -> str:
"""Encoded authority part of URL.
Empty string
for relative URLs.
"""
return self._val.netloc
@cached_property
def _default_port(self) -> Union[int,
None]:
"""Default port for the scheme or None if not known."""
return DEFAULT_PORTS.get(self._val.scheme)
@cached_property
def authority(self) -> str:
"""Decoded authority part of URL.
Empty string
for relative URLs.
"""
return self._make_netloc(self.user, self.password, self.host, self.port)
@cached_property
def raw_user(self) -> Union[str,
None]:
"""Encoded user part of URL.
None if user
is missing.
"""
# not .username
self._cache_netloc()
return self._cache[
"raw_user"]
@cached_property
def user(self) -> Union[str,
None]:
"""Decoded user part of URL.
None if user
is missing.
"""
raw_user = self.raw_user
if raw_user
is None:
return None
return self._UNQUOTER(raw_user)
@cached_property
def raw_password(self) -> Union[str,
None]:
"""Encoded password part of URL.
None if password
is missing.
"""
self._cache_netloc()
return self._cache[
"raw_password"]
@cached_property
def password(self) -> Union[str,
None]:
"""Decoded password part of URL.
None if password
is missing.
"""
raw_password = self.raw_password
if raw_password
is None:
return None
return self._UNQUOTER(raw_password)
@cached_property
def raw_host(self) -> Union[str,
None]:
"""Encoded host part of URL.
None for relative URLs.
When working
with IPv6 addresses, use the `host_subcomponent` property instead
as it will
return the host subcomponent
with brackets.
"""
# Use host instead of hostname for sake of shortness
# May add .hostname prop later
self._cache_netloc()
return self._cache[
"raw_host"]
@cached_property
def host(self) -> Union[str,
None]:
"""Decoded host part of URL.
None for relative URLs.
"""
if (raw := self.raw_host)
is None:
return None
if raw
and raw[-1].isdigit()
or ":" in raw:
# IP addresses are never IDNA encoded
return raw
return _idna_decode(raw)
@cached_property
def host_subcomponent(self) -> Union[str,
None]:
"""Return the host subcomponent part of URL.
None for relative URLs.
https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
`IP-literal =
"[" ( IPv6address / IPvFuture )
"]"`
Examples:
- `
http://example.com:8080` -> `example.com`
- `
http://example.com:80` -> `example.com`
- `
https://127.0.0.1:8443` -> `127.0.0.1`
- `
https://[::1]:8443` -> `[::1]`
- `
http://[::1]` -> `[::1]`
"""
if (raw := self.raw_host)
is None:
return None
return f
"[{raw}]" if ":" in raw
else raw
@cached_property
def port(self) -> Union[int,
None]:
"""Port part of URL, with scheme-based fallback.
None for relative URLs
or URLs without explicit port
and
scheme without default port substitution.
"""
return self.explicit_port
or self._default_port
@cached_property
def explicit_port(self) -> Union[int,
None]:
"""Port part of URL, without scheme-based fallback.
None for relative URLs
or URLs without explicit port.
"""
self._cache_netloc()
return self._cache[
"explicit_port"]
@cached_property
def raw_path(self) -> str:
"""Encoded path of URL.
/
for absolute URLs without path part.
"""
ret = self._val.path
if not ret
and self._val.netloc:
ret =
"/"
return ret
@cached_property
def path(self) -> str:
"""Decoded path of URL.
/
for absolute URLs without path part.
"""
return self._PATH_UNQUOTER(self.raw_path)
@cached_property
def path_safe(self) -> str:
"""Decoded path of URL.
/
for absolute URLs without path part.
/ (%2F)
and % (%25) are
not decoded
"""
return self._PATH_SAFE_UNQUOTER(self.raw_path)
@cached_property
def _parsed_query(self) -> List[Tuple[str, str]]:
"""Parse query part of URL."""
return parse_qsl(self._val.query, keep_blank_values=
True)
@cached_property
def query(self) ->
"MultiDictProxy[str]":
"""A MultiDictProxy representing parsed query parameters in decoded
representation.
Empty value
if URL has no query part.
"""
return MultiDictProxy(MultiDict(self._parsed_query))
@cached_property
def raw_query_string(self) -> str:
"""Encoded query part of URL.
Empty string
if query
is missing.
"""
return self._val.query
@cached_property
def query_string(self) -> str:
"""Decoded query part of URL.
Empty string
if query
is missing.
"""
return self._QS_UNQUOTER(self._val.query)
@cached_property
def path_qs(self) -> str:
"""Decoded path of URL with query."""
if not self.query_string:
return self.path
return f
"{self.path}?{self.query_string}"
@cached_property
def raw_path_qs(self) -> str:
"""Encoded path of URL with query."""
if not self._val.query:
return self.raw_path
return f
"{self.raw_path}?{self._val.query}"
@cached_property
def raw_fragment(self) -> str:
"""Encoded fragment part of URL.
Empty string
if fragment
is missing.
"""
return self._val.fragment
@cached_property
def fragment(self) -> str:
"""Decoded fragment part of URL.
Empty string
if fragment
is missing.
"""
return self._UNQUOTER(self._val.fragment)
@cached_property
def raw_parts(self) -> Tuple[str, ...]:
"""A tuple containing encoded *path* parts.
(
'/',)
for absolute URLs
if *path*
is missing.
"""
path = self._val.path
if self._val.netloc:
return (
"/", *path[1:].split(
"/"))
if path
else (
"/",)
if path
and path[0] ==
"/":
return (
"/", *path[1:].split(
"/"))
return tuple(path.split(
"/"))
@cached_property
def parts(self) -> Tuple[str, ...]:
"""A tuple containing decoded *path* parts.
(
'/',)
for absolute URLs
if *path*
is missing.
"""
return tuple(self._UNQUOTER(part)
for part
in self.raw_parts)
@cached_property
def parent(self) ->
"URL":
"""A new URL with last part of path removed and cleaned up query and
fragment.
"""
path = self.raw_path
if not path
or path ==
"/":
if self._val.fragment
or self._val.query:
return self._from_val(self._val._replace(query=
"", fragment=
""))
return self
parts = path.split(
"/")
val = self._val._replace(path=
"/".join(parts[:-1]), query=
"", fragment=
"")
return self._from_val(val)
@cached_property
def raw_name(self) -> str:
"""The last part of raw_parts."""
parts = self.raw_parts
if self._val.netloc:
parts = parts[1:]
if not parts:
return ""
else:
return parts[-1]
else:
return parts[-1]
@cached_property
def name(self) -> str:
"""The last part of parts."""
return self._UNQUOTER(self.raw_name)
@cached_property
def raw_suffix(self) -> str:
name = self.raw_name
i = name.rfind(
".")
if 0 < i < len(name) - 1:
return name[i:]
else:
return ""
@cached_property
def suffix(self) -> str:
return self._UNQUOTER(self.raw_suffix)
@cached_property
def raw_suffixes(self) -> Tuple[str, ...]:
name = self.raw_name
if name.endswith(
"."):
return ()
name = name.lstrip(
".")
return tuple(
"." + suffix
for suffix
in name.split(
".")[1:])
@cached_property
def suffixes(self) -> Tuple[str, ...]:
return tuple(self._UNQUOTER(suffix)
for suffix
in self.raw_suffixes)
@staticmethod
def _validate_authority_uri_abs_path(host: str, path: str) ->
None:
"""Ensure that path in URL with authority starts with a leading slash.
Raise ValueError
if not.
"""
if host
and path
and path[0] !=
"/":
raise ValueError(
"Path in a URL with authority should start with a slash ('/') if set"
)
def _make_child(self, paths:
"Sequence[str]", encoded: bool =
False) ->
"URL":
"""
add paths to self._val.path, accounting
for absolute vs relative paths,
keep existing, but do
not create new, empty segments
"""
parsed: List[str] = []
needs_normalize: bool =
False
for idx, path
in enumerate(reversed(paths)):
# empty segment of last is not removed
last = idx == 0
if path
and path[0] ==
"/":
raise ValueError(
f
"Appending path {path!r} starting from slash is forbidden"
)
path = path
if encoded
else self._PATH_QUOTER(path)
needs_normalize |=
"." in path
segments = path.split(
"/")
segments.reverse()
# remove trailing empty segment for all but the last path
segment_slice_start = int(
not last
and segments[0] ==
"")
parsed += segments[segment_slice_start:]
parsed.reverse()
if self._val.path
and (old_path_segments := self._val.path.split(
"/")):
old_path_cutoff = -1
if old_path_segments[-1] ==
"" else None
parsed = [*old_path_segments[:old_path_cutoff], *parsed]
if self._val.netloc:
parsed = _normalize_path_segments(parsed)
if needs_normalize
else parsed
if parsed
and parsed[0] !=
"":
# inject a leading slash when adding a path to an absolute URL
# where there was none before
parsed = [
"", *parsed]
new_path =
"/".join(parsed)
return self._from_val(self._val._replace(path=new_path, query=
"", fragment=
""))
@classmethod
def _normalize_path(cls, path: str) -> str:
# Drop '.' and '..' from str path
prefix =
""
if path
and path[0] ==
"/":
# preserve the "/" root element of absolute paths, copying it to the
# normalised output as per sections 5.2.4 and 6.2.2.3 of rfc3986.
prefix =
"/"
path = path[1:]
segments = path.split(
"/")
return prefix +
"/".join(_normalize_path_segments(segments))
@classmethod
@lru_cache
# match the same size as urlsplit
def _parse_host(
cls, host: str
) -> Tuple[bool, str, Union[bool,
None], str, str, str]:
"""Parse host into parts
Returns a tuple of:
-
True if the host looks like an IP address,
False otherwise.
- Lowercased host
-
True if the host
is ASCII-only,
False otherwise.
- Raw IP address
- Separator between IP address
and zone
- Zone part of the IP address
"""
lower_host = host.lower()
is_ascii = host.isascii()
# If the host ends with a digit or contains a colon, its likely
# an IP address.
if host
and (host[-1].isdigit()
or ":" in host):
if "%" in host:
return True, lower_host, is_ascii, *host.partition(
"%")
return True, lower_host, is_ascii, host,
"",
""
return False, lower_host, is_ascii,
"",
"",
""
@classmethod
def _encode_host(
cls, host: str, human: bool =
False, validate_host: bool =
True
) -> str:
"""Encode host part of URL."""
looks_like_ip, lower_host, is_ascii, raw_ip, sep, zone = cls._parse_host(host)
if looks_like_ip:
# If it looks like an IP, we check with _ip_compressed_version
# and fall-through if its not an IP address. This is a performance
# optimization to avoid parsing IP addresses as much as possible
# because it is orders of magnitude slower than almost any other
# operation this library does.
# Might be an IP address, check it
#
# IP Addresses can look like:
# https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
# - 127.0.0.1 (last character is a digit)
# - 2001:db8::ff00:42:8329 (contains a colon)
# - 2001:db8::ff00:42:8329%eth0 (contains a colon)
# - [2001:db8::ff00:42:8329] (contains a colon -- brackets should
# have been removed before it gets here)
# Rare IP Address formats are not supported per:
# https://datatracker.ietf.org/doc/html/rfc3986#section-7.4
#
# IP parsing is slow, so its wrapped in an LRU
try:
ip_compressed_version = _ip_compressed_version(raw_ip)
except ValueError:
pass
else:
# These checks should not happen in the
# LRU to keep the cache size small
host, version = ip_compressed_version
if version == 6:
return f
"[{host}%{zone}]" if sep
else f
"[{host}]"
return f
"{host}%{zone}" if sep
else host
if human:
return lower_host
# IDNA encoding is slow,
# skip it for ASCII-only strings
# Don't move the check into _idna_encode() helper
# to reduce the cache size
if is_ascii:
# Check for invalid characters explicitly; _idna_encode() does this
# for non-ascii host names.
if validate_host:
_host_validate(lower_host)
return lower_host
return _idna_encode(lower_host)
@classmethod
@lru_cache
# match the same size as urlsplit
def _make_netloc(
cls,
user: Union[str,
None],
password: Union[str,
None],
host: Union[str,
None],
port: Union[int,
None],
encode: bool =
False,
) -> str:
"""Make netloc from parts.
The user
and password are encoded
if encode
is True.
The host must already be encoded
with _encode_host.
"""
if host
is None:
return ""
ret = host
if port
is not None:
ret = f
"{ret}:{port}"
if user
is None and password
is None:
return ret
if password
is not None:
if not user:
user =
""
elif encode:
user = cls._QUOTER(user)
if encode:
password = cls._QUOTER(password)
user = f
"{user}:{password}"
elif user
and encode:
user = cls._QUOTER(user)
return f
"{user}@{ret}" if user
else ret
@classmethod
@lru_cache
# match the same size as urlsplit
def _split_netloc(
cls,
netloc: str,
) -> Tuple[Union[str,
None], Union[str,
None], Union[str,
None], Union[int,
None]]:
"""Split netloc into username, password, host and port."""
if "@" not in netloc:
username: Union[str,
None] =
None
password: Union[str,
None] =
None
hostinfo = netloc
else:
userinfo, _, hostinfo = netloc.rpartition(
"@")
username, have_password, password = userinfo.partition(
":")
if not have_password:
password =
None
if "[" in hostinfo:
_, _, bracketed = hostinfo.partition(
"[")
hostname, _, port_str = bracketed.partition(
"]")
_, _, port_str = port_str.partition(
":")
else:
hostname, _, port_str = hostinfo.partition(
":")
if not port_str:
return username
or None, password, hostname
or None,
None
try:
port = int(port_str)
except ValueError:
raise ValueError(
"Invalid URL: port can't be converted to integer")
if not (0 <= port <= 65535):
raise ValueError(
"Port out of range 0-65535")
return username
or None, password, hostname
or None, port
def with_scheme(self, scheme: str) ->
"URL":
"""Return a new URL with scheme replaced."""
# N.B. doesn't cleanup query/fragment
if not isinstance(scheme, str):
raise TypeError(
"Invalid scheme type")
lower_scheme = scheme.lower()
if not self._val.netloc
and lower_scheme
in SCHEME_REQUIRES_HOST:
msg = (
"scheme replacement is not allowed for "
f
"relative URLs for the {lower_scheme} scheme"
)
raise ValueError(msg)
return self._from_val(self._val._replace(scheme=lower_scheme))
def with_user(self, user: Union[str,
None]) ->
"URL":
"""Return a new URL with user replaced.
Autoencode user
if needed.
Clear user/password
if user
is None.
"""
# N.B. doesn't cleanup query/fragment
val = self._val
if user
is None:
password =
None
elif isinstance(user, str):
user = self._QUOTER(user)
password = self.raw_password
else:
raise TypeError(
"Invalid user type")
if not val.netloc:
raise ValueError(
"user replacement is not allowed for relative URLs")
encoded_host = self.host_subcomponent
or ""
netloc = self._make_netloc(user, password, encoded_host, self.explicit_port)
return self._from_val(val._replace(netloc=netloc))
def with_password(self, password: Union[str,
None]) ->
"URL":
"""Return a new URL with password replaced.
Autoencode password
if needed.
Clear password
if argument
is None.
"""
# N.B. doesn't cleanup query/fragment
if password
is None:
pass
elif isinstance(password, str):
password = self._QUOTER(password)
else:
raise TypeError(
"Invalid password type")
if not self._val.netloc:
raise ValueError(
"password replacement is not allowed for relative URLs")
encoded_host = self.host_subcomponent
or ""
port = self.explicit_port
netloc = self._make_netloc(self.raw_user, password, encoded_host, port)
return self._from_val(self._val._replace(netloc=netloc))
def with_host(self, host: str) ->
"URL":
"""Return a new URL with host replaced.
Autoencode host
if needed.
Changing host
for relative URLs
is not allowed, use .join()
instead.
"""
# N.B. doesn't cleanup query/fragment
if not isinstance(host, str):
raise TypeError(
"Invalid host type")
val = self._val
if not val.netloc:
raise ValueError(
"host replacement is not allowed for relative URLs")
if not host:
raise ValueError(
"host removing is not allowed")
encoded_host = self._encode_host(host)
if host
else ""
port = self.explicit_port
netloc = self._make_netloc(self.raw_user, self.raw_password, encoded_host, port)
return self._from_val(val._replace(netloc=netloc))
def with_port(self, port: Union[int,
None]) ->
"URL":
"""Return a new URL with port replaced.
Clear port to default
if None is passed.
"""
# N.B. doesn't cleanup query/fragment
if port
is not None:
if isinstance(port, bool)
or not isinstance(port, int):
raise TypeError(f
"port should be int or None, got {type(port)}")
if not (0 <= port <= 65535):
raise ValueError(f
"port must be between 0 and 65535, got {port}")
val = self._val
if not val.netloc:
raise ValueError(
"port replacement is not allowed for relative URLs")
encoded_host = self.host_subcomponent
or ""
netloc = self._make_netloc(self.raw_user, self.raw_password, encoded_host, port)
return self._from_val(val._replace(netloc=netloc))
def with_path(self, path: str, *, encoded: bool =
False) ->
"URL":
"""Return a new URL with path replaced."""
if not encoded:
path = self._PATH_QUOTER(path)
if self._val.netloc:
path = self._normalize_path(path)
if "." in path
else path
if len(path) > 0
and path[0] !=
"/":
path =
"/" + path
return self._from_val(self._val._replace(path=path, query=
"", fragment=
""))
def _get_str_query_from_sequence_iterable(
self,
items: Iterable[Tuple[Union[str, istr], QueryVariable]],
) -> str:
"""Return a query string from a sequence of (key, value) pairs.
value
is a single value
or a sequence of values
for the key
The sequence of values must be a list
or tuple.
"""
quoter = self._QUERY_PART_QUOTER
pairs = [
f
"{quoter(k)}={quoter(v if type(v) is str else self._query_var(v))}"
for k, val
in items
for v
in (
val
if type(val)
is not str
and isinstance(val, (list, tuple))
else (val,)
)
]
return "&".join(pairs)
@staticmethod
def _query_var(v: QueryVariable) -> str:
cls = type(v)
if issubclass(cls, str):
if TYPE_CHECKING:
assert isinstance(v, str)
return v
if cls
is int:
# Fast path for non-subclassed int
return str(v)
if issubclass(cls, float):
if TYPE_CHECKING:
assert isinstance(v, float)
if math.isinf(v):
raise ValueError(
"float('inf') is not supported")
if math.isnan(v):
raise ValueError(
"float('nan') is not supported")
return str(float(v))
if cls
is not bool
and isinstance(cls, SupportsInt):
return str(int(v))
raise TypeError(
"Invalid variable type: value "
"should be str, int or float, got {!r} "
"of type {}".format(v, cls)
)
def _get_str_query_from_iterable(
self, items: Iterable[Tuple[Union[str, istr], str]]
) -> str:
"""Return a query string from an iterable.
The iterable must contain (key, value) pairs.
The values are
not allowed to be sequences, only single values are
allowed.
For sequences, use `_get_str_query_from_sequence_iterable`.
"""
quoter = self._QUERY_PART_QUOTER
# A listcomp is used since listcomps are inlined on CPython 3.12+ and
# they are a bit faster than a generator expression.
pairs = [
f
"{quoter(k)}={quoter(v if type(v) is str else self._query_var(v))}"
for k, v
in items
]
return "&".join(pairs)
def _get_str_query(self, *args: Any, **kwargs: Any) -> Union[str,
None]:
query: Union[str, Mapping[str, QueryVariable],
None]
if kwargs:
if len(args) > 0:
raise ValueError(
"Either kwargs or single query parameter must be present"
)
query = kwargs
elif len(args) == 1:
query = args[0]
else:
raise ValueError(
"Either kwargs or single query parameter must be present")
if query
is None:
return None
if isinstance(query, Mapping):
return self._get_str_query_from_sequence_iterable(query.items())
if isinstance(query, str):
return self._QUERY_QUOTER(query)
if isinstance(query, (bytes, bytearray, memoryview)):
raise TypeError(
"Invalid query type: bytes, bytearray and memoryview are forbidden"
)
if isinstance(query, Sequence):
# We don't expect sequence values if we're given a list of pairs
# already; only mappings like builtin `dict` which can't have the
# same key pointing to multiple values are allowed to use
# `_query_seq_pairs`.
return self._get_str_query_from_iterable(query)
raise TypeError(
"Invalid query type: only str, mapping or "
"sequence of (key, value) pairs is allowed"
)
@overload
def with_query(self, query: Query) ->
"URL": ...
@overload
def with_query(self, **kwargs: QueryVariable) ->
"URL": ...
def with_query(self, *args: Any, **kwargs: Any) ->
"URL":
"""Return a new URL with query part replaced.
Accepts any Mapping (e.g. dict, multidict.MultiDict instances)
or str, autoencode the argument
if needed.
A sequence of (key, value) pairs
is supported
as well.
It also can take an arbitrary number of keyword arguments.
Clear query
if None is passed.
"""
# N.B. doesn't cleanup query/fragment
new_query = self._get_str_query(*args, **kwargs)
or ""
return self._from_val(self._val._replace(query=new_query))
@overload
def extend_query(self, query: Query) ->
"URL": ...
@overload
def extend_query(self, **kwargs: QueryVariable) ->
"URL": ...
def extend_query(self, *args: Any, **kwargs: Any) ->
"URL":
"""Return a new URL with query part combined with the existing.
This method will
not remove existing query parameters.
Example:
>>> url = URL(
'http://example.com/?a=1&b=2')
>>> url.extend_query(a=3, c=4)
URL(
'http://example.com/?a=1&b=2&a=3&c=4')
"""
new_query_string = self._get_str_query(*args, **kwargs)
if not new_query_string:
return self
if new_query := self._val.query:
# both strings are already encoded so we can use a simple
# string join
if new_query[-1] ==
"&":
new_query += new_query_string
else:
new_query += f
"&{new_query_string}"
else:
new_query = new_query_string
return self._from_val(self._val._replace(query=new_query))
@overload
def update_query(self, query: Query) ->
"URL": ...
@overload
def update_query(self, **kwargs: QueryVariable) ->
"URL": ...
def update_query(self, *args: Any, **kwargs: Any) ->
"URL":
"""Return a new URL with query part updated.
This method will overwrite existing query parameters.
Example:
>>> url = URL(
'http://example.com/?a=1&b=2')
>>> url.update_query(a=3, c=4)
URL(
'http://example.com/?a=3&b=2&c=4')
"""
s = self._get_str_query(*args, **kwargs)
if s
is None:
return self._from_val(self._val._replace(query=
""))
query = MultiDict(self._parsed_query)
query.update(parse_qsl(s, keep_blank_values=
True))
new_str = self._get_str_query_from_iterable(query.items())
return self._from_val(self._val._replace(query=new_str))
def without_query_params(self, *query_params: str) ->
"URL":
"""Remove some keys from query part and return new URL."""
params_to_remove = set(query_params) & self.query.keys()
if not params_to_remove:
return self
return self.with_query(
tuple(
(name, value)
for name, value
in self.query.items()
if name
not in params_to_remove
)
)
def with_fragment(self, fragment: Union[str,
None]) ->
"URL":
"""Return a new URL with fragment replaced.
Autoencode fragment
if needed.
Clear fragment to default
if None is passed.
"""
# N.B. doesn't cleanup query/fragment
if fragment
is None:
raw_fragment =
""
elif not isinstance(fragment, str):
raise TypeError(
"Invalid fragment type")
else:
raw_fragment = self._FRAGMENT_QUOTER(fragment)
if self._val.fragment == raw_fragment:
return self
return self._from_val(self._val._replace(fragment=raw_fragment))
def with_name(self, name: str) ->
"URL":
"""Return a new URL with name (last part of path) replaced.
Query
and fragment parts are cleaned up.
Name
is encoded
if needed.
"""
# N.B. DOES cleanup query/fragment
if not isinstance(name, str):
raise TypeError(
"Invalid name type")
if "/" in name:
raise ValueError(
"Slash in name is not allowed")
name = self._PATH_QUOTER(name)
if name
in (
".",
".."):
raise ValueError(
". and .. values are forbidden")
parts = list(self.raw_parts)
if self._val.netloc:
if len(parts) == 1:
parts.append(name)
else:
parts[-1] = name
parts[0] =
"" # replace leading '/'
else:
parts[-1] = name
if parts[0] ==
"/":
parts[0] =
"" # replace leading '/'
return self._from_val(
self._val._replace(path=
"/".join(parts), query=
"", fragment=
"")
)
def with_suffix(self, suffix: str) ->
"URL":
"""Return a new URL with suffix (file extension of name) replaced.
Query
and fragment parts are cleaned up.
suffix
is encoded
if needed.
"""
if not isinstance(suffix, str):
raise TypeError(
"Invalid suffix type")
if suffix
and not suffix[0] ==
"." or suffix ==
".":
raise ValueError(f
"Invalid suffix {suffix!r}")
name = self.raw_name
if not name:
raise ValueError(f
"{self!r} has an empty name")
old_suffix = self.raw_suffix
name = name + suffix
if not old_suffix
else name[: -len(old_suffix)] + suffix
return self.with_name(name)
def join(self, url:
"URL") ->
"URL":
"""Join URLs
Construct a full (“absolute”) URL by combining a “base URL”
(self)
with another URL (url).
Informally, this uses components of the base URL,
in
particular the addressing scheme, the network location
and
(part of) the path, to provide missing components
in the
relative URL.
"""
if type(url)
is not URL:
raise TypeError(
"url should be URL")
val = self._val
other_val = url._val
scheme = other_val.scheme
or val.scheme
if scheme != val.scheme
or scheme
not in USES_RELATIVE:
return url
# scheme is in uses_authority as uses_authority is a superset of uses_relative
if other_val.netloc
and scheme
in USES_AUTHORITY:
return self._from_val(other_val._replace(scheme=scheme))
parts: _SplitResultDict = {
"scheme": scheme}
if other_val.path
or other_val.fragment:
parts[
"fragment"] = other_val.fragment
if other_val.path
or other_val.query:
parts[
"query"] = other_val.query
if not other_val.path:
return self._from_val(val._replace(**parts))
if other_val.path[0] ==
"/":
path = other_val.path
elif not val.path:
path = f
"/{other_val.path}"
elif val.path[-1] ==
"/":
path = f
"{val.path}{other_val.path}"
else:
# …
# and relativizing ".."
# parts[0] is / for absolute urls, this join will add a double slash there
path =
"/".join([*self.parts[:-1],
""])
path += other_val.path
# which has to be removed
if val.path[0] ==
"/":
path = path[1:]
parts[
"path"] = self._normalize_path(path)
if "." in path
else path
return self._from_val(val._replace(**parts))
def joinpath(self, *other: str, encoded: bool =
False) ->
"URL":
"""Return a new URL with the elements in other appended to the path."""
return self._make_child(other, encoded=encoded)
def human_repr(self) -> str:
"""Return decoded human readable string for URL representation."""
user = _human_quote(self.user,
"#/:?@[]")
password = _human_quote(self.password,
"#/:?@[]")
host = self.host
if host:
host = self._encode_host(host, human=
True)
path = _human_quote(self.path,
"#?")
if TYPE_CHECKING:
assert path
is not None
query_string =
"&".join(
"{}={}".format(_human_quote(k,
"#&+;="), _human_quote(v, "#&+;="))
for k, v
in self.query.items()
)
fragment = _human_quote(self.fragment,
"")
if TYPE_CHECKING:
assert fragment
is not None
netloc = self._make_netloc(user, password, host, self.explicit_port)
scheme = self._val.scheme
return self._unsplit_result(scheme, netloc, path, query_string, fragment)
def _human_quote(s: Union[str,
None], unsafe: str) -> Union[str,
None]:
if not s:
return s
for c
in "%" + unsafe:
if c
in s:
s = s.replace(c, f
"%{ord(c):02X}")
if s.isprintable():
return s
return "".join(c
if c.isprintable()
else quote(c)
for c
in s)
_MAXCACHE = 256
@lru_cache(_MAXCACHE)
def _idna_decode(raw: str) -> str:
try:
return idna.decode(raw.encode(
"ascii"))
except UnicodeError:
# e.g. '::1'
return raw.encode(
"ascii").decode(
"idna")
@lru_cache(_MAXCACHE)
def _idna_encode(host: str) -> str:
try:
return idna.encode(host, uts46=
True).decode(
"ascii")
except UnicodeError:
return host.encode(
"idna").decode(
"ascii")
@lru_cache(_MAXCACHE)
def _ip_compressed_version(raw_ip: str) -> Tuple[str, int]:
"""Return compressed version of IP address and its version."""
ip = ip_address(raw_ip)
return ip.compressed, ip.version
@lru_cache(_MAXCACHE)
def _host_validate(host: str) ->
None:
"""Validate an ascii host name."""
invalid = _not_reg_name.search(host)
if invalid
is None:
return
value, pos, extra = invalid.group(), invalid.start(),
""
if value ==
"@" or (value ==
":" and "@" in host[pos:]):
# this looks like an authority string
extra = (
", if the value includes a username or password, "
"use 'authority' instead of 'host'"
)
raise ValueError(
f
"Host {host!r} cannot contain {value!r} (at position " f
"{pos}){extra}"
)
from None
@rewrite_module
def cache_clear() ->
None:
"""Clear all LRU caches."""
_idna_decode.cache_clear()
_idna_encode.cache_clear()
_ip_compressed_version.cache_clear()
_host_validate.cache_clear()
@rewrite_module
def cache_info() -> CacheInfo:
"""Report cache statistics."""
return {
"idna_encode": _idna_encode.cache_info(),
"idna_decode": _idna_decode.cache_info(),
"ip_address": _ip_compressed_version.cache_info(),
"host_validate": _host_validate.cache_info(),
}
@rewrite_module
def cache_configure(
*,
idna_encode_size: Union[int,
None] = _MAXCACHE,
idna_decode_size: Union[int,
None] = _MAXCACHE,
ip_address_size: Union[int,
None] = _MAXCACHE,
host_validate_size: Union[int,
None] = _MAXCACHE,
) ->
None:
"""Configure LRU cache sizes."""
global _idna_decode, _idna_encode, _ip_compressed_version, _host_validate
_idna_encode = lru_cache(idna_encode_size)(_idna_encode.__wrapped__)
_idna_decode = lru_cache(idna_decode_size)(_idna_decode.__wrapped__)
_ip_compressed_version = lru_cache(ip_address_size)(
_ip_compressed_version.__wrapped__
)
_host_validate = lru_cache(host_validate_size)(_host_validate.__wrapped__)