def _reset_heartbeat(self) -> None: if self._heartbeat isNone: return
self._cancel_pong_response_cb()
loop = self._loop assert loop isnotNone
conn = self._conn
timeout_ceil_threshold = (
conn._connector._timeout_ceil_threshold if conn isnotNoneelse 5
)
now = loop.time()
when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold)
self._heartbeat_when = when if self._heartbeat_cb isNone: # We do not cancel the previous heartbeat_cb here because # it generates a significant amount of TimerHandle churn # which causes asyncio to rebuild the heap frequently. # Instead _send_heartbeat() will reschedule the next # heartbeat if it fires too early.
self._heartbeat_cb = loop.call_at(when, self._send_heartbeat)
def _send_heartbeat(self) -> None:
self._heartbeat_cb = None
loop = self._loop
now = loop.time() if now < self._heartbeat_when: # Heartbeat fired too early, reschedule
self._heartbeat_cb = loop.call_at(
self._heartbeat_when, self._send_heartbeat
) return
if sys.version_info >= (3, 12): # Optimization for Python 3.12, try to send the ping # immediately to avoid having to schedule # the task on the event loop.
ping_task = asyncio.Task(self._writer.ping(), loop=loop, eager_start=True) else:
ping_task = loop.create_task(self._writer.ping())
def get_extra_info(self, name: str, default: Any = None) -> Any: """extra info from connection transport"""
conn = self._response.connection if conn isNone: return default
transport = conn.transport if transport isNone: return default return transport.get_extra_info(name, default)
async def close(self, *, code: int = WSCloseCode.OK, message: bytes = b"") -> bool: # we need to break `receive()` cycle first, # `close()` may be called from different task if self._waiting andnot self._closing: assert self._loop isnotNone
self._close_wait = self._loop.create_future()
self._set_closing()
self._reader.feed_data(WS_CLOSING_MESSAGE, 0)
await self._close_wait
whileTrue: if self._waiting: raise RuntimeError("Concurrent call to receive() is not allowed")
if self._closed: return WS_CLOSED_MESSAGE elif self._closing:
await self.close() return WS_CLOSED_MESSAGE
try:
self._waiting = True try: if receive_timeout: # Entering the context manager and creating # Timeout() object can take almost 50% of the # run time in this loop so we avoid it if # there is no read timeout.
async with async_timeout.timeout(receive_timeout):
msg = await self._reader.read() else:
msg = await self._reader.read()
self._reset_heartbeat() finally:
self._waiting = False if self._close_wait:
set_result(self._close_wait, None) except (asyncio.CancelledError, asyncio.TimeoutError):
self._close_code = WSCloseCode.ABNORMAL_CLOSURE raise except EofStream:
self._close_code = WSCloseCode.OK
await self.close() return WSMessage(WSMsgType.CLOSED, None, None) except ClientError: # Likely ServerDisconnectedError when connection is lost
self._set_closed()
self._close_code = WSCloseCode.ABNORMAL_CLOSURE return WS_CLOSED_MESSAGE except WebSocketError as exc:
self._close_code = exc.code
await self.close(code=exc.code) return WSMessage(WSMsgType.ERROR, exc, None) except Exception as exc:
self._exception = exc
self._set_closing()
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
await self.close() return WSMessage(WSMsgType.ERROR, exc, None)
if msg.type is WSMsgType.CLOSE:
self._set_closing()
self._close_code = msg.data ifnot self._closed and self._autoclose:
await self.close() elif msg.type is WSMsgType.CLOSING:
self._set_closing() elif msg.type is WSMsgType.PING and self._autoping:
await self.pong(msg.data) continue elif msg.type is WSMsgType.PONG and self._autoping: continue
return msg
async def receive_str(self, *, timeout: Optional[float] = None) -> str:
msg = await self.receive(timeout) if msg.type isnot WSMsgType.TEXT: raise TypeError(f"Received message {msg.type}:{msg.data!r} is not str") return cast(str, msg.data)
async def receive_bytes(self, *, timeout: Optional[float] = None) -> bytes:
msg = await self.receive(timeout) if msg.type isnot WSMsgType.BINARY: raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes") return cast(bytes, msg.data)
¤ Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.0.21Bemerkung:
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.