NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < (
3,
13,
1,
) or sys.version_info < (3, 12, 7) # Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960 # which first appeared in Python 3.12.7 and 3.13.1
class _TransportPlaceholder: """placeholder for BaseConnector.connect function"""
__slots__ = ()
def close(self) -> None: """Close the placeholder transport."""
class BaseConnector: """Base connector class.
keepalive_timeout - (optional) Keep-alive timeout.
force_close - Set to True to force close and do reconnect
after each request (and between redirects).
limit - The total number of simultaneous connections.
limit_per_host - Number of simultaneous connections to one host.
enable_cleanup_closed - Enables clean-up closed ssl transports.
Disabled by default.
timeout_ceil_threshold - Trigger ceiling of timeout values when
it's above timeout_ceil_threshold.
loop - Optional event loop. """
_closed = True# prevent AttributeError in __del__ if ctor was failed
_source_traceback = None
# abort transport after 2 seconds (cleanup broken connections)
_cleanup_closed_period = 2.0
if force_close: if keepalive_timeout isnotNoneand keepalive_timeout isnot sentinel: raise ValueError( "keepalive_timeout cannot ""be set if force_close is True"
) else: if keepalive_timeout is sentinel:
keepalive_timeout = 15.0
loop = loop or asyncio.get_running_loop()
self._timeout_ceil_threshold = timeout_ceil_threshold
self._closed = False if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
# {host_key: FIFO list of waiters} # The FIFO is implemented with an OrderedDict with None keys because # python does not have an ordered set.
self._waiters: DefaultDict[
ConnectionKey, OrderedDict[asyncio.Future[None], None]
] = defaultdict(OrderedDict)
@property def force_close(self) -> bool: """Ultimately close connection on releasing if True.""" return self._force_close
@property def limit(self) -> int: """The total number for simultaneous connections.
If limit is 0 the connector has no limit.
The default limit size is 100. """ return self._limit
@property def limit_per_host(self) -> int: """The limit for simultaneous connections to the same endpoint.
Endpoints are the same if they are have equal
(host, port, is_ssl) triple. """ return self._limit_per_host
def _cleanup(self) -> None: """Cleanup unused transports.""" if self._cleanup_handle:
self._cleanup_handle.cancel() # _cleanup_handle should be unset, otherwise _release() will not # recreate it ever!
self._cleanup_handle = None
now = monotonic()
timeout = self._keepalive_timeout
if self._conns:
connections = {}
deadline = now - timeout for key, conns in self._conns.items():
alive: List[Tuple[ResponseHandler, float]] = [] for proto, use_time in conns: if proto.is_connected() and use_time - deadline >= 0:
alive.append((proto, use_time)) continue
transport = proto.transport
proto.close() ifnot self._cleanup_closed_disabled and key.is_ssl:
self._cleanup_closed_transports.append(transport)
def close(self) -> Awaitable[None]: """Close all opened transports."""
self._close() return _DeprecationWaiter(noop())
def _close(self) -> None: if self._closed: return
self._closed = True
try: if self._loop.is_closed(): return
# cancel cleanup task if self._cleanup_handle:
self._cleanup_handle.cancel()
# cancel cleanup close task if self._cleanup_closed_handle:
self._cleanup_closed_handle.cancel()
for data in self._conns.values(): for proto, t0 in data:
proto.close()
for proto in self._acquired:
proto.close()
for transport in self._cleanup_closed_transports: if transport isnotNone:
transport.abort()
finally:
self._conns.clear()
self._acquired.clear() for keyed_waiters in self._waiters.values(): for keyed_waiter in keyed_waiters:
keyed_waiter.cancel()
self._waiters.clear()
self._cleanup_handle = None
self._cleanup_closed_transports.clear()
self._cleanup_closed_handle = None
def _available_connections(self, key: "ConnectionKey") -> int: """ Return number of available connections.
The limit, limit_per_host and the connection key are taken into account.
If it returns less than 1 means that there are no connections
available. """ # check total available connections # If there are no limits, this will always return 1
total_remain = 1
if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0: return total_remain
# check limit per host if host_remain := self._limit_per_host: if acquired := self._acquired_per_host.get(key):
host_remain -= len(acquired) if total_remain > host_remain: return host_remain
return total_remain
async def connect(
self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
) -> Connection: """Get from pool or create new connection."""
key = req.connection_key if (conn := await self._get(key, traces)) isnotNone: # If we do not have to wait and we can get a connection from the pool # we can avoid the timeout ceil logic and directly return the connection return conn
async with ceil_timeout(timeout.connect, timeout.ceil_threshold): if self._available_connections(key) <= 0:
await self._wait_for_available_connection(key, traces) if (conn := await self._get(key, traces)) isnotNone: return conn
try: # Traces are done inside the try block to ensure that the # that the placeholder is still cleaned up if an exception # is raised. if traces: for trace in traces:
await trace.send_connection_create_start()
proto = await self._create_connection(req, traces, timeout) if traces: for trace in traces:
await trace.send_connection_create_end() except BaseException:
self._release_acquired(key, placeholder) raise else: if self._closed:
proto.close() raise ClientConnectionError("Connector is closed.")
# The connection was successfully created, drop the placeholder # and add the real connection to the acquired set. There should # be no awaits after the proto is added to the acquired set # to ensure that the connection is not left in the acquired set # on cancellation.
acquired_per_host = self._acquired_per_host[key]
self._acquired.remove(placeholder)
acquired_per_host.remove(placeholder)
self._acquired.add(proto)
acquired_per_host.add(proto) return Connection(self, key, proto, self._loop)
async def _wait_for_available_connection(
self, key: "ConnectionKey", traces: List["Trace"]
) -> None: """Wait for an available connection slot.""" # We loop here because there is a race between # the connection limit check and the connection # being acquired. If the connection is acquired # between the check and the await statement, we # need to loop again to check if the connection # slot is still available.
attempts = 0 whileTrue:
fut: asyncio.Future[None] = self._loop.create_future()
keyed_waiters = self._waiters[key]
keyed_waiters[fut] = None if attempts: # If we have waited before, we need to move the waiter # to the front of the queue as otherwise we might get # starved and hit the timeout.
keyed_waiters.move_to_end(fut, last=False)
try: # Traces happen in the try block to ensure that the # the waiter is still cleaned up if an exception is raised. if traces: for trace in traces:
await trace.send_connection_queued_start()
await fut if traces: for trace in traces:
await trace.send_connection_queued_end() finally: # pop the waiter from the queue if its still # there and not already removed by _release_waiter
keyed_waiters.pop(fut, None) ifnot self._waiters.get(key, True): del self._waiters[key]
if self._available_connections(key) > 0: break
attempts += 1
async def _get(
self, key: "ConnectionKey", traces: List["Trace"]
) -> Optional[Connection]: """Get next reusable connection for the key or None.
The connection will be marked as acquired. """ try:
conns = self._conns[key] except KeyError: returnNone
t1 = monotonic() while conns:
proto, t0 = conns.pop() # We will we reuse the connection if its connected and # the keepalive timeout has not been exceeded if proto.is_connected() and t1 - t0 <= self._keepalive_timeout: ifnot conns: # The very last connection was reclaimed: drop the key del self._conns[key]
self._acquired.add(proto)
self._acquired_per_host[key].add(proto) if traces: for trace in traces: try:
await trace.send_connection_reuseconn() except BaseException:
self._release_acquired(key, proto) raise return Connection(self, key, proto, self._loop)
# Connection cannot be reused, close it
transport = proto.transport
proto.close() # only for SSL transports ifnot self._cleanup_closed_disabled and key.is_ssl:
self._cleanup_closed_transports.append(transport)
# No more connections: drop the key del self._conns[key] returnNone
def _release_waiter(self) -> None: """
Iterates over all waiters until one to be released is found.
The one to be released isnot finished and
belongs to a host that has available connections. """ ifnot self._waiters: return
# Having the dict keys ordered this avoids to iterate # at the same order at each call.
queues = list(self._waiters)
random.shuffle(queues)
for key in queues: if self._available_connections(key) < 1: continue
def next_addrs(self, key: Tuple[str, int]) -> List[ResolveResult]:
loop, length = self._addrs_rr[key]
addrs = list(islice(loop, length)) # Consume one more element to shift internal state of `cycle`
next(loop) return addrs
This method isnot async-friendly and should be called from a thread
because it will load certificates from disk and do other blocking I/O. """ if ssl isNone: # No ssl support returnNone if verified: return ssl.create_default_context()
sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
sslcontext.options |= ssl.OP_NO_SSLv2
sslcontext.options |= ssl.OP_NO_SSLv3
sslcontext.check_hostname = False
sslcontext.verify_mode = ssl.CERT_NONE
sslcontext.options |= ssl.OP_NO_COMPRESSION
sslcontext.set_default_verify_paths() return sslcontext
# The default SSLContext objects are created at import time # since they do blocking I/O to load certificates from disk, # and imports should always be done before the event loop starts # or in a thread.
_SSL_CONTEXT_VERIFIED = _make_ssl_context(True)
_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False)
class TCPConnector(BaseConnector): """TCP connector.
verify_ssl - Set to True to check ssl certifications.
fingerprint - Pass the binary sha256
digest of the expected certificate in DER format to verify
that the certificate the server presents matches. See also https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning
resolver - Enable DNS lookups and use this
resolver
use_dns_cache - Use memory cache for DNS lookups.
ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
family - socket address family
local_addr - local tuple of (host, port) to bind socket to
keepalive_timeout - (optional) Keep-alive timeout.
force_close - Set to True to force close and do reconnect
after each request (and between redirects).
limit - The total number of simultaneous connections.
limit_per_host - Number of simultaneous connections to one host.
enable_cleanup_closed - Enables clean-up closed ssl transports.
Disabled by default.
happy_eyeballs_delay - This is the “Connection Attempt Delay” as defined in RFC 8305. To disable
the happy eyeballs algorithm, set to None.
interleave - “First Address Family Count” as defined in RFC 8305
loop - Optional event loop. """
def close(self) -> Awaitable[None]: """Close all ongoing DNS calls.""" for fut in chain.from_iterable(self._throttle_dns_futures.values()):
fut.cancel()
for t in self._resolve_host_tasks:
t.cancel()
return super().close()
@property def family(self) -> int: """Socket family like AF_INET.""" return self._family
@property def use_dns_cache(self) -> bool: """True if local DNS caching is enabled.""" return self._use_dns_cache
def clear_dns_cache(
self, host: Optional[str] = None, port: Optional[int] = None
) -> None: """Remove specified host/port or clear all dns local cache.""" if host isnotNoneand port isnotNone:
self._cached_hosts.remove((host, port)) elif host isnotNoneor port isnotNone: raise ValueError("either both host and port ""or none of them are allowed") else:
self._cached_hosts.clear()
if traces: for trace in traces:
await trace.send_dns_resolvehost_start(host)
res = await self._resolver.resolve(host, port, family=self._family)
if traces: for trace in traces:
await trace.send_dns_resolvehost_end(host)
return res
key = (host, port) if key in self._cached_hosts andnot self._cached_hosts.expired(key): # get result early, before any await (#4014)
result = self._cached_hosts.next_addrs(key)
if traces: for trace in traces:
await trace.send_dns_cache_hit(host) return result
futures: Set["asyncio.Future[None]"] # # If multiple connectors are resolving the same host, we wait # for the first one to resolve and then use the result for all of them. # We use a throttle to ensure that we only resolve the host once # and then use the result for all the waiters. # if key in self._throttle_dns_futures: # get futures early, before any await (#4014)
futures = self._throttle_dns_futures[key]
future: asyncio.Future[None] = self._loop.create_future()
futures.add(future) if traces: for trace in traces:
await trace.send_dns_cache_hit(host) try:
await future finally:
futures.discard(future) return self._cached_hosts.next_addrs(key)
# update dict early, before any await (#4014)
self._throttle_dns_futures[key] = futures = set() # In this case we need to create a task to ensure that we can shield # the task from cancellation as cancelling this lookup should not cancel # the underlying lookup or else the cancel event will get broadcast to # all the waiters across all connections. #
coro = self._resolve_host_with_throttle(key, host, port, futures, traces)
loop = asyncio.get_running_loop() if sys.version_info >= (3, 12): # Optimization for Python 3.12, try to send immediately
resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True) else:
resolved_host_task = loop.create_task(coro)
async def _resolve_host_with_throttle(
self,
key: Tuple[str, int],
host: str,
port: int,
futures: Set["asyncio.Future[None]"],
traces: Optional[Sequence["Trace"]],
) -> List[ResolveResult]: """Resolve host and set result for all waiters.
This method must be run in a task and shielded from cancellation
to avoid cancelling the underlying lookup. """ if traces: for trace in traces:
await trace.send_dns_cache_miss(host) try: if traces: for trace in traces:
await trace.send_dns_resolvehost_start(host)
addrs = await self._resolver.resolve(host, port, family=self._family) if traces: for trace in traces:
await trace.send_dns_resolvehost_end(host)
self._cached_hosts.add(key, addrs) for fut in futures:
set_result(fut, None) except BaseException as e: # any DNS exception is set for the waiters to raise the same exception. # This coro is always run in task that is shielded from cancellation so # we should never be propagating cancellation here. for fut in futures:
set_exception(fut, e) raise finally:
self._throttle_dns_futures.pop(key)
Has same keyword arguments as BaseEventLoop.create_connection. """ if req.proxy:
_, proto = await self._create_proxy_connection(req, traces, timeout) else:
_, proto = await self._create_direct_connection(req, traces, timeout)
return proto
def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]: """Logic to get the correct SSL context
0. if req.ssl isfalse, returnNone
1. if ssl_context is specified in req, use it
2. if _ssl_context is specified in self, use it
3. otherwise:
1. if verify_ssl isnot specified in req, use self.ssl_context
(will generate a default context according to self.verify_ssl)
2. if verify_ssl isTruein req, generate a default SSL context
3. if verify_ssl isFalsein req, generate a SSL context that
won't verify """ ifnot req.is_ssl(): returnNone
if ssl isNone: # pragma: no cover raise RuntimeError("SSL is not supported.")
sslcontext = req.ssl if isinstance(sslcontext, ssl.SSLContext): return sslcontext if sslcontext isnotTrue: # not verified or fingerprinted return _SSL_CONTEXT_UNVERIFIED
sslcontext = self._ssl if isinstance(sslcontext, ssl.SSLContext): return sslcontext if sslcontext isnotTrue: # not verified or fingerprinted return _SSL_CONTEXT_UNVERIFIED return _SSL_CONTEXT_VERIFIED
def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]:
ret = req.ssl if isinstance(ret, Fingerprint): return ret
ret = self._ssl if isinstance(ret, Fingerprint): return ret returnNone
async def _wrap_create_connection(
self,
*args: Any,
addr_infos: List[aiohappyeyeballs.AddrInfoType],
req: ClientRequest,
timeout: "ClientTimeout",
client_error: Type[Exception] = ClientConnectorError,
**kwargs: Any,
) -> Tuple[asyncio.Transport, ResponseHandler]: try:
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
sock = await aiohappyeyeballs.start_connection(
addr_infos=addr_infos,
local_addr_infos=self._local_addr_infos,
happy_eyeballs_delay=self._happy_eyeballs_delay,
interleave=self._interleave,
loop=self._loop,
) return await self._loop.create_connection(*args, **kwargs, sock=sock) except cert_errors as exc: raise ClientConnectorCertificateError(req.connection_key, exc) from exc except ssl_errors as exc: raise ClientConnectorSSLError(req.connection_key, exc) from exc except OSError as exc: if exc.errno isNoneand isinstance(exc, asyncio.TimeoutError): raise raise client_error(req.connection_key, exc) from exc
async def _wrap_existing_connection(
self,
*args: Any,
req: ClientRequest,
timeout: "ClientTimeout",
client_error: Type[Exception] = ClientConnectorError,
**kwargs: Any,
) -> Tuple[asyncio.Transport, ResponseHandler]: try:
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
): return await self._loop.create_connection(*args, **kwargs) except cert_errors as exc: raise ClientConnectorCertificateError(req.connection_key, exc) from exc except ssl_errors as exc: raise ClientConnectorSSLError(req.connection_key, exc) from exc except OSError as exc: if exc.errno isNoneand isinstance(exc, asyncio.TimeoutError): raise raise client_error(req.connection_key, exc) from exc
def _fail_on_no_start_tls(self, req: "ClientRequest") -> None: """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``.
It is necessary for TLS-in-TLS so that it is possible to
send HTTPS queries through HTTPS proxies.
def _warn_about_tls_in_tls(
self,
underlying_transport: asyncio.Transport,
req: ClientRequest,
) -> None: """Issue a warning if the requested URL has HTTPS scheme.""" if req.request_info.url.scheme != "https": return
async def _start_tls_connection(
self,
underlying_transport: asyncio.Transport,
req: ClientRequest,
timeout: "ClientTimeout",
client_error: Type[Exception] = ClientConnectorError,
) -> Tuple[asyncio.BaseTransport, ResponseHandler]: """Wrap the raw TCP transport with TLS."""
tls_proto = self._factory() # Create a brand new proto for TLS
sslcontext = self._get_ssl_context(req) if TYPE_CHECKING: # _start_tls_connection is unreachable in the current code path # if sslcontext is None. assert sslcontext isnotNone
try:
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
): try:
tls_transport = await self._loop.start_tls(
underlying_transport,
tls_proto,
sslcontext,
server_hostname=req.server_hostname or req.host,
ssl_handshake_timeout=timeout.total,
) except BaseException: # We need to close the underlying transport since # `start_tls()` probably failed before it had a # chance to do this:
underlying_transport.close() raise except cert_errors as exc: raise ClientConnectorCertificateError(req.connection_key, exc) from exc except ssl_errors as exc: raise ClientConnectorSSLError(req.connection_key, exc) from exc except OSError as exc: if exc.errno isNoneand isinstance(exc, asyncio.TimeoutError): raise raise client_error(req.connection_key, exc) from exc except TypeError as type_err: # Example cause looks like this: # TypeError: transport <asyncio.sslproto._SSLProtocolTransport # object at 0x7f760615e460> is not supported by start_tls()
raise ClientConnectionError( "Cannot initialize a TLS-in-TLS connection to host "
f"{req.host!s}:{req.port:d} through an underlying connection "
f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
f"[{type_err!s}]"
) from type_err else: if tls_transport isNone:
msg = "Failed to start TLS (possibly caused by closing transport)" raise client_error(req.connection_key, OSError(msg))
tls_proto.connection_made(
tls_transport
) # Kick the state machine of the new TLS protocol
return tls_transport, tls_proto
def _convert_hosts_to_addr_infos(
self, hosts: List[ResolveResult]
) -> List[aiohappyeyeballs.AddrInfoType]: """Converts the list of hosts to a list of addr_infos.
The list of hosts is the result of a DNS lookup. The list of
addr_infos is the result of a call to `socket.getaddrinfo()`. """
addr_infos: List[aiohappyeyeballs.AddrInfoType] = [] for hinfo in hosts:
host = hinfo["host"]
is_ipv6 = ":"in host
family = socket.AF_INET6 if is_ipv6 else socket.AF_INET if self._family and self._family != family: continue
addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"])
addr_infos.append(
(family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)
) return addr_infos
host = req.url.raw_host assert host isnotNone # Replace multiple trailing dots with a single one. # A trailing dot is only present for fully-qualified domain names. # See https://github.com/aio-libs/aiohttp/pull/7364. if host.endswith(".."):
host = host.rstrip(".") + "."
port = req.port assert port isnotNone try: # Cancelling this lookup should not cancel the underlying lookup # or else the cancel event will get broadcast to all the waiters # across all connections.
hosts = await self._resolve_host(host, port, traces=traces) except OSError as exc: if exc.errno isNoneand isinstance(exc, asyncio.TimeoutError): raise # in case of proxy it is not ClientProxyConnectionError # it is problem of resolving proxy ip itself raise ClientConnectorDNSError(req.connection_key, exc) from exc
last_exc: Optional[Exception] = None
addr_infos = self._convert_hosts_to_addr_infos(hosts) while addr_infos: # Strip trailing dots, certificates contain FQDN without dots. # See https://github.com/aio-libs/aiohttp/issues/3636
server_hostname = (
(req.server_hostname or host).rstrip(".") if sslcontext elseNone
)
if req.is_ssl() and fingerprint: try:
fingerprint.check(transp) except ServerFingerprintMismatch as exc:
transp.close() ifnot self._cleanup_closed_disabled:
self._cleanup_closed_transports.append(transp)
last_exc = exc # Remove the bad peer from the list of addr_infos
sock: socket.socket = transp.get_extra_info("socket")
bad_peer = sock.getpeername()
aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer) continue
return transp, proto else: assert last_exc isnotNone raise last_exc
if req.is_ssl(): if runtime_has_start_tls:
self._warn_about_tls_in_tls(transport, req)
# For HTTPS requests over HTTP proxy # we must notify proxy to tunnel connection # so we send CONNECT command: # CONNECT www.python.org:443 HTTP/1.1 # Host: www.python.org # # next we must do TLS handshake and so on # to do this we must wrap raw socket into secure one # asyncio handles this perfectly
proxy_req.method = hdrs.METH_CONNECT
proxy_req.url = req.url
key = attr.evolve(
req.connection_key, proxy=None, proxy_auth=None, proxy_headers_hash=None
)
conn = Connection(self, key, proto, self._loop)
proxy_resp = await proxy_req.send(conn) try:
protocol = conn._protocol assert protocol isnotNone
# read_until_eof=True will ensure the connection isn't closed # once the response is received and processed allowing # START_TLS to work on the connection below.
protocol.set_response_params(
read_until_eof=runtime_has_start_tls,
timeout_ceil_threshold=self._timeout_ceil_threshold,
)
resp = await proxy_resp.start(conn) except BaseException:
proxy_resp.close()
conn.close() raise else:
conn._protocol = None try: if resp.status != 200:
message = resp.reason if message isNone:
message = HTTPStatus(resp.status).phrase raise ClientHttpProxyError(
proxy_resp.request_info,
resp.history,
status=resp.status,
message=message,
headers=resp.headers,
) ifnot runtime_has_start_tls:
rawsock = transport.get_extra_info("socket", default=None) if rawsock isNone: raise RuntimeError( "Transport does not expose socket instance"
) # Duplicate the socket, so now we can close proxy transport
rawsock = rawsock.dup() except BaseException: # It shouldn't be closed in `finally` because it's fed to # `loop.start_tls()` and the docs say not to touch it after # passing there.
transport.close() raise finally: ifnot runtime_has_start_tls:
transport.close()
ifnot runtime_has_start_tls: # HTTP proxy with support for upgrade to HTTPS
sslcontext = self._get_ssl_context(req) return await self._wrap_existing_connection(
self._factory,
timeout=timeout,
ssl=sslcontext,
sock=rawsock,
server_hostname=req.host,
req=req,
)
return await self._start_tls_connection( # Access the old transport for the last time before it's # closed and forgotten forever:
transport,
req=req,
timeout=timeout,
) finally:
proxy_resp.close()
return transport, proto
class UnixConnector(BaseConnector): """Unix socket connector.
path - Unix socket path.
keepalive_timeout - (optional) Keep-alive timeout.
force_close - Set to True to force close and do reconnect
after each request (and between redirects).
limit - The total number of simultaneous connections.
limit_per_host - Number of simultaneous connections to one host.
loop - Optional event loop. """
path - Windows named pipe path.
keepalive_timeout - (optional) Keep-alive timeout.
force_close - Set to True to force close and do reconnect
after each request (and between redirects).
limit - The total number of simultaneous connections.
limit_per_host - Number of simultaneous connections to one host.
loop - Optional event loop. """
def __init__(
self,
path: str,
force_close: bool = False,
keepalive_timeout: Union[object, float, None] = sentinel,
limit: int = 100,
limit_per_host: int = 0,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
super().__init__(
force_close=force_close,
keepalive_timeout=keepalive_timeout,
limit=limit,
limit_per_host=limit_per_host,
loop=loop,
) ifnot isinstance(
self._loop, asyncio.ProactorEventLoop # type: ignore[attr-defined]
): raise RuntimeError( "Named Pipes only available in proactor ""loop under windows"
)
self._path = path
@property def path(self) -> str: """Path to the named pipe.""" return self._path
async def _create_connection(
self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
) -> ResponseHandler: try:
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
_, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined]
self._factory, self._path
) # the drain is required so that the connection_made is called # and transport is set otherwise it is not set before the # `assert conn.transport is not None` # in client.py's _request method
await asyncio.sleep(0) # other option is to manually set transport like # `proto.transport = trans` except OSError as exc: if exc.errno isNoneand isinstance(exc, asyncio.TimeoutError): raise raise ClientConnectorError(req.connection_key, exc) from exc
return cast(ResponseHandler, proto)
¤ Dauer der Verarbeitung: 0.27 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.