Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/python/aiohttp/examples/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 1 kB image not shown  

Quelle  web_ws.py   Sprache: Python

 
#!/usr/bin/env python3
"""Example for aiohttp.web websocket server."""

# The extra strict mypy settings are here to help test that `Application[AppKey()]`
# syntax is working correctly. A regression will cause mypy to raise an error.
# mypy: disallow-any-expr, disallow-any-unimported, disallow-subclassing-any

import os
from typing import List

from aiohttp import web

WS_FILE = os.path.join(os.path.dirname(__file__), "websocket.html")
sockets = web.AppKey("sockets", List[web.WebSocketResponse])


async def wshandler(request: web.Request) -> web.StreamResponse:
    resp = web.WebSocketResponse()
    available = resp.can_prepare(request)
    if not available:
        with open(WS_FILE, "rb"as fp:
            return web.Response(body=fp.read(), content_type="text/html")

    await resp.prepare(request)

    await resp.send_str("Welcome!!!")

    try:
        print("Someone joined.")
        for ws in request.app[sockets]:
            await ws.send_str("Someone joined")
        request.app[sockets].append(resp)

        async for msg in resp:  # type: ignore[misc]
            if msg.type == web.WSMsgType.TEXT:  # type: ignore[misc]
                for ws in request.app[sockets]:
                    if ws is not resp:
                        await ws.send_str(msg.data)  # type: ignore[misc]
            else:
                return resp
        return resp

    finally:
        request.app[sockets].remove(resp)
        print("Someone disconnected.")
        for ws in request.app[sockets]:
            await ws.send_str("Someone disconnected.")


async def on_shutdown(app: web.Application) -> None:
    for ws in app[sockets]:
        await ws.close()


def init() -> web.Application:
    app = web.Application()
    l: List[web.WebSocketResponse] = []
    app[sockets] = l
    app.router.add_get("/", wshandler)
    app.on_shutdown.append(on_shutdown)
    return app


web.run_app(init())

71%


¤ Dauer der Verarbeitung: 0.5 Sekunden  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung ist noch experimentell.