# to create a timeout specific for a single request, either # - create a completely new one to overwrite the default # - or use http://www.attrs.org/en/stable/api.html#attr.evolve # to overwrite the defaults
# Convert to list of tuples if headers:
real_headers: CIMultiDict[str] = CIMultiDict(headers) else:
real_headers = CIMultiDict()
self._default_headers: CIMultiDict[str] = real_headers if skip_auto_headers isnotNone:
self._skip_auto_headers = frozenset(istr(i) for i in skip_auto_headers) else:
self._skip_auto_headers = frozenset()
# NOTE: timeout clamps existing connect and read timeouts. We cannot # set the default to None because we need to detect if the user wants # to use the existing timeouts by setting timeout to None.
if self.closed: raise RuntimeError("Session is closed")
if data isnotNoneand json isnotNone: raise ValueError( "data and json parameters can not be used at the same time"
) elif json isnotNone:
data = payload.JsonPayload(json, dumps=self._json_serialize)
ifnot isinstance(chunked, bool) and chunked isnotNone:
warnings.warn("Chunk size is deprecated #1615", DeprecationWarning)
redirects = 0
history: List[ClientResponse] = []
version = self._version
params = params or {}
# Merge with default headers and transform to CIMultiDict
headers = self._prepare_headers(headers)
try:
url = self._build_url(str_or_url) except ValueError as e: raise InvalidUrlClientError(str_or_url) from e
assert self._connector isnotNone if url.scheme notin self._connector.allowed_protocol_schema_set: raise NonHttpUrlClientError(url)
skip_headers = set(self._skip_auto_headers) if skip_auto_headers isnotNone: for i in skip_auto_headers:
skip_headers.add(istr(i))
if proxy isNone:
proxy_headers = None else:
proxy_headers = self._prepare_headers(proxy_headers) try:
proxy = URL(proxy) except ValueError as e: raise InvalidURL(proxy) from e
if timeout is sentinel:
real_timeout: ClientTimeout = self._timeout else: ifnot isinstance(timeout, ClientTimeout):
real_timeout = ClientTimeout(total=timeout) else:
real_timeout = timeout # timeout is cumulative for all request operations # (request, redirects, responses, data consuming)
tm = TimeoutHandle(
self._loop, real_timeout.total, ceil_threshold=real_timeout.ceil_threshold
)
handle = tm.start()
if read_bufsize isNone:
read_bufsize = self._read_bufsize
if auto_decompress isNone:
auto_decompress = self._auto_decompress
if max_line_size isNone:
max_line_size = self._max_line_size
if max_field_size isNone:
max_field_size = self._max_field_size
traces = [
Trace(
self,
trace_config,
trace_config.trace_config_ctx(trace_request_ctx=trace_request_ctx),
) for trace_config in self._trace_configs
]
for trace in traces:
await trace.send_request_start(method, url.update_query(params), headers)
timer = tm.timer() try: with timer: # https://www.rfc-editor.org/rfc/rfc9112.html#name-retrying-requests
retry_persistent_connection = method in IDEMPOTENT_METHODS whileTrue:
url, auth_from_url = strip_auth_from_url(url) ifnot url.raw_host: # NOTE: Bail early, otherwise, causes `InvalidURL` through # NOTE: `self._request_class()` below.
err_exc_cls = (
InvalidUrlRedirectClientError if redirects else InvalidUrlClientError
) raise err_exc_cls(url) # If `auth` was passed for an already authenticated URL, # disallow only if this is the initial URL; this is to avoid issues # with sketchy redirects that are not the caller's responsibility ifnot history and (auth and auth_from_url): raise ValueError( "Cannot combine AUTH argument with " "credentials encoded in URL"
)
# Override the auth with the one from the URL only if we # have no auth, or if we got an auth from a redirect URL if auth isNoneor (history and auth_from_url isnotNone):
auth = auth_from_url if auth isNone:
auth = self._default_auth # It would be confusing if we support explicit # Authorization header with auth argument if (
headers isnotNone and auth isnotNone and hdrs.AUTHORIZATION in headers
): raise ValueError( "Cannot combine AUTHORIZATION header " "with AUTH argument or credentials " "encoded in URL"
)
if cookies isnotNone:
tmp_cookie_jar = CookieJar()
tmp_cookie_jar.update_cookies(cookies)
req_cookies = tmp_cookie_jar.filter_cookies(url) if req_cookies:
all_cookies.load(req_cookies)
if proxy isnotNone:
proxy = URL(proxy) elif self._trust_env: with suppress(LookupError):
proxy, proxy_auth = get_env_proxy_for_url(url)
if cookies := resp.cookies:
self._cookie_jar.update_cookies(cookies, resp.url)
# redirects if resp.status in (301, 302, 303, 307, 308) and allow_redirects:
for trace in traces:
await trace.send_request_redirect(
method, url.update_query(params), headers, resp
)
redirects += 1
history.append(resp) if max_redirects and redirects >= max_redirects:
resp.close() raise TooManyRedirects(
history[0].request_info, tuple(history)
)
# For 301 and 302, mimic IE, now changed in RFC # https://github.com/kennethreitz/requests/pull/269 if (resp.status == 303 and resp.method != hdrs.METH_HEAD) or (
resp.status in (301, 302) and resp.method == hdrs.METH_POST
):
method = hdrs.METH_GET
data = None if headers.get(hdrs.CONTENT_LENGTH):
headers.pop(hdrs.CONTENT_LENGTH)
r_url = resp.headers.get(hdrs.LOCATION) or resp.headers.get(
hdrs.URI
) if r_url isNone: # see github.com/aio-libs/aiohttp/issues/2022 break else: # reading from correct redirection # response is forbidden
resp.release()
try:
parsed_redirect_url = URL(
r_url, encoded=not self._requote_redirect_url
) except ValueError as e: raise InvalidUrlRedirectClientError(
r_url, "Server attempted redirecting to a location that does not look like a URL",
) from e
if protocols:
real_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = ",".join(protocols) if origin isnotNone:
real_headers[hdrs.ORIGIN] = origin if compress:
extstr = ws_ext_gen(compress=compress)
real_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = extstr
# For the sake of backward compatibility, if user passes in None, convert it to True if ssl isNone:
warnings.warn( "ssl=None is deprecated, please use ssl=True",
DeprecationWarning,
stacklevel=2,
)
ssl = True
ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
# websocket protocol
protocol = None if protocols and hdrs.SEC_WEBSOCKET_PROTOCOL in resp.headers:
resp_protocols = [
proto.strip() for proto in resp.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",")
]
for proto in resp_protocols: if proto in protocols:
protocol = proto break
# websocket compress
notakeover = False if compress:
compress_hdrs = resp.headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS) if compress_hdrs: try:
compress, notakeover = ws_ext_parse(compress_hdrs) except WSHandshakeError as exc: raise WSServerHandshakeError(
resp.request_info,
resp.history,
message=exc.args[0],
status=resp.status,
headers=resp.headers,
) from exc else:
compress = 0
notakeover = False
# For WS connection the read_timeout must be either receive_timeout or greater # None == no timeout, i.e. infinite timeout, so None is the max timeout possible if receive_timeout isNone: # Reset regardless
conn_proto.read_timeout = receive_timeout elif conn_proto.read_timeout isnotNone: # If read_timeout was set check which wins
conn_proto.read_timeout = max(receive_timeout, conn_proto.read_timeout)
def _prepare_headers(self, headers: Optional[LooseHeaders]) -> "CIMultiDict[str]": """Add default headers and transform it to CIMultiDict""" # Convert headers to MultiDict
result = CIMultiDict(self._default_headers) if headers: ifnot isinstance(headers, (MultiDictProxy, MultiDict)):
headers = CIMultiDict(headers)
added_names: Set[str] = set() for key, value in headers.items(): if key in added_names:
result.add(key, value) else:
result[key] = value
added_names.add(key) return result
@property def connector_owner(self) -> bool: """Should connector be closed on session closing""" return self._connector_owner
@property def raise_for_status(
self,
) -> Union[bool, Callable[[ClientResponse], Awaitable[None]]]: """Should `ClientResponse.raise_for_status()` be called for each response.""" return self._raise_for_status
@property def auto_decompress(self) -> bool: """Should the body response be automatically decompressed.""" return self._auto_decompress
@property def trust_env(self) -> bool: """
Should proxies information from environment or netrc be trusted.
Information isfrom HTTP_PROXY / HTTPS_PROXY environment variables or ~/.netrc file if present. """ return self._trust_env
@property def trace_configs(self) -> List[TraceConfig]: """A list of TraceConfig instances used for client tracing""" return self._trace_configs
def detach(self) -> None: """Detach connector from session without closing the former.
Session is switched to closed state anyway. """
self._connector = None
def __enter__(self) -> None: raise TypeError("Use async with instead")
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None: # __exit__ should exist in pair with __enter__ but never executed pass# pragma: no cover
Returns response object.
method - HTTP method
url - request url
params - (optional) Dictionary or bytes to be sent in the query
string of the new request
data - (optional) Dictionary, bytes, or file-like object to
send in the body of the request
json - (optional) Any json compatible python object
headers - (optional) Dictionary of HTTP Headers to send with
the request
cookies - (optional) Dict object to send with the request
auth - (optional) BasicAuth named tuple represent HTTP Basic Auth
auth - aiohttp.helpers.BasicAuth
allow_redirects - (optional) If set to False, do not follow
redirects
version - Request HTTP version.
compress - Set to Trueif request has to be compressed with deflate encoding.
chunked - Set to chunk size for chunked transfer encoding.
expect100 - Expect 100-continue response from server.
connector - BaseConnector sub-class instance to support
connection pooling.
read_until_eof - Read response until eof if response
does not have Content-Length header.
loop - Optional event loop.
timeout - Optional ClientTimeout settings structure, 5min
total timeout by default.
Usage::
>>> import aiohttp
>>> resp = await aiohttp.request('GET', 'http://python.org/')
>>> resp
<ClientResponse(python.org/) [200]>
>>> data = await resp.read() """
connector_owner = False if connector isNone:
connector_owner = True
connector = TCPConnector(loop=loop, force_close=True)
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.