import asyncio from contextlib import suppress from typing import Any, Optional, Tuple
from .base_protocol import BaseProtocol from .client_exceptions import (
ClientOSError,
ClientPayloadError,
ServerDisconnectedError,
SocketTimeoutError,
) from .helpers import (
_EXC_SENTINEL,
BaseTimerContext,
set_exception,
status_code_must_be_empty_body,
) from .http import HttpResponseParser, RawResponseMessage from .http_exceptions import HttpProcessingError from .streams import EMPTY_PAYLOAD, DataQueue, StreamReader
class ResponseHandler(BaseProtocol, DataQueue[Tuple[RawResponseMessage, StreamReader]]): """Helper class to adapt between Protocol and StreamReader."""
@property def should_close(self) -> bool: return (
self._should_close or (self._payload isnotNoneandnot self._payload.is_eof()) or self._upgraded or self._exception isnotNone or self._payload_parser isnotNone or bool(self._buffer) or bool(self._tail)
)
if self._payload_parser isnotNone: with suppress(Exception): # FIXME: log this somehow?
self._payload_parser.feed_eof()
uncompleted = None if self._parser isnotNone: try:
uncompleted = self._parser.feed_eof() except Exception as underlying_exc: if self._payload isnotNone:
client_payload_exc_msg = (
f"Response payload is not completed: {underlying_exc !r}"
) ifnot connection_closed_cleanly:
client_payload_exc_msg = (
f"{client_payload_exc_msg !s}. "
f"{original_connection_error !r}"
)
set_exception(
self._payload,
ClientPayloadError(client_payload_exc_msg),
underlying_exc,
)
ifnot self.is_eof(): if isinstance(original_connection_error, OSError):
reraised_exc = ClientOSError(*original_connection_error.args) if connection_closed_cleanly:
reraised_exc = ServerDisconnectedError(uncompleted) # assigns self._should_close to True as side effect, # we do it anyway below
underlying_non_eof_exc = (
_EXC_SENTINEL if connection_closed_cleanly else original_connection_error
) assert underlying_non_eof_exc isnotNone assert reraised_exc isnotNone
self.set_exception(reraised_exc, underlying_non_eof_exc)
def set_parser(self, parser: Any, payload: Any) -> None: # TODO: actual types are: # parser: WebSocketReader # payload: FlowControlDataQueue # but they are not generi enough # Need an ABC for both types
self._payload = payload
self._payload_parser = parser
self._drop_timeout()
if self._tail:
data, self._tail = self._tail, b""
self.data_received(data)
def _on_read_timeout(self) -> None:
exc = SocketTimeoutError("Timeout on reading data from socket")
self.set_exception(exc) if self._payload isnotNone:
set_exception(self._payload, exc)
# custom payload parser if self._payload_parser isnotNone:
eof, tail = self._payload_parser.feed_data(data) if eof:
self._payload = None
self._payload_parser = None
if tail:
self.data_received(tail) return else: if self._upgraded or self._parser isNone: # i.e. websocket connection, websocket parser is not set yet
self._tail += data else: # parse http messages try:
messages, upgraded, tail = self._parser.feed_data(data) except BaseException as underlying_exc: if self.transport isnotNone: # connection.release() could be called BEFORE # data_received(), the transport is already # closed in this case
self.transport.close() # should_close is True after the call if isinstance(underlying_exc, HttpProcessingError):
exc = HttpProcessingError(
code=underlying_exc.code,
message=underlying_exc.message,
headers=underlying_exc.headers,
) else:
exc = HttpProcessingError()
self.set_exception(exc, underlying_exc) return
self._upgraded = upgraded
payload: Optional[StreamReader] = None for message, payload in messages: if message.should_close:
self._should_close = True
self._payload = payload
if self._skip_payload or status_code_must_be_empty_body(
message.code
):
self.feed_data((message, EMPTY_PAYLOAD), 0) else:
self.feed_data((message, payload), 0) if payload isnotNone: # new message(s) was processed # register timeout handler unsubscribing # either on end-of-stream or immediately for # EMPTY_PAYLOAD if payload isnot EMPTY_PAYLOAD:
payload.on_eof(self._drop_timeout) else:
self._drop_timeout()
if tail: if upgraded:
self.data_received(tail) else:
self._tail = tail
¤ Dauer der Verarbeitung: 0.15 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.