# -*- coding: utf-8 -*-
"""Validates and parses email-related DNS records"""
from __future__ import annotations
import logging
import dns
import json
from collections import OrderedDict
from typing import Union
from time import sleep
from io import StringIO
from csv import DictWriter
import checkdmarc._constants
from checkdmarc.utils import get_base_domain, get_nameservers, DNSException
from checkdmarc.dnssec import test_dnssec
from checkdmarc.mta_sts import check_mta_sts
from checkdmarc.smtp import check_mx
from checkdmarc.spf import check_spf
from checkdmarc.dmarc import check_dmarc
from checkdmarc.bimi import check_bimi
from checkdmarc.smtp_tls_reporting import check_smtp_tls_reporting
"""Copyright 2019-2023 Sean Whalen
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
__version__ = checkdmarc._constants.__version__
[docs]
def check_domains(domains: list[str], parked: bool = False,
approved_nameservers: list[str] = None,
approved_mx_hostnames: bool = None,
skip_tls: bool = False,
bimi_selector: str = None,
include_tag_descriptions: bool = False,
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0,
wait: float = 0.0) -> Union[OrderedDict, list[OrderedDict]]:
"""
Check the given domains for SPF and DMARC records, parse them, and return
them
Args:
domains (list): A list of domains to check
parked (bool): Indicates that the domains are parked
approved_nameservers (list): A list of approved nameservers
approved_mx_hostnames (list): A list of approved MX hostname
skip_tls (bool): Skip STARTTLS testing
bimi_selector (str): The BIMI selector to test
include_tag_descriptions (bool): Include descriptions of
tags and/or tag values in the
results
nameservers (list): A list of nameservers to query
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for an answer from DNS
wait (float): number of seconds to wait between processing domains
Returns:
An ``OrderedDict`` or ``list`` of `OrderedDict` with the following keys
- ``domain`` - The domain name
- ``base_domain`` The base domain
- ``mx`` - See :func:`checkdmarc.smtp.get_mx_hosts`
- ``spf`` - A ``valid`` flag, plus the output of
:func:`checkdmarc.spf.parse_spf_record` or an ``error``
- ``dmarc`` - A ``valid`` flag, plus the output of
:func:`checkdmarc.dmarc.parse_dmarc_record` or an ``error``
"""
domains = sorted(list(set(
map(lambda d: d.rstrip(".\r\n").strip().lower().split(",")[0],
domains))))
not_domains = []
for domain in domains:
if "." not in domain:
not_domains.append(domain)
for domain in not_domains:
domains.remove(domain)
while "" in domains:
domains.remove("")
results = []
for domain in domains:
domain = domain.lower()
logging.debug(f"Checking: {domain}")
domain_results = OrderedDict(
[("domain", domain), ("base_domain", get_base_domain(domain)),
("dnssec", None), ("ns", []), ("mx", [])])
domain_results["dnssec"] = test_dnssec(
domain,
nameservers=nameservers,
timeout=timeout
)
domain_results["ns"] = check_ns(
domain,
approved_nameservers=approved_nameservers,
nameservers=nameservers,
resolver=resolver, timeout=timeout
)
mta_sts_mx_patterns = None
domain_results["mta_sts"] = check_mta_sts(domain,
nameservers=nameservers,
resolver=resolver,
timeout=timeout)
if domain_results["mta_sts"]["valid"]:
mta_sts_mx_patterns = domain_results["mta_sts"]["policy"]["mx"]
domain_results["mx"] = check_mx(
domain,
approved_mx_hostnames=approved_mx_hostnames,
mta_sts_mx_patterns=mta_sts_mx_patterns,
skip_tls=skip_tls,
nameservers=nameservers,
resolver=resolver,
timeout=timeout
)
domain_results["spf"] = check_spf(
domain,
parked=parked,
nameservers=nameservers,
resolver=resolver,
timeout=timeout
)
domain_results["dmarc"] = check_dmarc(
domain,
parked=parked,
include_dmarc_tag_descriptions=include_tag_descriptions,
nameservers=nameservers,
resolver=resolver,
timeout=timeout
)
domain_results["smtp_tls_reporting"] = check_smtp_tls_reporting(
domain,
nameservers=nameservers,
resolver=resolver,
timeout=timeout
)
if bimi_selector is not None:
domain_results["bimi"] = check_bimi(
domain,
selector=bimi_selector,
include_tag_descriptions=include_tag_descriptions,
nameservers=nameservers,
resolver=resolver,
timeout=timeout)
results.append(domain_results)
if wait > 0.0:
logging.debug(f"Sleeping for {wait} seconds")
sleep(wait)
if len(results) == 1:
results = results[0]
return results
[docs]
def check_ns(domain: str,
approved_nameservers: list[str] = None,
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0) -> OrderedDict:
"""
Returns a dictionary of nameservers and warnings or a dictionary with an
empty list and an error.
Args:
domain (str): A domain name
approved_nameservers (list): A list of approved nameserver substrings
nameservers (list): A list of nameservers to query
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for a record from DNS
Returns:
OrderedDict: A dictionary with the following keys:
- ``hostnames`` - A list of nameserver hostnames
- ``warnings`` - A list of warnings
If a DNS error occurs, the dictionary will have the following
keys:
- ``hostnames`` - An empty list
- ``error`` - An error message
"""
try:
ns_results = get_nameservers(
domain,
approved_nameservers=approved_nameservers,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
except DNSException as error:
ns_results = OrderedDict([("hostnames", []),
("error", error.__str__())])
return ns_results
[docs]
def results_to_json(results: Union[dict, list[dict]]) -> str:
"""
Converts a dictionary of results or list of results to a JSON string
Args:
results (dict): A dictionary of results
Returns:
str: Results in JSON format
"""
return json.dumps(results, ensure_ascii=False, indent=2)
[docs]
def results_to_csv_rows(results: Union[dict, list[dict]]) -> list[dict]:
"""
Converts a results dictionary or list of dictionaries and returns a
list of CSV row dictionaries
Args:
results (dict): A dictionary of results
Returns:
list: A list of CSV row dictionaries
"""
rows = []
if type(results) is OrderedDict:
results = [results]
for result in results:
row = dict()
ns = result["ns"]
mx = result["mx"]
_mta_sts = result["mta_sts"]
_spf = result["spf"]
_dmarc = result["dmarc"]
row["domain"] = result["domain"]
row["base_domain"] = result["base_domain"]
row["dnssec"] = result["dnssec"]
row["ns"] = "|".join(ns["hostnames"])
_smtp_tls_reporting = result["smtp_tls_reporting"]
if "error" in ns:
row["ns_error"] = ns["error"]
else:
row["ns_warnings"] = "|".join(ns["warnings"])
if "error" in _mta_sts:
row["mta_sts_error"] = _mta_sts["error"]
else:
row["mta_sts_id"] = _mta_sts["id"]
row["mta_sts_mode"] = _mta_sts["policy"]["mode"]
row["mta_sts_max_age"] = _mta_sts["policy"]["max_age"]
row["mta_sts_mx"] = "|".join(_mta_sts["policy"]["mx"])
row["mta_sts_warnings"] = "|".join(_mta_sts["warnings"])
if "bimi" in result:
_bimi = result["bimi"]
row["bimi_warnings"] = "|".join(_bimi["warnings"])
row["bimi_selector"] = _bimi["selector"]
if "error" in _bimi:
row["bimi_error"] = _bimi["error"]
if "l" in _bimi["tags"]:
row["bimi_l"] = _bimi["tags"]["l"]["value"]
if "a" in _bimi["tags"]:
row["bimi_a"] = _bimi["tags"]["a"]["value"]
row["mx"] = "|".join(list(
map(lambda r: f"{r['preference']}, {r['hostname']}", mx["hosts"])))
tls = None
try:
tls_results = list(map(lambda r: f"{r['starttls']}", mx["hosts"]))
for tls_result in tls_results:
tls = tls_result
if tls_result is False:
tls = False
break
except KeyError:
# The user might opt to skip the STARTTLS test
pass
finally:
row["tls"] = tls
starttls = None
try:
starttls_results = list(
map(lambda r: f"{r['starttls']}", mx["hosts"]))
for starttls_result in starttls_results:
starttls = starttls_result
if starttls_result is False:
starttls = False
except KeyError:
# The user might opt to skip the STARTTLS test
pass
finally:
row["starttls"] = starttls
if "error" in mx:
row["mx_error"] = mx["error"]
else:
row["mx_warnings"] = "|".join(mx["warnings"])
row["spf_record"] = _spf["record"]
row["spf_valid"] = _spf["valid"]
if "error" in _spf:
row["spf_error"] = _spf["error"]
else:
row["spf_warnings"] = "|".join(_spf["warnings"])
row["dmarc_record"] = _dmarc["record"]
row["dmarc_record_location"] = _dmarc["location"]
row["dmarc_valid"] = _dmarc["valid"]
if "error" in _dmarc:
row["dmarc_error"] = _dmarc["error"]
else:
row["dmarc_adkim"] = _dmarc["tags"]["adkim"]["value"]
row["dmarc_aspf"] = _dmarc["tags"]["aspf"]["value"]
row["dmarc_fo"] = ":".join(_dmarc["tags"]["fo"]["value"])
row["dmarc_p"] = _dmarc["tags"]["p"]["value"]
row["dmarc_pct"] = _dmarc["tags"]["pct"]["value"]
row["dmarc_rf"] = ":".join(_dmarc["tags"]["rf"]["value"])
row["dmarc_ri"] = _dmarc["tags"]["ri"]["value"]
row["dmarc_sp"] = _dmarc["tags"]["sp"]["value"]
if "rua" in _dmarc["tags"]:
addresses = _dmarc["tags"]["rua"]["value"]
addresses = list(map(lambda u: "{}:{}".format(
u["scheme"],
u["address"]), addresses))
row["dmarc_rua"] = "|".join(addresses)
if "ruf" in _dmarc["tags"]:
addresses = _dmarc["tags"]["ruf"]["value"]
addresses = list(map(lambda u: "{}:{}".format(
u["scheme"],
u["address"]), addresses))
row["dmarc_ruf"] = "|".join(addresses)
row["dmarc_warnings"] = "|".join(_dmarc["warnings"])
if "error" in _smtp_tls_reporting:
row["smtp_tls_reporting_valid"] = False
row["smtp_tls_reporting_error"] = _smtp_tls_reporting["error"]
else:
row["smtp_tls_reporting_valid"] = True
row["smtp_tls_reporting_rua"] = "|".join(_smtp_tls_reporting[
"tags"]["rua"][
"value"])
row["smtp_tls_reporting_warnings"] = _smtp_tls_reporting[
"warnings"]
rows.append(row)
return rows
[docs]
def results_to_csv(results: dict) -> str:
"""
Converts a dictionary of results to CSV
Args:
results (dict): A dictionary of results
Returns:
str: A CSV of results
"""
fields = ["domain", "base_domain", "dnssec", "spf_valid", "dmarc_valid",
"dmarc_adkim", "dmarc_aspf",
"dmarc_fo", "dmarc_p", "dmarc_pct", "dmarc_rf", "dmarc_ri",
"dmarc_rua", "dmarc_ruf", "dmarc_sp",
"tls", "starttls", "spf_record", "dmarc_record",
"dmarc_record_location", "mx", "mx_error", "mx_warnings",
"mta_sts_id", "mta_sts_mode", "mta_sts_max_age",
"smtp_tls_reporting_valid", "smtp_tls_reporting_rua",
"mta_sts_mx", "mta_sts_error", "mta_sts_warnings", "spf_error",
"spf_warnings", "dmarc_error", "dmarc_warnings",
"ns", "ns_error", "ns_warnings",
"smtp_tls_reporting_error", "smtp_tls_reporting_warnings"]
output = StringIO(newline="\n")
writer = DictWriter(output, fieldnames=fields)
writer.writeheader()
rows = results_to_csv_rows(results)
writer.writerows(rows)
output.flush()
return output.getvalue()
[docs]
def output_to_file(path: str, content: str):
"""
Write given content to the given path
Args:
path (str): A file path
content (str): JSON or CSV text
"""
with open(path, "w", newline="\n", encoding="utf-8",
errors="ignore") as output_file:
output_file.write(content)