# -*- coding: utf-8 -*-
"""Brand Indicators for Message Identification (BIMI) record validation"""
from __future__ import annotations
import logging
import re
from collections import OrderedDict
import dns
import requests
from pyleri import (Grammar,
Regex,
Sequence,
List
)
from checkdmarc._constants import SYNTAX_ERROR_MARKER, USER_AGENT
from checkdmarc.utils import (WSP_REGEX, HTTPS_REGEX, query_dns,
get_base_domain)
"""Copyright 2019-2023 Sean Whalen
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
BIMI_VERSION_REGEX_STRING = fr"v{WSP_REGEX}*={WSP_REGEX}*BIMI1{WSP_REGEX}*;"
BIMI_TAG_VALUE_REGEX_STRING = (
fr"([a-z]{{1,2}}){WSP_REGEX}*={WSP_REGEX}*(bimi1|{HTTPS_REGEX})?"
)
BIMI_TAG_VALUE_REGEX = re.compile(BIMI_TAG_VALUE_REGEX_STRING, re.IGNORECASE)
class _BIMIWarning(Exception):
"""Raised when a non-fatal BIMI error occurs"""
[docs]
class BIMIError(Exception):
"""Raised when a fatal BIMI error occurs"""
def __init__(self, msg: str, data: dict = None):
"""
Args:
msg (str): The error message
data (dict): A dictionary of data to include in the results
"""
self.data = data
Exception.__init__(self, msg)
[docs]
class BIMIRecordNotFound(BIMIError):
"""Raised when a BIMI record could not be found"""
def __init__(self, error):
if isinstance(error, dns.exception.Timeout):
error.kwargs["timeout"] = round(error.kwargs["timeout"], 1)
[docs]
class BIMISyntaxError(BIMIError):
"""Raised when a BIMI syntax error is found"""
[docs]
class InvalidBIMITag(BIMISyntaxError):
"""Raised when an invalid BIMI tag is found"""
[docs]
class InvalidBIMITagValue(BIMISyntaxError):
"""Raised when an invalid BIMI tag value is found"""
[docs]
class InvalidBIMIIndicatorURI(InvalidBIMITagValue):
"""Raised when an invalid BIMI indicator URI is found"""
[docs]
class SPFRecordFoundWhereBIMIRecordShouldBe(UnrelatedTXTRecordFoundAtBIMI):
"""Raised when an SPF record is found where a BIMI record should be;
most likely, the ``selector_bimi`` subdomain
record does not actually exist, and the request for ``TXT`` records was
redirected to the base domain"""
[docs]
class BIMIRecordInWrongLocation(BIMIError):
"""Raised when a BIMI record is found at the root of a domain"""
[docs]
class MultipleBIMIRecords(BIMIError):
"""Raised when multiple BIMI records are found"""
class _BIMIGrammar(Grammar):
"""Defines Pyleri grammar for BIMI records"""
version_tag = Regex(BIMI_VERSION_REGEX_STRING)
tag_value = Regex(BIMI_TAG_VALUE_REGEX_STRING, re.IGNORECASE)
START = Sequence(
version_tag, List(tag_value,
delimiter=Regex(f"{WSP_REGEX}*;{WSP_REGEX}*"),
opt=True))
bimi_tags = OrderedDict(
v=OrderedDict(name="Version",
required=True,
description='Identifies the record '
'retrieved as a BIMI '
'record. It MUST have the '
'value of "BIMI1". The '
'value of this tag MUST '
'match precisely; if it '
'does not or it is absent, '
'the entire retrieved '
'record MUST be ignored. '
'It MUST be the first '
'tag in the list.'),
a=OrderedDict(name="Authority Evidence Location",
required=False,
default="",
description='If present, this tag MUST have an empty value '
'or its value MUST be a single URI. An empty '
'value for the tag is interpreted to mean the '
'Domain Owner does not wish to publish or does '
'not have authority evidence to disclose. The '
'URI, if present, MUST contain a fully '
'qualified domain name (FQDN) and MUST specify '
'HTTPS as the URI scheme ("https"). The URI '
'SHOULD specify the location of a publicly '
'retrievable BIMI Evidence Document.'
),
l=OrderedDict(name="Location",
required=False,
default="",
description='The value of this tag is either empty '
'indicating declination to publish, or a single '
'URI representing the location of a Brand '
'Indicator file. The only supported transport '
'is HTTPS.'
)
)
def _query_bimi_record(domain: str, selector: str = "default",
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0):
"""
Queries DNS for a BIMI record
Args:
domain (str): A domain name
selector: the BIMI selector
nameservers (list): A list of nameservers to query
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for a record from DNS
Returns:
str: A record string or None
"""
domain = domain.lower()
target = f"{selector}._bimi.{domain}"
txt_prefix = "v=BIMI1"
bimi_record = None
bimi_record_count = 0
unrelated_records = []
try:
records = query_dns(target, "TXT", nameservers=nameservers,
resolver=resolver, timeout=timeout)
for record in records:
if record.startswith(txt_prefix):
bimi_record_count += 1
else:
unrelated_records.append(record)
if bimi_record_count > 1:
raise MultipleBIMIRecords(
"Multiple BMI records are not permitted")
if len(unrelated_records) > 0:
ur_str = "\n\n".join(unrelated_records)
raise UnrelatedTXTRecordFoundAtBIMI(
"Unrelated TXT records were discovered. These should be "
"removed, as some receivers may not expect to find "
"unrelated TXT records "
f"at {target}\n\n{ur_str}")
bimi_record = records[0]
except dns.resolver.NoAnswer:
try:
records = query_dns(domain, "TXT",
nameservers=nameservers, resolver=resolver,
timeout=timeout)
for record in records:
if record.startswith(txt_prefix):
raise BIMIRecordInWrongLocation(
"The BIMI record must be located at "
f"{target}, not {domain}")
except dns.resolver.NoAnswer:
pass
except dns.resolver.NXDOMAIN:
raise BIMIRecordNotFound(
f"The domain {domain} does not exist")
except Exception as error:
BIMIRecordNotFound(error)
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
pass
except Exception as error:
raise BIMIRecordNotFound(error)
return bimi_record
[docs]
def query_bimi_record(domain: str, selector: str = "default",
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0) -> OrderedDict:
"""
Queries DNS for a BIMI record
Args:
domain (str): A domain name
selector (str): The BMI selector
nameservers (list): A list of nameservers to query
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for a record from DNS
Returns:
OrderedDict: An ``OrderedDict`` with the following keys:
- ``record`` - the unparsed BIMI record string
- ``location`` - the domain where the record was found
- ``warnings`` - warning conditions found
Raises:
:exc:`checkdmarc.bimi.BIMIRecordNotFound`
:exc:`checkdmarc.bimi.BIMIRecordInWrongLocation`
:exc:`checkdmarc.bimi.MultipleBIMIRecords`
"""
logging.debug(f"Checking for a BIMI record at {selector}._bimi.{domain}")
warnings = []
base_domain = get_base_domain(domain)
location = domain.lower()
record = _query_bimi_record(domain, selector=selector,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
try:
root_records = query_dns(domain, "TXT",
nameservers=nameservers, resolver=resolver,
timeout=timeout)
for root_record in root_records:
if root_record.startswith("v=BIMI1"):
warnings.append(f"BIMI record at root of {domain} "
"has no effect")
except dns.resolver.NXDOMAIN:
raise BIMIRecordNotFound(
f"The domain {domain} does not exist")
except dns.exception.DNSException:
pass
if record is None and domain != base_domain and selector != "default":
record = _query_bimi_record(base_domain,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
location = base_domain
if record is None:
raise BIMIRecordNotFound(
f"A BIMI record does not exist at the {selector} selector for "
f"this domain or its base domain")
return OrderedDict([("record", record), ("location", location),
("warnings", warnings)])
[docs]
def parse_bimi_record(
record: str,
include_tag_descriptions: bool = False,
syntax_error_marker: str = SYNTAX_ERROR_MARKER) -> OrderedDict:
"""
Parses a BIMI record
Args:
record (str): A BIMI record
include_tag_descriptions (bool): Include descriptions in parsed results
syntax_error_marker (str): The maker for pointing out syntax errors
Returns:
OrderedDict: An ``OrderedDict`` with the following keys:
- ``tags`` - An ``OrderedDict`` of BIMI tags
- ``value`` - The BIMI tag value
- ``description`` - A description of the tag/value
- ``warnings`` - A ``list`` of warnings
.. note::
This will attempt to download the files at the URLs provided in
the BIMI record and will include a warning if the downloads fail,
but the file content is not currently analyzed.
.. note::
``description`` is only included if
``include_tag_descriptions`` is set to ``True``
Raises:
:exc:`checkdmarc.bimi.BIMISyntaxError`
:exc:`checkdmarc.bimi.InvalidBIMITag`
:exc:`checkdmarc.bimi.InvalidBIMITagValue`
:exc:`checkdmarc.bimi.SPFRecordFoundWhereBIMIRecordShouldBe`
"""
logging.debug("Parsing the BIMI record")
session = requests.Session()
session.headers = {"User-Agent": USER_AGENT}
spf_in_dmarc_error_msg = "Found a SPF record where a BIMI record " \
"should be; most likely, the _bimi " \
"subdomain record does not actually exist, " \
"and the request for TXT records was " \
"redirected to the base domain"
warnings = []
record = record.strip('"')
if record.lower().startswith("v=spf1"):
raise SPFRecordFoundWhereBIMIRecordShouldBe(spf_in_dmarc_error_msg)
bimi_syntax_checker = _BIMIGrammar()
parsed_record = bimi_syntax_checker.parse(record)
if not parsed_record.is_valid:
expecting = list(
map(lambda x: str(x).strip('"'), list(parsed_record.expecting)))
marked_record = (record[:parsed_record.pos] + syntax_error_marker +
record[parsed_record.pos:])
expecting = " or ".join(expecting)
raise BIMISyntaxError(f"Error: Expected {expecting} at position "
f"{parsed_record.pos} "
f"(marked with {syntax_error_marker}) in: "
f"{marked_record}")
pairs = BIMI_TAG_VALUE_REGEX.findall(record)
tags = OrderedDict()
for pair in pairs:
tag = pair[0].lower().strip()
tag_value = str(pair[1].strip())
if tag not in bimi_tags:
raise InvalidBIMITag(f"{tag} is not a valid BIMI record tag")
tags[tag] = OrderedDict(value=tag_value)
if include_tag_descriptions:
tags[tag]["name"] = bimi_tags[tag]["name"]
tags[tag]["description"] = bimi_tags[tag]["description"]
if tag == "a" and tag_value != "":
try:
response = session.get(tag_value)
response.raise_for_status()
except Exception as e:
warnings.append(f"Unable to download Authority Evidence at "
f"{tag_value} - {str(e)}")
elif tag == "e" and tag_value != "":
try:
response = session.get(tag_value)
response.raise_for_status()
except Exception as e:
warnings.append(f"Unable to download "
f"{tag_value} - {str(e)}")
return OrderedDict(tags=tags, warnings=warnings)
[docs]
def check_bimi(domain: str, selector: str = "default",
include_tag_descriptions: bool = False,
nameservers: list[str] = None,
resolver: dns.resolver.Resolver = None,
timeout: float = 2.0) -> OrderedDict:
"""
Returns a dictionary with a parsed BIMI record or an error.
.. note::
This will attempt to download the files at the URLs provided in
the BIMI record and will include a warning if the downloads fail,
but the file content is not currently analyzed.
Args:
domain (str): A domain name
selector (str): The BIMI selector
include_tag_descriptions (bool): Include descriptions in parsed results
nameservers (list): A list of nameservers to query
resolver (dns.resolver.Resolver): A resolver object to use for DNS
requests
timeout (float): number of seconds to wait for an answer from DNS
Returns:
OrderedDict: An ``OrderedDict`` with the following keys:
- ``record`` - The BIMI record string
- ``parsed`` - The parsed BIMI record
- ``valid`` - True
- ``warnings`` - A ``list`` of warnings
If a DNS error occurs, the dictionary will have the
following keys:
- ``error`` - Tne error message
- ``valid`` - False
"""
bimi_results = OrderedDict([("record", None), ("valid", True)])
selector = selector.lower()
try:
bimi_query = query_bimi_record(
domain,
selector=selector,
nameservers=nameservers, resolver=resolver,
timeout=timeout)
bimi_results["selector"] = selector
bimi_results["record"] = bimi_query["record"]
parsed_bimi = parse_bimi_record(
bimi_results["record"],
include_tag_descriptions=include_tag_descriptions)
bimi_results["tags"] = parsed_bimi["tags"]
bimi_results["warnings"] = parsed_bimi["warnings"]
except BIMIError as error:
bimi_results["selector"] = selector
bimi_results["valid"] = False
bimi_results["error"] = str(error)
return bimi_results