if ssl isnotNone:
SSL_ALLOWED_TYPES = (ssl.SSLContext, bool, Fingerprint, type(None)) else: # pragma: no cover
SSL_ALLOWED_TYPES = (bool, type(None))
def _merge_ssl_params(
ssl: Union["SSLContext", bool, Fingerprint],
verify_ssl: Optional[bool],
ssl_context: Optional["SSLContext"],
fingerprint: Optional[bytes],
) -> Union["SSLContext", bool, Fingerprint]: if ssl isNone:
ssl = True# Double check for backwards compatibility if verify_ssl isnotNoneandnot verify_ssl:
warnings.warn( "verify_ssl is deprecated, use ssl=False instead",
DeprecationWarning,
stacklevel=3,
) if ssl isnotTrue: raise ValueError( "verify_ssl, ssl_context, fingerprint and ssl " "parameters are mutually exclusive"
) else:
ssl = False if ssl_context isnotNone:
warnings.warn( "ssl_context is deprecated, use ssl=context instead",
DeprecationWarning,
stacklevel=3,
) if ssl isnotTrue: raise ValueError( "verify_ssl, ssl_context, fingerprint and ssl " "parameters are mutually exclusive"
) else:
ssl = ssl_context if fingerprint isnotNone:
warnings.warn( "fingerprint is deprecated, ""use ssl=Fingerprint(fingerprint) instead",
DeprecationWarning,
stacklevel=3,
) if ssl isnotTrue: raise ValueError( "verify_ssl, ssl_context, fingerprint and ssl " "parameters are mutually exclusive"
) else:
ssl = Fingerprint(fingerprint) ifnot isinstance(ssl, SSL_ALLOWED_TYPES): raise TypeError( "ssl should be SSLContext, bool, Fingerprint or None, " "got {!r} instead.".format(ssl)
) return ssl
@attr.s(auto_attribs=True, slots=True, frozen=True, cache_hash=True) class ConnectionKey: # the key should contain an information about used proxy / TLS # to prevent reusing wrong connections from a pool
host: str
port: Optional[int]
is_ssl: bool
ssl: Union[SSLContext, bool, Fingerprint]
proxy: Optional[URL]
proxy_auth: Optional[BasicAuth]
proxy_headers_hash: Optional[int] # hash(CIMultiDict)
def _is_expected_content_type(
response_content_type: str, expected_content_type: str
) -> bool: if expected_content_type == "application/json": return json_re.match(response_content_type) isnotNone return expected_content_type in response_content_type
# Type of body depends on PAYLOAD_REGISTRY, which is dynamic.
body: Any = b""
auth = None
response = None
__writer = None# async task for streaming data
_continue = None# waiter future for '100 Continue' response
# N.B. # Adding __del__ method with self._writer closing doesn't make sense # because _writer is instance method, thus it keeps a reference to self. # Until writer has finished finalizer will not be called.
def __init__(
self,
method: str,
url: URL,
*,
params: Query = None,
headers: Optional[LooseHeaders] = None,
skip_auto_headers: Optional[Iterable[str]] = None,
data: Any = None,
cookies: Optional[LooseCookies] = None,
auth: Optional[BasicAuth] = None,
version: http.HttpVersion = http.HttpVersion11,
compress: Union[str, bool, None] = None,
chunked: Optional[bool] = None,
expect100: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,
response_class: Optional[Type["ClientResponse"]] = None,
proxy: Optional[URL] = None,
proxy_auth: Optional[BasicAuth] = None,
timer: Optional[BaseTimerContext] = None,
session: Optional["ClientSession"] = None,
ssl: Union[SSLContext, bool, Fingerprint] = True,
proxy_headers: Optional[LooseHeaders] = None,
traces: Optional[List["Trace"]] = None,
trust_env: bool = False,
server_hostname: Optional[str] = None,
): if loop isNone:
loop = asyncio.get_event_loop() if match := _CONTAINS_CONTROL_CHAR_RE.search(method): raise ValueError(
f"Method cannot contain non-token characters {method!r} "
f"(found at least {match.group()!r})"
) # URL forbids subclasses, so a simple type check is enough. assert type(url) is URL, url if proxy isnotNone: assert type(proxy) is URL, proxy # FIXME: session is None in tests only, need to fix tests # assert session is not None if TYPE_CHECKING: assert session isnotNone
self._session = session if params:
url = url.extend_query(params)
self.original_url = url
self.url = url.with_fragment(None) if url.raw_fragment else url
self.method = method.upper()
self.chunked = chunked
self.compress = compress
self.loop = loop
self.length = None if response_class isNone:
real_response_class = ClientResponse else:
real_response_class = response_class
self.response_class: Type[ClientResponse] = real_response_class
self._timer = timer if timer isnotNoneelse TimerNoop()
self._ssl = ssl if ssl isnotNoneelseTrue
self.server_hostname = server_hostname
if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
def update_host(self, url: URL) -> None: """Update destination host, port and connection type (ssl).""" # get host/port ifnot url.raw_host: raise InvalidURL(url)
# basic auth info if url.raw_user or url.raw_password:
self.auth = helpers.BasicAuth(url.user or"", url.password or"")
def update_version(self, version: Union[http.HttpVersion, str]) -> None: """Convert request version to two elements tuple.
parser HTTP version '1.1' => (1, 1) """ if isinstance(version, str):
v = [part.strip() for part in version.split(".", 1)] try:
version = http.HttpVersion(int(v[0]), int(v[1])) except ValueError: raise ValueError(
f"Can not parse http version number: {version}"
) fromNone
self.version = version
# Build the host header if _YARL_SUPPORTS_HOST_SUBCOMPONENT:
host = self.url.host_subcomponent # host_subcomponent is None when the URL is a relative URL. # but we know we do not have a relative URL here. assert host isnotNone else:
host = cast(str, self.url.raw_host) if helpers.is_ipv6_address(host):
host = f"[{host}]"
if host[-1] == ".": # Remove all trailing dots from the netloc as while # they are valid FQDNs in DNS, TLS validation fails. # See https://github.com/aio-libs/aiohttp/issues/3636. # To avoid string manipulation we only call rstrip if # the last character is a dot.
host = host.rstrip(".")
# If explicit port is not None, it means that the port was # explicitly specified in the URL. In this case we check # if its not the default port for the scheme and add it to # the host header. We check explicit_port first because # yarl caches explicit_port and its likely to already be # in the cache and non-default port URLs are far less common.
explicit_port = self.url.explicit_port if explicit_port isnotNoneandnot self.url.is_default_port():
host = f"{host}:{explicit_port}"
self.headers[hdrs.HOST] = host
ifnot headers: return
if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
headers = headers.items()
for key, value in headers: # type: ignore[misc] # A special case for Host header if key in self._HOST_STRINGS:
self.headers[key] = value else:
self.headers.add(key, value)
def update_auto_headers(self, skip_auto_headers: Optional[Iterable[str]]) -> None: if skip_auto_headers isnotNone:
self.skip_auto_headers = CIMultiDict(
(hdr, None) for hdr in sorted(skip_auto_headers)
)
used_headers = self.headers.copy()
used_headers.extend(self.skip_auto_headers) # type: ignore[arg-type] else: # Fast path when there are no headers to skip # which is the most common case.
self.skip_auto_headers = CIMultiDict()
used_headers = self.headers
for hdr, val in self.DEFAULT_HEADERS.items(): if hdr notin used_headers:
self.headers.add(hdr, val)
if hdrs.USER_AGENT notin used_headers:
self.headers[hdrs.USER_AGENT] = SERVER_SOFTWARE
if self.headers.get(hdrs.CONTENT_ENCODING): if self.compress: raise ValueError( "compress can not be set ""if Content-Encoding header is set"
) elif self.compress: ifnot isinstance(self.compress, str):
self.compress = "deflate"
self.headers[hdrs.CONTENT_ENCODING] = self.compress
self.chunked = True# enable chunked, no need to deal with length
def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None: """Set basic auth.""" if auth isNone:
auth = self.auth if auth isNoneand trust_env and self.url.host isnotNone:
netrc_obj = netrc_from_env() with contextlib.suppress(LookupError):
auth = basicauth_from_netrc(netrc_obj, self.url.host) if auth isNone: return
ifnot isinstance(auth, helpers.BasicAuth): raise TypeError("BasicAuth() tuple is required instead")
self.headers[hdrs.AUTHORIZATION] = auth.encode()
def update_body_from_data(self, body: Any) -> None: if body isNone: return
# FormData if isinstance(body, FormData):
body = body()
try:
body = payload.PAYLOAD_REGISTRY.get(body, disposition=None) except payload.LookupError:
body = FormData(body)()
self.body = body
# enable chunked encoding if needed ifnot self.chunked: if hdrs.CONTENT_LENGTH notin self.headers:
size = body.size if size isNone:
self.chunked = True else: if hdrs.CONTENT_LENGTH notin self.headers:
self.headers[hdrs.CONTENT_LENGTH] = str(size)
# copy payload headers assert body.headers for key, value in body.headers.items(): if key in self.headers or key in self.skip_auto_headers: continue
self.headers[key] = value
def update_expect_continue(self, expect: bool = False) -> None: if expect:
self.headers[hdrs.EXPECT] = "100-continue" elif (
hdrs.EXPECT in self.headers and self.headers[hdrs.EXPECT].lower() == "100-continue"
):
expect = True
if expect:
self._continue = self.loop.create_future()
if proxy_auth andnot isinstance(proxy_auth, helpers.BasicAuth): raise ValueError("proxy_auth must be None or BasicAuth() tuple")
self.proxy_auth = proxy_auth
def keep_alive(self) -> bool: if self.version < HttpVersion10: # keep alive not supported at all returnFalse if self.version == HttpVersion10: # no headers means we close for Http 1.0 return self.headers.get(hdrs.CONNECTION) == "keep-alive" elif self.headers.get(hdrs.CONNECTION) == "close": returnFalse
if self.compress:
writer.enable_compression(self.compress) # type: ignore[arg-type]
if self.chunked isnotNone:
writer.enable_chunking()
# set default content-type if (
self.method in self.POST_METHODS and hdrs.CONTENT_TYPE notin self.skip_auto_headers and hdrs.CONTENT_TYPE notin self.headers
):
self.headers[hdrs.CONTENT_TYPE] = "application/octet-stream"
# set the connection header
connection = self.headers.get(hdrs.CONNECTION) ifnot connection: if self.keep_alive(): if self.version == HttpVersion10:
connection = "keep-alive" else: if self.version == HttpVersion11:
connection = "close"
if connection isnotNone:
self.headers[hdrs.CONNECTION] = connection
# status + headers
v = self.version
status_line = f"{self.method} {path} HTTP/{v.major}.{v.minor}"
await writer.write_headers(status_line, self.headers)
coro = self.write_bytes(writer, conn)
task: Optional["asyncio.Task[None]"] if sys.version_info >= (3, 12): # Optimization for Python 3.12, try to write # bytes immediately to avoid having to schedule # the task on the event loop.
task = asyncio.Task(coro, loop=self.loop, eager_start=True) else:
task = self.loop.create_task(coro)
if task.done():
task = None else:
self._writer = task
# Some of these attributes are None when created, # but will be set by the start() method. # As the end user will likely never see the None values, we cheat the types below. # from the Status-Line of the response
version: Optional[HttpVersion] = None# HTTP-Version
status: int = None# type: ignore[assignment] # Status-Code
reason: Optional[str] = None# Reason-Phrase
_writer is only provided for backwards compatibility for subclasses that may need to access it. """ return self.__writer
@_writer.setter def _writer(self, writer: Optional["asyncio.Task[None]"]) -> None: """Set the writer task for streaming data.""" if self.__writer isnotNone:
self.__writer.remove_done_callback(self.__reset_writer)
self.__writer = writer if writer isNone: return if writer.done(): # The writer is already done, so we can clear it immediately.
self.__writer = None else:
writer.add_done_callback(self.__reset_writer)
for val in re.split(r",(?=\s*<)", links_str):
match = re.match(r"\s*<(.*)>(.*)", val) if match isNone: # pragma: no cover # the check exists to suppress mypy error continue
url, params_str = match.groups()
params = params_str.split(";")[1:]
link: MultiDict[Union[str, URL]] = MultiDict()
for param in params:
match = re.match(r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$", param, re.M) if match isNone: # pragma: no cover # the check exists to suppress mypy error continue
key, _, value, _ = match.groups()
# headers
self._headers = message.headers # type is CIMultiDictProxy
self._raw_headers = message.raw_headers # type is Tuple[bytes, bytes]
# payload
self.content = payload
# cookies for hdr in self.headers.getall(hdrs.SET_COOKIE, ()): try:
self.cookies.load(hdr) except CookieError as exc:
client_logger.warning("Can not load response cookies: %s", exc) return self
def _response_eof(self) -> None: if self._closed: return
# protocol could be None because connection could be detached
protocol = self._connection and self._connection.protocol if protocol isnotNoneand protocol.upgraded: return
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self._in_context = False # similar to _RequestContextManager, we do not need to check # for exceptions, response object can close connection # if state is broken
self.release()
await self.wait_for_close()
¤ Dauer der Verarbeitung: 0.36 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.