def _reset_heartbeat(self) -> None: if self._heartbeat isNone: return
self._cancel_pong_response_cb()
loop = self._loop assert loop isnotNone
conn = self._conn
timeout_ceil_threshold = (
conn._connector._timeout_ceil_threshold if conn isnotNoneelse 5
)
now = loop.time()
when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold)
self._heartbeat_when = when if self._heartbeat_cb isNone: # We do not cancel the previous heartbeat_cb here because # it generates a significant amount of TimerHandle churn # which causes asyncio to rebuild the heap frequently. # Instead _send_heartbeat() will reschedule the next # heartbeat if it fires too early.
self._heartbeat_cb = loop.call_at(when, self._send_heartbeat)
def _send_heartbeat(self) -> None:
self._heartbeat_cb = None
loop = self._loop
now = loop.time() if now < self._heartbeat_when: # Heartbeat fired too early, reschedule
self._heartbeat_cb = loop.call_at(
self._heartbeat_when, self._send_heartbeat
) return
if sys.version_info >= (3, 12): # Optimization for Python 3.12, try to send the ping # immediately to avoid having to schedule # the task on the event loop.
ping_task = asyncio.Task(self._writer.ping(), loop=loop, eager_start=True) else:
ping_task = loop.create_task(self._writer.ping())
def get_extra_info(self, name: str, default: Any = None) -> Any: """extra info from connection transport"""
conn = self._response.connection if conn isNone: return default
transport = conn.transport if transport isNone: return default return transport.get_extra_info(name, default)
async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bool: # we need to break `receive()` cycle first, # `close()` may be called from different task if self._waiting andnot self._closing: assert self._loop isnotNone
self._close_wait = self._loop.create_future()
self._set_closing()
self._reader.feed_data(WS_CLOSING_MESSAGE, 0)
await self._close_wait
whileTrue: if self._waiting: raise RuntimeError("Concurrent call to receive() is not allowed")
if self._closed: return WS_CLOSED_MESSAGE elif self._closing:
await self.close() return WS_CLOSED_MESSAGE
try:
self._waiting = True try: if receive_timeout: # Entering the context manager and creating # Timeout() object can take almost 50% of the # run time in this loop so we avoid it if # there is no read timeout.
async with async_timeout.timeout(receive_timeout):
msg = await self._reader.read() else:
msg = await self._reader.read()
self._reset_heartbeat() finally:
self._waiting = False if self._close_wait:
set_result(self._close_wait, None) except (asyncio.CancelledError, asyncio.TimeoutError):
self._close_code = WSCloseCode.ABNORMAL_CLOSURE raise except EofStream:
self._close_code = WSCloseCode.OK
await self.close() return WSMessage(WSMsgType.CLOSED, None, None) except ClientError: # Likely ServerDisconnectedError when connection is lost
self._set_closed()
self._close_code = WSCloseCode.ABNORMAL_CLOSURE return WS_CLOSED_MESSAGE except WebSocketError as exc:
self._close_code = exc.code
await self.close(code=exc.code) return WSMessage(WSMsgType.ERROR, exc, None) except Exception as exc:
self._exception = exc
self._set_closing()
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
await self.close() return WSMessage(WSMsgType.ERROR, exc, None)
if msg.type is WSMsgType.CLOSE:
self._set_closing()
self._close_code = msg.data ifnot self._closed and self._autoclose:
await self.close() elif msg.type is WSMsgType.CLOSING:
self._set_closing() elif msg.type is WSMsgType.PING and self._autoping:
await self.pong(msg.data) continue elif msg.type is WSMsgType.PONG and self._autoping: continue
return msg
async def receive_str(self, *, timeout: Optional[float] = None) -> str:
msg = await self.receive(timeout) if msg.type isnot WSMsgType.TEXT: raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str") return cast(str, msg.data)
async def receive_bytes(self, *, timeout: Optional[float] = None) -> bytes:
msg = await self.receive(timeout) if msg.type isnot WSMsgType.BINARY: raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes") return cast(bytes, msg.data)
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.