# -*- coding: utf-8 -*-
"""DMARC record validation"""
from __future__ import annotations
import logging
import re
from collections import OrderedDict
from typing import Union
import dns
from pyleri import (Grammar,
Regex,
Sequence,
List,
)
from checkdmarc.utils import (WSP_REGEX, query_dns, get_base_domain,
MAILTO_REGEX, DNSException)
from checkdmarc.utils import get_mx_records
from checkdmarc._constants import SYNTAX_ERROR_MARKER
"""Copyright 2019-2023 Sean Whalen
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
DMARC_VERSION_REGEX_STRING = fr"v{WSP_REGEX}*={WSP_REGEX}*DMARC1{WSP_REGEX}*;"
DMARC_TAG_VALUE_REGEX_STRING = (
fr"([a-z]{{1,5}}){WSP_REGEX}*={WSP_REGEX}*([\w.:@/+!,_\- ]+)"
)
DMARC_TAG_VALUE_REGEX = re.compile(DMARC_TAG_VALUE_REGEX_STRING,
re.IGNORECASE)
class _DMARCWarning(Exception):
"""Raised when a non-fatal DMARC error occurs"""
class _DMARCBestPracticeWarning(_DMARCWarning):
"""Raised when a DMARC record does not follow a best practice"""
[docs]
class DMARCError(Exception):
"""Raised when a fatal DMARC error occurs"""
def __init__(self, msg: str, data: dict = None):
"""
Args:
msg (str): The error message
data (dict): A dictionary of data to include in the results
"""
self.data = data
Exception.__init__(self, msg)
[docs]
class DMARCRecordNotFound(DMARCError):
def __init__(self, error):
"""
Raised when a DMARC record could not be found
"""
if isinstance(error, dns.exception.Timeout):
error.kwargs["timeout"] = round(error.kwargs["timeout"], 1)
[docs]
class DMARCSyntaxError(DMARCError):
"""Raised when a DMARC syntax error is found"""
[docs]
class InvalidDMARCTag(DMARCSyntaxError):
"""Raised when an invalid DMARC tag is found"""
[docs]
class InvalidDMARCTagValue(DMARCSyntaxError):
"""Raised when an invalid DMARC tag value is found"""
[docs]
class DMARCRecordStartsWithWhitespace(DMARCSyntaxError):
"""Raised when DMARC record starts with whitespace"""
[docs]
class InvalidDMARCReportURI(InvalidDMARCTagValue):
"""Raised when an invalid DMARC reporting URI is found"""
[docs]
class SPFRecordFoundWhereDMARCRecordShouldBe(UnrelatedTXTRecordFoundAtDMARC):
"""Raised when an SPF record is found where a DMARC record should be;
most likely, the ``_dmarc`` subdomain
record does not actually exist, and the request for ``TXT`` records was
redirected to the base domain"""
[docs]
class DMARCRecordInWrongLocation(DMARCError):
"""Raised when a DMARC record is found at the root of a domain"""
[docs]
class DMARCReportEmailAddressMissingMXRecords(_DMARCWarning):
"""Raised when an email address in a DMARC report URI is missing MX
records"""
[docs]
class UnverifiedDMARCURIDestination(_DMARCWarning):
"""Raised when the destination of a DMARC report URI does not indicate
that it accepts reports for the domain"""
[docs]
class MultipleDMARCRecords(DMARCError):
"""Raised when multiple DMARC records are found, in violation of
RFC 7486, section 6.6.3"""
class _DMARCGrammar(Grammar):
"""Defines Pyleri grammar for DMARC records"""
version_tag = Regex(DMARC_VERSION_REGEX_STRING, re.IGNORECASE)
tag_value = Regex(DMARC_TAG_VALUE_REGEX_STRING, re.IGNORECASE)
START = Sequence(
version_tag,
List(
tag_value,
delimiter=Regex(f"{WSP_REGEX}*;{WSP_REGEX}*"),
opt=True))
dmarc_tags = OrderedDict(adkim=OrderedDict(name="DKIM Alignment Mode",
required=False,
default="r",
description='In relaxed mode, '
'the Organizational '
'Domains of both the '
'DKIM-authenticated '
'signing domain (taken '
'from the value of the '
'"d=" tag in the '
'signature) and that '
'of the RFC 5322 '
'From domain '
'must be equal if the '
'identifiers are to be '
'considered aligned.'),
aspf=OrderedDict(name="SPF alignment mode",
required=False,
default="r",
description='In relaxed mode, '
'the SPF-authenticated '
'domain and RFC5322 '
'From domain must have '
'the same '
'Organizational Domain. '
'In strict mode, only '
'an exact DNS domain '
'match is considered to '
'produce Identifier '
'Alignment.'),
fo=OrderedDict(name="Failure Reporting Options",
required=False,
default="0",
description='Provides requested '
'options for generation '
'of failure reports. '
'Report generators MAY '
'choose to adhere to the '
'requested options. '
'This tag\'s content '
'MUST be ignored if '
'a "ruf" tag (below) is '
'not also specified. '
'The value of this tag is '
'a colon-separated list '
'of characters that '
'indicate failure '
'reporting options.',
values={
"0": 'Generate a DMARC failure '
'report if all underlying '
'authentication mechanisms '
'fail to produce an aligned '
'"pass" result.',
"1": 'Generate a DMARC failure '
'report if any underlying '
'authentication mechanism '
'produced something other '
'than an aligned '
'"pass" result.',
"d": 'Generate a DKIM failure '
'report if the message had '
'a signature that failed '
'evaluation, regardless of '
'its alignment. DKIM-'
'specific reporting is '
'described in AFRF-DKIM.',
"s": 'Generate an SPF failure '
'report if the message '
'failed SPF evaluation, '
'regardless of its alignment.'
' SPF-specific reporting is '
'described in AFRF-SPF'
}
),
p=OrderedDict(name="Requested Mail Receiver Policy",
reqired=True,
description='Specifies the policy to '
'be enacted by the '
'Receiver at the '
'request of the '
'Domain Owner. The '
'policy applies to '
'the domain and to its '
'subdomains, unless '
'subdomain policy '
'is explicitly described '
'using the "sp" tag.',
values={
"none": 'The Domain Owner requests '
'no specific action be '
'taken regarding delivery '
'of messages.',
"quarantine": 'The Domain Owner '
'wishes to have '
'email that fails '
'the DMARC mechanism '
'check be treated by '
'Mail Receivers as '
'suspicious. '
'Depending on the '
'capabilities of the '
'MailReceiver, '
'this can mean '
'"place into spam '
'folder", '
'"scrutinize '
'with additional '
'intensity", and/or '
'"flag as '
'suspicious".',
"reject": 'The Domain Owner wishes '
'for Mail Receivers to '
'reject '
'email that fails the '
'DMARC mechanism check. '
'Rejection SHOULD '
'occur during the SMTP '
'transaction.'
}
),
pct=OrderedDict(name="Percentage",
required=False,
default=100,
description='Integer percentage of '
'messages from the '
'Domain Owner\'s '
'mail stream to which '
'the DMARC policy is to '
'be applied. '
'However, this '
'MUST NOT be applied to '
'the DMARC-generated '
'reports, all of which '
'must be sent and '
'received unhindered. '
'The purpose of the '
'"pct" tag is to allow '
'Domain Owners to enact '
'a slow rollout of '
'enforcement of the '
'DMARC mechanism.'
),
rf=OrderedDict(name="Report Format",
required=False,
default="afrf",
description='A list separated by '
'colons of one or more '
'report formats as '
'requested by the '
'Domain Owner to be '
'used when a message '
'fails both SPF and DKIM '
'tests to report details '
'of the individual '
'failure. Only "afrf" '
'(the auth-failure report '
'type) is currently '
'supported in the '
'DMARC standard.',
values={
"afrf": ' "Authentication Failure '
'Reporting Using the '
'Abuse Reporting Format", '
'RFC 6591, April 2012,'
'<https://www.rfc-'
'editor.org/info/rfc6591>'
}
),
ri=OrderedDict(name="Report Interval",
required=False,
default=86400,
description='Indicates a request to '
'Receivers to generate '
'aggregate reports '
'separated by no more '
'than the requested '
'number of seconds. '
'DMARC implementations '
'MUST be able to provide '
'daily reports and '
'SHOULD be able to '
'provide hourly reports '
'when requested. '
'However, anything other '
'than a daily report is '
'understood to '
'be accommodated on a '
'best-effort basis.'
),
rua=OrderedDict(name="Aggregate Feedback Addresses",
required=False,
description=' A comma-separated list '
'of DMARC URIs to which '
'aggregate feedback '
'is to be sent.'
),
ruf=OrderedDict(name="Forensic Feedback Addresses",
required=False,
description=' A comma-separated list '
'of DMARC URIs to which '
'forensic feedback '
'is to be sent.'
),
sp=OrderedDict(name="Subdomain Policy",
required=False,
description='Indicates the policy to '
'be enacted by the '
'Receiver at the request '
'of the Domain Owner. '
'It applies only to '
'subdomains of the '
'domain queried, and not '
'to the domain itself. '
'Its syntax is identical '
'to that of the "p" tag '
'defined above. If '
'absent, the policy '
'specified by the "p" '
'tag MUST be applied '
'for subdomains.'
),
v=OrderedDict(name="Version",
reqired=True,
description='Identifies the record '
'retrieved as a DMARC '
'record. It MUST have the '
'value of "DMARC1". The '
'value of this tag MUST '
'match precisely; if it '
'does not or it is absent, '
'the entire retrieved '
'record MUST be ignored. '
'It MUST be the first '
'tag in the list.')
)
def _query_dmarc_record(domain: str, nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0,
ignore_unrelated_records: bool = False
) -> Union[str, None]:
"""
Queries DNS for a DMARC record
Args:
domain (str): A domain name
nameservers (list): A list of nameservers to query
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for a record from DNS
Returns:
str: A record string or None
"""
domain = domain.lower()
target = f"_dmarc.{domain}"
txt_prefix = "v=DMARC1"
dmarc_record = None
dmarc_record_count = 0
unrelated_records = []
try:
records = query_dns(target, "TXT", nameservers=nameservers,
resolver=resolver, timeout=timeout)
for record in records:
if record.startswith(txt_prefix):
dmarc_record_count += 1
elif record.strip().startswith(txt_prefix):
raise DMARCRecordStartsWithWhitespace(
"Found a DMARC record that starts with whitespace. "
"Please remove the whitespace, as some implementations "
"may not process it correctly."
)
else:
unrelated_records.append(record)
if dmarc_record_count > 1:
raise MultipleDMARCRecords(
"Multiple DMARC policy records are not permitted - "
"https://tools.ietf.org/html/rfc7489#section-6.6.3")
if len(unrelated_records) > 0:
if not ignore_unrelated_records:
ur_str = "\n\n".join(unrelated_records)
raise UnrelatedTXTRecordFoundAtDMARC(
"Unrelated TXT records were discovered. These should be "
"removed, as some receivers may not expect to find "
f"unrelated TXT records at {target}\n\n{ur_str}")
dmarc_record = [record for record in records if record.startswith(
txt_prefix)][0]
except dns.resolver.NoAnswer:
try:
records = query_dns(domain, "TXT",
nameservers=nameservers, resolver=resolver,
timeout=timeout)
for record in records:
if record.startswith(txt_prefix):
raise DMARCRecordInWrongLocation(
"The DMARC record must be located at "
f"{target}, not {domain}")
except dns.resolver.NoAnswer:
pass
except dns.resolver.NXDOMAIN:
raise DMARCRecordNotFound(
f"The domain {0} does not exist".format(domain))
except Exception as error:
raise DMARCRecordNotFound(error)
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
pass
except DMARCRecordStartsWithWhitespace as error:
raise error
except MultipleDMARCRecords as error:
raise error
except Exception as error:
raise DMARCRecordNotFound(error)
return dmarc_record
[docs]
def query_dmarc_record(domain: str, nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0,
ignore_unrelated_records: bool = False) -> OrderedDict:
"""
Queries DNS for a DMARC record
Args:
domain (str): A domain name
nameservers (list): A list of nameservers to query
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for a record from DNS
ignore_unrelated_records (bool): Ignore unrelated TXT records
Returns:
OrderedDict: An ``OrderedDict`` with the following keys:
- ``record`` - the unparsed DMARC record string
- ``location`` - the domain where the record was found
- ``warnings`` - warning conditions found
Raises:
:exc:`checkdmarc.dmarc.DMARCRecordNotFound`
:exc:`checkdmarc.dmarc.DMARCRecordInWrongLocation`
:exc:`checkdmarc.dmarc.MultipleDMARCRecords`
:exc:`checkdmarc.dmarc.SPFRecordFoundWhereDMARCRecordShouldBe`
"""
logging.debug(f"Checking for a DMARC record on {domain}")
warnings = []
base_domain = get_base_domain(domain)
location = domain.lower()
record = _query_dmarc_record(
domain, nameservers=nameservers,
resolver=resolver, timeout=timeout,
ignore_unrelated_records=ignore_unrelated_records)
try:
root_records = query_dns(domain, "TXT",
nameservers=nameservers, resolver=resolver,
timeout=timeout)
for root_record in root_records:
if root_record.startswith("v=DMARC1"):
warnings.append(f"DMARC record at root of {domain} "
"has no effect")
except dns.resolver.NXDOMAIN:
raise DMARCRecordNotFound(
f"The domain {domain} does not exist")
except dns.exception.DNSException:
pass
if record is None and domain != base_domain:
record = _query_dmarc_record(base_domain, nameservers=nameservers,
resolver=resolver, timeout=timeout)
location = base_domain
if record is None:
raise DMARCRecordNotFound(
"A DMARC record does not exist for this domain or its base domain")
return OrderedDict([("record", record), ("location", location),
("warnings", warnings)])
[docs]
def get_dmarc_tag_description(
tag: str,
value: Union[str, list[str]] = None) -> OrderedDict:
"""
Get the name, default value, and description for a DMARC tag, amd/or a
description for a tag value
Args:
tag (str): A DMARC tag
value: An optional value
Returns:
OrderedDict: An ``OrderedDict`` with the following keys:
- ``name`` - the tag name
- ``default``- the tag's default value
- ``description`` - A description of the tag or value
"""
name = dmarc_tags[tag]["name"]
description = dmarc_tags[tag]["description"]
default = None
allowed_values = {}
if "default" in dmarc_tags[tag]:
default = dmarc_tags[tag]["default"]
if type(value) is str and value in allowed_values:
description = allowed_values[value]
elif type(value) is list and len(allowed_values):
new_description = ""
for sub_value in value:
if sub_value in allowed_values:
value_description = allowed_values[sub_value]
new_description += f"{sub_value}: {value_description}\n\n"
new_description = new_description.strip()
if new_description != "":
description = new_description
return OrderedDict(
[("name", name), ("default", default), ("description", description)])
[docs]
def parse_dmarc_report_uri(uri: str) -> OrderedDict:
"""
Parses a DMARC Reporting (i.e. ``rua``/``ruf``) URI
.. note::
``mailto`` is the only reporting URI scheme supported in DMARC1
Args:
uri: A DMARC URI
Returns:
OrderedDict: An ``OrderedDict`` of the URI's components:
- ``scheme``
- ``address``
- ``size_limit``
Raises:
:exc:`checkdmarc.dmarc.InvalidDMARCReportURI`
"""
uri = uri.strip()
mailto_matches = MAILTO_REGEX.findall(uri)
if len(mailto_matches) != 1:
raise InvalidDMARCReportURI(
(
f"{uri} is not a valid DMARC report URI" + (
""
if uri.startswith("mailto:")
else (
" - please make sure that the URI begins with "
"a schema such as mailto:"
)
)
)
)
match = mailto_matches[0]
scheme = match[0].lower()
email_address = match[1]
size_limit = match[2].lstrip("!")
if size_limit == "":
size_limit = None
return OrderedDict([("scheme", scheme), ("address", email_address),
("size_limit", size_limit)])
[docs]
def check_wildcard_dmarc_report_authorization(
domain: str,
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0) -> bool:
"""
Checks for a wildcard DMARC report authorization record, e.g.:
::
*._report.example.com IN TXT "v=DMARC1"
Args:
domain (str): The domain to check
nameservers (list): A list of nameservers to query
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for an answer from DNS
Returns:
bool: An indicator of the existence of a valid wildcard DMARC report
authorization record
"""
wildcard_target = f"*._report._dmarc.{domain}"
dmarc_record_count = 0
unrelated_records = []
try:
records = query_dns(wildcard_target, "TXT",
nameservers=nameservers, resolver=resolver,
timeout=timeout)
for record in records:
if record.startswith("v=DMARC1"):
dmarc_record_count += 1
else:
unrelated_records.append(record)
if len(unrelated_records) > 0:
ur_str = "\n\n".join(unrelated_records)
raise UnrelatedTXTRecordFoundAtDMARC(
"Unrelated TXT records were discovered. "
"These should be removed, as some "
"receivers may not expect to find unrelated TXT records "
f"at {wildcard_target}\n\n{ur_str}")
if dmarc_record_count < 1:
return False
except dns.exception.DNSException:
return False
return True
[docs]
def verify_dmarc_report_destination(source_domain: str,
destination_domain: str,
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0) -> bool:
"""
Checks if the report destination accepts reports for the source domain
per RFC 7489, section 7.1
Args:
source_domain (str): The source domain
destination_domain (str): The destination domain
nameservers (list): A list of nameservers to query
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for an answer from DNS
Returns:
bool: Indicates if the report domain accepts reports from the given
domain
Raises:
:exc:`checkdmarc.dmarc.UnverifiedDMARCURIDestination`
:exc:`checkdmarc.dmarc.UnrelatedTXTRecordFound`
"""
source_domain = source_domain.lower()
destination_domain = destination_domain.lower()
if get_base_domain(source_domain) != get_base_domain(destination_domain):
if check_wildcard_dmarc_report_authorization(destination_domain,
nameservers=nameservers,
resolver=resolver):
return True
target = f"{source_domain}._report._dmarc.{destination_domain}"
message = f"{destination_domain} does not indicate that it accepts " \
f"DMARC reports about {source_domain} - " \
"Authorization record not found: " \
f'{source_domain}._report._dmarc.{destination_domain} " \
IN TXT "v=DMARC1"'
dmarc_record_count = 0
unrelated_records = []
try:
records = query_dns(target, "TXT",
nameservers=nameservers, resolver=resolver,
timeout=timeout)
for record in records:
if record.startswith("v=DMARC1"):
dmarc_record_count += 1
else:
unrelated_records.append(record)
if len(unrelated_records) > 0:
ur_str = "\n\n".join(unrelated_records)
raise UnrelatedTXTRecordFoundAtDMARC(
"Unrelated TXT records were discovered. "
"These should be removed, as some "
"receivers may not expect to find unrelated TXT records "
f"at {target}\n\n{ur_str}")
if dmarc_record_count < 1:
return False
except Exception:
raise UnverifiedDMARCURIDestination(message)
return True
[docs]
def parse_dmarc_record(
record: str, domain: str, parked: bool = False,
include_tag_descriptions: bool = False,
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0,
syntax_error_marker: str = SYNTAX_ERROR_MARKER) -> OrderedDict:
"""
Parses a DMARC record
Args:
record (str): A DMARC record
domain (str): The domain where the record is found
parked (bool): Indicates if a domain is parked
include_tag_descriptions (bool): Include descriptions in parsed results
nameservers (list): A list of nameservers to query
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for an answer from DNS
syntax_error_marker (str): The maker for pointing out syntax errors
Returns:
OrderedDict: An ``OrderedDict`` with the following keys:
- ``tags`` - An ``OrderedDict`` of DMARC tags
- ``value`` - The DMARC tag value
- ``explicit`` - ``bool``: A value is explicitly set
- ``default`` - The tag's default value
- ``description`` - A description of the tag/value
- ``warnings`` - A ``list`` of warnings
.. note::
``default`` and ``description`` are only included if
``include_tag_descriptions`` is set to ``True``
Raises:
:exc:`checkdmarc.dmarc.DMARCSyntaxError`
:exc:`checkdmarc.dmarc.InvalidDMARCTag`
:exc:`checkdmarc.dmarc.InvalidDMARCTagValue`
:exc:`checkdmarc.dmarc.InvalidDMARCReportURI`
:exc:`checkdmarc.dmarc.UnverifiedDMARCURIDestination`
:exc:`checkdmarc.dmarc.UnrelatedTXTRecordFound`
:exc:`checkdmarc.dmarc.DMARCReportEmailAddressMissingMXRecords`
"""
logging.debug(f"Parsing the DMARC record for {domain}")
spf_in_dmarc_error_msg = "Found a SPF record where a DMARC record " \
"should be; most likely, the _dmarc " \
"subdomain record does not actually exist, " \
"and the request for TXT records was " \
"redirected to the base domain"
warnings = []
record = record.strip('"')
if record.lower().startswith("v=spf1"):
raise SPFRecordFoundWhereDMARCRecordShouldBe(spf_in_dmarc_error_msg)
dmarc_syntax_checker = _DMARCGrammar()
parsed_record = dmarc_syntax_checker.parse(record)
if not parsed_record.is_valid:
expecting = list(
map(lambda x: str(x).strip('"'), list(parsed_record.expecting)))
marked_record = (record[:parsed_record.pos] + syntax_error_marker +
record[parsed_record.pos:])
expecting = " or ".join(expecting)
raise DMARCSyntaxError(f"Error: Expected {expecting} at position "
f"{parsed_record.pos} "
f"(marked with {syntax_error_marker}) in: "
f"{marked_record}")
pairs = DMARC_TAG_VALUE_REGEX.findall(record)
tags = OrderedDict()
# Find explicit tags
for pair in pairs:
tags[pair[0].lower()] = OrderedDict(
[("value", str(pair[1].strip())), ("explicit", True)])
# Include implicit tags and their defaults
for tag in dmarc_tags.keys():
if tag not in tags and "default" in dmarc_tags[tag]:
tags[tag] = OrderedDict(
[("value", dmarc_tags[tag]["default"]), ("explicit", False)])
if "p" not in tags:
raise DMARCSyntaxError(
'The record is missing the required policy ("p") tag')
tags["p"]["value"] = tags["p"]["value"].lower()
if "sp" not in tags:
tags["sp"] = OrderedDict([("value", tags["p"]["value"]),
("explicit", False)])
if list(tags.keys())[1] != "p":
raise DMARCSyntaxError("the p tag must immediately follow the v tag")
tags["v"]["value"] = tags["v"]["value"].upper()
# Validate tag values
for tag in tags:
if tag not in dmarc_tags:
raise InvalidDMARCTag(f"{tag} is not a valid DMARC tag")
tag_value = tags[tag]["value"]
allowed_values = None
if "values" in dmarc_tags[tag]:
allowed_values = dmarc_tags[tag]["values"]
if tag == "fo":
tag_value = tag_value.split(":")
if "0" in tag_value and "1" in tag_value:
warnings.append(
"When 1 is present in the fo tag, including 0 is "
"redundant"
)
for value in tag_value:
if value not in allowed_values:
raise InvalidDMARCTagValue(
f"{value} is not a valid option for the DMARC fo tag")
elif tag == "rf":
tag_value = tag_value.lower().split(":")
for value in tag_value:
if value not in allowed_values:
raise InvalidDMARCTagValue(
f"{value} is not a valid option for the DMARC "
"rf tag")
elif allowed_values and tag_value not in allowed_values:
allowed_values_str = ",".join(allowed_values)
raise InvalidDMARCTagValue(
f"Tag {tag} must have one of the following values: "
f"{allowed_values_str} - not {tags[tag]['value']}")
try:
tags["pct"]["value"] = int(tags["pct"]["value"])
except ValueError:
raise InvalidDMARCTagValue(
"The value of the pct tag must be an integer")
try:
tags["ri"]["value"] = int(tags["ri"]["value"])
except ValueError:
raise InvalidDMARCTagValue(
"The value of the ri tag must be an integer")
if "rua" in tags:
parsed_uris = []
uris = tags["rua"]["value"].split(",")
for uri in uris:
try:
uri = parse_dmarc_report_uri(uri)
parsed_uris.append(uri)
email_address = uri["address"]
email_domain = email_address.split("@")[-1]
if email_domain.lower() != domain:
verify_dmarc_report_destination(domain, email_domain,
nameservers=nameservers,
resolver=resolver,
timeout=timeout)
try:
hosts = get_mx_records(email_domain,
nameservers=nameservers,
resolver=resolver,
timeout=timeout)
if len(hosts) == 0:
raise DMARCReportEmailAddressMissingMXRecords(
"The domain for rua email address "
f"{email_address} has no MX records"
)
except DNSException as warning:
raise DMARCReportEmailAddressMissingMXRecords(
"Failed to retrieve MX records for the domain of "
"rua email address "
f"{email_address} - {warning}")
except _DMARCWarning as warning:
warnings.append(str(warning))
tags["rua"]["value"] = parsed_uris
if len(parsed_uris) > 2:
warnings.append(str(_DMARCBestPracticeWarning(
"Some DMARC reporters might not send to more than two rua URIs"
)))
else:
warnings.append(str(_DMARCBestPracticeWarning(
"rua tag (destination for aggregate reports) not found")))
if "ruf" in tags.keys():
parsed_uris = []
uris = tags["ruf"]["value"].split(",")
for uri in uris:
try:
uri = parse_dmarc_report_uri(uri)
parsed_uris.append(uri)
email_address = uri["address"]
email_domain = email_address.split("@")[-1]
if email_domain.lower() != domain:
verify_dmarc_report_destination(domain, email_domain,
nameservers=nameservers,
resolver=resolver,
timeout=timeout)
try:
hosts = get_mx_records(email_domain,
nameservers=nameservers,
resolver=resolver,
timeout=timeout)
if len(hosts) == 0:
raise DMARCReportEmailAddressMissingMXRecords(
"The domain for ruf email address "
f"{email_address} has no MX records"
)
except DNSException as warning:
raise DMARCReportEmailAddressMissingMXRecords(
"Failed to retrieve MX records for the domain of "
"ruf email address "
f"{email_address} - {warning}"
)
except _DMARCWarning as warning:
warnings.append(str(warning))
tags["ruf"]["value"] = parsed_uris
if len(parsed_uris) > 2:
warnings.append(str(_DMARCBestPracticeWarning(
"Some DMARC reporters might not send to more than two ruf URIs"
)))
if tags["pct"]["value"] < 0 or tags["pct"]["value"] > 100:
warnings.append(str(InvalidDMARCTagValue(
"pct value must be an integer between 0 and 100")))
elif tags["pct"]["value"] < 100:
warning_msg = "pct value is less than 100. This leads to " \
"inconsistent and unpredictable policy " \
"enforcement. Consider using p=none to " \
"monitor results instead"
warnings.append(str(_DMARCBestPracticeWarning(warning_msg)))
if parked and tags["p"] != "reject":
warning_msg = "Policy (p=) should be reject for parked domains"
warnings.append(str(_DMARCBestPracticeWarning(warning_msg)))
if parked and tags["sp"] != "reject":
warning_msg = "Subdomain policy (sp=) should be reject for " \
"parked domains"
warnings.append(str(_DMARCBestPracticeWarning(warning_msg)))
# Add descriptions if requested
if include_tag_descriptions:
for tag in tags:
tag_value = tags[tag]["value"]
details = get_dmarc_tag_description(tag, tag_value)
tags[tag]["name"] = details["name"]
if details["default"]:
tags[tag]["default"] = details["default"]
tags[tag]["description"] = details["description"]
return OrderedDict([("tags", tags), ("warnings", warnings)])
[docs]
def get_dmarc_record(domain: str,
include_tag_descriptions: bool = False,
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0) -> OrderedDict:
"""
Retrieves a DMARC record for a domain and parses it
Args:
domain (str): A domain name
include_tag_descriptions (bool): Include descriptions in parsed results
nameservers (list): A list of nameservers to query
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for an answer from DNS
Returns:
OrderedDict: An ``OrderedDict`` with the following keys:
- ``record`` - The DMARC record string
- ``location`` - Where the DMARC was found
- ``parsed`` - See :meth:`checkdmarc.parse_dmarc_record`
Raises:
:exc:`checkdmarc.dmarc.DMARCRecordNotFound`
:exc:`checkdmarc.dmarc.DMARCRecordInWrongLocation`
:exc:`checkdmarc.dmarc.MultipleDMARCRecords`
:exc:`checkdmarc.dmarc.SPFRecordFoundWhereDMARCRecordShouldBe`
:exc:`checkdmarc.dmarc.UnverifiedDMARCURIDestination`
:exc:`checkdmarc.dmarc.DMARCSyntaxError`
:exc:`checkdmarc.dmarc.InvalidDMARCTag`
:exc:`checkdmarc.dmarc.InvalidDMARCTagValue`
:exc:`checkdmarc.dmarc.InvalidDMARCReportURI`
:exc:`checkdmarc.dmarc.UnverifiedDMARCURIDestination`
:exc:`checkdmarc.dmarc.UnrelatedTXTRecordFound`
:exc:`checkdmarc.dmarc.DMARCReportEmailAddressMissingMXRecords`
"""
query = query_dmarc_record(domain, nameservers=nameservers,
resolver=resolver, timeout=timeout)
tag_descriptions = include_tag_descriptions
tags = parse_dmarc_record(query["record"], query["location"],
include_tag_descriptions=tag_descriptions,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
return OrderedDict([("record",
query["record"]),
("location", query["location"]),
("parsed", tags)])
[docs]
def check_dmarc(domain: str, parked: bool = False,
include_dmarc_tag_descriptions: bool = False,
ignore_unrelated_records: bool = False,
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0) -> OrderedDict:
"""
Returns a dictionary with a parsed DMARC record or an error
Args:
domain (str): A domain name
parked (bool): The domain is parked
include_dmarc_tag_descriptions (bool): Include tag descriptions
ignore_unrelated_records (bool): Ignore unrelated TXT records
nameservers (list): A list of nameservers to query
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for a record from DNS
Returns:
OrderedDict: An ``OrderedDict`` with the following keys:
- ``record`` - the unparsed DMARC record string
- ``location`` - the domain where the record was found
- ``warnings`` - warning conditions found
If a DNS error occurs, the dictionary will have the
following keys:
- ``error`` - An error message
- ``valid`` - False
"""
dmarc_results = OrderedDict([("record", None), ("valid", True),
("location", None)])
try:
dmarc_query = query_dmarc_record(
domain,
ignore_unrelated_records=ignore_unrelated_records,
nameservers=nameservers,
resolver=resolver,
timeout=timeout)
dmarc_results["record"] = dmarc_query["record"]
dmarc_results["location"] = dmarc_query["location"]
parsed_dmarc_record = parse_dmarc_record(
dmarc_query["record"],
dmarc_query["location"],
parked=parked,
include_tag_descriptions=include_dmarc_tag_descriptions,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
dmarc_results["warnings"] = dmarc_query["warnings"]
dmarc_results["tags"] = parsed_dmarc_record["tags"]
dmarc_results["warnings"] += parsed_dmarc_record[
"warnings"]
except DMARCError as error:
dmarc_results["error"] = str(error)
dmarc_results["valid"] = False
if hasattr(error, "data") and error.data:
for key in error.data:
dmarc_results[key] = error.data[key]
return dmarc_results