def set_status(
self,
status: int,
reason: Optional[str] = None,
) -> None: assert ( not self.prepared
), "Cannot change the response status code after the headers have been sent"
self._set_status(status, reason)
if hdrs.CONTENT_LENGTH in self._headers: raise RuntimeError( "You can't enable chunked encoding when ""a content length is set"
) if chunk_size isnotNone:
warnings.warn("Chunk size is deprecated #1615", DeprecationWarning)
def enable_compression(
self, force: Optional[Union[bool, ContentCoding]] = None
) -> None: """Enables response compression encoding.""" # Backwards compatibility for when force was a bool <0.17. if isinstance(force, bool):
force = ContentCoding.deflate if force else ContentCoding.identity
warnings.warn( "Using boolean for force is deprecated #3318", DeprecationWarning
) elif force isnotNone: assert isinstance(force, ContentCoding), ( "force should one of ""None, bool or ""ContentEncoding"
)
self._compression = True
self._compression_force = force
Sets new cookie or updates existent with new value.
Also updates only those params which are notNone. """
old = self._cookies.get(name) if old isnotNoneand old.coded_value == "": # deleted cookie
self._cookies.pop(name, None)
self._cookies[name] = value
c = self._cookies[name]
if expires isnotNone:
c["expires"] = expires elif c.get("expires") == "Thu, 01 Jan 1970 00:00:00 GMT": del c["expires"]
if domain isnotNone:
c["domain"] = domain
if max_age isnotNone:
c["max-age"] = str(max_age) elif"max-age"in c: del c["max-age"]
c["path"] = path
if secure isnotNone:
c["secure"] = secure if httponly isnotNone:
c["httponly"] = httponly if version isnotNone:
c["version"] = version if samesite isnotNone:
c["samesite"] = samesite
Creates new empty expired cookie. """ # TODO: do we need domain/path here?
self._cookies.pop(name, None)
self.set_cookie(
name, "",
max_age=0,
expires="Thu, 01 Jan 1970 00:00:00 GMT",
domain=domain,
path=path,
)
@property def content_length(self) -> Optional[int]: # Just a placeholder for adding setter return super().content_length
@content_length.setter def content_length(self, value: Optional[int]) -> None: if value isnotNone:
value = int(value) if self._chunked: raise RuntimeError( "You can't set content length when ""chunked encoding is enable"
)
self._headers[hdrs.CONTENT_LENGTH] = str(value) else:
self._headers.pop(hdrs.CONTENT_LENGTH, None)
@property def content_type(self) -> str: # Just a placeholder for adding setter return super().content_type
async def _do_start_compression(self, coding: ContentCoding) -> None: if coding is ContentCoding.identity: return assert self._payload_writer isnotNone
self._headers[hdrs.CONTENT_ENCODING] = coding.value
self._payload_writer.enable_compression(coding.value) # Compressed payload may have different content length, # remove the header
self._headers.popall(hdrs.CONTENT_LENGTH, None)
async def _start_compression(self, request: "BaseRequest") -> None: if self._compression_force:
await self._do_start_compression(self._compression_force) return # Encoding comparisons should be case-insensitive # https://www.rfc-editor.org/rfc/rfc9110#section-8.4.1
accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower() for value, coding in CONTENT_CODINGS.items(): if value in accept_encoding:
await self._do_start_compression(coding) return
headers = self._headers if self._cookies: for cookie in self._cookies.values():
value = cookie.output(header="")[1:]
headers.add(hdrs.SET_COOKIE, value)
if self._compression:
await self._start_compression(request)
if self._chunked: if version != HttpVersion11: raise RuntimeError( "Using chunked encoding is forbidden " "for HTTP/{0.major}.{0.minor}".format(request.version)
) ifnot self._must_be_empty_body:
writer.enable_chunking()
headers[hdrs.TRANSFER_ENCODING] = "chunked" if hdrs.CONTENT_LENGTH in headers: del headers[hdrs.CONTENT_LENGTH] elif self._length_check: # Disabled for WebSockets
writer.length = self.content_length if writer.length isNone: if version >= HttpVersion11: ifnot self._must_be_empty_body:
writer.enable_chunking()
headers[hdrs.TRANSFER_ENCODING] = "chunked" elifnot self._must_be_empty_body:
keep_alive = False
# connection header if hdrs.CONNECTION notin headers: if keep_alive: if version == HttpVersion10:
headers[hdrs.CONNECTION] = "keep-alive" elif version == HttpVersion11:
headers[hdrs.CONNECTION] = "close"
if self._eof_sent: raise RuntimeError("Cannot call write() after write_eof()") if self._payload_writer isNone: raise RuntimeError("Cannot call write() before prepare()")
await self._payload_writer.write(data)
async def drain(self) -> None: assertnot self._eof_sent, "EOF has already been sent" assert self._payload_writer isnotNone, "Response has not been started"
warnings.warn( "drain method is deprecated, use await resp.write()",
DeprecationWarning,
stacklevel=2,
)
await self._payload_writer.drain()
if content_type isnotNoneand"charset"in content_type: raise ValueError("charset must not be in content_type ""argument")
if text isnotNone: if hdrs.CONTENT_TYPE in real_headers: if content_type or charset: raise ValueError( "passing both Content-Type header and " "content_type or charset params " "is forbidden"
) else: # fast path for filling headers ifnot isinstance(text, str): raise TypeError("text argument must be str (%r)" % type(text)) if content_type isNone:
content_type = "text/plain" if charset isNone:
charset = "utf-8"
real_headers[hdrs.CONTENT_TYPE] = content_type + "; charset=" + charset
body = text.encode(charset)
text = None elif hdrs.CONTENT_TYPE in real_headers: if content_type isnotNoneor charset isnotNone: raise ValueError( "passing both Content-Type header and " "content_type or charset params " "is forbidden"
) elif content_type isnotNone: if charset isnotNone:
content_type += "; charset=" + charset
real_headers[hdrs.CONTENT_TYPE] = content_type
@property def content_length(self) -> Optional[int]: if self._chunked: returnNone
if hdrs.CONTENT_LENGTH in self._headers: return int(self._headers[hdrs.CONTENT_LENGTH])
if self._compressed_body isnotNone: # Return length of the compressed body return len(self._compressed_body) elif isinstance(self._body, Payload): # A payload without content length, or a compressed payload returnNone elif self._body isnotNone: return len(self._body) else: return 0
@content_length.setter def content_length(self, value: Optional[int]) -> None: raise RuntimeError("Content length is set automatically")
async def write_eof(self, data: bytes = b"") -> None: if self._eof_sent: return if self._compressed_body isNone:
body: Optional[Union[bytes, Payload]] = self._body else:
body = self._compressed_body assertnot data, f"data arg is not supported, got {data!r}" assert self._req isnotNone assert self._payload_writer isnotNone if body isNoneor self._must_be_empty_body:
await super().write_eof() elif isinstance(self._body, Payload):
await self._body.write(self._payload_writer)
await super().write_eof() else:
await super().write_eof(cast(bytes, body))
async def _start(self, request: "BaseRequest") -> AbstractStreamWriter: if hdrs.CONTENT_LENGTH in self._headers: if should_remove_content_length(request.method, self.status): del self._headers[hdrs.CONTENT_LENGTH] elifnot self._chunked: if isinstance(self._body, Payload): if self._body.size isnotNone:
self._headers[hdrs.CONTENT_LENGTH] = str(self._body.size) else:
body_len = len(self._body) if self._body else"0" # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.6-7 if body_len != "0"or (
self.status != 304 and request.method.upper() != hdrs.METH_HEAD
):
self._headers[hdrs.CONTENT_LENGTH] = str(body_len)
return await super()._start(request)
async def _do_start_compression(self, coding: ContentCoding) -> None: if self._chunked or isinstance(self._body, Payload): return await super()._do_start_compression(coding) if coding is ContentCoding.identity: return # Instead of using _payload_writer.enable_compression, # compress the whole body
compressor = ZLibCompressor(
encoding=coding.value,
max_sync_chunk_size=self._zlib_executor_size,
executor=self._zlib_executor,
) assert self._body isnotNone if self._zlib_executor_size isNoneand len(self._body) > LARGE_BODY_SIZE:
warnings.warn( "Synchronous compression of large response bodies "
f"({len(self._body)} bytes) might block the async event loop. " "Consider providing a custom value to zlib_executor_size/" "zlib_executor response properties or disabling compression on it."
)
self._compressed_body = (
await compressor.compress(self._body) + compressor.flush()
)
self._headers[hdrs.CONTENT_ENCODING] = coding.value
self._headers[hdrs.CONTENT_LENGTH] = str(len(self._compressed_body))
def json_response(
data: Any = sentinel,
*,
text: Optional[str] = None,
body: Optional[bytes] = None,
status: int = 200,
reason: Optional[str] = None,
headers: Optional[LooseHeaders] = None,
content_type: str = "application/json",
dumps: JSONEncoder = json.dumps,
) -> Response: if data isnot sentinel: if text or body: raise ValueError("only one of data, text, or body should be specified") else:
text = dumps(data) return Response(
text=text,
body=body,
status=status,
reason=reason,
headers=headers,
content_type=content_type,
)
¤ Dauer der Verarbeitung: 0.21 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.