# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0.  If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.

import os
import shutil
import time

from datetime import timedelta

import dns
import dns.update
import pytest

pytest.importorskip("dns", minversion="2.0.0")
import isctest
import isctest.mark
from isctest.kasp import (
    KeyProperties,
    KeyTimingMetadata,
)
from isctest.vars.algorithms import ECDSAP256SHA256, ECDSAP384SHA384

pytestmark = pytest.mark.extra_artifacts(
    [
        "K*.private",
        "K*.backup",
        "K*.cmp",
        "K*.key",
        "K*.state",
        "*.axfr",
        "*.created",
        "dig.out*",
        "keyevent.out.*",
        "keygen.out.*",
        "keys",
        "published.test*",
        "python.out.*",
        "retired.test*",
        "rndc.dnssec.*.out.*",
        "rndc.zonestatus.out.*",
        "rrsig.out.*",
        "created.key-*",
        "unused.key-*",
        "verify.out.*",
        "zone.out.*",
        "ns*/K*.key",
        "ns*/K*.offline",
        "ns*/K*.private",
        "ns*/K*.state",
        "ns*/*.db",
        "ns*/*.db.infile",
        "ns*/*.db.signed",
        "ns*/*.db.signed.tmp",
        "ns*/*.jbk",
        "ns*/*.jnl",
        "ns*/*.zsk1",
        "ns*/*.zsk2",
        "ns*/dsset-*",
        "ns*/keygen.out.*",
        "ns*/keys",
        "ns*/ksk",
        "ns*/ksk/K*",
        "ns*/zsk",
        "ns*/zsk",
        "ns*/zsk/K*",
        "ns*/named-fips.conf",
        "ns*/settime.out.*",
        "ns*/signer.out.*",
        "ns*/zones",
        "ns*/policies/*.conf",
        "ns3/legacy-keys.*",
        "ns3/dynamic-signed-inline-signing.kasp.db.signed.signed",
    ]
)


kasp_config = {
    "dnskey-ttl": timedelta(seconds=1234),
    "ds-ttl": timedelta(days=1),
    "key-directory": "{keydir}",
    "max-zone-ttl": timedelta(days=1),
    "parent-propagation-delay": timedelta(hours=1),
    "publish-safety": timedelta(hours=1),
    "retire-safety": timedelta(hours=1),
    "signatures-refresh": timedelta(days=5),
    "signatures-validity": timedelta(days=14),
    "zone-propagation-delay": timedelta(minutes=5),
}

autosign_config = {
    "dnskey-ttl": timedelta(seconds=300),
    "ds-ttl": timedelta(days=1),
    "key-directory": "{keydir}",
    "max-zone-ttl": timedelta(days=1),
    "parent-propagation-delay": timedelta(hours=1),
    "publish-safety": timedelta(hours=1),
    "retire-safety": timedelta(hours=1),
    "signatures-refresh": timedelta(days=7),
    "signatures-validity": timedelta(days=14),
    "zone-propagation-delay": timedelta(minutes=5),
}

lifetime = {
    "P10Y": int(timedelta(days=10 * 365).total_seconds()),
    "P5Y": int(timedelta(days=5 * 365).total_seconds()),
    "P2Y": int(timedelta(days=2 * 365).total_seconds()),
    "P1Y": int(timedelta(days=365).total_seconds()),
    "P30D": int(timedelta(days=30).total_seconds()),
    "P6M": int(timedelta(days=31 * 6).total_seconds()),
}

KASP_INHERIT_TSIG_SECRET = {
    "sha1": "FrSt77yPTFx6hTs4i2tKLB9LmE0=",
    "sha224": "hXfwwwiag2QGqblopofai9NuW28q/1rH4CaTnA==",
    "sha256": "R16NojROxtxH/xbDl//ehDsHm5DjWTQ2YXV+hGC2iBY=",
    "view1": "YPfMoAk6h+3iN8MDRQC004iSNHY=",
    "view2": "4xILSZQnuO1UKubXHkYUsvBRPu8=",
    "view3": "C1Azf+gGPMmxrUg/WQINP6eV9Y0=",
}


def param(*args, **kwargs):
    if "id" not in kwargs:
        kwargs["id"] = args[0]  # use first argument  as test ID
    return pytest.param(*args, **kwargs)


def autosign_properties(alg, size):
    return [
        f"ksk {lifetime['P2Y']} {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
        f"zsk {lifetime['P1Y']} {alg} {size} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
    ]


def rsa1_properties(alg):
    return [
        f"ksk {lifetime['P10Y']} {alg} 2048 goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
        f"zsk {lifetime['P5Y']} {alg} 2048 goal:omnipresent dnskey:rumoured zrrsig:rumoured",
        f"zsk {lifetime['P1Y']} {alg} 2000 goal:omnipresent dnskey:rumoured zrrsig:rumoured",
    ]


def fips_properties(alg, bits=None):
    sizes = [2048, 2048, 3072]
    if bits is not None:
        sizes = [bits, bits, bits]

    return [
        f"ksk {lifetime['P10Y']} {alg} {sizes[0]} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
        f"zsk {lifetime['P5Y']} {alg} {sizes[1]} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
        f"zsk {lifetime['P1Y']} {alg} {sizes[2]} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
    ]


def check_all(server, zone, policy, ksks, zsks, zsk_missing=False, tsig=None):
    isctest.kasp.check_dnssecstatus(server, zone, ksks + zsks, policy=policy)
    isctest.kasp.check_apex(
        server, zone, ksks, zsks, zsk_missing=zsk_missing, tsig=tsig
    )
    isctest.kasp.check_subdomain(server, zone, ksks, zsks, tsig=tsig)
    isctest.kasp.check_dnssec_verify(server, zone, tsig=tsig)


def set_keytimes_default_policy(kp):
    # The first key is immediately published and activated.
    kp.timing["Generated"] = kp.key.get_timing("Created")
    kp.timing["Published"] = kp.timing["Generated"]
    kp.timing["Active"] = kp.timing["Generated"]
    # The DS can be published if the DNSKEY and RRSIG records are
    # OMNIPRESENT.  This happens after max-zone-ttl (1d) plus
    # plus zone-propagation-delay (300s).
    kp.timing["PublishCDS"] = kp.timing["Published"] + timedelta(days=1, seconds=300)
    # Key lifetime is unlimited, so not setting 'Retired' nor 'Removed'.
    kp.timing["DNSKEYChange"] = kp.timing["Published"]
    kp.timing["DSChange"] = kp.timing["Published"]
    kp.timing["KRRSIGChange"] = kp.timing["Active"]
    kp.timing["ZRRSIGChange"] = kp.timing["Active"]


def cb_ixfr_is_signed(expected_updates, params, ksks=None, zsks=None):
    zone = params["zone"]
    policy = params["policy"]
    servers = params["servers"]

    isctest.log.info(f"check that the zone {zone} is correctly signed after ixfr")
    isctest.log.debug(
        f"expected updates {expected_updates} policy {policy} ksks {ksks} zsks {zsks}"
    )
    shutil.copyfile(f"ns2/{zone}.db.in2", f"ns2/{zone}.db")
    servers["ns2"].rndc(f"reload {zone}", log=False)

    def update_is_signed():
        parts = update.split()
        qname = parts[0]
        qtype = dns.rdatatype.from_text(parts[1])
        rdata = parts[2]
        return isctest.kasp.verify_update_is_signed(
            servers["ns3"], zone, qname, qtype, rdata, ksks, zsks
        )

    for update in expected_updates:
        isctest.run.retry_with_timeout(update_is_signed, timeout=5)


