def _reset_heartbeat(self) -> None: if self._heartbeat isNone: return
self._cancel_pong_response_cb()
req = self._req
timeout_ceil_threshold = (
req._protocol._timeout_ceil_threshold if req isnotNoneelse 5
)
loop = self._loop assert loop isnotNone
now = loop.time()
when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold)
self._heartbeat_when = when if self._heartbeat_cb isNone: # We do not cancel the previous heartbeat_cb here because # it generates a significant amount of TimerHandle churn # which causes asyncio to rebuild the heap frequently. # Instead _send_heartbeat() will reschedule the next # heartbeat if it fires too early.
self._heartbeat_cb = loop.call_at(when, self._send_heartbeat)
def _send_heartbeat(self) -> None:
self._heartbeat_cb = None
loop = self._loop assert loop isnotNoneand self._writer isnotNone
now = loop.time() if now < self._heartbeat_when: # Heartbeat fired too early, reschedule
self._heartbeat_cb = loop.call_at(
self._heartbeat_when, self._send_heartbeat
) return
if sys.version_info >= (3, 12): # Optimization for Python 3.12, try to send the ping # immediately to avoid having to schedule # the task on the event loop.
ping_task = asyncio.Task(self._writer.ping(), loop=loop, eager_start=True) else:
ping_task = loop.create_task(self._writer.ping())
def _ping_task_done(self, task: "asyncio.Task[None]") -> None: """Callback for when the ping task completes.""" ifnot task.cancelled() and (exc := task.exception()):
self._handle_ping_pong_exception(exc)
self._ping_task = None
def _pong_not_received(self) -> None: if self._req isnotNoneand self._req.transport isnotNone:
self._handle_ping_pong_exception(asyncio.TimeoutError())
def _handle_ping_pong_exception(self, exc: BaseException) -> None: """Handle exceptions raised during ping/pong processing.""" if self._closed: return
self._set_closed()
self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
self._exception = exc if self._waiting andnot self._closing and self._reader isnotNone:
self._reader.feed_data(WSMessage(WSMsgType.ERROR, exc, None))
def _set_closed(self) -> None: """Set the connection to closed.
Cancel any heartbeat timers and set the closed flag. """
self._closed = True
self._cancel_heartbeat()
async def prepare(self, request: BaseRequest) -> AbstractStreamWriter: # make pre-check to don't hide it by do_handshake() exceptions if self._payload_writer isnotNone: return self._payload_writer
# find common sub-protocol between client and server
protocol = None if hdrs.SEC_WEBSOCKET_PROTOCOL in headers:
req_protocols = [
str(proto.strip()) for proto in headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",")
]
for proto in req_protocols: if proto in self._protocols:
protocol = proto break else: # No overlap found: Return no protocol as per spec
ws_logger.warning( "Client protocols %r don’t overlap server-known ones %r",
req_protocols,
self._protocols,
)
# check supported version
version = headers.get(hdrs.SEC_WEBSOCKET_VERSION, "") if version notin ("13", "8", "7"): raise HTTPBadRequest(text=f"Unsupported version: {version}")
notakeover = False
compress = 0 if self._compress:
extensions = headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS) # Server side always get return with no exception. # If something happened, just drop compress extension
compress, notakeover = ws_ext_parse(extensions, isserver=True) if compress:
enabledext = ws_ext_gen(
compress=compress, isserver=True, server_notakeover=notakeover
)
response_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = enabledext
def get_extra_info(self, name: str, default: Any = None) -> Any: """Get optional transport information.
If no value associated with ``name`` is found, ``default`` is returned. """
writer = self._writer if writer isNone: return default
transport = writer.transport if transport isNone: return default return transport.get_extra_info(name, default)
async def write_eof(self) -> None: # type: ignore[override] if self._eof_sent: return if self._payload_writer isNone: raise RuntimeError("Response has not been started")
reader = self._reader assert reader isnotNone # we need to break `receive()` cycle before we can call # `reader.read()` as `close()` may be called from different task if self._waiting: assert self._loop isnotNone assert self._close_wait isNone
self._close_wait = self._loop.create_future()
reader.feed_data(WS_CLOSING_MESSAGE)
await self._close_wait
if self._closing:
self._close_transport() returnTrue
try:
async with async_timeout.timeout(self._timeout): whileTrue:
msg = await reader.read() if msg.type is WSMsgType.CLOSE:
self._set_code_close_transport(msg.data) returnTrue except asyncio.CancelledError:
self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) raise except Exception as exc:
self._exception = exc
self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) returnTrue
def _set_closing(self, code: WSCloseCode) -> None: """Set the close code and mark the connection as closing."""
self._closing = True
self._close_code = code
self._cancel_heartbeat()
def _set_code_close_transport(self, code: WSCloseCode) -> None: """Set the close code and close the transport."""
self._close_code = code
self._close_transport()
def _close_transport(self) -> None: """Close the transport.""" if self._req isnotNoneand self._req.transport isnotNone:
self._req.transport.close()
receive_timeout = timeout or self._receive_timeout whileTrue: if self._waiting: raise RuntimeError("Concurrent call to receive() is not allowed")
if self._closed:
self._conn_lost += 1 if self._conn_lost >= THRESHOLD_CONNLOST_ACCESS: raise RuntimeError("WebSocket connection is closed.") return WS_CLOSED_MESSAGE elif self._closing: return WS_CLOSING_MESSAGE
try:
self._waiting = True try: if receive_timeout: # Entering the context manager and creating # Timeout() object can take almost 50% of the # run time in this loop so we avoid it if # there is no read timeout.
async with async_timeout.timeout(receive_timeout):
msg = await self._reader.read() else:
msg = await self._reader.read()
self._reset_heartbeat() finally:
self._waiting = False if self._close_wait:
set_result(self._close_wait, None) except asyncio.TimeoutError: raise except EofStream:
self._close_code = WSCloseCode.OK
await self.close() return WSMessage(WSMsgType.CLOSED, None, None) except WebSocketError as exc:
self._close_code = exc.code
await self.close(code=exc.code) return WSMessage(WSMsgType.ERROR, exc, None) except Exception as exc:
self._exception = exc
self._set_closing(WSCloseCode.ABNORMAL_CLOSURE)
await self.close() return WSMessage(WSMsgType.ERROR, exc, None)
if msg.type is WSMsgType.CLOSE:
self._set_closing(msg.data) # Could be closed while awaiting reader. ifnot self._closed and self._autoclose: # The client is likely going to close the # connection out from under us so we do not # want to drain any pending writes as it will # likely result writing to a broken pipe.
await self.close(drain=False) elif msg.type is WSMsgType.CLOSING:
self._set_closing(WSCloseCode.OK) elif msg.type is WSMsgType.PING and self._autoping:
await self.pong(msg.data) continue elif msg.type is WSMsgType.PONG and self._autoping: continue
async def __anext__(self) -> WSMessage:
msg = await self.receive() if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED): raise StopAsyncIteration return msg
def _cancel(self, exc: BaseException) -> None: # web_protocol calls this from connection_lost # or when the server is shutting down.
self._closing = True
self._cancel_heartbeat() if self._reader isnotNone:
set_exception(self._reader, exc)
¤ Dauer der Verarbeitung: 0.19 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.