def get(
self,
data: Any,
*args: Any,
_CHAIN: "Type[chain[_PayloadRegistryItem]]" = chain,
**kwargs: Any,
) -> "Payload": if self._first: for factory, type_ in self._first: if isinstance(data, type_): return factory(data, *args, **kwargs) # Try the fast lookup first if lookup_factory := self._normal_lookup.get(type(data)): return lookup_factory(data, *args, **kwargs) # Bail early if its already a Payload if isinstance(data, Payload): return data # Fallback to the slower linear search for factory, type_ in _CHAIN(self._normal, self._last): if isinstance(data, type_): return factory(data, *args, **kwargs) raise LookupError()
def register(
self, factory: PayloadType, type: Any, *, order: Order = Order.normal
) -> None: if order is Order.try_first:
self._first.append((factory, type)) elif order is Order.normal:
self._normal.append((factory, type)) if isinstance(type, Iterable): for t in type:
self._normal_lookup[t] = factory else:
self._normal_lookup[type] = factory elif order is Order.try_last:
self._last.append((factory, type)) else: raise ValueError(f"Unsupported order {order!r}")
if isinstance(value, memoryview):
self._size = value.nbytes elif isinstance(value, (bytes, bytearray)):
self._size = len(value) else: raise TypeError(f"value argument must be byte-ish, not {type(value)!r}")
if self._size > TOO_LARGE_BYTES_BODY:
kwargs = {"source": self}
warnings.warn( "Sending a large body directly with raw bytes might" " lock the event loop. You should probably pass an " "io.BytesIO object instead",
ResourceWarning,
**kwargs,
)
class BufferedReaderPayload(IOBasePayload):
_value: io.BufferedIOBase
@property def size(self) -> Optional[int]: try: return os.fstat(self._value.fileno()).st_size - self._value.tell() except (OSError, AttributeError): # data.fileno() is not supported, e.g. # io.BufferedReader(io.BytesIO(b'data')) # For some file-like objects (e.g. tarfile), the fileno() attribute may # not exist at all, and will instead raise an AttributeError. returnNone
async def write(self, writer: AbstractStreamWriter) -> None: if self._iter: try: # iter is not None check prevents rare cases # when the case iterable is used twice whileTrue:
chunk = await self._iter.__anext__()
await writer.write(chunk) except StopAsyncIteration:
self._iter = None
PAYLOAD_REGISTRY = PayloadRegistry()
PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview))
PAYLOAD_REGISTRY.register(StringPayload, str)
PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO)
PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase)
PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO)
PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom))
PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase)
PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader) # try_last for giving a chance to more specialized async interables like # multidict.BodyPartReaderPayload override the default
PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)
¤ Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.0.26Bemerkung:
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.