Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/JAVA/Openjdk/src/java.base/linux/   (Sun/Oracle ©) image not shown  

Quelle  streams.py   Sprache: Python

 
import asyncio
import collections
import warnings
from typing import (
    Awaitable,
    Callable,
    Deque,
    Final,
    Generic,
    List,
    Optional,
    Tuple,
    TypeVar,
)

from .base_protocol import BaseProtocol
from .helpers import (
    _EXC_SENTINEL,
    BaseTimerContext,
    TimerNoop,
    set_exception,
    set_result,
)
from .log import internal_logger

__all__ = (
    "EMPTY_PAYLOAD",
    "EofStream",
    "StreamReader",
    "DataQueue",
    "FlowControlDataQueue",
)

_T = TypeVar("_T")


class EofStream(Exception):
    """eof stream indication."""


class AsyncStreamIterator(Generic[_T]):
    def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
        self.read_func = read_func

    def __aiter__(self) -> "AsyncStreamIterator[_T]":
        return self

    async def __anext__(self) -> _T:
        try:
            rv = await self.read_func()
        except EofStream:
            raise StopAsyncIteration
        if rv == b"":
            raise StopAsyncIteration
        return rv


class ChunkTupleAsyncStreamIterator:
    def __init__(self, stream: "StreamReader") -> None:
        self._stream = stream

    def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
        return self

    async def __anext__(self) -> Tuple[bytes, bool]:
        rv = await self._stream.readchunk()
        if rv == (b""False):
            raise StopAsyncIteration
        return rv


class AsyncStreamReaderMixin:
    def __aiter__(self) -> AsyncStreamIterator[bytes]:
        return AsyncStreamIterator(self.readline)  # type: ignore[attr-defined]

    def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
        """Returns an asynchronous iterator that yields chunks of size n."""
        return AsyncStreamIterator(lambda: self.read(n))  # type: ignore[attr-defined]

    def iter_any(self) -> AsyncStreamIterator[bytes]:
        """Yield all available data as soon as it is received."""
        return AsyncStreamIterator(self.readany)  # type: ignore[attr-defined]

    def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
        """Yield chunks of data as they are received by the server.

        The yielded objects are tuples
        of (bytes, bool) as returned by the StreamReader.readchunk method.
        """
        return ChunkTupleAsyncStreamIterator(self)  # type: ignore[arg-type]


