class HeadersParser: def __init__(
self,
max_line_size: int = 8190,
max_headers: int = 32768,
max_field_size: int = 8190,
lax: bool = False,
) -> None:
self.max_line_size = max_line_size
self.max_headers = max_headers
self.max_field_size = max_field_size
self._lax = lax
def parse_headers(
self, lines: List[bytes]
) -> Tuple["CIMultiDictProxy[str]", RawHeaders]:
headers: CIMultiDict[str] = CIMultiDict() # note: "raw" does not mean inclusion of OWS before/after the field value
raw_headers = []
lines_idx = 1
line = lines[1]
line_count = len(lines)
while line: # Parse initial header name : value pair. try:
bname, bvalue = line.split(b":", 1) except ValueError: raise InvalidHeader(line) fromNone
def _is_supported_upgrade(headers: CIMultiDictProxy[str]) -> bool: """Check if the upgrade header is supported.""" return headers.get(hdrs.UPGRADE, "").lower() in {"tcp", "websocket"}
class HttpParser(abc.ABC, Generic[_MsgT]):
lax: ClassVar[bool] = False
# read HTTP message (request/response line + headers), \r\n\r\n # and split by lines if self._payload_parser isNoneandnot self._upgraded:
pos = data.find(SEP, start_pos) # consume \r\n if pos == start_pos andnot self._lines:
start_pos = pos + len(SEP) continue
if pos >= start_pos: if should_close: raise BadHttpMessage("Data after `Connection: close`")
# line found
line = data[start_pos:pos] if SEP == b"\n": # For lax response parsing
line = line.rstrip(b"\r")
self._lines.append(line)
start_pos = pos + len(SEP)
# \r\n\r\n found if self._lines[-1] == EMPTY: try:
msg: _MsgT = self.parse_message(self._lines) finally:
self._lines.clear()
# Shouldn't allow +/- or other number formats. # https://www.rfc-editor.org/rfc/rfc9110#section-8.6-2 # msg.headers is already stripped of leading/trailing wsp ifnot DIGITS.fullmatch(length_hdr): raise InvalidHeader(CONTENT_LENGTH)
return int(length_hdr)
length = get_content_length() # do not support old websocket spec if SEC_WEBSOCKET_KEY1 in msg.headers: raise InvalidHeader(SEC_WEBSOCKET_KEY1)
self._upgraded = msg.upgrade and _is_supported_upgrade(
msg.headers
)
method = getattr(msg, "method", self.method) # code is only present on responses
code = getattr(msg, "code", 0)
Line continuations are supported. Returns list of header name and value pairs. Header name isin upper case. """
headers, raw_headers = self._headers_parser.parse_headers(lines)
close_conn = None
encoding = None
upgrade = False
chunked = False
# https://www.rfc-editor.org/rfc/rfc9110.html#section-5.5-6 # https://www.rfc-editor.org/rfc/rfc9110.html#name-collected-abnf
singletons = (
hdrs.CONTENT_LENGTH,
hdrs.CONTENT_LOCATION,
hdrs.CONTENT_RANGE,
hdrs.CONTENT_TYPE,
hdrs.ETAG,
hdrs.HOST,
hdrs.MAX_FORWARDS,
hdrs.SERVER,
hdrs.TRANSFER_ENCODING,
hdrs.USER_AGENT,
)
bad_hdr = next((h for h in singletons if len(headers.getall(h, ())) > 1), None) if bad_hdr isnotNone: raise BadHttpMessage(f"Duplicate '{bad_hdr}' header found.")
# keep-alive
conn = headers.get(hdrs.CONNECTION) if conn:
v = conn.lower() if v == "close":
close_conn = True elif v == "keep-alive":
close_conn = False # https://www.rfc-editor.org/rfc/rfc9110.html#name-101-switching-protocols elif v == "upgrade"and headers.get(hdrs.UPGRADE):
upgrade = True
# encoding
enc = headers.get(hdrs.CONTENT_ENCODING) if enc:
enc = enc.lower() if enc in ("gzip", "deflate", "br"):
encoding = enc
# chunking
te = headers.get(hdrs.TRANSFER_ENCODING) if te isnotNone: if self._is_chunked_te(te):
chunked = True
if hdrs.CONTENT_LENGTH in headers: raise BadHttpMessage( "Transfer-Encoding can't be present with Content-Length",
)
# version
match = VERSRE.fullmatch(version) if match isNone: raise BadStatusLine(line)
version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
if close isNone: # then the headers weren't set in the request if version_o <= HttpVersion10: # HTTP 1.0 must asks to not close
close = True else: # HTTP 1.1 must ask to close.
close = False
if len(reason) > self.max_line_size: raise LineTooLong( "Status line is too long", str(self.max_line_size), str(len(reason))
)
# version
match = VERSRE.fullmatch(version) if match isNone: raise BadStatusLine(line)
version_o = HttpVersion(int(match.group(1)), int(match.group(2)))
# The status code is a three-digit ASCII number, no padding if len(status) != 3 ornot DIGITS.fullmatch(status): raise BadStatusLine(line)
status_i = int(status)
# toss the CRLF at the end of the chunk if self._chunk == ChunkState.PARSE_CHUNKED_CHUNK_EOF: if self._lax and chunk.startswith(b"\r"):
chunk = chunk[1:] if chunk[: len(SEP)] == SEP:
chunk = chunk[len(SEP) :]
self._chunk = ChunkState.PARSE_CHUNKED_SIZE else:
self._chunk_tail = chunk returnFalse, b""
# if stream does not contain trailer, after 0\r\n # we should get another \r\n otherwise # trailers needs to be skipped until \r\n\r\n if self._chunk == ChunkState.PARSE_MAYBE_TRAILERS:
head = chunk[: len(SEP)] if head == SEP: # end of stream
self.payload.feed_eof() returnTrue, chunk[len(SEP) :] # Both CR and LF, or only LF may not be received yet. It is # expected that CRLF or LF will be shown at the very first # byte next time, otherwise trailers should come. The last # CRLF which marks the end of response might not be # contained in the same TCP segment which delivered the # size indicator. ifnot head: returnFalse, b"" if head == SEP[:1]:
self._chunk_tail = head returnFalse, b""
self._chunk = ChunkState.PARSE_TRAILERS
# read and discard trailer up to the CRLF terminator if self._chunk == ChunkState.PARSE_TRAILERS:
pos = chunk.find(SEP) if pos >= 0:
chunk = chunk[pos + len(SEP) :]
self._chunk = ChunkState.PARSE_MAYBE_TRAILERS else:
self._chunk_tail = chunk returnFalse, b""
# Read all bytes until eof elif self._type == ParseState.PARSE_UNTIL_EOF:
self.payload.feed_data(chunk, len(chunk))
returnFalse, b""
class DeflateBuffer: """DeflateStream decompress stream and feed data into specified stream."""
# RFC1950 # bits 0..3 = CM = 0b1000 = 8 = "deflate" # bits 4..7 = CINFO = 1..7 = windows size. if ( not self._started_decoding and self.encoding == "deflate" and chunk[0] & 0xF != 8
): # Change the decoder to decompress incorrectly compressed data # Actually we should issue a warning about non-RFC-compliant data.
self.decompressor = ZLibDecompressor(
encoding=self.encoding, suppress_deflate_header=True
)
if chunk or self.size > 0:
self.out.feed_data(chunk, len(chunk)) if self.encoding == "deflate"andnot self.decompressor.eof: raise ContentEncodingError("deflate")
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.