def cb_rrsig_refresh(params, ksks=None, zsks=None):
    zone = params["zone"]
    servers = params["servers"]

    isctest.log.info(f"check that the zone {zone} refreshes expired signatures")

    def rrsig_is_refreshed():
        parts = query.split()
        qname = parts[0]
        qtype = dns.rdatatype.from_text(parts[1])
        return isctest.kasp.verify_rrsig_is_refreshed(
            servers["ns3"], zone, f"ns3/{zone}.db.signed", qname, qtype, ksks, zsks
        )

    queries = [
        f"{zone} DNSKEY",
        f"{zone} SOA",
        f"{zone} NS",
        f"{zone} NSEC",
        f"a.{zone} A",
        f"a.{zone} NSEC",
        f"b.{zone} A",
        f"b.{zone} NSEC",
        f"c.{zone} A",
        f"c.{zone} NSEC",
        f"ns3.{zone} A",
        f"ns3.{zone} NSEC",
    ]

    for query in queries:
        isctest.run.retry_with_timeout(rrsig_is_refreshed, timeout=5)


def cb_rrsig_reuse(params, ksks=None, zsks=None):
    zone = params["zone"]
    servers = params["servers"]

    isctest.log.info(f"check that the zone {zone} reuses fresh signatures")

    def rrsig_is_reused():
        parts = query.split()
        qname = parts[0]
        qtype = dns.rdatatype.from_text(parts[1])
        return isctest.kasp.verify_rrsig_is_reused(
            servers["ns3"], zone, f"ns3/{zone}.db.signed", qname, qtype, ksks, zsks
        )

    queries = [
        f"{zone} NS",
        f"{zone} NSEC",
        f"a.{zone} A",
        f"a.{zone} NSEC",
        f"b.{zone} A",
        f"b.{zone} NSEC",
        f"c.{zone} A",
        f"c.{zone} NSEC",
        f"ns3.{zone} A",
        f"ns3.{zone} NSEC",
    ]

    for query in queries:
        rrsig_is_reused()


def cb_legacy_keys(params, ksks=None, zsks=None):
    zone = params["zone"]
    keydir = params["config"]["key-directory"]

    isctest.log.info(f"check that the zone {zone} uses correct legacy keys")

    assert len(ksks) == 1
    assert len(zsks) == 1

    # This assumes the zone has a policy that dictates one KSK and one ZSK.
    # The right keys to be used are stored in "{zone}.ksk" and "{zone}.zsk".
    with open(f"{keydir}/{zone}.ksk", "r", encoding="utf-8") as file:
        kskfile = file.read()
    with open(f"{keydir}/{zone}.zsk", "r", encoding="utf-8") as file:
        zskfile = file.read()

    assert f"{keydir}/{kskfile}".strip() == ksks[0].path
    assert f"{keydir}/{zskfile}".strip() == zsks[0].path


def cb_remove_keyfiles(params, ksks=None, zsks=None):
    zone = params["zone"]
    servers = params["servers"]
    keydir = params["config"]["key-directory"]

    isctest.log.info(
        "check that removing key files does not create new keys to be generated"
    )

    for k in ksks + zsks:
        os.remove(k.keyfile)
        os.remove(k.privatefile)
        os.remove(k.statefile)

    with servers["ns3"].watch_log_from_here() as watcher:
        servers["ns3"].rndc(f"loadkeys {zone}", log=False)
        watcher.wait_for_line(
            f"zone {zone}/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
        )

    # Check keys again, make sure no new keys are created.
    keys = isctest.kasp.keydir_to_keylist(zone, keydir)
    isctest.kasp.check_keys(zone, keys, [])
    # Zone is still signed correctly.
    isctest.kasp.check_dnssec_verify(servers["ns3"], zone)


