import asyncio import asyncio.streams import sys import traceback import warnings from collections import deque from contextlib import suppress from html import escape as html_escape from http import HTTPStatus from logging import Logger from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Deque,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
)
import attr import yarl
from .abc import AbstractAccessLogger, AbstractStreamWriter from .base_protocol import BaseProtocol from .helpers import ceil_timeout from .http import (
HttpProcessingError,
HttpRequestParser,
HttpVersion10,
RawRequestMessage,
StreamWriter,
) from .log import access_logger, server_logger from .streams import EMPTY_PAYLOAD, StreamReader from .tcp_helpers import tcp_keepalive from .web_exceptions import HTTPException, HTTPInternalServerError from .web_log import AccessLogger from .web_request import BaseRequest from .web_response import Response, StreamResponse
class RequestHandler(BaseProtocol): """HTTP protocol implementation.
RequestHandler handles incoming HTTP request. It reads request line,
request headers and request payload and calls handle_request() method.
By default it always returns with 404 response.
RequestHandler handles errors in incoming request, like bad
status line, bad headers or incomplete payload. If any error occurs,
connection gets closed.
keepalive_timeout -- number of seconds before closing
keep-alive connection
tcp_keepalive -- TCP keep-alive is on, default is on
debug -- enable debug mode
logger -- custom logger object
access_log_class -- custom classfor access_logger
access_log -- custom logging object
access_log_format -- access log format string
loop -- Optional event loop
max_line_size -- Optional maximum header line size
max_field_size -- Optional maximum header field size
max_headers -- Optional maximum header size
timeout_ceil_threshold -- Optional value to specify
threshold to ceil() timeout
values
We need to clean up everything and stop accepting requests.
It is especially important for keep-alive connections. """
self._force_close = True
if self._keepalive_handle isnotNone:
self._keepalive_handle.cancel()
# Wait for graceful handler completion if self._request_in_progress: # The future is only created when we are shutting # down while the handler is still processing a request # to avoid creating a future for every request.
self._handler_waiter = self._loop.create_future() try:
async with ceil_timeout(timeout):
await self._handler_waiter except (asyncio.CancelledError, asyncio.TimeoutError):
self._handler_waiter = None if (
sys.version_info >= (3, 11) and (task := asyncio.current_task()) and task.cancelling()
): raise # Then cancel handler and wait try:
async with ceil_timeout(timeout): if self._current_request isnotNone:
self._current_request._cancel(asyncio.CancelledError())
if self._task_handler isnotNoneandnot self._task_handler.done():
await asyncio.shield(self._task_handler) except (asyncio.CancelledError, asyncio.TimeoutError): if (
sys.version_info >= (3, 11) and (task := asyncio.current_task()) and task.cancelling()
): raise
# force-close non-idle handler if self._task_handler isnotNone:
self._task_handler.cancel()
:param bool val: new state. """
self._keepalive = val if self._keepalive_handle:
self._keepalive_handle.cancel()
self._keepalive_handle = None
def close(self) -> None: """Close connection.
Stop accepting new pipelining messages and close
connection when handlers done processing messages. """
self._close = True if self._waiter:
self._waiter.cancel()
def force_close(self) -> None: """Forcefully close connection."""
self._force_close = True if self._waiter:
self._waiter.cancel() if self.transport isnotNone:
self.transport.close()
self.transport = None
It reads request line, request headers and request payload, then
calls handle_request() method. Subclass has to override
handle_request(). start() handles various exceptions in request or response handling. Connection is being closed always unless
keep_alive(True) specified. """
loop = self._loop
handler = asyncio.current_task(loop) assert handler isnotNone
manager = self._manager assert manager isnotNone
keepalive_timeout = self._keepalive_timeout
resp = None assert self._request_factory isnotNone assert self._request_handler isnotNone
whilenot self._force_close: ifnot self._messages: try: # wait for next request
self._waiter = loop.create_future()
await self._waiter finally:
self._waiter = None
message, payload = self._messages.popleft()
start = loop.time()
manager.requests_count += 1
writer = StreamWriter(self, loop) if isinstance(message, _ErrInfo): # make request_factory work
request_handler = self._make_error_handler(message)
message = ERROR else:
request_handler = self._request_handler
request = self._request_factory(message, payload, self, writer, handler) try: # a new task is used for copy context vars (#3406)
coro = self._handle_request(request, start, request_handler) if sys.version_info >= (3, 12):
task = asyncio.Task(coro, loop=loop, eager_start=True) else:
task = loop.create_task(coro) try:
resp, reset = await task except ConnectionError:
self.log_debug("Ignored premature client disconnection") break
# Drop the processed task from asyncio.Task.all_tasks() early del task if reset:
self.log_debug("Ignored premature client disconnection 2") break
# notify server about keep-alive
self._keepalive = bool(resp.keep_alive)
# check payload ifnot payload.is_eof():
lingering_time = self._lingering_time ifnot self._force_close and lingering_time:
self.log_debug( "Start lingering close timer for %s sec.", lingering_time
)
now = loop.time()
end_t = now + lingering_time
try: whilenot payload.is_eof() and now < end_t:
async with ceil_timeout(end_t - now): # read and ignore
await payload.readany()
now = loop.time() except (asyncio.CancelledError, asyncio.TimeoutError): if (
sys.version_info >= (3, 11) and (t := asyncio.current_task()) and t.cancelling()
): raise
# if payload still uncompleted ifnot payload.is_eof() andnot self._force_close:
self.log_debug("Uncompleted request.")
self.close()
payload.set_exception(_PAYLOAD_ACCESS_ERROR)
except asyncio.CancelledError:
self.log_debug("Ignored premature client disconnection") raise except Exception as exc:
self.log_exception("Unhandled exception", exc_info=exc)
self.force_close() finally: if self.transport isNoneand resp isnotNone:
self.log_debug("Ignored premature client disconnection.") elifnot self._force_close: if self._keepalive andnot self._close: # start keep-alive timer if keepalive_timeout isnotNone:
now = loop.time()
close_time = now + keepalive_timeout
self._next_keepalive_close_time = close_time if self._keepalive_handle isNone:
self._keepalive_handle = loop.call_at(
close_time, self._process_keepalive
) else: break
# remove handler, close transport if no handlers left ifnot self._force_close:
self._task_handler = None if self.transport isnotNone:
self.transport.close()
async def finish_response(
self, request: BaseRequest, resp: StreamResponse, start_time: float
) -> Tuple[StreamResponse, bool]: """Prepare the response and write_eof, then log access.
This has to
be called within the context of any exception so the access logger
can get exception information. Returns Trueif the client disconnects
prematurely. """
request._finish() if self._request_parser isnotNone:
self._request_parser.set_upgraded(False)
self._upgrade = False if self._message_tail:
self._request_parser.feed_data(self._message_tail)
self._message_tail = b"" try:
prepare_meth = resp.prepare except AttributeError: if resp isNone:
self.log_exception("Missing return statement on request handler") else:
self.log_exception( "Web-handler should return a response instance, " "got {!r}".format(resp)
)
exc = HTTPInternalServerError()
resp = Response(
status=exc.status, reason=exc.reason, text=exc.text, headers=exc.headers
)
prepare_meth = resp.prepare try:
await prepare_meth(request)
await resp.write_eof() except ConnectionError:
self.log_access(request, resp, start_time) return resp, True
Returns HTTP response with specific status code. Logs additional
information. It always closes current connection. """
self.log_exception("Error handling request", exc_info=exc)
# some data already got sent, connection is broken if request.writer.output_size > 0: raise ConnectionError( "Response is sent already, cannot send another response " "with the error message"
)
ct = "text/plain" if status == HTTPStatus.INTERNAL_SERVER_ERROR:
title = "{0.value} {0.phrase}".format(HTTPStatus.INTERNAL_SERVER_ERROR)
msg = HTTPStatus.INTERNAL_SERVER_ERROR.description
tb = None if self.debug: with suppress(Exception):
tb = traceback.format_exc()
¤ Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.0.42Bemerkung:
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.