@property def addresses(self) -> List[Any]:
ret: List[Any] = [] for site in self._sites:
server = site._server if server isnotNone:
sockets = server.sockets # type: ignore[attr-defined] if sockets isnotNone: for sock in sockets:
ret.append(sock.getsockname()) return ret
if self._handle_signals: try:
loop.add_signal_handler(signal.SIGINT, _raise_graceful_exit)
loop.add_signal_handler(signal.SIGTERM, _raise_graceful_exit) except NotImplementedError: # pragma: no cover # add_signal_handler is not implemented on Windows pass
self._server = await self._make_server()
@abstractmethod
async def shutdown(self) -> None: """Call any shutdown hooks to help server close gracefully."""
async def cleanup(self) -> None: # The loop over sites is intentional, an exception on gather() # leaves self._sites in unpredictable state. # The loop guaranties that a site is either deleted on success or # still present on failure for site in list(self._sites):
await site.stop()
if self._server: # If setup succeeded # Yield to event loop to ensure incoming requests prior to stopping the sites # have all started to be handled before we proceed to close idle connections.
await asyncio.sleep(0)
self._server.pre_shutdown()
await self.shutdown()
await self._server.shutdown(self._shutdown_timeout)
await self._cleanup_server()
self._server = None if self._handle_signals:
loop = asyncio.get_running_loop() try:
loop.remove_signal_handler(signal.SIGINT)
loop.remove_signal_handler(signal.SIGTERM) except NotImplementedError: # pragma: no cover # remove_signal_handler is not implemented on Windows pass
@abstractmethod
async def _make_server(self) -> Server: pass# pragma: no cover
@abstractmethod
async def _cleanup_server(self) -> None: pass# pragma: no cover
def _reg_site(self, site: BaseSite) -> None: if site in self._sites: raise RuntimeError(f"Site {site} is already registered in runner {self}")
self._sites.append(site)
def _check_site(self, site: BaseSite) -> None: if site notin self._sites: raise RuntimeError(f"Site {site} is not registered in runner {self}")
def _unreg_site(self, site: BaseSite) -> None: if site notin self._sites: raise RuntimeError(f"Site {site} is not registered in runner {self}")
self._sites.remove(site)
class ServerRunner(BaseRunner): """Low-level web server runner"""
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.