@pytest.mark.parametrize(
    "params",
    [
        pytest.param(
            {
                "zone": "rsasha1.kasp",
                "policy": "rsasha1",
                "config": kasp_config,
                "key-properties": rsa1_properties(5),
            },
            id="rsasha1.kasp",
            marks=isctest.mark.with_algorithm("RSASHA1"),
        ),
        pytest.param(
            {
                "zone": "rsasha1-nsec3.kasp",
                "policy": "rsasha1",
                "config": kasp_config,
                "key-properties": rsa1_properties(7),
            },
            id="rsasha1-nsec3.kasp",
            marks=isctest.mark.with_algorithm("RSASHA1"),
        ),
        pytest.param(
            {
                "zone": "dnskey-ttl-mismatch.autosign",
                "policy": "autosign",
                "config": autosign_config,
                "offset": -timedelta(days=30 * 6),
                "key-properties": autosign_properties(
                    os.environ["DEFAULT_ALGORITHM_NUMBER"], os.environ["DEFAULT_BITS"]
                ),
            },
            id="dnskey-ttl-mismatch.autosign",
        ),
        pytest.param(
            {
                "zone": "expired-sigs.autosign",
                "policy": "autosign",
                "config": autosign_config,
                "offset": -timedelta(days=30 * 6),
                "key-properties": autosign_properties(
                    os.environ["DEFAULT_ALGORITHM_NUMBER"], os.environ["DEFAULT_BITS"]
                ),
                "additional-tests": [
                    {
                        "callback": cb_rrsig_refresh,
                        "arguments": [],
                    },
                ],
            },
            id="expired-sigs.autosign",
        ),
        pytest.param(
            {
                "zone": "fresh-sigs.autosign",
                "policy": "autosign",
                "config": autosign_config,
                "offset": -timedelta(days=30 * 6),
                "key-properties": autosign_properties(
                    os.environ["DEFAULT_ALGORITHM_NUMBER"], os.environ["DEFAULT_BITS"]
                ),
                "additional-tests": [
                    {
                        "callback": cb_rrsig_reuse,
                        "arguments": [],
                    },
                ],
            },
            id="fresh-sigs.autosign",
        ),
        pytest.param(
            {
                "zone": "unfresh-sigs.autosign",
                "policy": "autosign",
                "config": autosign_config,
                "offset": -timedelta(days=30 * 6),
                "key-properties": autosign_properties(
                    os.environ["DEFAULT_ALGORITHM_NUMBER"], os.environ["DEFAULT_BITS"]
                ),
                "additional-tests": [
                    {
                        "callback": cb_rrsig_refresh,
                        "arguments": [],
                    },
                ],
            },
            id="unfresh-sigs.autosign",
        ),
        pytest.param(
            {
                "zone": "keyfiles-missing.autosign",
                "policy": "autosign",
                "config": autosign_config,
                "offset": -timedelta(days=30 * 6),
                "key-properties": autosign_properties(
                    os.environ["DEFAULT_ALGORITHM_NUMBER"], os.environ["DEFAULT_BITS"]
                ),
                "additional-tests": [
                    {
                        "callback": cb_remove_keyfiles,
                        "arguments": [],
                    },
                ],
            },
            id="keyfiles-missing.autosign",
        ),
        pytest.param(
            {
                "zone": "ksk-missing.autosign",
                "policy": "autosign",
                "config": autosign_config,
                "offset": -timedelta(days=30 * 6),
                "key-properties": [
                    f"ksk 63072000 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent missing",
                    f"zsk 31536000 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
                ],
            },
            id="ksk-missing.autosign",
        ),
        pytest.param(
            {
                "zone": "zsk-missing.autosign",
                "policy": "autosign",
                "config": autosign_config,
                "offset": -timedelta(days=30 * 6),
                "key-properties": [
                    f"ksk 63072000 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
                    f"zsk 31536000 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent missing",
                ],
            },
            id="zsk-missing.autosign",
        ),
        pytest.param(
            {
                "zone": "dnssec-keygen.kasp",
                "policy": "rsasha256",
                "config": kasp_config,
                "key-properties": fips_properties(8),
            },
            id="dnssec-keygen.kasp",
        ),
        pytest.param(
            {
                "zone": "ecdsa256.kasp",
                "policy": "ecdsa256",
                "config": kasp_config,
                "key-properties": fips_properties(13, bits=256),
            },
            id="ecdsa256.kasp",
        ),
        pytest.param(
            {
                "zone": "ecdsa384.kasp",
                "policy": "ecdsa384",
                "config": kasp_config,
                "key-properties": fips_properties(14, bits=384),
            },
            id="ecdsa384.kasp",
        ),
        pytest.param(
            {
                "zone": "inherit.kasp",
                "policy": "rsasha256",
                "config": kasp_config,
                "key-properties": fips_properties(8),
            },
            id="inherit.kasp",
        ),
        pytest.param(
            {
                "zone": "keystore.kasp",
                "policy": "keystore",
                "config": {
                    "dnskey-ttl": timedelta(seconds=303),
                    "ds-ttl": timedelta(days=1),
                    "key-directory": "{keydir}",
                    "max-zone-ttl": timedelta(days=1),
                    "parent-propagation-delay": timedelta(hours=1),
                    "publish-safety": timedelta(hours=1),
                    "retire-safety": timedelta(hours=1),
                    "signatures-refresh": timedelta(days=5),
                    "signatures-validity": timedelta(days=14),
                    "zone-propagation-delay": timedelta(minutes=5),
                },
                "key-directories": ["{keydir}/ksk", "{keydir}/zsk"],
                "key-properties": [
                    f"ksk unlimited {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
                    f"zsk unlimited {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
                ],
            },
            id="keystore.kasp",
        ),
        pytest.param(
            {
                "zone": "legacy-keys.kasp",
                "policy": "migrate-to-dnssec-policy",
                "config": kasp_config,
                "pregenerated": True,
                "key-properties": [
                    "ksk 16070400 8 2048 goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
                    "zsk 16070400 8 2048 goal:omnipresent dnskey:rumoured zrrsig:rumoured",
                ],
                "additional-tests": [
                    {
                        "callback": cb_legacy_keys,
                        "arguments": [],
                    },
                ],
            },
            id="legacy-keys.kasp",
        ),
        pytest.param(
            {
                "zone": "pregenerated.kasp",
                "policy": "rsasha256",
                "config": kasp_config,
                "pregenerated": True,
                "key-properties": fips_properties(8),
            },
            id="pregenerated.kasp",
        ),
        pytest.param(
            {
                "zone": "rsasha256.kasp",
                "policy": "rsasha256",
                "config": kasp_config,
                "key-properties": fips_properties(8),
            },
            id="rsasha256.kasp",
        ),
        pytest.param(
            {
                "zone": "rsasha512.kasp",
                "policy": "rsasha512",
                "config": kasp_config,
                "key-properties": fips_properties(10),
            },
            id="rsasha512.kasp",
        ),
        pytest.param(
            {
                "zone": "rumoured.kasp",
                "policy": "rsasha256",
                "config": kasp_config,
                "rumoured": True,
                "key-properties": fips_properties(8),
            },
            id="rumoured.kasp",
        ),
        pytest.param(
            {
                "zone": "secondary.kasp",
                "policy": "rsasha256",
                "config": kasp_config,
                "key-properties": fips_properties(8),
                "additional-tests": [
                    {
                        "callback": cb_ixfr_is_signed,
                        "arguments": [
                            [
                                "a.secondary.kasp. A 10.0.0.11",
                                "d.secondary.kasp. A 10.0.0.4",
                            ],
                        ],
                    },
                ],
            },
            id="secondary.kasp",
        ),
        pytest.param(
            {
                "zone": "some-keys.kasp",
                "policy": "rsasha256",
                "config": kasp_config,
                "pregenerated": True,
                "key-properties": fips_properties(8),
            },
            id="some-keys.kasp",
        ),
        pytest.param(
            {
                "zone": "unlimited.kasp",
                "policy": "unlimited",
                "config": kasp_config,
                "key-properties": [
                    f"csk 0 {os.environ['DEFAULT_ALGORITHM_NUMBER']} {os.environ['DEFAULT_BITS']} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
                ],
            },
            id="unlimited.kasp",
        ),
        pytest.param(
            {
                "zone": "ed25519.kasp",
                "policy": "ed25519",
                "config": kasp_config,
                "key-properties": fips_properties(15, bits=256),
            },
            id="ed25519.kasp",
            marks=isctest.mark.with_algorithm("ED25519"),
        ),
        pytest.param(
            {
                "zone": "ed448.kasp",
                "policy": "ed448",
                "config": kasp_config,
                "key-properties": fips_properties(16, bits=456),
            },
            id="ed448.kasp",
            marks=isctest.mark.with_algorithm("ED448"),
        ),
    ],
)
def test_kasp_case(servers, params):
    # Test many different configurations and expected keys and states after
    # initial startup.
    server = servers["ns3"]
    keydir = server.identifier

    # Get test parameters.
    zone = params["zone"]
    policy = params["policy"]

    params["config"]["key-directory"] = params["config"]["key-directory"].replace(
        "{keydir}", keydir
    )
    if "key-directories" in params:
        for i, val in enumerate(params["key-directories"]):
            params["key-directories"][i] = val.replace("{keydir}", keydir)

    ttl = int(params["config"]["dnskey-ttl"].total_seconds())
    pregenerated = False
    if params.get("pregenerated"):
        pregenerated = params["pregenerated"]
    zsk_missing = zone == "zsk-missing.autosign"

    # Test case.
    isctest.log.info(f"check test case zone {zone} policy {policy}")

    # First make sure the zone is signed.
    isctest.kasp.check_zone_is_signed(server, zone)

    # Key properties.
    expected = isctest.kasp.policy_to_properties(ttl=ttl, keys=params["key-properties"])
    # Key files.
    if "key-directories" in params:
        kdir = params["key-directories"][0]
        ksks = isctest.kasp.keydir_to_keylist(zone, kdir, in_use=pregenerated)
        kdir = params["key-directories"][1]
        zsks = isctest.kasp.keydir_to_keylist(zone, kdir, in_use=pregenerated)
        keys = ksks + zsks
    else:
        keys = isctest.kasp.keydir_to_keylist(
            zone, params["config"]["key-directory"], in_use=pregenerated
        )
        ksks = [k for k in keys if k.is_ksk()]
        zsks = [k for k in keys if not k.is_ksk()]

    isctest.kasp.check_keys(zone, keys, expected)

    offset = params["offset"] if "offset" in params else None

    for kp in expected:
        kp.set_expected_keytimes(
            params["config"], offset=offset, pregenerated=pregenerated
        )

    if "rumoured" not in params:
        isctest.kasp.check_keytimes(keys, expected)

    check_all(server, zone, policy, ksks, zsks, zsk_missing=zsk_missing)

    if "additional-tests" in params:
        params["servers"] = servers
        for additional_test in params["additional-tests"]:
            callback = additional_test["callback"]
            arguments = additional_test["arguments"]
            callback(*arguments, params=params, ksks=ksks, zsks=zsks)