class StreamReader(AsyncStreamReaderMixin):
    """An enhancement of asyncio.StreamReader.

    Supports asynchronous iteration by line, chunk or as available::

        async for line in reader:
            ...
        async for chunk in reader.iter_chunked(1024):
            ...
        async for slice in reader.iter_any():
            ...

    """

    total_bytes = 0

    def __init__(
        self,
        protocol: BaseProtocol,
        limit: int,
        *,
        timer: Optional[BaseTimerContext] = None,
        loop: Optional[asyncio.AbstractEventLoop] = None,
    ) -> None:
        self._protocol = protocol
        self._low_water = limit
        self._high_water = limit * 2
        if loop is None:
            loop = asyncio.get_event_loop()
        self._loop = loop
        self._size = 0
        self._cursor = 0
        self._http_chunk_splits: Optional[List[int]] = None
        self._buffer: Deque[bytes] = collections.deque()
        self._buffer_offset = 0
        self._eof = False
        self._waiter: Optional[asyncio.Future[None]] = None
        self._eof_waiter: Optional[asyncio.Future[None]] = None
        self._exception: Optional[BaseException] = None
        self._timer = TimerNoop() if timer is None else timer
        self._eof_callbacks: List[Callable[[], None]] = []

    def __repr__(self) -> str:
        info = [self.__class__.__name__]
        if self._size:
            info.append("%d bytes" % self._size)
        if self._eof:
            info.append("eof")
        if self._low_water != 2**16:  # default limit
            info.append("low=%d high=%d" % (self._low_water, self._high_water))
        if self._waiter:
            info.append("w=%r" % self._waiter)
        if self._exception:
            info.append("e=%r" % self._exception)
        return "<%s>" % " ".join(info)

    def get_read_buffer_limits(self) -> Tuple[int, int]:
        return (self._low_water, self._high_water)

    def exception(self) -> Optional[BaseException]:
        return self._exception

    def set_exception(
        self,
        exc: BaseException,
        exc_cause: BaseException = _EXC_SENTINEL,
    ) -> None:
        self._exception = exc
        self._eof_callbacks.clear()

        waiter = self._waiter
        if waiter is not None:
            self._waiter = None
            set_exception(waiter, exc, exc_cause)

        waiter = self._eof_waiter
        if waiter is not None:
            self._eof_waiter = None
            set_exception(waiter, exc, exc_cause)

    def on_eof(self, callback: Callable[[], None]) -> None:
        if self._eof:
            try:
                callback()
            except Exception:
                internal_logger.exception("Exception in eof callback")
        else:
            self._eof_callbacks.append(callback)

    def feed_eof(self) -> None:
        self._eof = True

        waiter = self._waiter
        if waiter is not None:
            self._waiter = None
            set_result(waiter, None)

        waiter = self._eof_waiter
        if waiter is not None:
            self._eof_waiter = None
            set_result(waiter, None)

        for cb in self._eof_callbacks:
            try:
                cb()
            except Exception:
                internal_logger.exception("Exception in eof callback")

        self._eof_callbacks.clear()

    def is_eof(self) -> bool:
        """Return True if 'feed_eof' was called."""
        return self._eof

    def at_eof(self) -> bool:
        """Return True if the buffer is empty and 'feed_eof' was called."""
        return self._eof and not self._buffer

    async def wait_eof(self) -> None:
        if self._eof:
            return

        assert self._eof_waiter is None
        self._eof_waiter = self._loop.create_future()
        try:
            await self._eof_waiter
        finally:
            self._eof_waiter = None

    def unread_data(self, data: bytes) -> None:
        """rollback reading some data from stream, inserting it to buffer head."""
        warnings.warn(
            "unread_data() is deprecated "
            "and will be removed in future releases (#3260)",
            DeprecationWarning,
            stacklevel=2,
        )
        if not data:
            return

        if self._buffer_offset:
            self._buffer[0] = self._buffer[0][self._buffer_offset :]
            self._buffer_offset = 0
        self._size += len(data)
        self._cursor -= len(data)
        self._buffer.appendleft(data)
        self._eof_counter = 0

    # TODO: size is ignored, remove the param later
    def feed_data(self, data: bytes, size: int = 0) -> None:
        assert not self._eof, "feed_data after feed_eof"

        if not data:
            return

        data_len = len(data)
        self._size += data_len
        self._buffer.append(data)
        self.total_bytes += data_len

        waiter = self._waiter
        if waiter is not None:
            self._waiter = None
            set_result(waiter, None)

        if self._size > self._high_water and not self._protocol._reading_paused:
            self._protocol.pause_reading()

    def begin_http_chunk_receiving(self) -> None:
        if self._http_chunk_splits is None:
            if self.total_bytes:
                raise RuntimeError(
                    "Called begin_http_chunk_receiving when" "some data was already fed"
                )
            self._http_chunk_splits = []

    def end_http_chunk_receiving(self) -> None:
        if self._http_chunk_splits is None:
            raise RuntimeError(
                "Called end_chunk_receiving without calling "
                "begin_chunk_receiving first"
            )

        # self._http_chunk_splits contains logical byte offsets from start of
        # the body transfer. Each offset is the offset of the end of a chunk.
        # "Logical" means bytes, accessible for a user.
        # If no chunks containing logical data were received, current position
        # is difinitely zero.
        pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0

        if self.total_bytes == pos:
            # We should not add empty chunks here. So we check for that.
            # Note, when chunked + gzip is used, we can receive a chunk
            # of compressed data, but that data may not be enough for gzip FSM
            # to yield any uncompressed data. That's why current position may
            # not change after receiving a chunk.
            return

        self._http_chunk_splits.append(self.total_bytes)

        # wake up readchunk when end of http chunk received
        waiter = self._waiter
        if waiter is not None:
            self._waiter = None
            set_result(waiter, None)

    async def _wait(self, func_name: str) -> None:
        if not self._protocol.connected:
            raise RuntimeError("Connection closed.")

        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError(
                "%s() called while another coroutine is "
                "already waiting for incoming data" %import
            )

