Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Apache/test/modules/md/   (Apache Software Stiftung Version 2.4.65©)  Datei vom 5.1.2025 mit Größe 24 kB image not shown  

Quelle  test_502_acmev2_drive.py   Sprache: Python

 
# test driving the ACMEv2 protocol

import base64
import json
import os.path
import re
from datetime import timedelta

import pytest
from pyhttpd.certs import CertificateSpec

from .md_conf import MDConf
from .md_cert_util import MDCertUtil
from .md_env import MDTestEnv


@pytest.mark.skipif(condition=not MDTestEnv.has_a2md(), reason="no a2md available")
@pytest.mark.skipif(condition=not MDTestEnv.has_acme_server(),
                    reason="no ACME test server configured")
class TestDrivev2:

    @pytest.fixture(autouse=True, scope='class')
    def _class_scope(self, env, acme):
        acme.start(config='default')
        env.check_acme()
        env.APACHE_CONF_SRC = "data/test_drive"
        MDConf(env).install()
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'

    @pytest.fixture(autouse=True, scope='function')
    def _method_scope(self, env, request):
        env.clear_store()
        MDConf(env).install()
        self.test_domain = env.get_request_domain(request)

    # --------- invalid precondition ---------

    def test_md_502_000(self, env):
        # test case: md without contact info
        domain = self.test_domain
        name = "www." + domain
        assert env.a2md(["add", name]).exit_code == 0
        r = env.a2md(["drive", name])
        assert r.exit_code == 1
        assert re.search("No contact information", r.stderr)

    def test_md_502_001(self, env):
        # test case: md with contact, but without TOS
        domain = self.test_domain
        name = "www." + domain
        assert env.a2md(["add", name]).exit_code == 0
        assert env.a2md( 
            ["update", name, "contacts""admin@test1.not-forbidden.org"]
            ).exit_code == 0
        r = env.a2md(["drive", name])
        assert r.exit_code == 1
        assert re.search("the CA requires you to accept the terms-of-service as specified in ", r.stderr)

    # test_102 removed, was based on false assumption
    def test_md_502_003(self, env):
        # test case: md with unknown protocol FOO
        domain = self.test_domain
        name = "www." + domain
        self._prepare_md(env, [name])
        assert env.a2md(
            ["update", name, "ca", env.acme_url, "FOO"]
            ).exit_code == 0
        r = env.a2md(["drive", name])
        assert r.exit_code == 1
        assert re.search("Unknown CA protocol", r.stderr)

    # --------- driving OK ---------

    def test_md_502_100(self, env):
        # test case: md with one domain
        domain = self.test_domain
        name = "www." + domain
        self._prepare_md(env, [name])
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        # drive
        prev_md = env.a2md(["list", name]).json['output'][0]
        r = env.a2md(["-vv""drive""-c""http-01", name])
        assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr)
        env.check_md_credentials([name])
        self._check_account_key(env, name)

        # check archive content
        store_md = json.loads(open(env.store_archived_file(name, 1, 'md.json')).read())
        for f in ['name''ca''domains''contacts''renew-mode''renew-window''must-staple']:
            assert store_md[f] == prev_md[f]
        
        # check file system permissions:
        env.check_file_permissions(name)
        # check: challenges removed
        env.check_dir_empty(env.store_challenges())
        # check how the challenge resources are answered in sevceral combinations 
        r = env.get_meta(domain, "/.well-known/acme-challenge"False)
        assert r.exit_code == 0
        assert r.response['status'] == 404
        r = env.get_meta(domain, "/.well-known/acme-challenge/"False)
        assert r.exit_code == 0
        assert r.response['status'] == 404
        r = env.get_meta(domain, "/.well-known/acme-challenge/123"False)
        assert r.exit_code == 0
        assert r.response['status'] == 404
        assert r.exit_code == 0
        cdir = os.path.join(env.store_challenges(), domain)
        os.makedirs(cdir)
        open(os.path.join(cdir, 'acme-http-01.txt'), "w").write("content-of-123")
        r = env.get_meta(domain, "/.well-known/acme-challenge/123"False)
        assert r.exit_code == 0
        assert r.response['status'] == 200
        assert r.response['header']['content-length'] == '14'

    def test_md_502_101(self, env):
        # test case: md with 2 domains
        domain = self.test_domain
        name = "www." + domain
        self._prepare_md(env, [name, "test." + domain])
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        # drive
        r = env.a2md(["-vv""drive""-c""http-01", name])
        assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr)
        env.check_md_credentials([name, "test." + domain])

    # test_502_102 removed, as accounts without ToS are not allowed in ACMEv2

    def test_md_502_103(self, env):
        # test case: md with one domain, ACME account and TOS agreement on server
        # setup: create md
        domain = self.test_domain
        name = "www." + domain
        assert env.a2md(["add", name]).exit_code == 0
        assert env.a2md(["update", name, "contacts""admin@" + domain]).exit_code == 0
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        # setup: create account on server
        r = env.a2md(["-t""accepted""acme""newreg""admin@" + domain], raw=True)
        assert r.exit_code == 0
        acct = re.match("registered: (.*)$", r.stdout).group(1)
        # setup: link md to account
        assert env.a2md(["update", name, "account", acct]).exit_code == 0
        # drive
        r = env.a2md(["-vv""drive", name])
        assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr)
        env.check_md_credentials([name])

    # test_502_104 removed, order are created differently in ACMEv2

    def test_md_502_105(self, env):
        # test case: md with one domain, local TOS agreement and ACME account that is deleted (!) on server
        # setup: create md
        domain = self.test_domain
        name = "www." + domain
        self._prepare_md(env, [name])
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        # setup: create account on server
        r = env.a2md(["-t""accepted""acme""newreg""test@" + domain], raw=True)
        assert r.exit_code == 0
        acct = re.match("registered: (.*)$", r.stdout).group(1)
        # setup: link md to account
        assert env.a2md(["update", name, "account", acct]).exit_code == 0
        # setup: delete account on server
        assert env.a2md(["acme""delreg", acct]).exit_code == 0
        # drive
        r = env.a2md(["drive", name])
        assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr)
        env.check_md_credentials([name])

    def test_md_502_107(self, env):
        # test case: drive again on COMPLETE md, then drive --force
        # setup: prepare md in store
        domain = self.test_domain
        name = "www." + domain
        self._prepare_md(env, [name])
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        # drive
        r = env.a2md(["-vv""drive", name])
        assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr)
        env.check_md_credentials([name])
        orig_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem'))

        # drive again
        assert env.a2md(["-vv""drive", name]).exit_code == 0
        env.check_md_credentials([name])
        cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem'))
        # check: cert not changed
        assert cert.same_serial_as(orig_cert)

        # drive --force
        assert env.a2md(["-vv""drive""--force", name]).exit_code == 0
        env.check_md_credentials([name])
        cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem'))
        # check: cert not changed
        assert not cert.same_serial_as(orig_cert)
        # check: previous cert was archived
        cert = MDCertUtil(env.store_archived_file(name, 2, 'pubcert.pem'))
        assert cert.same_serial_as(orig_cert)

    def test_md_502_108(self, env):
        # test case: drive via HTTP proxy
        domain = self.test_domain
        name = "www." + domain
        self._prepare_md(env, [name])
        conf = MDConf(env, proxy=True)
        conf.add('LogLevel proxy:trace8')
        conf.install()
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'

        # drive it, with wrong proxy url -> FAIL
        r = env.a2md(["-p""http://localhost:1""drive", name])
        assert r.exit_code == 1
        assert "Connection refused" in r.stderr

        # drive it, working proxy url -> SUCCESS
        proxy_url = f"http://localhost:{env.proxy_port}"
        r = env.a2md(["-vv""-p", proxy_url, "drive", name])
        assert 0 == r.exit_code, "a2md failed: {0}".format(r.stderr)
        env.check_md_credentials([name])

    def test_md_502_109(self, env):
        # test case: redirect on SSL-only domain
        # setup: prepare config
        domain = self.test_domain
        name = "www." + domain
        conf = MDConf(env, admin="admin@" + domain)
        conf.add_drive_mode("manual")
        conf.add_md([name])
        conf.add_vhost(name, port=env.http_port, doc_root="htdocs/test")
        conf.add_vhost(name, doc_root="htdocs/test")
        conf.install()
        # setup: create resource files
        self._write_res_file(os.path.join(env.server_docs_dir, "test"), "name.txt", name)
        self._write_res_file(os.path.join(env.server_docs_dir), "name.txt""not-forbidden.org")
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'

        # drive it
        assert env.a2md(["drive", name]).exit_code == 0
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        # test HTTP access - no redirect
        jdata = env.get_json_content(f"test1.{env.http_tld}""/alive.json", use_https=False)
        assert jdata['host']== "test1"
        assert env.get_content(name, "/name.txt", use_https=False) == name
        r = env.get_meta(name, "/name.txt", use_https=False)
        assert int(r.response['header']['content-length']) == len(name)
        assert "Location" not in r.response['header']
        # test HTTPS access
        assert env.get_content(name, "/name.txt", use_https=True) == name

        # test HTTP access again -> redirect to default HTTPS port
        conf.add("MDRequireHttps temporary")
        conf.install()
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        r = env.get_meta(name, "/name.txt", use_https=False)
        assert r.response['status'] == 302
        exp_location = "https://%s/name.txt" % name
        assert r.response['header']['location'] == exp_location
        # should not see this
        assert 'strict-transport-security' not in r.response['header']
        # test default HTTP vhost -> still no redirect
        jdata = env.get_json_content(f"test1.{env.http_tld}""/alive.json", use_https=False)
        assert jdata['host']== "test1"
        r = env.get_meta(name, "/name.txt", use_https=True)
        # also not for this
        assert 'strict-transport-security' not in r.response['header']

        # test HTTP access again -> redirect permanent
        conf.add("MDRequireHttps permanent")
        conf.install()
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        r = env.get_meta(name, "/name.txt", use_https=False)
        assert r.response['status'] == 301
        exp_location = "https://%s/name.txt" % name
        assert r.response['header']['location'] == exp_location
        assert 'strict-transport-security' not in r.response['header']
        # should see this
        r = env.get_meta(name, "/name.txt", use_https=True)
        assert r.response['header']['strict-transport-security'] == 'max-age=15768000'

    def test_md_502_110(self, env):
        # test case: SSL-only domain, override headers generated by mod_md 
        # setup: prepare config
        domain = self.test_domain
        name = "www." + domain
        conf = MDConf(env, admin="admin@" + domain)
        conf.add_drive_mode("manual")
        conf.add("MDRequireHttps permanent")
        conf.add_md([name])
        conf.add_vhost(name, port=env.http_port)
        conf.add_vhost(name)
        conf.install()
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        # drive it
        assert env.a2md(["drive", name]).exit_code == 0
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'

        # test override HSTS header
        conf.add('Header set Strict-Transport-Security "max-age=10886400; includeSubDomains; preload"')
        conf.install()
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        r = env.get_meta(name, "/name.txt", use_https=True)
        assert 'strict-transport-security' in r.response['header'], r.response['header']
        assert r.response['header']['strict-transport-security'] == \
               'max-age=10886400; includeSubDomains; preload'

        # test override Location header
        conf.add(' Redirect /a /name.txt')
        conf.add(' Redirect seeother /b /name.txt')
        conf.install()
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        # check: default redirect by mod_md still works
        exp_location = "https://%s/name.txt" % name
        r = env.get_meta(name, "/name.txt", use_https=False)
        assert r.response['status'] == 301
        assert r.response['header']['location'] == exp_location
        # check: redirect as given by mod_alias
        exp_location = "https://%s/a" % name
        r = env.get_meta(name, "/a", use_https=False)
        assert r.response['status'] == 301    # FAIL: mod_alias generates Location header instead of mod_md
        assert r.response['header']['location'] == exp_location

    def test_md_502_111(self, env):
        # test case: vhost with parallel HTTP/HTTPS, check mod_alias redirects
        # setup: prepare config
        domain = self.test_domain
        name = "www." + domain
        conf = MDConf(env, admin="admin@" + domain)
        conf.add_drive_mode("manual")
        conf.add_md([name])
        conf.add(" LogLevel alias:debug")
        conf.add_vhost(name, port=env.http_port)
        conf.add_vhost(name)
        conf.install()
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        # drive it
        r = env.a2md(["-v""drive", name])
        assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr)
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'

        # setup: place redirect rules
        conf.add(' Redirect /a /name.txt')
        conf.add(' Redirect seeother /b /name.txt')
        conf.install()
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        # check: redirects on HTTP
        exp_location = "http://%s:%s/name.txt" % (name, env.http_port)
        r = env.get_meta(name, "/a", use_https=False)
        assert r.response['status'] == 302
        assert r.response['header']['location'] == exp_location
        r = env.get_meta(name, "/b", use_https=False)
        assert r.response['status'] == 303
        assert r.response['header']['location'] == exp_location
        # check: redirects on HTTPS
        exp_location = "https://%s:%s/name.txt" % (name, env.https_port)
        r = env.get_meta(name, "/a", use_https=True)
        assert r.response['status'] == 302
        assert r.response['header']['location'] == exp_location     # FAIL: expected 'https://...' but found 'http://...'
        r = env.get_meta(name, "/b", use_https=True)
        assert r.response['status'] == 303
        assert r.response['header']['location'] == exp_location

    def test_md_502_120(self, env):
        # test case: NP dereference reported by Daniel Caminada <daniel.caminada@ergon.ch>
        domain = self.test_domain
        name = "www." + domain
        conf = MDConf(env, admin="admin@" + domain)
        conf.add_drive_mode("manual")
        conf.add_md([name])
        conf.add_vhost(name)
        conf.install()
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        env.run(["openssl""s_client",
                 f"-connect""localhost:{env.https_port}",
                 "-servername""example.com""-crlf"
                 ], intext="GET https:// HTTP/1.1\nHost: example.com\n\n")
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'

    # --------- critical state change -> drive again ---------

    def test_md_502_200(self, env):
        # test case: add dns name on existing valid md
        # setup: create md in store
        domain = self.test_domain
        name = "www." + domain
        self._prepare_md(env, [name])
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        # setup: drive it
        r = env.a2md(["drive", name])
        assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr)
        old_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem'))
        # setup: add second domain
        assert env.a2md(["update", name, "domains", name, "test." + domain]).exit_code == 0
        # drive
        r = env.a2md(["-vv""drive", name])
        assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr)
        # check new cert
        env.check_md_credentials([name, "test." + domain])
        new_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem'))
        assert not old_cert.same_serial_as(new_cert.get_serial)

    @pytest.mark.parametrize("renew_window,test_data_list", [
        ("14d", [
            {"valid": {"notBefore": -5,   "notAfter": 180}, "renew"False},
            {"valid": {"notBefore": -200, "notAfter": 15}, "renew"False},
            {"valid": {"notBefore": -200, "notAfter": 13}, "renew"True},
        ]),
        ("30%", [
            {"valid": {"notBefore": -0,   "notAfter": 180}, "renew"False},
            {"valid": {"notBefore": -120, "notAfter": 60}, "renew"False},
            {"valid": {"notBefore": -126, "notAfter": 53}, "renew"True},
        ])
    ])
    def test_md_502_201(self, env, renew_window, test_data_list):
        # test case: trigger cert renew when entering renew window 
        # setup: prepare COMPLETE md
        domain = self.test_domain
        name = "www." + domain
        conf = MDConf(env, admin="admin@" + domain)
        conf.add_drive_mode("manual")
        conf.add_renew_window(renew_window)
        conf.add_md([name])
        conf.install()
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        assert env.a2md(["list", name]).json['output'][0]['state'] == env.MD_S_INCOMPLETE
        # setup: drive it
        r = env.a2md(["drive", name])
        assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr)
        cert1 = MDCertUtil(env.store_domain_file(name, 'pubcert.pem'))
        assert env.a2md(["list", name]).json['output'][0]['state'] == env.MD_S_COMPLETE

        # replace cert by self-signed one -> check md status
        print("TRACE: start testing renew window: %s" % renew_window)
        for tc in test_data_list:
            print("TRACE: create self-signed cert: %s" % tc["valid"])
            creds = env.create_self_signed_cert(CertificateSpec(domains=[name]),
                                                valid_from=timedelta(days=tc["valid"]["notBefore"]),
                                                valid_to=timedelta(days=tc["valid"]["notAfter"]))
            assert creds.certificate.serial_number != cert1.get_serial_number()
            # copy it over, assess status again
            creds.save_cert_pem(env.store_domain_file(name, 'pubcert.pem'))
            md = env.a2md(["list", name]).json['output'][0]
            assert md["renew"] == tc["renew"], \
                "Expected renew == {} indicator in {}, test case {}".format(tc["renew"], md, tc)

    @pytest.mark.parametrize("key_type,key_params,exp_key_length", [
        ("RSA", [2048], 2048),
        ("RSA", [3072], 3072),
        ("RSA", [4096], 4096),
        ("Default", [], 2048)
    ])
    def test_md_502_202(self, env, key_type, key_params, exp_key_length):
        # test case: specify RSA key length and verify resulting cert key 
        # setup: prepare md
        domain = self.test_domain
        name = "www." + domain
        conf = MDConf(env, admin="admin@" + domain)
        conf.add_drive_mode("manual")
        conf.add_private_key(key_type, key_params)
        conf.add_md([name])
        conf.install()
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        assert env.a2md(["list", name]).json['output'][0]['state'] == env.MD_S_INCOMPLETE
        # setup: drive it
        r = env.a2md(["-vv""drive", name])
        assert r.exit_code == 0, "drive for MDPrivateKeys {} {}: {}".format(key_type, key_params, r.stderr)
        assert env.a2md(["list", name]).json['output'][0]['state'] == env.MD_S_COMPLETE
        # check cert key length
        cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem'))
        assert cert.get_key_length() == exp_key_length

    # test_502_203 removed, as ToS agreement is not really checked in ACMEv2

    # --------- non-critical state change -> keep data ---------

    def test_md_502_300(self, env):
        # test case: remove one domain name from existing valid md
        # setup: create md in store
        domain = self.test_domain
        name = "www." + domain
        self._prepare_md(env, [name, "test." + domain, "xxx." + domain])
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        # setup: drive it
        r = env.a2md(["drive", name])
        assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr)
        old_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem'))
        # setup: remove one domain
        assert env.a2md(["update", name, "domains"] + [name, "test." + domain]).exit_code == 0
        # drive
        assert env.a2md(["-vv""drive", name]).exit_code == 0
        # compare cert serial
        new_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem'))
        assert old_cert.same_serial_as(new_cert)

    def test_md_502_301(self, env):
        # test case: change contact info on existing valid md
        # setup: create md in store
        domain = self.test_domain
        name = "www." + domain
        self._prepare_md(env, [name])
        assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
        # setup: drive it
        r = env.a2md(["drive", name])
        assert r.exit_code == 0, "a2md drive failed: {0}".format(r.stderr)
        old_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem'))
        # setup: add second domain
        assert env.a2md(["update", name, "contacts""test@" + domain]).exit_code == 0
        # drive
        assert env.a2md(["drive", name]).exit_code == 0
        # compare cert serial
        new_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem'))
        assert old_cert.same_serial_as(new_cert)

    # --------- network problems ---------

    def test_md_502_400(self, env):
        # test case: server not reachable
        domain = self.test_domain
        name = "www." + domain
        self._prepare_md(env, [name])
        assert env.a2md(
            ["update", name, "ca""http://localhost:4711/directory"]
            ).exit_code == 0
        # drive
        r = env.a2md(["drive", name])
        assert r.exit_code == 1
        assert r.json['status'] != 0
        assert r.json['description'] == 'Connection refused'

    # --------- _utils_ ---------

    def _prepare_md(self, env, domains):
        assert env.a2md(["add"] + domains).exit_code == 0
        assert env.a2md(
            ["update", domains[0], "contacts""admin@" + domains[0]]
            ).exit_code == 0
        assert env.a2md( 
            ["update", domains[0], "agreement", env.acme_tos]
            ).exit_code == 0

    def _write_res_file(self, doc_root, name, content):
        if not os.path.exists(doc_root):
            os.makedirs(doc_root)
        open(os.path.join(doc_root, name), "w").write(content)

    RE_MSG_OPENSSL_BAD_DECRYPT = re.compile('.*\'bad decrypt\'.*')

    def _check_account_key(self, env, name):
        # read encryption key
        md_store = json.loads(open(env.path_store_json(), 'r').read())
        encrypt_key = base64.urlsafe_b64decode(str(md_store['key']))
        # check: key file is encrypted PEM
        md = env.a2md(["list", name]).json['output'][0]
        acc = md['ca']['account']
        MDCertUtil.validate_privkey(env.path_account_key(acc), lambda *args: encrypt_key)

Messung V0.5
C=90 H=100 G=95

¤ Dauer der Verarbeitung: 0.16 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.