@pytest.mark.parametrize(
    "zone, server_id, tsig_kind",
    [
        param("unsigned.tld", "ns2", None),
        param("none.inherit.signed", "ns4", "sha1"),
        param("none.override.signed", "ns4", "sha224"),
        param("inherit.none.signed", "ns4", "sha256"),
        param("none.none.signed", "ns4", "sha256"),
        param("inherit.inherit.unsigned", "ns5", "sha1"),
        param("none.inherit.unsigned", "ns5", "sha1"),
        param("none.override.unsigned", "ns5", "sha224"),
        param("inherit.none.unsigned", "ns5", "sha256"),
        param("none.none.unsigned", "ns5", "sha256"),
    ],
)
def test_kasp_inherit_unsigned(zone, server_id, tsig_kind, servers):
    server = servers[server_id]
    tsig = (
        f"hmac-{tsig_kind}:{tsig_kind}:{KASP_INHERIT_TSIG_SECRET[tsig_kind]}"
        if tsig_kind
        else None
    )

    keys = isctest.kasp.keydir_to_keylist(zone, server.identifier)
    isctest.kasp.check_keys(zone, keys, [])
    isctest.kasp.check_dnssecstatus(server, zone, [])
    isctest.kasp.check_apex(server, zone, [], [], tsig=tsig)
    isctest.kasp.check_subdomain(server, zone, [], [], tsig=tsig)


@pytest.mark.parametrize(
    "zone, policy, server_id, alg, tsig_kind",
    [
        param("signed.tld", "default", "ns2", ECDSAP256SHA256, None),
        param("override.inherit.signed", "default", "ns4", ECDSAP256SHA256, "sha1"),
        param("inherit.override.signed", "default", "ns4", ECDSAP256SHA256, "sha224"),
        param("override.inherit.unsigned", "default", "ns5", ECDSAP256SHA256, "sha1"),
        param("inherit.override.unsigned", "default", "ns5", ECDSAP256SHA256, "sha224"),
        param("inherit.inherit.signed", "test", "ns4", ECDSAP384SHA384, "sha1"),
        param("override.override.signed", "test", "ns4", ECDSAP384SHA384, "sha224"),
        param("override.none.signed", "test", "ns4", ECDSAP384SHA384, "sha256"),
        param("override.override.unsigned", "test", "ns5", ECDSAP384SHA384, "sha224"),
        param("override.none.unsigned", "test", "ns5", ECDSAP384SHA384, "sha256"),
    ],
)
def test_kasp_inherit_signed(zone, policy, server_id, alg, tsig_kind, servers):
    server = servers[server_id]
    tsig = (
        f"hmac-{tsig_kind}:{tsig_kind}:{KASP_INHERIT_TSIG_SECRET[tsig_kind]}"
        if tsig_kind
        else None
    )

    key1 = KeyProperties.default()
    key1.metadata["Algorithm"] = alg.number
    key1.metadata["Length"] = alg.bits
    keys = isctest.kasp.keydir_to_keylist(zone, server.identifier)

    isctest.kasp.check_zone_is_signed(server, zone, tsig=tsig)
    isctest.kasp.check_keys(zone, keys, [key1])
    set_keytimes_default_policy(key1)
    isctest.kasp.check_keytimes(keys, [key1])
    check_all(server, zone, policy, keys, [], tsig=tsig)


@pytest.mark.parametrize(
    "number, dynamic, inline_signing, txt_rdata",
    [
        param("1", "yes", "no", "view1"),
        param("2", "no", "yes", "view2"),
        param("3", "no", "yes", "view2"),
    ],
)
def test_kasp_inherit_view(number, dynamic, inline_signing, txt_rdata, servers):
    zone = "example.net"
    policy = "test"
    server = servers["ns4"]
    view = f"example{number}"
    tsig = f"{os.environ['DEFAULT_HMAC']}:keyforview{number}:{KASP_INHERIT_TSIG_SECRET[f'view{number}']}"

    key1 = KeyProperties.default()
    key1.metadata["Algorithm"] = ECDSAP384SHA384.number
    key1.metadata["Length"] = ECDSAP384SHA384.bits
    keys = isctest.kasp.keydir_to_keylist(zone, server.identifier)

    isctest.kasp.check_zone_is_signed(server, zone, tsig=tsig)
    isctest.kasp.check_keys(zone, keys, [key1])
    set_keytimes_default_policy(key1)
    isctest.kasp.check_keytimes(keys, [key1])
    isctest.kasp.check_dnssecstatus(server, zone, keys, policy=policy, view=view)
    isctest.kasp.check_apex(server, zone, keys, [], tsig=tsig)
    # check zonestatus
    response = server.rndc(f"zonestatus {zone} in {view}", log=False)
    assert f"dynamic: {dynamic}" in response
    assert f"inline signing: {inline_signing}" in response
    # check subdomain
    fqdn = f"{zone}."
    qname = f"view.{zone}."
    qtype = dns.rdatatype.TXT
    rdata = txt_rdata
    query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
    tsigkey = tsig.split(":")
    keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0])
    query.use_tsig(keyring)
    try:
        response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3)
    except dns.exception.Timeout:
        isctest.log.debug(f"query timeout for query {qname} {qtype} to {server.ip}")
        response = None
    assert response.rcode() == dns.rcode.NOERROR
    match = f'{qname} 300 IN TXT "{rdata}"'
    rrsigs = []
    for rrset in response.answer:
        if rrset.match(
            dns.name.from_text(qname), dns.rdataclass.IN, dns.rdatatype.RRSIG, qtype
        ):
            rrsigs.append(rrset)
        else:
            assert match in rrset.to_text()
    assert len(rrsigs) > 0
    isctest.kasp.check_signatures(rrsigs, qtype, fqdn, keys, [])


