import importlib import inspect import logging import re import os import shutil import stat import subprocess import sys import time from datetime import datetime, timedelta from string import Template from typing import List, Optional
from configparser import ConfigParser, ExtendedInterpolation from urllib.parse import urlparse
from .certs import Credentials, HttpdTestCA, CertificateSpec from .log import HttpdErrorLog from .nghttp import Nghttp from .result import ExecResult
def _make_conf(self): # remove anything from another run/test suite
conf_dest_dir = os.path.join(self.env.server_dir, 'conf') if os.path.isdir(conf_dest_dir):
shutil.rmtree(conf_dest_dir) for d in self._source_dirs:
conf_src_dir = os.path.join(d, 'conf') if os.path.isdir(conf_src_dir): ifnot os.path.exists(conf_dest_dir):
os.makedirs(conf_dest_dir) for name in os.listdir(conf_src_dir):
src_path = os.path.join(conf_src_dir, name)
m = re.match(r'(.+).template', name) if m:
self._make_template(src_path, os.path.join(conf_dest_dir, m.group(1))) elif os.path.isfile(src_path):
shutil.copy(src_path, os.path.join(conf_dest_dir, name))
def _make_template(self, src, dest):
var_map = dict() for name, value in HttpdTestEnv.__dict__.items(): if isinstance(value, property):
var_map[name] = value.fget(self.env)
t = Template(''.join(open(src).readlines())) with open(dest, 'w') as fd:
fd.write(t.substitute(var_map))
def _make_modules_conf(self):
loaded = set()
modules_conf = os.path.join(self.env.server_dir, 'conf/modules.conf') with open(modules_conf, 'w') as fd: # issue load directives for all modules we want that are shared
missing_mods = list() for m in self._modules:
match = re.match(r'^mod_(.+)$', m) if match:
m = match.group(1) if m in loaded: continue
mod_path = os.path.join(self.env.libexec_dir, f"mod_{m}.so") if os.path.isfile(mod_path):
fd.write(f"LoadModule {m}_module \"{mod_path}\"\n") elif m in self.env.dso_modules:
missing_mods.append(m) else:
fd.write(f"#built static: LoadModule {m}_module \"{mod_path}\"\n")
loaded.add(m) for m in self._optional_modules:
match = re.match(r'^mod_(.+)$', m) if match:
m = match.group(1) if m in loaded: continue
mod_path = os.path.join(self.env.libexec_dir, f"mod_{m}.so") if os.path.isfile(mod_path):
fd.write(f"LoadModule {m}_module \"{mod_path}\"\n")
loaded.add(m) if len(missing_mods) > 0: raise Exception(f"Unable to find modules: {missing_mods} "
f"DSOs: {self.env.dso_modules}")
def _make_htdocs(self): ifnot os.path.exists(self.env.server_docs_dir):
os.makedirs(self.env.server_docs_dir)
dest_dir = os.path.join(self.env.server_dir, 'htdocs') # remove anything from another run/test suite if os.path.isdir(dest_dir):
shutil.rmtree(dest_dir) for d in self._source_dirs:
srcdocs = os.path.join(d, 'htdocs') if os.path.isdir(srcdocs):
shutil.copytree(srcdocs, dest_dir, dirs_exist_ok=True) # make all contained .py scripts executable for dirpath, _dirnames, filenames in os.walk(dest_dir): for fname in filenames: if re.match(r'.+\.py', fname):
py_file = os.path.join(dirpath, fname)
st = os.stat(py_file)
os.chmod(py_file, st.st_mode | stat.S_IEXEC)
modules_conf = os.path.join(self.env.server_dir, 'conf/modules.conf') with open(modules_conf, 'a') as fd: # load our test module which is not installed
fd.write(f"LoadModule aptest_module \"{local_dir}/mod_aptest/.libs/mod_aptest.so\"\n")
def _build_clients(self):
clients_dir = os.path.join(
os.path.dirname(os.path.dirname(inspect.getfile(HttpdTestSetup))), 'clients')
p = subprocess.run(['make'], capture_output=True, cwd=clients_dir)
rv = p.returncode if rv != 0:
log.error(f"compiling test clients failed: {p.stderr}") raise Exception(f"compiling test clients failed: {p.stderr}")
class HttpdTestEnv:
LIBEXEC_DIR = None
@classmethod def has_python_package(cls, name: str) -> bool: if name in sys.modules: # already loaded returnTrue elif (spec := importlib.util.find_spec(name)) isnotNone:
module = importlib.util.module_from_spec(spec)
sys.modules[name] = module
spec.loader.exec_module(module) returnTrue else: returnFalse
def get_credentials_for_name(self, dns_name) -> List['Credentials']: for spec in [s for s in self._cert_specs if s.domains isnotNone]: if dns_name in spec.domains: return self.ca.get_credentials_for_name(spec.domains[0]) return []
def h2load_is_at_least(self, minv): ifnot self.has_h2load(): returnFalse if self._h2load_version isNone:
p = subprocess.run([self._h2load, '--version'], capture_output=True, text=True) if p.returncode != 0: returnFalse
s = p.stdout.strip()
m = re.match(r'h2load nghttp2/(\S+)', s) if m:
self._h2load_version = self._versiontuple(m.group(1)) if self._h2load_version isnotNone: return self._h2load_version >= self._versiontuple(minv) returnFalse
def curl_is_at_least(self, minv): if self._curl_version isNone:
p = subprocess.run([self._curl, '-V'], capture_output=True, text=True) if p.returncode != 0: returnFalse for l in p.stdout.splitlines():
m = re.match(r'curl ([0-9.]+)[- ].*', l) if m:
self._curl_version = self._versiontuple(m.group(1)) break if self._curl_version isnotNone: return self._curl_version >= self._versiontuple(minv) returnFalse
def curl_is_less_than(self, version): if self._curl_version isNone:
p = subprocess.run([self._curl, '-V'], capture_output=True, text=True) if p.returncode != 0: returnFalse for l in p.stdout.splitlines():
m = re.match(r'curl ([0-9.]+)[- ].*', l) if m:
self._curl_version = self._versiontuple(m.group(1)) break if self._curl_version isnotNone: return self._curl_version < self._versiontuple(version) returnFalse
def clear_curl_headerfiles(self): for fname in os.listdir(path=self.gen_dir): if re.match(r'curl\.headers\.\d+', fname):
os.remove(os.path.join(self.gen_dir, fname))
self._curl_headerfiles_n = 0
def curl_resolve_args(self, url, insecure=False, force_resolve=True, options=None):
u = urlparse(url)
if force_resolve and u.hostname and u.hostname != 'localhost' \ and u.hostname != self._httpd_addr \ andnot re.match(r'^(\d+|\[|:).*', u.hostname): assert u.port, f"port not in url: {url}"
args.extend(["--resolve", f"{u.hostname}:{u.port}:{self._httpd_addr}"]) return args
def make_data_file(self, indir: str, fname: str, fsize: int) -> str:
fpath = os.path.join(indir, fname)
s10 = "0123456789"
s = (101 * s10) + s10[0:3] with open(fpath, 'w') as fd: for i in range(int(fsize / 1024)):
fd.write(f"{i:09d}-{s}\n")
remain = int(fsize % 1024) if remain != 0:
i = int(fsize / 1024) + 1
s = f"{i:09d}-{s}\n"
fd.write(s[0:remain]) return fpath
¤ Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.0.25Bemerkung:
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.