import asyncio import functools import json import random import re import sys import zlib from enum import IntEnum from functools import partial from struct import Struct from typing import (
Any,
Callable,
Final,
List,
NamedTuple,
Optional,
Pattern,
Set,
Tuple,
Union,
cast,
)
from .base_protocol import BaseProtocol from .client_exceptions import ClientConnectionResetError from .compression_utils import ZLibCompressor, ZLibDecompressor from .helpers import NO_EXTENSIONS, set_exception from .streams import DataQueue
ALLOWED_CLOSE_CODES: Final[Set[int]] = {int(i) for i in WSCloseCode}
# For websockets, keeping latency low is extremely important as implementations # generally expect to be able to send and receive messages quickly. We use a # larger chunk size than the default to reduce the number of executor calls # since the executor is a significant source of latency and overhead when # the chunks are small. A size of 5KiB was chosen because it is also the # same value python-zlib-ng choose to use as the threshold to release the GIL.
WEBSOCKET_MAX_SYNC_CHUNK_SIZE = 5 * 1024
class WSMsgType(IntEnum): # websocket spec types
CONTINUATION = 0x0
TEXT = 0x1
BINARY = 0x2
PING = 0x9
PONG = 0xA
CLOSE = 0x8
class WSMessage(NamedTuple):
type: WSMsgType # To type correctly, this would need some kind of tagged union for each type.
data: Any
extra: Optional[str]
`mask` is a `bytes` object of length 4; `data` is a `bytearray`
object of any length. The contents of `data` are masked with `mask`, as specified in section 5.3 of RFC 6455.
Note that this function mutates the `data` argument.
This pure-python implementation may be replaced by an optimized
version when available.
""" assert isinstance(data, bytearray), data assert len(mask) == 4, mask
if data:
_XOR_TABLE = _xor_table()
a, b, c, d = (_XOR_TABLE[n] for n in mask)
data[::4] = data[::4].translate(a)
data[1::4] = data[1::4].translate(b)
data[2::4] = data[2::4].translate(c)
data[3::4] = data[3::4].translate(d)
if NO_EXTENSIONS: # pragma: no cover
_websocket_mask = _websocket_mask_python else: try: from ._websocket import _websocket_mask_cython # type: ignore[import-not-found]
compress = 0
notakeover = False for ext in _WS_EXT_RE_SPLIT.finditer(extstr):
defext = ext.group(1) # Return compress = 15 when get `permessage-deflate` ifnot defext:
compress = 15 break
match = _WS_EXT_RE.match(defext) if match:
compress = 15 if isserver: # Server never fail to detect compress handshake. # Server does not need to send max wbit to client if match.group(4):
compress = int(match.group(4)) # Group3 must match if group4 matches # Compress wbit 8 does not support in zlib # If compress level not support, # CONTINUE to next extension if compress > 15 or compress < 9:
compress = 0 continue if match.group(1):
notakeover = True # Ignore regex group 5 & 6 for client_max_window_bits break else: if match.group(6):
compress = int(match.group(6)) # Group5 must match if group6 matches # Compress wbit 8 does not support in zlib # If compress level not support, # FAIL the parse progress if compress > 15 or compress < 9: raise WSHandshakeError("Invalid window size") if match.group(2):
notakeover = True # Ignore regex group 5 & 6 for client_max_window_bits break # Return Fail if client side and not match elifnot isserver: raise WSHandshakeError("Extension for deflate not supported" + ext.group(1))
return compress, notakeover
def ws_ext_gen(
compress: int = 15, isserver: bool = False, server_notakeover: bool = False
) -> str: # client_notakeover=False not used for server # compress wbit 8 does not support in zlib if compress < 9 or compress > 15: raise ValueError( "Compress wbits must between 9 and 15, ""zlib does not support wbits=8"
)
enabledext = ["permessage-deflate"] ifnot isserver:
enabledext.append("client_max_window_bits")
if compress < 15:
enabledext.append("server_max_window_bits=" + str(compress)) if server_notakeover:
enabledext.append("server_no_context_takeover") # if client_notakeover: # enabledext.append('client_no_context_takeover') return"; ".join(enabledext)
def _feed_data(self, data: bytes) -> None: for fin, opcode, payload, compressed in self.parse_frame(data): if opcode in MESSAGE_TYPES_WITH_CONTENT: # load text/binary
is_continuation = opcode == WSMsgType.CONTINUATION ifnot fin: # got partial frame payload ifnot is_continuation:
self._opcode = opcode
self._partial += payload if self._max_msg_size and len(self._partial) >= self._max_msg_size: raise WebSocketError(
WSCloseCode.MESSAGE_TOO_BIG, "Message size {} exceeds limit {}".format(
len(self._partial), self._max_msg_size
),
) continue
has_partial = bool(self._partial) if is_continuation: if self._opcode isNone: raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR, "Continuation frame for non started message",
)
opcode = self._opcode
self._opcode = None # previous frame was non finished # we should get continuation opcode elif has_partial: raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR, "The opcode in non-fin frame is expected " "to be zero, got {!r}".format(opcode),
)
if self._max_msg_size and len(assembled_payload) >= self._max_msg_size: raise WebSocketError(
WSCloseCode.MESSAGE_TOO_BIG, "Message size {} exceeds limit {}".format(
len(assembled_payload), self._max_msg_size
),
)
# Decompress process must to be done after all packets # received. if compressed: ifnot self._decompressobj:
self._decompressobj = ZLibDecompressor(
suppress_deflate_header=True
)
payload_merged = self._decompressobj.decompress_sync(
assembled_payload + _WS_DEFLATE_TRAILING, self._max_msg_size
) if self._decompressobj.unconsumed_tail:
left = len(self._decompressobj.unconsumed_tail) raise WebSocketError(
WSCloseCode.MESSAGE_TOO_BIG, "Decompressed message size {} exceeds limit {}".format(
self._max_msg_size + left, self._max_msg_size
),
) else:
payload_merged = bytes(assembled_payload)
if opcode == WSMsgType.TEXT: try:
text = payload_merged.decode("utf-8") except UnicodeDecodeError as exc: raise WebSocketError(
WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message"
) from exc
# tuple.__new__ is used to avoid the overhead of the lambda
msg = tuple.__new__(WSMessage, (WSMsgType.TEXT, text, ""))
self.queue.feed_data(msg, len(payload_merged)) continue
# tuple.__new__ is used to avoid the overhead of the lambda
msg = tuple.__new__(WSMessage, (WSMsgType.BINARY, payload_merged, ""))
self.queue.feed_data(msg, len(payload_merged)) elif opcode == WSMsgType.CLOSE: if len(payload) >= 2:
close_code = UNPACK_CLOSE_CODE(payload[:2])[0] if close_code < 3000 and close_code notin ALLOWED_CLOSE_CODES: raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
f"Invalid close code: {close_code}",
) try:
close_message = payload[2:].decode("utf-8") except UnicodeDecodeError as exc: raise WebSocketError(
WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message"
) from exc
msg = tuple.__new__(
WSMessage, (WSMsgType.CLOSE, close_code, close_message)
) elif payload: raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
f"Invalid close frame: {fin} {opcode} {payload!r}",
) else:
msg = tuple.__new__(WSMessage, (WSMsgType.CLOSE, 0, ""))
# frame-fin = %x0 ; more frames of this message follow # / %x1 ; final frame of this message # frame-rsv1 = %x0 ; # 1 bit, MUST be 0 unless negotiated otherwise # frame-rsv2 = %x0 ; # 1 bit, MUST be 0 unless negotiated otherwise # frame-rsv3 = %x0 ; # 1 bit, MUST be 0 unless negotiated otherwise # # Remove rsv1 from this test for deflate development if rsv2 or rsv3 or (rsv1 andnot self._compress): raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR, "Received frame with non-zero reserved bits",
)
if opcode > 0x7 and fin == 0: raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR, "Received fragmented control frame",
)
# Control frames MUST have a payload # length of 125 bytes or less if opcode > 0x7 and length > 125: raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR, "Control frame payload cannot be ""larger than 125 bytes",
)
# Set compress status if last package is FIN # OR set compress status if this is first fragment # Raise error if not first fragment with rsv1 = 0x1 if self._frame_fin or self._compressed isNone:
self._compressed = Trueif rsv1 elseFalse elif rsv1: raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR, "Received frame with non-zero reserved bits",
)
async def _send_frame(
self, message: bytes, opcode: int, compress: Optional[int] = None
) -> None: """Send a frame over the websocket with message as its payload.""" if self._closing andnot (opcode & WSMsgType.CLOSE): raise ClientConnectionResetError("Cannot write to closing transport")
# RSV are the reserved bits in the frame header. They are used to # indicate that the frame is using an extension. # https://datatracker.ietf.org/doc/html/rfc6455#section-5.2
rsv = 0 # Only compress larger packets (disabled) # Does small packet needs to be compressed? # if self.compress and opcode < 8 and len(message) > 124: if (compress or self.compress) and opcode < 8: # RSV1 (rsv = 0x40) is set for compressed frames # https://datatracker.ietf.org/doc/html/rfc7692#section-7.2.3.1
rsv = 0x40
if compress: # Do not set self._compress if compressing is for this frame
compressobj = self._make_compress_obj(compress) else: # self.compress ifnot self._compressobj:
self._compressobj = self._make_compress_obj(self.compress)
compressobj = self._compressobj
message = await compressobj.compress(message) # Its critical that we do not return control to the event # loop until we have finished sending all the compressed # data. Otherwise we could end up mixing compressed frames # if there are multiple coroutines compressing data.
message += compressobj.flush(
zlib.Z_FULL_FLUSH if self.notakeover else zlib.Z_SYNC_FLUSH
) if message.endswith(_WS_DEFLATE_TRAILING):
message = message[:-4]
msg_length = len(message)
use_mask = self.use_mask
mask_bit = 0x80 if use_mask else 0
# Depending on the message length, the header is assembled differently. # The first byte is reserved for the opcode and the RSV bits.
first_byte = 0x80 | rsv | opcode if msg_length < 126:
header = PACK_LEN1(first_byte, msg_length | mask_bit)
header_len = 2 elif msg_length < 65536:
header = PACK_LEN2(first_byte, 126 | mask_bit, msg_length)
header_len = 4 else:
header = PACK_LEN3(first_byte, 127 | mask_bit, msg_length)
header_len = 10
if self.transport.is_closing(): raise ClientConnectionResetError("Cannot write to closing transport")
# https://datatracker.ietf.org/doc/html/rfc6455#section-5.3 # If we are using a mask, we need to generate it randomly # and apply it to the message before sending it. A mask is # a 32-bit value that is applied to the message using a # bitwise XOR operation. It is used to prevent certain types # of attacks on the websocket protocol. The mask is only used # when aiohttp is acting as a client. Servers do not use a mask. if use_mask:
mask = PACK_RANDBITS(self.get_random_bits())
message = bytearray(message)
_websocket_mask(mask, message)
self.transport.write(header + mask + message)
self._output_size += MASK_LEN elif msg_length > MSG_SIZE:
self.transport.write(header)
self.transport.write(message) else:
self.transport.write(header + message)
self._output_size += header_len + msg_length
# It is safe to return control to the event loop when using compression # after this point as we have already sent or buffered all the data.
# Once we have written output_size up to the limit, we call the # drain helper which waits for the transport to be ready to accept # more data. This is a flow control mechanism to prevent the buffer # from growing too large. The drain helper will return right away # if the writer is not paused. if self._output_size > self._limit:
self._output_size = 0
await self.protocol._drain_helper()
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.