def test_kasp_default(servers):
    server = servers["ns3"]

    # check the zone with default kasp policy has loaded and is signed.
    isctest.log.info("check a zone with the default policy is signed")
    zone = "default.kasp"
    policy = "default"

    # Key properties.
    # DNSKEY, RRSIG (ksk), RRSIG (zsk) are published. DS needs to wait.
    keyprops = [
        "csk 0 13 256 goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
    ]
    expected = isctest.kasp.policy_to_properties(ttl=3600, keys=keyprops)
    keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
    isctest.kasp.check_zone_is_signed(server, zone)
    isctest.kasp.check_keys(zone, keys, expected)
    set_keytimes_default_policy(expected[0])
    isctest.kasp.check_keytimes(keys, expected)
    check_all(server, zone, policy, keys, [])

    # Trigger a keymgr run. Make sure the key files are not touched if there
    # are no modifications to the key metadata.
    isctest.log.info(
        "check that key files are untouched if there are no metadata changes"
    )
    key = keys[0]
    privkey_stat = os.stat(key.privatefile)
    pubkey_stat = os.stat(key.keyfile)
    state_stat = os.stat(key.statefile)

    with server.watch_log_from_here() as watcher:
        server.rndc(f"loadkeys {zone}", log=False)
        watcher.wait_for_line(f"keymgr: {zone} done")

    assert privkey_stat.st_mtime == os.stat(key.privatefile).st_mtime
    assert pubkey_stat.st_mtime == os.stat(key.keyfile).st_mtime
    assert state_stat.st_mtime == os.stat(key.statefile).st_mtime

    # again
    with server.watch_log_from_here() as watcher:
        server.rndc(f"loadkeys {zone}", log=False)
        watcher.wait_for_line(f"keymgr: {zone} done")

    assert privkey_stat.st_mtime == os.stat(key.privatefile).st_mtime
    assert pubkey_stat.st_mtime == os.stat(key.keyfile).st_mtime
    assert state_stat.st_mtime == os.stat(key.statefile).st_mtime

    # modify unsigned zone file and check that new record is signed.
    isctest.log.info("check that an updated zone signs the new record")
    shutil.copyfile("ns3/template2.db.in", f"ns3/{zone}.db")
    server.rndc(f"reload {zone}", log=False)

    def update_is_signed():
        parts = update.split()
        qname = parts[0]
        qtype = dns.rdatatype.from_text(parts[1])
        rdata = parts[2]
        return isctest.kasp.verify_update_is_signed(
            server, zone, qname, qtype, rdata, keys, []
        )

    expected_updates = [f"a.{zone}. A 10.0.0.11", f"d.{zone}. A 10.0.0.44"]
    for update in expected_updates:
        isctest.run.retry_with_timeout(update_is_signed, timeout=5)

    # Move the private key file, a rekey event should not introduce
    # replacement keys.
    isctest.log.info("check that missing private key doesn't trigger rollover")
    shutil.move(f"{key.privatefile}", f"{key.path}.offline")
    expectmsg = "zone_rekey:zone_verifykeys failed: some key files are missing"
    with server.watch_log_from_here() as watcher:
        server.rndc(f"loadkeys {zone}", log=False)
        watcher.wait_for_line(f"zone {zone}/IN (signed): {expectmsg}")
    # Nothing has changed.
    expected[0].properties["private"] = False
    isctest.kasp.check_keys(zone, keys, expected)
    isctest.kasp.check_keytimes(keys, expected)
    check_all(server, zone, policy, keys, [])

    # A zone that uses inline-signing.
    isctest.log.info("check an inline-signed zone with the default policy is signed")
    zone = "inline-signing.kasp"
    # Key properties.
    key1 = KeyProperties.default()
    keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
    expected = [key1]
    isctest.kasp.check_zone_is_signed(server, zone)
    isctest.kasp.check_keys(zone, keys, expected)
    set_keytimes_default_policy(key1)
    isctest.kasp.check_keytimes(keys, expected)
    check_all(server, zone, policy, keys, [])


def test_kasp_dynamic(servers):
    # Dynamic update test cases.
    server = servers["ns3"]

    # Standard dynamic zone.
    isctest.log.info("check dynamic zone is updated and signed after update")
    zone = "dynamic.kasp"
    policy = "default"
    # Key properties.
    key1 = KeyProperties.default()
    expected = [key1]
    keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
    isctest.kasp.check_zone_is_signed(server, zone)
    isctest.kasp.check_keys(zone, keys, expected)
    set_keytimes_default_policy(key1)
    expected = [key1]
    isctest.kasp.check_keytimes(keys, expected)
    check_all(server, zone, policy, keys, [])

    # Update zone with nsupdate.
    def nsupdate(updates):
        message = dns.update.UpdateMessage(zone)
        for update in updates:
            if update[0] == "del":
                message.delete(update[1], update[2], update[3])
            else:
                assert update[0] == "add"
                message.add(update[1], update[2], update[3], update[4])

        try:
            response = isctest.query.udp(
                message, server.ip, server.ports.dns, timeout=3
            )
            assert response.rcode() == dns.rcode.NOERROR
        except dns.exception.Timeout:
            assert False, f"update timeout for {zone}"

        isctest.log.debug(f"update of zone {zone} to server {server.ip} successful")

    def update_is_signed():
        parts = update.split()
        qname = parts[0]
        qtype = dns.rdatatype.from_text(parts[1])
        rdata = parts[2]
        return isctest.kasp.verify_update_is_signed(
            server, zone, qname, qtype, rdata, keys, []
        )

    updates = [
        ["del", f"a.{zone}.", "A", "10.0.0.1"],
        ["add", f"a.{zone}.", 300, "A", "10.0.0.101"],
        ["add", f"d.{zone}.", 300, "A", "10.0.0.4"],
    ]
    nsupdate(updates)

    expected_updates = [f"a.{zone}. A 10.0.0.101", f"d.{zone}. A 10.0.0.4"]
    for update in expected_updates:
        isctest.run.retry_with_timeout(update_is_signed, timeout=5)

    # Update zone with nsupdate (reverting the above change).
    updates = [
        ["add", f"a.{zone}.", 300, "A", "10.0.0.1"],
        ["del", f"a.{zone}.", "A", "10.0.0.101"],
        ["del", f"d.{zone}.", "A", "10.0.0.4"],
    ]
    nsupdate(updates)

    update = f"a.{zone}. A 10.0.0.1"
    isctest.run.retry_with_timeout(update_is_signed, timeout=5)

    # Update zone with freeze/thaw.
    isctest.log.info("check dynamic zone is updated and signed after freeze and thaw")
    with server.watch_log_from_here() as watcher:
        server.rndc(f"freeze {zone}", log=False)
        watcher.wait_for_line(f"freezing zone '{zone}/IN': success")

    time.sleep(1)
    with open(f"ns3/{zone}.db", "a", encoding="utf-8") as zonefile:
        zonefile.write(f"d.{zone}. 300 A 10.0.0.44\n")
    time.sleep(1)

    with server.watch_log_from_here() as watcher:
        server.rndc(f"thaw {zone}", log=False)
        watcher.wait_for_line(f"thawing zone '{zone}/IN': success")

    expected_updates = [f"a.{zone}. A 10.0.0.1", f"d.{zone}. A 10.0.0.44"]

    for update in expected_updates:
        isctest.run.retry_with_timeout(update_is_signed, timeout=5)

    # Dynamic, and inline-signing.
    zone = "dynamic-inline-signing.kasp"
    # Key properties.
    key1 = KeyProperties.default()
    expected = [key1]
    keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
    isctest.kasp.check_zone_is_signed(server, zone)
    isctest.kasp.check_keys(zone, keys, expected)
    set_keytimes_default_policy(key1)
    expected = [key1]
    isctest.kasp.check_keytimes(keys, expected)
    check_all(server, zone, policy, keys, [])

    # Update zone with freeze/thaw.
    isctest.log.info(
        "check dynamic inline-signed zone is updated and signed after freeze and thaw"
    )
    with server.watch_log_from_here() as watcher:
        server.rndc(f"freeze {zone}", log=False)
        watcher.wait_for_line(f"freezing zone '{zone}/IN': success")

    time.sleep(1)
    shutil.copyfile("ns3/template2.db.in", f"ns3/{zone}.db")
    time.sleep(1)

    with server.watch_log_from_here() as watcher:
        server.rndc(f"thaw {zone}", log=False)
        watcher.wait_for_line(f"thawing zone '{zone}/IN': success")

    expected_updates = [f"a.{zone}. A 10.0.0.11", f"d.{zone}. A 10.0.0.44"]
    for update in expected_updates:
        isctest.run.retry_with_timeout(update_is_signed, timeout=5)

    # Dynamic, signed, and inline-signing.
    isctest.log.info("check dynamic signed, and inline-signed zone")
    zone = "dynamic-signed-inline-signing.kasp"
    # Key properties.
    key1 = KeyProperties.default()
    # The ns3/setup.sh script sets all states to omnipresent.
    key1.metadata["DNSKEYState"] = "omnipresent"
    key1.metadata["KRRSIGState"] = "omnipresent"
    key1.metadata["ZRRSIGState"] = "omnipresent"
    key1.metadata["DSState"] = "omnipresent"
    expected = [key1]
    keys = isctest.kasp.keydir_to_keylist(zone, "ns3/keys")
    isctest.kasp.check_zone_is_signed(server, zone)
    isctest.kasp.check_keys(zone, keys, expected)
    check_all(server, zone, policy, keys, [])
    # Ensure no zone_resigninc for the unsigned version of the zone is triggered.
    assert f"zone_resigninc: zone {zone}/IN (unsigned): enter" not in "ns3/named.run"


