_TCHAR: Final[str] = string.digits + string.ascii_letters + r"!#$%&'*+.^_`|~-" # '-' at the end to prevent interpretation as range in a char class
_TOKEN: Final[str] = rf"[{_TCHAR}]+"
_QDTEXT: Final[str] = r"[{}]".format(
r"".join(chr(c) for c in (0x09, 0x20, 0x21) + tuple(range(0x23, 0x7F)))
) # qdtext includes 0x5C to escape 0x5D ('\]') # qdtext excludes obs-text (because obsoleted, and encoding not specified)
Creates and returns a new instance of Request object. If no parameters
are given, an exact copy is returned. If a parameter isnot passed, it
will reuse the one from the current request object. """ if self._read_bytes: raise RuntimeError("Cannot clone request ""after reading its content")
dct: Dict[str, Any] = {} if method isnot sentinel:
dct["method"] = method if rel_url isnot sentinel:
new_url: URL = URL(rel_url)
dct["url"] = new_url
dct["path"] = str(new_url) if headers isnot sentinel: # a copy semantic
dct["headers"] = CIMultiDictProxy(CIMultiDict(headers))
dct["raw_headers"] = tuple(
(k.encode("utf-8"), v.encode("utf-8")) for k, v in dct["headers"].items()
)
message = self._message._replace(**dct)
kwargs = {} if scheme isnot sentinel:
kwargs["scheme"] = scheme if host isnot sentinel:
kwargs["host"] = host if remote isnot sentinel:
kwargs["remote"] = remote if client_max_size is sentinel:
client_max_size = self._client_max_size
Makes an effort to parse Forwarded headers as specified by RFC 7239:
- It adds one (immutable) dictionary per Forwarded 'field-value', ie
per proxy. The element corresponds to the data in the Forwarded
field-value added by the first proxy encountered by the client. Each
subsequent item corresponds to those added by later proxies.
- It checks that every value has valid syntax in general as specified in section 4: either a 'token'or a 'quoted-string'.
- It un-escapes found escape sequences.
- It does NOT validate 'by'and'for' contents as specified in section
6.
- It does NOT validate 'host' contents (Host ABNF).
- It does NOT validate 'proto' contents for valid URI scheme names.
Returns a tuple containing one or more immutable dicts """
elems = [] for field_value in self._message.headers.getall(hdrs.FORWARDED, ()):
length = len(field_value)
pos = 0
need_separator = False
elem: Dict[str, str] = {}
elems.append(types.MappingProxyType(elem)) while 0 <= pos < length:
match = _FORWARDED_PAIR_RE.match(field_value, pos) if match isnotNone: # got a valid forwarded-pair if need_separator: # bad syntax here, skip to next comma
pos = field_value.find(",", pos) else:
name, value, port = match.groups() if value[0] == '"': # quoted string: remove quotes and unescape
value = _QUOTED_PAIR_REPLACE_RE.sub(r"\1", value[1:-1]) if port:
value += port
elem[name.lower()] = value
pos += len(match.group(0))
need_separator = True elif field_value[pos] == ",": # next forwarded-element
need_separator = False
elem = {}
elems.append(types.MappingProxyType(elem))
pos += 1 elif field_value[pos] == ";": # next forwarded-pair
need_separator = False
pos += 1 elif field_value[pos] in" \t": # Allow whitespace even between forwarded-pairs, though # RFC 7239 doesn't. This simplifies code and is in line # with Postel's law.
pos += 1 else: # bad syntax here, skip to next comma
pos = field_value.find(",", pos) return tuple(elems)
@reify def scheme(self) -> str: """A string representing the scheme of the request.
Hostname is resolved in this order:
- overridden value by .clone(scheme=new_scheme) call.
- type of connection to peer: HTTPS if socket is SSL, HTTP otherwise.
'http'or'https'. """ if self._transport_sslcontext: return"https" else: return"http"
@reify def method(self) -> str: """Read only property for getting HTTP method.
The value is upper-cased str like 'GET', 'POST', 'PUT' etc. """ return self._method
@reify def version(self) -> HttpVersion: """Read only property for getting HTTP version of request.
@reify def host(self) -> str: """Hostname of the request.
Hostname is resolved in this order:
- overridden value by .clone(host=new_host) call.
- HOST HTTP header
- socket.getfqdn() value
For example, 'example.com'or'localhost:8080'.
For historical reasons, the port number may be included. """
host = self._message.headers.get(hdrs.HOST) if host isnotNone: return host return socket.getfqdn()
@reify def remote(self) -> Optional[str]: """Remote IP of client initiated HTTP request.
The IP is resolved in this order:
- overridden value by .clone(remote=new_remote) call.
- peername of opened socket """ if self._transport_peername isNone: returnNone if isinstance(self._transport_peername, (list, tuple)): return str(self._transport_peername[0]) return str(self._transport_peername)
@reify def url(self) -> URL: """The full URL of the request.""" # authority is used here because it may include the port number # and we want yarl to parse it correctly return URL.build(scheme=self.scheme, authority=self.host).join(self._rel_url)
@reify def path(self) -> str: """The URL including *PATH INFO* without the host or scheme.
E.g., ``/app/blog`` """ return self._rel_url.path
@reify def path_qs(self) -> str: """The URL including PATH_INFO and the query string.
@reify def query(self) -> "MultiMapping[str]": """A multidict with all the variables in the query string.""" return self._rel_url.query
@reify def query_string(self) -> str: """The query string in the URL.
E.g., id=10 """ return self._rel_url.query_string
@reify def headers(self) -> CIMultiDictProxy[str]: """A case-insensitive multidict proxy with all headers.""" return self._headers
@reify def raw_headers(self) -> RawHeaders: """A sequence of pairs for all headers.""" return self._message.raw_headers
@reify def if_modified_since(self) -> Optional[datetime.datetime]: """The value of If-Modified-Since HTTP header, or None.
This header is represented as a `datetime` object. """ return parse_http_date(self.headers.get(hdrs.IF_MODIFIED_SINCE))
@reify def if_unmodified_since(self) -> Optional[datetime.datetime]: """The value of If-Unmodified-Since HTTP header, or None.
This header is represented as a `datetime` object. """ return parse_http_date(self.headers.get(hdrs.IF_UNMODIFIED_SINCE))
@staticmethod def _etag_values(etag_header: str) -> Iterator[ETag]: """Extract `ETag` objects from raw header.""" if etag_header == ETAG_ANY: yield ETag(
is_weak=False,
value=ETAG_ANY,
) else: for match in LIST_QUOTED_ETAG_RE.finditer(etag_header):
is_weak, value, garbage = match.group(2, 3, 4) # Any symbol captured by 4th group means # that the following sequence is invalid. if garbage: break
A read-only dictionary-like object. """
raw = self.headers.get(hdrs.COOKIE, "")
parsed = SimpleCookie(raw) return MappingProxyType({key: val.value for key, val in parsed.items()})
@reify def http_range(self) -> slice: """The content of Range HTTP header.
Return a slice instance.
"""
rng = self._headers.get(hdrs.RANGE)
start, end = None, None if rng isnotNone: try:
pattern = r"^bytes=(\d*)-(\d*)$"
start, end = re.findall(pattern, rng)[0] except IndexError: # pattern was not found in header raise ValueError("range not in acceptable format")
end = int(end) if end elseNone
start = int(start) if start elseNone
if start isNoneand end isnotNone: # end with no start is to return tail of content
start = -end
end = None
if start isnotNoneand end isnotNone: # end is inclusive in range header, exclusive for slice
end += 1
if start >= end: raise ValueError("start cannot be after end")
if start is end isNone: # No valid range supplied raise ValueError("No start or end of range specified")
Eat unread part of HTTP BODY if present. """ whilenot self._payload.at_eof():
await self._payload.readany()
async def read(self) -> bytes: """Read request body if present.
Returns bytes object with full request content. """ if self._read_bytes isNone:
body = bytearray() whileTrue:
chunk = await self._payload.readany()
body.extend(chunk) if self._client_max_size:
body_size = len(body) if body_size >= self._client_max_size: raise HTTPRequestEntityTooLarge(
max_size=self._client_max_size, actual_size=body_size
) ifnot chunk: break
self._read_bytes = bytes(body) return self._read_bytes
async def text(self) -> str: """Return BODY as text using encoding from .charset."""
bytes_body = await self.read()
encoding = self.charset or"utf-8" return bytes_body.decode(encoding)
async def json(self, *, loads: JSONDecoder = DEFAULT_JSON_DECODER) -> Any: """Return BODY as JSON."""
body = await self.text() return loads(body)
async def multipart(self) -> MultipartReader: """Return async iterator to process BODY as multipart.""" return MultipartReader(self._headers, self._payload)
async def post(self) -> "MultiDictProxy[Union[str, bytes, FileField]]": """Return POST parameters.""" if self._post isnotNone: return self._post if self._method notin self.POST_METHODS:
self._post = MultiDictProxy(MultiDict()) return self._post
# NOTE: Release file descriptors for the # NOTE: `tempfile.Temporaryfile`-created `_io.BufferedRandom` # NOTE: instances of files sent within multipart request body # NOTE: via HTTP POST request. for file_name, file_field_object in self._post.items(): if isinstance(file_field_object, FileField):
file_field_object.file.close()
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.