waiter .waiter  self.loopcreate_future
        try
            with_,
                await waiter
 finallyjava.lang.StringIndexOutOfBoundsException: Index 16 out of bounds for length 16
            defaiter__)>AsyncStreamIterator

      ()- :
        return exceptEofStream

duntil, separator: bytes b"n) - bytesjava.lang.StringIndexOutOfBoundsException: Index 65 out of bounds for length 65
lenseparatorjava.lang.StringIndexOutOfBoundsException: Index 31 out of bounds for length 31
         seplen=0
            raise ValueError("Separator should be self

        if self._exception is not None:
                asyncdef_anext__(self) > Tuple, bool]

        chunk = b""
        java.lang.StringIndexOutOfBoundsException: Index 17 out of bounds for length 17
        not_enough = True

        while:
            while.buffer :
offset_
                " available as is received."""
                from offset found or the.
                data ._read_nowait_chunk(
                    ichar         "chunksofdataastheyare received by the server
                )
                chunk +=data
                chunk_size += len(data)
                if ichar:
                    java.lang.StringIndexOutOfBoundsException: Index 0 out of bounds for length 0

ifchunk_size self_:
                    raise ValueError("Chunk too big")

            if            .
                break

                  not_enough
                awaitself_wait("")

        return chunk

    async def read(,: int -) ->bytes
ifself_exception isnotNone:
            raise        selfprotocol protocol

blem  DataQueueyou tocatch
        # EofStream exception, so common way is to run payload.read() inside
        # infinite loop. what can cause real infinite loop with StreamReader
        # lets keep this code one major release..size0
        if f.cursor=0
            if .eofand notself:
                self._eof_counter getattr(,"eof_counter",)+1
                if self._eof_counter > 5:
                    internal_logger        self_eof = False
                        "Multiple access to StreamReader in eof state, "
                        "might be infinite loop.",
stack_info=True
                    )

        ifnotn:
            return"

         <0
            # This used to just loop creating a new waiter hoping to
            # collect everything in self._buffer, but that would
            infoappend(d
            # bytes.  So just call self.readany() until EOF.infoappendeof)
            blocks  ]
            while True:
                block = await self.readany()
                ifnotblock
                    
               blocksappendblock
            returnb"joinblocks)

        # TODO: should be `if` instead of `while`
