import abc import argparse import importlib import json import logging import multiprocessing import os import platform import subprocess import sys import threading import time import traceback import urllib import uuid from collections import defaultdict, OrderedDict from io import IOBase from itertools import chain, product from html5lib import html5parser from typing import ClassVar, List, Optional, Set, Tuple
from localpaths import repo_root # type: ignore
from manifest.sourcefile import read_script_metadata, js_meta_re, parse_variants # type: ignore from wptserve import server as wptserve, handlers from wptserve import stash from wptserve import config from wptserve.handlers import filesystem_path, wrap_pipeline from wptserve.response import ResponseHeaders from wptserve.utils import get_port, HTTPException, http2_compatible from pywebsocket3 import standalone as pywebsocket
def replace_end(s, old, new): """
Given a string `s` that ends with `old`, replace that occurrence of `old` with `new`. """ assert s.endswith(old) return s[:-len(old)] + new
def inject_script(html, script_tag): # Tokenize and find the position of the first content (e.g. after the # doctype, html, and head opening tags if present but before any other tags).
token_types = html5parser.tokenTypes
after_tags = {"html", "head"}
before_tokens = {token_types["EndTag"], token_types["EmptyTag"],
token_types["Characters"]}
error_tokens = {token_types["ParseError"]}
tokenizer = html5parser._tokenizer.HTMLTokenizer(html)
stream = tokenizer.stream
offset = 0
error = False for item in tokenizer: if item["type"] == token_types["StartTag"]: ifnot item["name"].lower() in after_tags: break elif item["type"] in before_tokens: break elif item["type"] in error_tokens:
error = True break
offset = stream.chunkOffset else:
error = True
ifnot error and stream.prevNumCols or stream.prevNumLines: # We're outside the first chunk, so we don't know what to do
error = True
if error: return html else: return html[:offset] + script_tag + html[offset:]
def _get_path(self, path, resource_path): """Convert the path from an incoming request into a path corresponding to an "unwrapped"
resource e.g. the file on disk that will be loaded in the wrapper.
:param path: Path from the HTTP request
:param resource_path: Boolean used to control whether to get the path for the resource that
this wrapper will load or the associated file on disk.
Typically these are the same but may differ when there are multiple
layers of wrapping e.g. for a .any.worker.html input the underlying disk file is
.any.js but the top level html file loads a resource with a
.any.worker.js extension, which itself loads the .any.js file. IfTruereturn the path to the resource that the wrapper will load,
otherwise return the path to the underlying file on disk.""" for item in self.path_replace: if len(item) == 2:
src, dest = item else: assert len(item) == 3
src = item[0]
dest = item[2 if resource_path else 1] if path.endswith(src):
path = replace_end(path, src, dest) return path
def _get_filesystem_path(self, request): """Get the path of the underlying resource file on disk.""" return self._get_path(filesystem_path(self.base_path, request, self.url_base), False)
def _get_metadata(self, request): """Get an iterator over script metadata based on // META comments in the
associated js file.
:param request: The Request being processed. """
path = self._get_filesystem_path(request) try: with open(path, "rb") as f: yieldfrom read_script_metadata(f, js_meta_re) except OSError: raise HTTPException(404)
def _get_meta(self, request): """Get an iterator over strings to inject into the wrapper document
based on // META comments in the associated js file.
:param request: The Request being processed. """ for key, value in self._get_metadata(request):
replacement = self._meta_replacement(key, value) if replacement: yield replacement
def _get_script(self, request): """Get an iterator over strings to inject into the wrapper document
based on // META comments in the associated js file.
:param request: The Request being processed. """ for key, value in self._get_metadata(request):
replacement = self._script_replacement(key, value) if replacement: yield replacement
@abc.abstractproperty def path_replace(self): # A list containing a mix of 2 item tuples with (input suffix, output suffix) # and 3-item tuples with (input suffix, filesystem suffix, resource suffix) # for the case where we want a different path in the generated resource to # the actual path on the filesystem (e.g. when there is another handler # that will wrap the file). returnNone
@abc.abstractproperty def wrapper(self): # String template with variables path and meta for wrapper document returnNone
@abc.abstractmethod def _meta_replacement(self, key, value): # Get the string to insert into the wrapper document, given # a specific metadata key: value pair. pass
@abc.abstractmethod def check_exposure(self, request): # Raise an exception if this handler shouldn't be exposed after all. pass
class HtmlWrapperHandler(WrapperHandler):
global_type: ClassVar[Optional[str]] = None
headers = [('Content-Type', 'text/html')]
def check_exposure(self, request): if self.global_type isnotNone:
global_variants = "" for (key, value) in self._get_metadata(request): if key == "global":
global_variants = value break
if self.global_type notin parse_variants(global_variants): raise HTTPException(404, "This test cannot be loaded in %s mode" %
self.global_type)
def _meta_replacement(self, key, value): if key == "timeout": if value == "long": return'' if key == "title":
value = value.replace("&", "&").replace("<", "<") return'%s' % value returnNone
@abc.abstractmethod def _create_script_import(self, attribute): # Take attribute (a string URL to a JS script) and return JS source to import the script # into the worker. pass
def _script_replacement(self, key, value): if key == "script":
attribute = value.replace("\\", "\\\\").replace('"', '\\"') return self._create_script_import(attribute) if key == "title":
value = value.replace("\\", "\\\\").replace('"', '\\"') return'self.META_TITLE = "%s";' % value returnNone
self.extra = []
self.inject_script_data = None if inject_script isnotNone: with open(inject_script, 'rb') as f:
self.inject_script_data = f.read()
self.mountpoint_routes = OrderedDict()
self.add_mount_point("/", None)
def get_routes(self):
routes = self.forbidden_override + self.forbidden + self.extra # Using reversed here means that mount points that are added later # get higher priority. This makes sense since / is typically added # first. for item in reversed(self.mountpoint_routes.values()):
routes.extend(item) return routes
def get_route_builder(logger, aliases, config):
builder = RoutesBuilder(config.inject_script) for alias in aliases:
url = alias["url-path"]
directory = alias["local-dir"] ifnot url.startswith("/") or len(directory) == 0:
logger.error("\"url-path\" value must start with '/'.") continue if url.endswith("/"):
builder.add_mount_point(url, directory) else:
builder.add_file_mount_point(url, directory) return builder
def create_daemon(self, init_func, host, port, paths, routes, bind_address,
config, log_handlers, env, **kwargs): # Ensure that when we start this in a new process we have the global lock # in the logging module unlocked
importlib.reload(logging)
os.environ = env
logger = get_logger(config.logging["level"], log_handlers)
if sys.platform == "darwin": # on Darwin, NOFILE starts with a very low limit (256), so bump it up a little # by way of comparison, Debian starts with a limit of 1024, Windows 512 import resource # local, as it only exists on Unix-like systems
maxfilesperproc = int(subprocess.check_output(
["sysctl", "-n", "kern.maxfilesperproc"]
).strip())
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) # 2048 is somewhat arbitrary, but gives us some headroom for wptrunner --parallel # note that it's expected that 2048 will be the min here
new_soft = min(2048, maxfilesperproc, hard) if soft < new_soft:
resource.setrlimit(resource.RLIMIT_NOFILE, (new_soft, hard)) try:
self.daemon = init_func(logger, host, port, paths, routes, bind_address, config, **kwargs) except OSError:
logger.critical("Socket error on port %s" % port) raise except Exception:
logger.critical(traceback.format_exc()) raise
for domain in sorted(
config.domains_set, key=lambda x: tuple(reversed(x.split(".")))
):
rv.append("%s\t%s" % (host, domain))
# Windows interpets the IP address 0.0.0.0 as non-existent, making it an # appropriate alias for non-existent hosts. However, UNIX-like systems # interpret the same address to mean any IP address, which is inappropraite # for this context. These systems do not reserve any value for this # purpose, so the inavailability of the domains must be taken for granted. # # https://github.com/web-platform-tests/wpt/issues/10560 if platform.uname()[0] == "Windows": for not_domain in sorted(
config.not_domains_set, key=lambda x: tuple(reversed(x.split(".")))
):
rv.append("0.0.0.0\t%s" % not_domain)
rv.append("# End web-platform-tests hosts")
rv.append("")
# If trying to start HTTP/2.0 server, check compatibility if scheme == 'h2'andnot http2_compatible():
logger.error('Cannot start HTTP/2.0 server as the environment is not compatible. ' + 'Requires OpenSSL 1.0.2+') continue
# Skip WebTransport over HTTP/3 server unless if is enabled explicitly. if scheme == 'webtransport-h3'andnot kwargs.get("webtransport_h3"): continue
if ssl_config isnotNone:
cmd_args += ["--tls", "--private-key", ssl_config["key_path"], "--certificate", ssl_config["cert_path"]]
if (bind_address):
cmd_args = ["-H", host] + cmd_args
opts, args = pywebsocket._parse_args_and_config(cmd_args)
opts.cgi_directories = []
opts.is_executable_method = None
self.server = pywebsocket.WebSocketServer(opts) if extra_handler_paths: for path in extra_handler_paths:
self.server.websocket_server_options.dispatcher._source_handler_files_in_dir(path, path, False, None)
ports = [item[0].getsockname()[1] for item in self.server._sockets] ifnot ports: # TODO: Fix the logging configuration in WebSockets processes # see https://github.com/web-platform-tests/wpt/issues/22719
logger.critical("Failed to start websocket server on port %s, " "is something already using that port?" % port) raise OSError() assert all(item == ports[0] for item in ports)
self.port = ports[0]
self.started = False
self.server_thread = None
def start(self):
self.started = True
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.setDaemon(True) # don't hang on exit
self.server_thread.start()
def stop(self): """
Stops the server.
If the server isnot running, this method has no effect. """ if self.started: try:
self.server.shutdown()
self.server.server_close()
self.server_thread.join()
self.server_thread = None except AttributeError: pass
self.started = False
self.server = None
def start_webtransport_h3_server(logger, host, port, paths, routes, bind_address, config, **kwargs): try: # TODO(bashi): Move the following import to the beginning of this file # once WebTransportH3Server is enabled by default. from webtransport.h3.webtransport_h3_server import WebTransportH3Server # type: ignore return WebTransportH3Server(host=host,
port=port,
doc_root=paths["doc_root"],
cert_path=config.ssl_config["cert_path"],
key_path=config.ssl_config["key_path"],
logger=logger) except Exception as error:
logger.critical(
f"Failed to start WebTransport over HTTP/3 server: {error}")
sys.exit(0)
def iter_servers(servers): for servers in servers.values(): for port, server in servers: yield server
def _make_subdomains_product(s: Set[str], depth: int = 2) -> Set[str]: return {".".join(x) for x in chain(*(product(s, repeat=i) for i in range(1, depth+1)))}
enable_http2 = kwargs.get("h2") if enable_http2 isNone:
enable_http2 = True if enable_http2:
rv._default["ports"]["h2"] = [9000]
if override_path and os.path.exists(override_path): with open(override_path) as f:
override_obj = json.load(f)
rv.update(override_obj)
if kwargs.get("config_path"):
other_path = os.path.abspath(os.path.expanduser(kwargs.get("config_path"))) if os.path.exists(other_path): with open(other_path) as f:
override_obj = json.load(f)
rv.update(override_obj) else: raise ValueError("Config path %s does not exist" % other_path)
if kwargs.get("verbose"):
rv.logging["level"] = "DEBUG"
overriding_path_args = [("doc_root", "Document root"),
("ws_doc_root", "WebSockets document root")] for key, title in overriding_path_args:
value = kwargs.get(key) if value isNone: continue
value = os.path.abspath(os.path.expanduser(value)) ifnot os.path.exists(value): raise ValueError("%s path %s does not exist" % (title, value))
setattr(rv, key, value)
return rv
def get_parser():
parser = argparse.ArgumentParser()
parser.add_argument("--latency", type=int,
help="Artificial latency to add before sending http responses, in ms")
parser.add_argument("--config", dest="config_path",
help="Path to external config file")
parser.add_argument("--doc_root", help="Path to document root. Overrides config.")
parser.add_argument("--ws_doc_root",
help="Path to WebSockets document root. Overrides config.")
parser.add_argument("--ws_extra", action="append", default=[],
help="Path to extra directory containing ws handlers. Overrides config.")
parser.add_argument("--inject-script",
help="Path to script file to inject, useful for testing polyfills.")
parser.add_argument("--alias_file",
help="File with entries for aliases/multiple doc roots. In form of `/ALIAS_NAME/, DOC_ROOT\\n`")
parser.add_argument("--h2", action="store_true", default=None,
help=argparse.SUPPRESS)
parser.add_argument("--no-h2", action="store_false", dest="h2", default=None,
help="Disable the HTTP/2.0 server")
parser.add_argument("--webtransport-h3", action="store_true",
help="Enable WebTransport over HTTP/3 server")
parser.add_argument("--exit-after-start", action="store_true",
help="Exit after starting servers")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
parser.set_defaults(report=False)
parser.set_defaults(is_wave=False) return parser
class MpContext: def __getattr__(self, name): return getattr(multiprocessing, name)
def get_logger(log_level, log_handlers): """Get a logger configured to log at level log_level
If the logger has existing handlers the log_handlers argument is ignored.
Otherwise the handlers in log_handlers are added to the logger. If there are
no log_handlers passed and no configured handlers, a stream handler is added
to the logger.
Typically this is called once per process to set up logging in that process.
:param log_level: - A string representing a log level e.g. "info"
:param log_handlers: - Optional list of Handler objects. """
logger = logging.getLogger()
logger.setLevel(getattr(logging, log_level.upper())) ifnot logger.hasHandlers(): if log_handlers isnotNone: for handler in log_handlers:
logger.addHandler(handler) else:
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("[%(asctime)s %(processName)s] %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler) return logger
if mp_context isNone: if hasattr(multiprocessing, "get_context"):
mp_context = multiprocessing.get_context() else:
mp_context = MpContext()
with build_config(logger,
os.path.join(repo_root, "config.json"),
config_cls=config_cls,
**kwargs) as config: # This sets the right log level
logger = get_logger(config.logging["level"], log_handlers)
bind_address = config["bind_address"]
if kwargs.get("alias_file"): with open(kwargs["alias_file"]) as alias_file: for line in alias_file:
alias, doc_root = (x.strip() for x in line.split(','))
config["aliases"].append({ 'url-path': alias, 'local-dir': doc_root,
})
if config["check_subdomains"]:
check_subdomains(logger, config, routes, mp_context, log_handlers)
stash_address = None if bind_address:
stash_address = (config.server_host, get_port(""))
logger.debug("Going to use port %d for stash" % stash_address[1])
ifnot kwargs.get("exit_after_start"): try: # Periodically check if all the servers are alive
server_process_exited = False whilenot server_process_exited: for server in iter_servers(servers):
server.proc.join(1) ifnot server.proc.is_alive():
server_process_exited = True break except KeyboardInterrupt: pass
failed_subproc = 0 for server in iter_servers(servers):
logger.info('Status of subprocess "%s": running', server.proc.name)
server.request_shutdown()
for server in iter_servers(servers):
server.wait(timeout=1) if server.proc.exitcode == 0:
logger.info('Status of subprocess "%s": exited correctly', server.proc.name) elif server.proc.exitcode isNone:
logger.warning( 'Status of subprocess "%s": shutdown timed out',
server.proc.name)
failed_subproc += 1 else:
subproc = server.proc
logger.warning('Status of subprocess "%s": failed. Exit with non-zero status: %d',
subproc.name, subproc.exitcode)
failed_subproc += 1 return failed_subproc
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.