Source code for checkdmarc.dnssec

# -*- coding: utf-8 -*-
"""DNSSEC tests"""

from __future__ import annotations

import logging

import dns
import dns.dnssec

from expiringdict import ExpiringDict

from checkdmarc.utils import get_base_domain


"""Copyright 2019-2023 Sean Whalen

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""

DNSSEC_CACHE = ExpiringDict(max_len=200000, max_age_seconds=1800)
DNSKEY_CACHE = ExpiringDict(max_len=200000, max_age_seconds=1800)
TLSA_CACHE = ExpiringDict(max_len=200000, max_age_seconds=1800)


[docs] def get_dnskey(domain: str, nameservers: list[str] = None, timeout: float = 2.0, cache: ExpiringDict = None): """ Get a DNSKEY RRSet on the given domain Args: domain (str): The domain to check nameservers (list): A list of nameservers to query timeout (float): Timeout in seconds cache (ExpiringDict): A cache Returns: A DNSKEY dictionary """ if nameservers is None: nameservers = dns.resolver.Resolver().nameservers if cache is None: cache = DNSKEY_CACHE domain = domain.lower() if domain in cache: return cache[domain] logging.debug(f"Checking for DNSKEY records at {domain}") request = dns.message.make_query(domain, dns.rdatatype.DNSKEY, want_dnssec=True) for nameserver in nameservers: try: response = dns.query.udp(request, nameserver, timeout=timeout) if response is not None: answer = response.answer if len(answer) == 0: logging.debug(f"No DNSKEY records found at {domain}") base_domain = get_base_domain(domain) if domain != base_domain: return get_dnskey(base_domain, nameservers=nameservers, timeout=timeout) cache[domain] = None return None rrset = answer[0] name = dns.name.from_text(f'{domain}.') key = {name: rrset} cache[domain] = key return key except Exception as e: cache[domain] = None logging.debug(f"DNSKEY query error: {e}")
[docs] def test_dnssec(domain: str, nameservers: list[str] = None, timeout: float = 2.0, cache: ExpiringDict = None) -> bool: """ Check for DNSSEC on the given domain Args: domain (str): The domain to check nameservers (list): A list of nameservers to query timeout (float): Timeout in seconds cache (ExpiringDict): Cache Returns: bool: DNSSEC status """ if nameservers is None: nameservers = dns.resolver.Resolver().nameservers if cache is None: cache = DNSSEC_CACHE if domain in cache: return DNSSEC_CACHE[cache] key = get_dnskey(domain, nameservers=nameservers, timeout=timeout) if key is None: return False rdatatypes = [dns.rdatatype.DNSKEY, dns.rdatatype.MX, dns.rdatatype.A, dns.rdatatype.NS, dns.rdatatype.CNAME] for rdatatype in rdatatypes: request = dns.message.make_query(domain, rdatatype, want_dnssec=True) for nameserver in nameservers: try: response = dns.query.udp(request, nameserver, timeout=timeout) if response is not None: answer = response.answer if len(answer) != 2: continue rrset = answer[0] rrsig = answer[1] dns.dnssec.validate(rrset, rrsig, key) logging.debug(f"Found a signed {rdatatype.name} record") cache[domain] = True return True except Exception as e: logging.debug(f"DNSSEC query error: {e}") cache[domain] = False return False
[docs] def get_tlsa_records(hostname: str, nameservers: list[str] = None, timeout: float = 2.0, port: int = 25, protocol: str = "tcp", cache: ExpiringDict = None) -> list[str]: """ Checks for TLSA records on the given hostname Args: hostname (str): The domain to check nameservers (list): A list of nameservers to query timeout (float): Timeout in seconds port (int): The port protocol (str): The protocol cache (ExpiringDict): A cache Returns: list: A list of TLSA records """ if nameservers is None: nameservers = dns.resolver.Resolver().nameservers protocol = protocol.lower() if cache is None: cache = TLSA_CACHE query_hostname = f"_{port}._{protocol}.{hostname}" if query_hostname in TLSA_CACHE: return TLSA_CACHE[query_hostname] tlsa_records = [] logging.debug(f"Checking for TLSA records at {query_hostname}") request = dns.message.make_query(query_hostname, dns.rdatatype.TLSA, want_dnssec=True) for nameserver in nameservers: try: response = dns.query.udp(request, nameserver, timeout=timeout) if response is not None: answer = response.answer if len(answer) != 2: return tlsa_records dnskey = get_dnskey( domain=hostname, nameservers=nameservers, timeout=timeout ) if dnskey is None: logging.debug(f"Found TLSA records at {hostname} but not " f"a DNSKEY record to verify them") return tlsa_records rrset = answer[0] rrsig = answer[1] dns.dnssec.validate(rrset, rrsig, dnskey) tlsa_records = list(map(lambda x: str(x), list(rrset.items.keys()))) cache[query_hostname] = tlsa_records return tlsa_records except Exception as e: logging.debug(f"TLSA query error: {e}") return tlsa_records