def test_kasp_checkds(servers):
    server = servers["ns3"]

    def wait_for_metadata():
        return isctest.util.file_contents_contain(ksk.statefile, metadata)

    # Zone: checkds-ksk.kasp.
    zone = "checkds-ksk.kasp"
    policy = "checkds-ksk"
    alg = os.environ["DEFAULT_ALGORITHM_NUMBER"]
    size = os.environ["DEFAULT_BITS"]
    policy_keys = [
        f"ksk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
        f"zsk unlimited {alg} {size} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
    ]
    expected = isctest.kasp.policy_to_properties(ttl=303, keys=policy_keys)
    keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
    ksks = [k for k in keys if k.is_ksk()]
    zsks = [k for k in keys if k.is_zsk()]
    isctest.kasp.check_zone_is_signed(server, zone)
    isctest.kasp.check_keys(zone, keys, expected)
    check_all(server, zone, policy, ksks, zsks)

    now = KeyTimingMetadata.now()
    ksk = ksks[0]

    isctest.log.info("check if checkds -publish correctly sets DSPublish")
    server.rndc(f"dnssec -checkds -when {now} published {zone}", log=False)
    metadata = f"DSPublish: {now}"
    isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
    expected[0].metadata["DSState"] = "rumoured"
    expected[0].timing["DSPublish"] = now
    isctest.kasp.check_keys(zone, keys, expected)

    isctest.log.info("check if checkds -withdrawn correctly sets DSRemoved")
    server.rndc(f"dnssec -checkds -when {now} withdrawn {zone}", log=False)
    metadata = f"DSRemoved: {now}"
    isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
    expected[0].metadata["DSState"] = "unretentive"
    expected[0].timing["DSRemoved"] = now
    isctest.kasp.check_keys(zone, keys, expected)


def test_kasp_checkds_doubleksk(servers):
    server = servers["ns3"]

    def wait_for_metadata():
        return isctest.util.file_contents_contain(ksk.statefile, metadata)

    # Zone: checkds-doubleksk.kasp.
    zone = "checkds-doubleksk.kasp"
    policy = "checkds-doubleksk"
    alg = os.environ["DEFAULT_ALGORITHM_NUMBER"]
    size = os.environ["DEFAULT_BITS"]
    policy_keys = [
        f"ksk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
        f"ksk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
        f"zsk unlimited {alg} {size} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
    ]
    expected = isctest.kasp.policy_to_properties(ttl=303, keys=policy_keys)
    keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
    ksks = [k for k in keys if k.is_ksk()]
    zsks = [k for k in keys if k.is_zsk()]
    isctest.kasp.check_zone_is_signed(server, zone)
    isctest.kasp.check_keys(zone, keys, expected)
    check_all(server, zone, policy, ksks, zsks)

    now = KeyTimingMetadata.now()
    ksk = ksks[0]

    badalg = os.environ["ALTERNATIVE_ALGORITHM_NUMBER"]
    isctest.log.info("check invalid checkds commands")

    def check_error():
        response = server.rndc(test["command"], log=False)
        assert test["error"] in response

    test_cases = [
        {
            "command": f"dnssec -checkds -when {now} published {zone}",
            "error": "multiple possible keys found, retry command with -key id",
        },
        {
            "command": f"dnssec -checkds -when {now} withdrawn {zone}",
            "error": "multiple possible keys found, retry command with -key id",
        },
        {
            "command": f"dnssec -checkds -when {now} -key {ksks[0].tag} -alg {badalg} published {zone}",
            "error": "Error executing checkds command: no matching key found",
        },
        {
            "command": f"dnssec -checkds -when {now} -key {ksks[0].tag} -alg {badalg} withdrawn {zone}",
            "error": "Error executing checkds command: no matching key found",
        },
    ]
    for test in test_cases:
        check_error()

    isctest.log.info("check if checkds -publish -key correctly sets DSPublish")
    server.rndc(
        f"dnssec -checkds -when {now} -key {ksk.tag} published {zone}", log=False
    )
    metadata = f"DSPublish: {now}"
    isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
    expected[0].metadata["DSState"] = "rumoured"
    expected[0].timing["DSPublish"] = now
    isctest.kasp.check_keys(zone, keys, expected)

    isctest.log.info("check if checkds -withdrawn -key correctly sets DSRemoved")
    ksk = ksks[1]
    server.rndc(
        f"dnssec -checkds -when {now} -key {ksk.tag} withdrawn {zone}", log=False
    )
    metadata = f"DSRemoved: {now}"
    isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
    expected[1].metadata["DSState"] = "unretentive"
    expected[1].timing["DSRemoved"] = now
    isctest.kasp.check_keys(zone, keys, expected)


def test_kasp_checkds_csk(servers):
    server = servers["ns3"]

    def wait_for_metadata():
        return isctest.util.file_contents_contain(ksk.statefile, metadata)

    # Zone: checkds-csk.kasp.
    zone = "checkds-csk.kasp"
    policy = "checkds-csk"
    alg = os.environ["DEFAULT_ALGORITHM_NUMBER"]
    size = os.environ["DEFAULT_BITS"]
    policy_keys = [
        f"csk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
    ]
    expected = isctest.kasp.policy_to_properties(ttl=303, keys=policy_keys)
    keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
    isctest.kasp.check_zone_is_signed(server, zone)
    isctest.kasp.check_keys(zone, keys, expected)
    check_all(server, zone, policy, keys, [])

    now = KeyTimingMetadata.now()
    ksk = keys[0]

    isctest.log.info("check if checkds -publish csk correctly sets DSPublish")
    server.rndc(f"dnssec -checkds -when {now} published {zone}", log=False)
    metadata = f"DSPublish: {now}"
    isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
    expected[0].metadata["DSState"] = "rumoured"
    expected[0].timing["DSPublish"] = now
    isctest.kasp.check_keys(zone, keys, expected)

    isctest.log.info("check if checkds -withdrawn csk correctly sets DSRemoved")
    server.rndc(f"dnssec -checkds -when {now} withdrawn {zone}", log=False)
    metadata = f"DSRemoved: {now}"
    isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
    expected[0].metadata["DSState"] = "unretentive"
    expected[0].timing["DSRemoved"] = now
    isctest.kasp.check_keys(zone, keys, expected)


def test_kasp_special_characters(servers):
    server = servers["ns3"]

    # A zone with special characters.
    isctest.log.info("check special characters")

    zone = r'i-am.":\;?&[]\@!\$*+,|=\.\(\)special.kasp'
    # It is non-trivial to adapt the tests to deal with all possible different
    # escaping characters, so we will just try to verify the zone.
    isctest.kasp.check_dnssec_verify(server, zone)


