async def compress(self, data: bytes) -> bytes: """Compress the data and returned the compressed bytes.
Note that flush() must be called after the last call to compress()
If the data size is large than the max_sync_chunk_size, the compression
will be done in the executor. Otherwise, the compression will be done in the event loop. """
async with self._compress_lock: # To ensure the stream is consistent in the event # there are multiple writers, we need to lock # the compressor so that only one writer can # compress at a time. if (
self._max_sync_chunk_size isnotNone and len(data) > self._max_sync_chunk_size
): return await asyncio.get_running_loop().run_in_executor(
self._executor, self._compressor.compress, data
) return self.compress_sync(data)
def flush(self, mode: int = zlib.Z_FINISH) -> bytes: return self._compressor.flush(mode)
async def decompress(self, data: bytes, max_length: int = 0) -> bytes: """Decompress the data and return the decompressed bytes.
If the data size is large than the max_sync_chunk_size, the decompression
will be done in the executor. Otherwise, the decompression will be done in the event loop. """ if (
self._max_sync_chunk_size isnotNone and len(data) > self._max_sync_chunk_size
): return await asyncio.get_running_loop().run_in_executor(
self._executor, self._decompressor.decompress, data, max_length
) return self.decompress_sync(data, max_length)
def flush(self, length: int = 0) -> bytes: return (
self._decompressor.flush(length) if length > 0 else self._decompressor.flush()
)
class BrotliDecompressor: # Supports both 'brotlipy' and 'Brotli' packages # since they share an import name. The top branches # are for 'brotlipy' and bottom branches for 'Brotli' def __init__(self) -> None: ifnot HAS_BROTLI: raise RuntimeError( "The brotli decompression is not available. " "Please install `Brotli` module"
)
self._obj = brotli.Decompressor()
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.