Source code for parsedmarc.utils

"""Utility functions that might be useful for other projects"""

import logging
import os
from datetime import datetime
from datetime import timezone
from datetime import timedelta
from collections import OrderedDict
import tempfile
import subprocess
import shutil
import mailparser
import json
import hashlib
import base64
import atexit
import mailbox
import re
import csv
import io

    import importlib.resources as pkg_resources
except ImportError:
    # Try backported to PY<37 `importlib_resources`
    import importlib_resources as pkg_resources

from dateutil.parser import parse as parse_date
import dns.reversename
import dns.resolver
import dns.exception
import geoip2.database
import geoip2.errors
import publicsuffixlist
import requests

from parsedmarc.log import logger
import parsedmarc.resources.dbip
import parsedmarc.resources.maps

parenthesis_regex = re.compile(r'\s*\(.*\)\s*')

null_file = open(os.devnull, "w")
mailparser_logger = logging.getLogger("mailparser")

tempdir = tempfile.mkdtemp()

def _cleanup():
    """Remove temporary files"""


[docs]class EmailParserError(RuntimeError): """Raised when an error parsing the email occurs"""
[docs]class DownloadError(RuntimeError): """Raised when an error occurs when downloading a file"""
[docs]def decode_base64(data): """ Decodes a base64 string, with padding being optional Args: data: A base64 encoded string Returns: bytes: The decoded bytes """ data = bytes(data, encoding="ascii") missing_padding = len(data) % 4 if missing_padding != 0: data += b'=' * (4 - missing_padding) return base64.b64decode(data)
[docs]def get_base_domain(domain): """ Gets the base domain name for the given domain .. note:: Results are based on a list of public domain suffixes at Args: domain (str): A domain or subdomain Returns: str: The base domain of the given domain """ psl = publicsuffixlist.PublicSuffixList() return psl.privatesuffix(domain)
[docs]def query_dns(domain, record_type, cache=None, nameservers=None, timeout=2.0): """ Queries DNS Args: domain (str): The domain or subdomain to query about record_type (str): The record type to query for cache (ExpiringDict): Cache storage nameservers (list): A list of one or more nameservers to use (Cloudflare's public DNS resolvers by default) timeout (float): Sets the DNS timeout in seconds Returns: list: A list of answers """ domain = str(domain).lower() record_type = record_type.upper() cache_key = "{0}_{1}".format(domain, record_type) if cache: records = cache.get(cache_key, None) if records: return records resolver = dns.resolver.Resolver() timeout = float(timeout) if nameservers is None: nameservers = ["", "", "2606:4700:4700::1111", "2606:4700:4700::1001", ] resolver.nameservers = nameservers resolver.timeout = timeout resolver.lifetime = timeout if record_type == "TXT": resource_records = list(map( lambda r: r.strings, resolver.resolve(domain, record_type, lifetime=timeout))) _resource_record = [ resource_record[0][:0].join(resource_record) for resource_record in resource_records if resource_record] records = [r.decode() for r in _resource_record] else: records = list(map( lambda r: r.to_text().replace('"', '').rstrip("."), resolver.resolve(domain, record_type, lifetime=timeout))) if cache: cache[cache_key] = records return records
[docs]def get_reverse_dns(ip_address, cache=None, nameservers=None, timeout=2.0): """ Resolves an IP address to a hostname using a reverse DNS query Args: ip_address (str): The IP address to resolve cache (ExpiringDict): Cache storage nameservers (list): A list of one or more nameservers to use (Cloudflare's public DNS resolvers by default) timeout (float): Sets the DNS query timeout in seconds Returns: str: The reverse DNS hostname (if any) """ hostname = None try: address = dns.reversename.from_address(ip_address) hostname = query_dns(address, "PTR", cache=cache, nameservers=nameservers, timeout=timeout)[0] except dns.exception.DNSException as e: logger.warning(f"get_reverse_dns({ip_address}) exception: {e}") pass return hostname
[docs]def timestamp_to_datetime(timestamp): """ Converts a UNIX/DMARC timestamp to a Python ``datetime`` object Args: timestamp (int): The timestamp Returns: datetime: The converted timestamp as a Python ``datetime`` object """ return datetime.fromtimestamp(int(timestamp))
[docs]def timestamp_to_human(timestamp): """ Converts a UNIX/DMARC timestamp to a human-readable string Args: timestamp: The timestamp Returns: str: The converted timestamp in ``YYYY-MM-DD HH:MM:SS`` format """ return timestamp_to_datetime(timestamp).strftime("%Y-%m-%d %H:%M:%S")
[docs]def human_timestamp_to_datetime(human_timestamp, to_utc=False): """ Converts a human-readable timestamp into a Python ``datetime`` object Args: human_timestamp (str): A timestamp string to_utc (bool): Convert the timestamp to UTC Returns: datetime: The converted timestamp """ human_timestamp = human_timestamp.replace("-0000", "") human_timestamp = parenthesis_regex.sub("", human_timestamp) dt = parse_date(human_timestamp) return dt.astimezone(timezone.utc) if to_utc else dt
[docs]def human_timestamp_to_unix_timestamp(human_timestamp): """ Converts a human-readable timestamp into a UNIX timestamp Args: human_timestamp (str): A timestamp in `YYYY-MM-DD HH:MM:SS`` format Returns: float: The converted timestamp """ human_timestamp = human_timestamp.replace("T", " ") return human_timestamp_to_datetime(human_timestamp).timestamp()
[docs]def get_ip_address_country(ip_address, db_path=None): """ Returns the ISO code for the country associated with the given IPv4 or IPv6 address Args: ip_address (str): The IP address to query for db_path (str): Path to a MMDB file from MaxMind or DBIP Returns: str: And ISO country code associated with the given IP address """ db_paths = [ "GeoLite2-Country.mmdb", "/usr/local/share/GeoIP/GeoLite2-Country.mmdb", "/usr/share/GeoIP/GeoLite2-Country.mmdb", "/var/lib/GeoIP/GeoLite2-Country.mmdb", "/var/local/lib/GeoIP/GeoLite2-Country.mmdb", "/usr/local/var/GeoIP/GeoLite2-Country.mmdb", "%SystemDrive%\\ProgramData\\MaxMind\\GeoIPUpdate\\GeoIP\\" "GeoLite2-Country.mmdb", "C:\\GeoIP\\GeoLite2-Country.mmdb", "dbip-country-lite.mmdb", "dbip-country.mmdb", ] if db_path is not None: if os.path.isfile(db_path) is False: db_path = None logger.warning(f"No file exists at {db_path}. Falling back to an " "included copy of the IPDB IP to Country " "Lite database.") if db_path is None: for system_path in db_paths: if os.path.exists(system_path): db_path = system_path break if db_path is None: with pkg_resources.path(parsedmarc.resources.dbip, "dbip-country-lite.mmdb") as path: db_path = path db_age = - datetime.fromtimestamp( os.stat(db_path).st_mtime) if db_age > timedelta(days=30): logger.warning("IP database is more than a month old") db_reader = geoip2.database.Reader(db_path) country = None try: country = except geoip2.errors.AddressNotFoundError: pass return country
[docs]def get_service_from_reverse_dns_base_domain(base_domain, always_use_local_file=False, local_file_path=None, url=None, offline=False, reverse_dns_map=None): """ Returns the service name of a given base domain name from reverse DNS. Args: base_domain (str): The base domain of the reverse DNS lookup always_use_local_file (bool): Always use a local map file local_file_path (str): Path to a local map file url (str): URL ro a reverse DNS map offline (bool): Use the built-in copy of the reverse DNS map reverse_dns_map (dict): A reverse DNS map Returns: dict: A dictionary containing name and type. If the service is unknown, the name will be the supplied reverse_dns_base_domain and the type will be None """ def load_csv(_csv_file): reader = csv.DictReader(_csv_file) for row in reader: key = row["base_reverse_dns"].lower().strip() reverse_dns_map[key] = dict( name=row["name"], type=row["type"]) base_domain = base_domain.lower().strip() if url is None: url = ("" "/parsedmarc/master/parsedmarc/" "resources/maps/base_reverse_dns_map.csv") if reverse_dns_map is None: reverse_dns_map = dict() csv_file = io.StringIO() if (not (offline or always_use_local_file) and len(reverse_dns_map) == 0): try: logger.debug(f"Trying to fetch " f"reverse DNS map from {url}...") csv_file.write(requests.get(url).text) load_csv(csv_file) except requests.exceptions.RequestException as e: logger.warning(f"Failed to fetch reverse DNS map: {e}") if len(reverse_dns_map) == 0:"Loading included reverse DNS map...") with pkg_resources.path(parsedmarc.resources.maps, "base_reverse_dns_map.csv") as path: if local_file_path is not None: path = local_file_path with open(path) as csv_file: load_csv(csv_file) try: service = reverse_dns_map[base_domain] except KeyError: service = dict(name=base_domain, type=None) return service
[docs]def get_ip_address_info(ip_address, ip_db_path=None, reverse_dns_map_path=None, always_use_local_files=False, reverse_dns_map_url=None, cache=None, reverse_dns_map=None, offline=False, nameservers=None, timeout=2.0): """ Returns reverse DNS and country information for the given IP address Args: ip_address (str): The IP address to check ip_db_path (str): path to a MMDB file from MaxMind or DBIP reverse_dns_map_path (str): Path to a reverse DNS map file reverse_dns_map_url (str): URL to the reverse DNS map file always_use_local_files (bool): Do not download files cache (ExpiringDict): Cache storage reverse_dns_map (dict): A reverse DNS map offline (bool): Do not make online queries for geolocation or DNS nameservers (list): A list of one or more nameservers to use (Cloudflare's public DNS resolvers by default) timeout (float): Sets the DNS timeout in seconds Returns: OrderedDict: ``ip_address``, ``reverse_dns`` """ ip_address = ip_address.lower() if cache is not None: info = cache.get(ip_address, None) if info: logger.debug(f"IP address {ip_address} was found in cache") return info info = OrderedDict() info["ip_address"] = ip_address if offline: reverse_dns = None else: reverse_dns = get_reverse_dns(ip_address, nameservers=nameservers, timeout=timeout) country = get_ip_address_country(ip_address, db_path=ip_db_path) info["country"] = country info["reverse_dns"] = reverse_dns info["base_domain"] = None info["name"] = None info["type"] = None if reverse_dns is not None: base_domain = get_base_domain(reverse_dns) service = get_service_from_reverse_dns_base_domain( base_domain, offline=offline, local_file_path=reverse_dns_map_path, url=reverse_dns_map_url, always_use_local_file=always_use_local_files, reverse_dns_map=reverse_dns_map) info["base_domain"] = base_domain info["type"] = service["type"] info["name"] = service["name"] if cache is not None: cache[ip_address] = info logger.debug(f"IP address {ip_address} added to cache") else: logger.debug(f"IP address {ip_address} reverse_dns not found") return info
def parse_email_address(original_address): if original_address[0] == "": display_name = None else: display_name = original_address[0] address = original_address[1] address_parts = address.split("@") local = None domain = None if len(address_parts) > 1: local = address_parts[0].lower() domain = address_parts[-1].lower() return OrderedDict([("display_name", display_name), ("address", address), ("local", local), ("domain", domain)])
[docs]def get_filename_safe_string(string): """ Converts a string to a string that is safe for a filename Args: string (str): A string to make safe for a filename Returns: str: A string safe for a filename """ invalid_filename_chars = ['\\', '/', ':', '"', '*', '?', '|', '\n', '\r'] if string is None: string = "None" for char in invalid_filename_chars: string = string.replace(char, "") string = string.rstrip(".") string = (string[:100]) if len(string) > 100 else string return string
[docs]def is_mbox(path): """ Checks if the given content is an MBOX mailbox file Args: path: Content to check Returns: bool: A flag that indicates if the file is an MBOX mailbox file """ _is_mbox = False try: mbox = mailbox.mbox(path) if len(mbox.keys()) > 0: _is_mbox = True except Exception as e: logger.debug("Error checking for MBOX file: {0}".format(e.__str__())) return _is_mbox
[docs]def is_outlook_msg(content): """ Checks if the given content is an Outlook msg OLE/MSG file Args: content: Content to check Returns: bool: A flag that indicates if the file is an Outlook MSG file """ return isinstance(content, bytes) and content.startswith( b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1")
[docs]def convert_outlook_msg(msg_bytes): """ Uses the ``msgconvert`` Perl utility to convert an Outlook MS file to standard RFC 822 format Args: msg_bytes (bytes): the content of the .msg file Returns: A RFC 822 string """ if not is_outlook_msg(msg_bytes): raise ValueError("The supplied bytes are not an Outlook MSG file") orig_dir = os.getcwd() tmp_dir = tempfile.mkdtemp() os.chdir(tmp_dir) with open("sample.msg", "wb") as msg_file: msg_file.write(msg_bytes) try: subprocess.check_call(["msgconvert", "sample.msg"], stdout=null_file, stderr=null_file) eml_path = "sample.eml" with open(eml_path, "rb") as eml_file: rfc822 = except FileNotFoundError: raise EmailParserError( "Failed to convert Outlook MSG: msgconvert utility not found") finally: os.chdir(orig_dir) shutil.rmtree(tmp_dir) return rfc822
[docs]def parse_email(data, strip_attachment_payloads=False): """ A simplified email parser Args: data: The RFC 822 message string, or MSG binary strip_attachment_payloads (bool): Remove attachment payloads Returns: dict: Parsed email data """ if isinstance(data, bytes): if is_outlook_msg(data): data = convert_outlook_msg(data) data = data.decode("utf-8", errors="replace") parsed_email = mailparser.parse_from_string(data) headers = json.loads(parsed_email.headers_json).copy() parsed_email = json.loads(parsed_email.mail_json).copy() parsed_email["headers"] = headers if "received" in parsed_email: for received in parsed_email["received"]: if "date_utc" in received: if received["date_utc"] is None: del received["date_utc"] else: received["date_utc"] = received["date_utc"].replace("T", " ") if "from" not in parsed_email: if "From" in parsed_email["headers"]: parsed_email["from"] = parsed_email["Headers"]["From"] else: parsed_email["from"] = None if parsed_email["from"] is not None: parsed_email["from"] = parse_email_address(parsed_email["from"][0]) if "date" in parsed_email: parsed_email["date"] = parsed_email["date"].replace("T", " ") else: parsed_email["date"] = None if "reply_to" in parsed_email: parsed_email["reply_to"] = list(map(lambda x: parse_email_address(x), parsed_email["reply_to"])) else: parsed_email["reply_to"] = [] if "to" in parsed_email: parsed_email["to"] = list(map(lambda x: parse_email_address(x), parsed_email["to"])) else: parsed_email["to"] = [] if "cc" in parsed_email: parsed_email["cc"] = list(map(lambda x: parse_email_address(x), parsed_email["cc"])) else: parsed_email["cc"] = [] if "bcc" in parsed_email: parsed_email["bcc"] = list(map(lambda x: parse_email_address(x), parsed_email["bcc"])) else: parsed_email["bcc"] = [] if "delivered_to" in parsed_email: parsed_email["delivered_to"] = list( map(lambda x: parse_email_address(x), parsed_email["delivered_to"]) ) if "attachments" not in parsed_email: parsed_email["attachments"] = [] else: for attachment in parsed_email["attachments"]: if "payload" in attachment: payload = attachment["payload"] try: if "content_transfer_encoding" in attachment: if attachment["content_transfer_encoding"] == "base64": payload = decode_base64(payload) else: payload = str.encode(payload) attachment["sha256"] = hashlib.sha256(payload).hexdigest() except Exception as e: logger.debug("Unable to decode attachment: {0}".format( e.__str__() )) if strip_attachment_payloads: for attachment in parsed_email["attachments"]: if "payload" in attachment: del attachment["payload"] if "subject" not in parsed_email: parsed_email["subject"] = None parsed_email["filename_safe_subject"] = get_filename_safe_string( parsed_email["subject"]) if "body" not in parsed_email: parsed_email["body"] = None return parsed_email