def test_kasp_insecure(servers):
    server = servers["ns3"]

    # Insecure zones.
    isctest.log.info("check insecure zones")

    zone = "insecure.kasp"
    expected = []
    keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
    isctest.kasp.check_keys(zone, keys, expected)
    isctest.kasp.check_dnssecstatus(server, zone, keys, policy="insecure")
    isctest.kasp.check_apex(server, zone, keys, [])
    isctest.kasp.check_subdomain(server, zone, keys, [])

    zone = "unsigned.kasp"
    expected = []
    keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
    isctest.kasp.check_keys(zone, keys, expected)
    isctest.kasp.check_dnssecstatus(server, zone, keys, policy=None)
    isctest.kasp.check_apex(server, zone, keys, [])
    isctest.kasp.check_subdomain(server, zone, keys, [])
    # Make sure the zone file is untouched.
    isctest.check.file_contents_equal(f"ns3/{zone}.db.infile", f"ns3/{zone}.db")


def test_kasp_bad_maxzonettl(servers):
    server = servers["ns3"]

    # check that max-zone-ttl rejects zones with too high TTL.
    isctest.log.info("check max-zone-ttl rejects zones with too high TTL")
    zone = "max-zone-ttl.kasp"
    assert f"loading from master file {zone}.db failed: out of range" in server.log


def test_kasp_dnssec_keygen():
    def keygen(zone, policy, keydir=None):
        if keydir is None:
            keydir = "."

        keygen_command = [
            os.environ.get("KEYGEN"),
            "-K",
            keydir,
            "-k",
            policy,
            "-l",
            "kasp.conf",
            zone,
        ]

        return isctest.run.cmd(keygen_command, log_stdout=True).stdout.decode("utf-8")

    # check that 'dnssec-keygen -k' (configured policy) creates valid files.
    keyprops = [
        f"csk {lifetime['P1Y']} 13 256",
        f"ksk {lifetime['P1Y']} 8 2048",
        f"zsk {lifetime['P30D']} 8 2048",
        f"zsk {lifetime['P6M']} 8 3072",
    ]
    keydir = "keys"
    out = keygen("kasp", "kasp", keydir)
    keys = isctest.kasp.keystr_to_keylist(out, keydir)
    expected = isctest.kasp.policy_to_properties(ttl=200, keys=keyprops)
    isctest.kasp.check_keys("kasp", keys, expected)

    # check that 'dnssec-keygen -k' (default policy) creates valid files.
    keyprops = ["csk 0 13 256"]
    out = keygen("kasp", "default")
    keys = isctest.kasp.keystr_to_keylist(out)
    expected = isctest.kasp.policy_to_properties(ttl=3600, keys=keyprops)
    isctest.kasp.check_keys("kasp", keys, expected)

    # check that 'dnssec-settime' by default does not edit key state file.
    key = keys[0]
    shutil.copyfile(key.privatefile, f"{key.privatefile}.backup")
    shutil.copyfile(key.keyfile, f"{key.keyfile}.backup")
    shutil.copyfile(key.statefile, f"{key.statefile}.backup")

    created = key.get_timing("Created")
    publish = key.get_timing("Publish") + timedelta(hours=1)
    settime = [
        os.environ.get("SETTIME"),
        "-P",
        str(publish),
        key.path,
    ]
    out = isctest.run.cmd(settime, log_stdout=True).stdout.decode("utf-8")

    isctest.check.file_contents_equal(f"{key.statefile}", f"{key.statefile}.backup")
    assert key.get_metadata("Publish", file=key.privatefile) == str(publish)
    assert key.get_metadata("Publish", file=key.keyfile, comment=True) == str(publish)

    # check that 'dnssec-settime -s' also sets publish time metadata and
    # states in key state file.
    now = KeyTimingMetadata.now()
    goal = "omnipresent"
    dnskey = "rumoured"
    krrsig = "rumoured"
    zrrsig = "omnipresent"
    ds = "hidden"
    keyprops = [
        f"csk 0 13 256 goal:{goal} dnskey:{dnskey} krrsig:{krrsig} zrrsig:{zrrsig} ds:{ds}",
    ]
    expected = isctest.kasp.policy_to_properties(ttl=3600, keys=keyprops)
    expected[0].timing = {
        "Generated": created,
        "Published": now,
        "Active": created,
        "DNSKEYChange": now,
        "KRRSIGChange": now,
        "ZRRSIGChange": now,
        "DSChange": now,
    }

    settime = [
        os.environ.get("SETTIME"),
        "-s",
        "-P",
        str(now),
        "-g",
        goal,
        "-k",
        dnskey,
        str(now),
        "-r",
        krrsig,
        str(now),
        "-z",
        zrrsig,
        str(now),
        "-d",
        ds,
        str(now),
        key.path,
    ]
    out = isctest.run.cmd(settime, log_stdout=True).stdout.decode("utf-8")
    isctest.kasp.check_keys("kasp", keys, expected)
    isctest.kasp.check_keytimes(keys, expected)

    # check that 'dnssec-settime -s' also unsets publish time metadata and
    # states in key state file.
    now = KeyTimingMetadata.now()
    keyprops = ["csk 0 13 256"]
    expected = isctest.kasp.policy_to_properties(ttl=3600, keys=keyprops)
    expected[0].timing = {
        "Generated": created,
        "Active": created,
    }

    settime = [
        os.environ.get("SETTIME"),
        "-s",
        "-P",
        "none",
        "-g",
        "none",
        "-k",
        "none",
        str(now),
        "-z",
        "none",
        str(now),
        "-r",
        "none",
        str(now),
        "-d",
        "none",
        str(now),
        key.path,
    ]
    out = isctest.run.cmd(settime, log_stdout=True).stdout.decode("utf-8")
    isctest.kasp.check_keys("kasp", keys, expected)
    isctest.kasp.check_keytimes(keys, expected)

    # check that 'dnssec-settime -s' also sets active time metadata and states in key state file (uppercase)
    soon = now + timedelta(hours=2)
    goal = "hidden"
    dnskey = "unretentive"
    krrsig = "omnipresent"
    zrrsig = "unretentive"
    ds = "omnipresent"
    keyprops = [
        f"csk 0 13 256 goal:{goal} dnskey:{dnskey} krrsig:{krrsig} zrrsig:{zrrsig} ds:{ds}",
    ]
    expected = isctest.kasp.policy_to_properties(ttl=3600, keys=keyprops)
    expected[0].timing = {
        "Generated": created,
        "Active": soon,
        "DNSKEYChange": soon,
        "KRRSIGChange": soon,
        "ZRRSIGChange": soon,
        "DSChange": soon,
    }

    settime = [
        os.environ.get("SETTIME"),
        "-s",
        "-A",
        str(soon),
        "-g",
        "HIDDEN",
        "-k",
        "UNRETENTIVE",
        str(soon),
        "-z",
        "UNRETENTIVE",
        str(soon),
        "-r",
        "OMNIPRESENT",
        str(soon),
        "-d",
        "OMNIPRESENT",
        str(soon),
        key.path,
    ]
    out = isctest.run.cmd(settime, log_stdout=True).stdout.decode("utf-8")
    isctest.kasp.check_keys("kasp", keys, expected)
    isctest.kasp.check_keytimes(keys, expected)