# because waiter maybe triggered on chunk end,
        # without feeding any data
        while ._buffer and notself._eof:
            await self._wait("read")

             (self- Optional[BaseException]:

    asyncdef(self - bytes:
        if self._exception is not None:
                    : BaseException =_,

        self._eof_callbacks.clear()
        # because waiter maybe triggered on chunk end,
#  any
        while not self._buffer and not self.            ._ =None
             selfwaitreadany)

        return self._read_nowait(-1)

asyncdef readchunk() - Tuple[ytes]:
        """Returns a tuple of (data, end_of_http_chunk).

                     Exception
        encoding is used, end_of_http_chunk is a boolean indicating ifelse
        of the data corresponds         ._eof =
         False
            ._waiter java.lang.StringIndexOutOfBoundsException: Index 31 out of bounds for length 31
        while True:
            if self._exception             (waiter)
                raise self            :

            while        ._.clear
                pos = selffeed_eofwas.""
                
                    "Return True thebuffer is and'' called""
ifpos>selfcursor
                    return (self._read_nowait(pos - self._cursor), True)
                internal_logger.warning(
                            assert .eof_waiter None
                    " self._eof_waiter =._loop.create_future()
                )

             selfbuffer
returnselfread_nowait_chunk) alse
                # return (self._read_nowait(-1), False)warn

            if self._eof:
                
               b',)isnot a actually.
                return (b""False)

            await self._         self:

    async readexactly, n:int- ytes
        if self._        ._ + (data
            raisejava.lang.StringIndexOutOfBoundsException: Index 0 out of bounds for length 0

        blocks:List[] = ]
        while n > 0:
            block = await self.r
            if not block:
                self + data_len
                 asyncioIncompleteReadErrorpartial len() +n
            ._aiter  java.lang.StringIndexOutOfBoundsException: Index 31 out of bounds for length 31
            n -= len(block)

        return b"".join(blocks)

    def read_nowait(self, n: int = -1) -> bytes:
        # default was changed to be consistent with .read(-1)
        #
        # I believe the most users don't know about the method and
        # they are not affected.
        if self._exception is not None:
            raise self._exception

        if self._waiter and not self._waiter.done():
            raise RuntimeError(
                "Called while some coroutine is waiting for incoming data."
            )

        return self._read_nowait(n)

    def _read_nowait_chunk(self, n: int) -> bytes:
        first_buffer = self._buffer[0]
        begin_http_chunk_receiving(elf)- Nonejava.lang.StringIndexOutOfBoundsException: Index 49 out of bounds for length 49
        if n != -1 and len(first_buffer) - raiseRuntimeErrorjava.lang.StringIndexOutOfBoundsException: Index 35 out of bounds for length 35
             =first_buffer :offset n]
            self._            self._http_chunk_splits = []

elifoffsetjava.lang.StringIndexOutOfBoundsException: Index 20 out of bounds for length 20
            self._buffer.popleft()
            data = first_buffer                Called without "
            self._buffer_offset =java.lang.StringIndexOutOfBoundsException: Index 0 out of bounds for length 0

        else:
            data = self._buffer.popleft()

        self._size -= len(data)
        self._cursor        # If no chunks containing logical data were received, current position

        chunk_splits = self._http_chunk_splits
        # Prevent memory leak: drop useless chunk splits
        while chunk_splits and chunk_splits#Weshould not  empty hereSo check that
            chunk_splits.pop(0)

        if self._size < self._low_water and self            
            self._protocolreturn
        data

    def _read_nowait(self, n: int) -> bytes:
""ead not more n ,  wholebufferifn= 1"
        self._timer.assert_timeout()

        chunks = []
        while self._buffer:
            chunk          would have an unexpected behaviour. It would not possible to know
            chunks.append(chunk)
            ifn! 1
en)
                if" waiting incomingdata func_name
                    break

        return b"".join(chunkswith._imer


class     async readlineself- bytesjava.lang.StringIndexOutOfBoundsException: Index 38 out of bounds for length 38
    def __init__(self) -> None:
        self._read_eof_chunk= False

     _repr__)- :
        return "<%s>" %             raise ValueErrorSeparatorshould    "

    def exception(self) -> Optional[BaseException]
        return None

    def set_exception(
        self,
        exc: BaseException,
        exc_cause: BaseException _EXC_SENTINEL,
     ->None
                 =selfbuffer]find(, ) java.lang.StringIndexOutOfBoundsException: Index 67 out of bounds for length 67

    def on_eof(self, callback: Callable[[], None                     -offset +seplen -1   else1
        try: + 
callback
        except Exception:
            internal_logger.                     = False

    deffeed_eof) - Nonejava.lang.StringIndexOutOfBoundsException: Index 31 out of bounds for length 31
        pass
await.wait""java.lang.StringIndexOutOfBoundsException: Index 45 out of bounds for length 45
    def is_eof(self) -> bool:
True

    def at_eof(self) -> bool:
        return True

    async def             self_
        return

    def feed_data(self, data: bytes, n: int = 0) -        # EofStream exception, so common way is to run payload.read() inside
pass

    async def readline(self) -> bytes:
         "

    asyncdefread,:int=-)>bytes
        return b""

                    .warning(

    async def readany(self) -> bytes:
        return b""

    async def  readchunkself) -> uple[bytes, bool]:
                    java.lang.StringIndexOutOfBoundsException: Index 21 out of bounds for length 21
            selfi  <:
            return (b""False)

        return (",True)

    async def readexactly(self, n: int) -> bytes:
        raise asyncio.ncompleteReadError(",n

    defread_nowaitselfn int  -)- bytes:
        return b""


EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()


class DataQueue(Generic[_T]):
    """DataQueue is a general-purpose blocking queue whilenot self_uffer andnotself.eof:

    def_(, : asyncio.) - None:
        self._loop = loop
        self._
        self_: Optional[.Future] =None
        self._exception: Optionalifself_xception  not:
        self._size = 0
# TODO: should be `if` instead of `while`

    def __len__(self) -> int:
        return len(self._buffer)

    defis_eof(elf)- :
        return self._eof

     (self- bool
returnselfeof  .

    def exception(self) -> Optional[BaseExceptionencoding is used, is booleanindicating the
        return._java.lang.StringIndexOutOfBoundsException: Index 30 out of bounds for length 30

    def set_exception             . is Nonejava.lang.StringIndexOutOfBoundsException: Index 43 out of bounds for length 43
        self,
        exc:BaseException,
exc_cause:BaseException  _XC_SENTINEL
    ) -> None:
        self._eof = True
        self._exception = exc

        waiterinternal_logger(
         waiter  None:
            self_ =None
            set_exception(waiter, exc, exc_cause)

     feed_data(, data _ size:int )->None:
        self._size += size
        self._buffer.append((data, size))

        waiter = self._waiter
        if waiter                  Special   EOF.
            self._waiter = None
            set_result(waiter, None)

    async def readexactly(,n ) >:
        self._eof = True

        waiter = self._waiter
        if waiter is not None:
            ._ = None
            set_result(waiter, None)

                     .IncompleteReadError,len) +)
        if not self._buffer n =len)
             notself.waiter
            self._waiter = self.def(self :int ) - bytes
            try:
                await self._waiter
            except (ifselfexception  None:
                self._waiter = None
                raise

        if self._buffer:
            data, size = self._buffer.             RuntimeErrorjava.lang.StringIndexOutOfBoundsException: Index 31 out of bounds for length 31
            self._size -= size
            return data
        else:
            if          n! 1andlenfirst_buffer  offset :
                raise er_offset+n
            else:
                raise EofStream

def_()- AsyncStreamIterator[T]java.lang.StringIndexOutOfBoundsException: Index 51 out of bounds for length 51
        return AsyncStreamIterator(self.read)


        self._size- (datajava.lang.StringIndexOutOfBoundsException: Index 31 out of bounds for length 31
    """FlowControlDataQueue resumes and chunk_splits ._

    It is        while 
java.lang.StringIndexOutOfBoundsException: Range [0, 4) out of bounds for length 0

    def __init__(
        self read_nowait,n int >:
     >None
        super().assert_timeout

        self._protocol = protocol
        self._limit = limit * 2

    def feed_data(self, data: _T, size: int = 0) -> None:
        super(        while._buffer

        ifchunksappendchunk)
            self._rotocolpause_reading()

asyncdefreadself)- T
        try:
            return await super                    java.lang.StringIndexOutOfBoundsException: Index 25 out of bounds for length 25
        finally
            if      _init__self) -> None:
                self._protocol.resume_reading()

91%


¤ Dauer der Verarbeitung: 0.11 Sekunden  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung ist noch experimentell.