else:
failed = True if is_quoted(value):
failed = False
value = unescape(value[1:-1].lstrip("\\/")) elif is_token(value):
failed = False elif parts: # maybe just ; in filename, in any case this is just # one case fix, for proper fix we need to redesign parser
_value = f"{value};{parts[0]}" if is_quoted(_value):
parts.pop(0)
value = unescape(_value[1:-1].lstrip("\\/"))
failed = False
if failed:
warnings.warn(BadContentDispositionHeader(header)) returnNone, {}
params[key] = value
return disptype.lower(), params
def content_disposition_filename(
params: Mapping[str, str], name: str = "filename"
) -> Optional[str]:
name_suf = "%s*" % name ifnot params: returnNone elif name_suf in params: return params[name_suf] elif name in params: return params[name] else:
parts = []
fnparams = sorted(
(key, value) for key, value in params.items() if key.startswith(name_suf)
) for num, (key, value) in enumerate(fnparams):
_, tail = key.split("*", 1) if tail.endswith("*"):
tail = tail[:-1] if tail == str(num):
parts.append(value) else: break ifnot parts: returnNone
value = "".join(parts) if"'"in value:
encoding, _, value = value.split("'", 2)
encoding = encoding or"utf-8" return unquote(value, encoding, "strict") return value
class MultipartResponseWrapper: """Wrapper around the MultipartReader.
It takes care about
underlying connection and close it when it needs in. """
async def __anext__(
self,
) -> Union["MultipartReader", "BodyPartReader"]:
part = await self.next() if part isNone: raise StopAsyncIteration return part
def at_eof(self) -> bool: """Returns True when all response data had been read.""" return self.resp.content.at_eof()
async def read(self, *, decode: bool = False) -> bytes: """Reads body part data.
decode: Decodes data following by encoding
method from Content-Encoding header. If it missed
data remains untouched """ if self._at_eof: return b""
data = bytearray() whilenot self._at_eof:
data.extend(await self.read_chunk(self.chunk_size)) if decode: return self.decode(data) return data
async def read_chunk(self, size: int = chunk_size) -> bytes: """Reads body part content chunk of the specified size.
size: chunk size """ if self._at_eof: return b"" if self._length:
chunk = await self._read_chunk_from_length(size) else:
chunk = await self._read_chunk_from_stream(size)
# For the case of base64 data, we must read a fragment of size with a # remainder of 0 by dividing by 4 for string without symbols \n or \r
encoding = self.headers.get(CONTENT_TRANSFER_ENCODING) if encoding and encoding.lower() == "base64":
stripped_chunk = b"".join(chunk.split())
remainder = len(stripped_chunk) % 4
self._read_bytes += len(chunk) if self._read_bytes == self._length:
self._at_eof = True if self._at_eof:
clrf = await self._content.readline() assert (
b"\r\n" == clrf
), "reader did not read all the data or it is malformed" return chunk
async def _read_chunk_from_length(self, size: int) -> bytes: # Reads body part content chunk of the specified size. # The body part must has Content-Length header with proper value. assert self._length isnotNone, "Content-Length required for chunked read"
chunk_size = min(size, self._length - self._read_bytes)
chunk = await self._content.read(chunk_size) if self._content.at_eof():
self._at_eof = True return chunk
async def _read_chunk_from_stream(self, size: int) -> bytes: # Reads content chunk of body part with unknown length. # The Content-Length header for body part is not necessary. assert (
size >= self._boundary_len
), "Chunk size must be greater or equal than boundary length + 2"
first_chunk = self._prev_chunk isNone if first_chunk:
self._prev_chunk = await self._content.read(size)
chunk = b"" # content.read() may return less than size, so we need to loop to ensure # we have enough data to detect the boundary. while len(chunk) < self._boundary_len:
chunk += await self._content.read(size)
self._content_eof += int(self._content.at_eof()) assert self._content_eof < 3, "Reading after EOF" if self._content_eof: break if len(chunk) > size:
self._content.unread_data(chunk[size:])
chunk = chunk[:size]
assert self._prev_chunk isnotNone
window = self._prev_chunk + chunk
sub = b"\r\n" + self._boundary if first_chunk:
idx = window.find(sub) else:
idx = window.find(sub, max(0, len(self._prev_chunk) - len(sub))) if idx >= 0: # pushing boundary back to content with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
self._content.unread_data(window[idx:]) if size > idx:
self._prev_chunk = self._prev_chunk[:idx]
chunk = window[len(self._prev_chunk) : idx] ifnot chunk:
self._at_eof = True
result = self._prev_chunk
self._prev_chunk = chunk return result
async def readline(self) -> bytes: """Reads body part by line by line.""" if self._at_eof: return b""
if self._unread:
line = self._unread.popleft() else:
line = await self._content.readline()
if line.startswith(self._boundary): # the very last boundary may not come with \r\n, # so set single rules for everyone
sline = line.rstrip(b"\r\n")
boundary = self._boundary
last_boundary = self._boundary + b"--" # ensure that we read exactly the boundary, not something alike if sline == boundary or sline == last_boundary:
self._at_eof = True
self._unread.append(line) return b"" else:
next_line = await self._content.readline() if next_line.startswith(self._boundary):
line = line[:-2] # strip CRLF but only once
self._unread.append(next_line)
return line
async def release(self) -> None: """Like read(), but reads all the data to the void.""" if self._at_eof: return whilenot self._at_eof:
await self.read_chunk(self.chunk_size)
Decoding is done according the specified Content-Encoding or Content-Transfer-Encoding headers value. """ if CONTENT_TRANSFER_ENCODING in self.headers:
data = self._decode_content_transfer(data) # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8 ifnot self._is_form_data and CONTENT_ENCODING in self.headers: return self._decode_content(data) return data
def _decode_content(self, data: bytes) -> bytes:
encoding = self.headers.get(CONTENT_ENCODING, "").lower() if encoding == "identity": return data if encoding in {"deflate", "gzip"}: return ZLibDecompressor(
encoding=encoding,
suppress_deflate_header=True,
).decompress_sync(data)
if encoding == "base64": return base64.b64decode(data) elif encoding == "quoted-printable": return binascii.a2b_qp(data) elif encoding in ("binary", "8bit", "7bit"): return data else: raise RuntimeError( "unknown content transfer encoding: {}""".format(encoding)
)
def get_charset(self, default: str) -> str: """Returns charset parameter from Content-Type header or default."""
ctype = self.headers.get(CONTENT_TYPE, "")
mimetype = parse_mimetype(ctype) return mimetype.parameters.get("charset", self._default_charset or default)
@reify def name(self) -> Optional[str]: """Returns name specified in Content-Disposition header.
If the header is missing or malformed, returns None. """
_, params = parse_content_disposition(self.headers.get(CONTENT_DISPOSITION)) return content_disposition_filename(params, "name")
class MultipartReader: """Multipart body reader."""
#: Response wrapper, used when multipart readers constructs from response.
response_wrapper_cls = MultipartResponseWrapper #: Multipart reader class, used to handle multipart/* body parts. #: None points to type(self)
multipart_reader_cls: Optional[Type["MultipartReader"]] = None #: Body part reader class for non multipart/* content types.
part_reader_cls = BodyPartReader
async def __anext__(
self,
) -> Optional[Union["MultipartReader", BodyPartReader]]:
part = await self.next() if part isNone: raise StopAsyncIteration return part
def at_eof(self) -> bool: """Returns True if the final boundary was reached, false otherwise.""" return self._at_eof
async def next(
self,
) -> Optional[Union["MultipartReader", BodyPartReader]]: """Emits the next multipart body part.""" # So, if we're at BOF, we need to skip till the boundary. if self._at_eof: returnNone
await self._maybe_release_last_part() if self._at_bof:
await self._read_until_first_boundary()
self._at_bof = False else:
await self._read_boundary() if self._at_eof: # we just read the last boundary, nothing to do there returnNone
part = await self.fetch_next_part() # https://datatracker.ietf.org/doc/html/rfc7578#section-4.6 if (
self._last_part isNone and self._mimetype.subtype == "form-data" and isinstance(part, BodyPartReader)
):
_, params = parse_content_disposition(part.headers.get(CONTENT_DISPOSITION)) if params.get("name") == "_charset_": # Longest encoding in https://encoding.spec.whatwg.org/encodings.json # is 19 characters, so 32 should be more than enough for any valid encoding.
charset = await part.read_chunk(32) if len(charset) > 31: raise RuntimeError("Invalid default charset")
self._default_charset = charset.strip().decode()
part = await self.fetch_next_part()
self._last_part = part return self._last_part
async def release(self) -> None: """Reads all the body parts to the void till the final boundary.""" whilenot self._at_eof:
item = await self.next() if item isNone: break
await item.release()
async def fetch_next_part(
self,
) -> Union["MultipartReader", BodyPartReader]: """Returns the next body part reader."""
headers = await self._read_headers() return self._get_part_reader(headers)
def _get_part_reader(
self,
headers: "CIMultiDictProxy[str]",
) -> Union["MultipartReader", BodyPartReader]: """Dispatches the response by the `Content-Type` header.
# the epilogue is expected and then either the end of input or the # parent multipart boundary, if the parent boundary is found then # it should be marked as unread and handed to the parent for # processing if next_line[:2] == b"--":
self._unread.append(next_line) # otherwise the request is likely missing an epilogue and both # lines should be passed to the parent for processing # (this handles the old behavior gracefully) else:
self._unread.extend([next_line, epilogue]) else: raise ValueError(f"Invalid boundary {chunk!r}, expected {self._boundary!r}")
async def _maybe_release_last_part(self) -> None: """Ensures that the last read body part is read completely.""" if self._last_part isnotNone: ifnot self._last_part.at_eof():
await self._last_part.release()
self._unread.extend(self._last_part._unread)
self._last_part = None
_Part = Tuple[Payload, str, str]
class MultipartWriter(Payload): """Multipart body writer."""
_value: None
def __init__(self, subtype: str = "mixed", boundary: Optional[str] = None) -> None:
boundary = boundary if boundary isnotNoneelse uuid.uuid4().hex # The underlying Payload API demands a str (utf-8), not bytes, # so we need to ensure we don't lose anything during conversion. # As a result, require the boundary to be ASCII only. # In both situations.
try:
self._boundary = boundary.encode("ascii") except UnicodeEncodeError: raise ValueError("boundary should contain ASCII only chars") fromNone
ctype = f"multipart/{subtype}; boundary={self._boundary_value}"
def append_payload(self, payload: Payload) -> Payload: """Adds a new body part to multipart writer."""
encoding: Optional[str] = None
te_encoding: Optional[str] = None if self._is_form_data: # https://datatracker.ietf.org/doc/html/rfc7578#section-4.7 # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8 assert ( not {CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TRANSFER_ENCODING}
& payload.headers.keys()
) # Set default Content-Disposition in case user doesn't create one if CONTENT_DISPOSITION notin payload.headers:
name = f"section-{len(self._parts)}"
payload.set_content_disposition("form-data", name=name) else: # compression
encoding = payload.headers.get(CONTENT_ENCODING, "").lower() if encoding and encoding notin ("deflate", "gzip", "identity"): raise RuntimeError(f"unknown content encoding: {encoding}") if encoding == "identity":
encoding = None
# te encoding
te_encoding = payload.headers.get(CONTENT_TRANSFER_ENCODING, "").lower() if te_encoding notin ("", "base64", "quoted-printable", "binary"): raise RuntimeError(f"unknown content transfer encoding: {te_encoding}") if te_encoding == "binary":
te_encoding = None
# size
size = payload.size if size isnotNoneandnot (encoding or te_encoding):
payload.headers[CONTENT_LENGTH] = str(size)
@property def size(self) -> Optional[int]: """Size of the payload."""
total = 0 for part, encoding, te_encoding in self._parts: if encoding or te_encoding or part.size isNone: returnNone
if encoding or te_encoding:
w = MultipartPayloadWriter(writer) if encoding:
w.enable_compression(encoding) if te_encoding:
w.enable_encoding(te_encoding)
await part.write(w) # type: ignore[arg-type]
await w.write_eof() else:
await part.write(writer)
await writer.write(b"\r\n")
if close_boundary:
await writer.write(b"--" + self._boundary + b"--\r\n")
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.