def test_kasp_zsk_retired(servers):
    server = servers["ns3"]

    config = {
        "dnskey-ttl": timedelta(seconds=300),
        "ds-ttl": timedelta(days=1),
        "max-zone-ttl": timedelta(days=1),
        "parent-propagation-delay": timedelta(hours=1),
        "publish-safety": timedelta(hours=1),
        "retire-safety": timedelta(hours=1),
        "signatures-refresh": timedelta(days=7),
        "signatures-validity": timedelta(days=14),
        "zone-propagation-delay": timedelta(minutes=5),
    }

    zone = "zsk-retired.autosign"
    policy = "autosign"
    alg = os.environ["DEFAULT_ALGORITHM_NUMBER"]
    size = os.environ["DEFAULT_BITS"]
    key_properties = [
        f"ksk 63072000 {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
        # zsk predecessor
        f"zsk 31536000 {alg} {size} goal:hidden dnskey:omnipresent zrrsig:omnipresent",
        # zsk successor
        f"zsk 31536000 {alg} {size} goal:omnipresent dnskey:rumoured zrrsig:hidden",
    ]
    expected = isctest.kasp.policy_to_properties(300, key_properties)
    keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
    ksks = [k for k in keys if k.is_ksk()]
    zsks = [k for k in keys if not k.is_ksk()]
    isctest.kasp.check_zone_is_signed(server, zone)
    isctest.kasp.check_keys(zone, keys, expected)

    offset = -timedelta(days=30 * 6)
    sign_delay = config["signatures-validity"] - config["signatures-refresh"]

    def sumvars(variables):
        result = timedelta(0)
        for var in variables:
            result = result + config[var]
        return result

    # KSK Key Timings:
    # IpubC = DprpC + TTLkey
    # Note: Also need to wait until the signatures are omnipresent.
    # That's why we use max-zone-ttl instead of dnskey-ttl here.
    Ipub_KSK = sumvars(["zone-propagation-delay", "max-zone-ttl"])
    # Iret = DprpP + TTLds
    Iret_KSK = sumvars(["parent-propagation-delay", "retire-safety", "ds-ttl"])

    # ZSK Key Timings:
    # Ipub = Dprp + TTLkey
    Ipub_ZSK = sumvars(["zone-propagation-delay", "publish-safety", "dnskey-ttl"])
    # Iret = Dsgn + Dprp + TTLsig
    Iret_ZSK = sumvars(["zone-propagation-delay", "retire-safety", "max-zone-ttl"])
    Iret_ZSK = Iret_ZSK + sign_delay

    # KSK
    expected[0].timing["Generated"] = expected[0].key.get_timing("Created")
    expected[0].timing["Published"] = expected[0].timing["Generated"]
    expected[0].timing["Published"] = expected[0].timing["Published"] + offset
    expected[0].timing["Active"] = expected[0].timing["Published"]
    expected[0].timing["Retired"] = expected[0].timing["Published"] + int(
        expected[0].metadata["Lifetime"]
    )
    # Trdy(N) = Tpub(N) + IpubC
    expected[0].timing["PublishCDS"] = expected[0].timing["Published"] + Ipub_KSK
    # Tdea(N) = Tret(N) + Iret
    expected[0].timing["Removed"] = expected[0].timing["Retired"] + Iret_KSK
    expected[0].timing["DNSKEYChange"] = None
    expected[0].timing["DSChange"] = None
    expected[0].timing["KRRSIGChange"] = None

    # ZSK (predecessor)
    expected[1].timing["Generated"] = expected[1].key.get_timing("Created")
    expected[1].timing["Published"] = expected[1].timing["Generated"] + offset
    expected[1].timing["Active"] = expected[1].timing["Published"]
    expected[1].timing["Retired"] = expected[1].timing["Generated"]
    # Tdea(N) = Tret(N) + Iret
    expected[1].timing["Removed"] = expected[1].timing["Retired"] + Iret_ZSK
    expected[1].timing["DNSKEYChange"] = None
    expected[1].timing["ZRRSIGChange"] = None

    # ZSK (successor)
    expected[2].timing["Generated"] = expected[2].key.get_timing("Created")
    expected[2].timing["Published"] = expected[2].timing["Generated"]
    # Trdy(N) = Tpub(N) + Ipub
    expected[2].timing["Active"] = expected[2].timing["Published"] + Ipub_ZSK
    # Tret(N) = Tact(N) + Lzsk
    expected[2].timing["Retired"] = expected[2].timing["Active"] + int(
        expected[2].metadata["Lifetime"]
    )
    # Tdea(N) = Tret(N) + Iret
    expected[2].timing["Removed"] = expected[2].timing["Retired"] + Iret_ZSK
    expected[2].timing["DNSKEYChange"] = None
    expected[2].timing["ZRRSIGChange"] = None

    isctest.kasp.check_keytimes(keys, expected)
    check_all(server, zone, policy, ksks, zsks)

    queries = [
        f"{zone} DNSKEY",
        f"{zone} SOA",
        f"{zone} NS",
        f"{zone} NSEC",
        f"a.{zone} A",
        f"a.{zone} NSEC",
        f"b.{zone} A",
        f"b.{zone} NSEC",
        f"c.{zone} A",
        f"c.{zone} NSEC",
        f"ns3.{zone} A",
        f"ns3.{zone} NSEC",
    ]

    def rrsig_is_refreshed():
        parts = query.split()
        qname = parts[0]
        qtype = dns.rdatatype.from_text(parts[1])
        return isctest.kasp.verify_rrsig_is_refreshed(
            server, zone, f"ns3/{zone}.db.signed", qname, qtype, ksks, zsks
        )

    for query in queries:
        isctest.run.retry_with_timeout(rrsig_is_refreshed, timeout=5)

    # Load again, make sure the purged key is not an issue when verifying keys.
    with server.watch_log_from_here() as watcher:
        server.rndc(f"loadkeys {zone}", log=False)
        watcher.wait_for_line(f"keymgr: {zone} done")

    msg = f"zone {zone}/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
    server.log.prohibit(msg)


def test_kasp_reload_restart(servers):
    server = servers["ns6"]
    zone = "example"

    def query_soa(qname):
        fqdn = dns.name.from_text(qname)
        qtype = dns.rdatatype.SOA
        query = dns.message.make_query(fqdn, qtype, use_edns=True, want_dnssec=True)
        try:
            response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3)
        except dns.exception.Timeout:
            isctest.log.debug(f"query timeout for query {qname} SOA to {server.ip}")
            return 0, 0

        assert response.rcode() == dns.rcode.NOERROR

        for rr in response.answer:
            if rr.match(fqdn, dns.rdataclass.IN, dns.rdatatype.RRSIG, qtype):
                continue

            assert rr.match(fqdn, dns.rdataclass.IN, qtype, dns.rdatatype.NONE)
            assert len(rr) == 1
            return rr[0].serial, rr.ttl

        return 0, 0

    def check_soa_ttl():
        soa2, ttl2 = query_soa(zone)
        return soa1 < soa2 and ttl2 == newttl

    # Check that the SOA SERIAL increases and check the TTLs (should be 300 as
    # defined in ns6/example2.db.in).
    soa1, ttl1 = query_soa(zone)
    assert ttl1 == 300

    shutil.copyfile(f"ns6/{zone}2.db.in", f"ns6/{zone}.db")
    with server.watch_log_from_here() as watcher:
        server.rndc("reload", log=False)
        watcher.wait_for_line("all zones loaded")

    newttl = 300
    isctest.run.retry_with_timeout(check_soa_ttl, timeout=10)

    # Check that the SOA SERIAL increases and check the TTLs (should be changed
    # from 300 to 400 as defined in ns6/example3.db.in).
    soa1, ttl1 = query_soa(zone)
    assert ttl1 == 300

    server.stop()
    shutil.copyfile(f"ns6/{zone}3.db.in", f"ns6/{zone}.db")
    os.unlink(f"ns6/{zone}.db.jnl")
    with server.watch_log_from_here() as watcher:
        server.start(["--noclean", "--restart", "--port", os.environ["PORT"]])
        watcher.wait_for_line("all zones loaded")

    newttl = 400
    isctest.run.retry_with_timeout(check_soa_ttl, timeout=10)
