import asyncio import zlib from typing import Any, Awaitable, Callable, NamedTuple, Optional, Union # noqa
from multidict import CIMultiDict
from .abc import AbstractStreamWriter from .base_protocol import BaseProtocol from .client_exceptions import ClientConnectionResetError from .compression_utils import ZLibCompressor from .helpers import NO_EXTENSIONS
def _write(self, chunk: bytes) -> None:
size = len(chunk)
self.buffer_size += size
self.output_size += size
transport = self._protocol.transport if transport isNoneor transport.is_closing(): raise ClientConnectionResetError("Cannot write to closing transport")
transport.write(chunk)
async def write(
self, chunk: bytes, *, drain: bool = True, LIMIT: int = 0x10000
) -> None: """Writes chunk of data to a stream.
write_eof() indicates end of stream.
writer can't be used after write_eof() method being called.
write() return drain future. """ if self._on_chunk_sent isnotNone:
await self._on_chunk_sent(chunk)
if isinstance(chunk, memoryview): if chunk.nbytes != len(chunk): # just reshape it
chunk = chunk.cast("c")
if self._compress isnotNone:
chunk = await self._compress.compress(chunk) ifnot chunk: return
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.