API reference
parsedmarc
A Python package for parsing DMARC reports
- exception parsedmarc.InvalidAggregateReport[source]
Raised when an invalid DMARC aggregate report is encountered
- exception parsedmarc.InvalidForensicReport[source]
Raised when an invalid DMARC forensic report is encountered
- exception parsedmarc.InvalidSMTPTLSReport[source]
Raised when an invalid SMTP TLS report is encountered
- parsedmarc.email_results(results: OrderedDict, *, host: str, mail_from: str, mail_to: str, mail_cc: list = None, mail_bcc: list = None, port: int = 0, require_encryption: bool = False, verify: bool = True, username: str = None, password: str = None, subject: str = None, attachment_filename: str = None, message: str = None)[source]
Emails parsing results as a zip file
- Parameters:
results (OrderedDict) – Parsing results
host (str) – Mail server hostname or IP address
mail_from – The value of the message from header
mail_to (list) – A list of addresses to mail to
mail_cc (list) – A list of addresses to CC
mail_bcc (list) – A list addresses to BCC
port (int) – Port to use
require_encryption (bool) – Require a secure connection from the start
verify (bool) – verify the SSL/TLS certificate
username (str) – An optional username
password (str) – An optional password
subject (str) – Overrides the default message subject
attachment_filename (str) – Override the default attachment filename
message (str) – Override the default plain text body
- parsedmarc.extract_report(content: bytes | str | IO[Any]) str[source]
Extracts text from a zip or gzip file, as a base64-encoded string, file-like object, or bytes.
- Parameters:
content – report file as a base64-encoded string, file-like object or
bytes.
- Returns:
The extracted text
- Return type:
str
- parsedmarc.extract_report_from_file_path(file_path: str)[source]
Extracts report from a file at the given file_path
- parsedmarc.get_dmarc_reports_from_mailbox(connection: MailboxConnection, *, reports_folder: str | None = 'INBOX', archive_folder: str | None = 'Archive', delete: bool | None = False, test: bool | None = False, ip_db_path: str | None = None, always_use_local_files: bool | None = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, offline: bool | None = False, nameservers: list[str] | None = None, dns_timeout: float | None = 6.0, strip_attachment_payloads: bool | None = False, results: OrderedDict[str, Any] | None = None, batch_size: int | None = 10, since: datetime | None = None, create_folders: bool | None = True, normalize_timespan_threshold_hours: float | None = 24) OrderedDict[str, OrderedDict[str, Any]][source]
Fetches and parses DMARC reports from a mailbox
- Parameters:
connection – A Mailbox connection object
reports_folder (str) – The folder where reports can be found
archive_folder (str) – The folder to move processed mail to
delete (bool) – Delete messages after processing them
test (bool) – Do not move or delete messages after processing them
ip_db_path (str) – Path to a MMDB file from MaxMind or DBIP
always_use_local_files (bool) – Do not download files
reverse_dns_map_path (str) – Path to a reverse DNS map file
reverse_dns_map_url (str) – URL to a reverse DNS map file
offline (bool) – Do not query online for geolocation or DNS
nameservers (list) – A list of DNS nameservers to query
dns_timeout (float) – Set the DNS query timeout
strip_attachment_payloads (bool) – Remove attachment payloads from forensic report results
results (dict) – Results from the previous run
batch_size (int) – Number of messages to read and process before saving (use 0 for no limit)
since – Search for messages since certain time (units - {“m”:”minutes”, “h”:”hours”, “d”:”days”, “w”:”weeks”})
create_folders (bool) – Whether to create the destination folders (not used in watch)
normalize_timespan_threshold_hours (float) – Normalize timespans beyond this
- Returns:
Lists of
aggregate_reports,forensic_reports, andsmtp_tls_reports- Return type:
OrderedDict
- parsedmarc.get_dmarc_reports_from_mbox(input_: str, *, nameservers: list[str] | None = None, dns_timeout: float | None = 2.0, strip_attachment_payloads: bool | None = False, ip_db_path: str | None = None, always_use_local_files: bool | None = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, offline: bool | None = False, normalize_timespan_threshold_hours: float | None = 24.0) OrderedDict[str, OrderedDict[str, Any]][source]
Parses a mailbox in mbox format containing e-mails with attached DMARC reports
- Parameters:
input (str) – A path to a mbox file
nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)
dns_timeout (float) – Sets the DNS timeout in seconds
strip_attachment_payloads (bool) – Remove attachment payloads from forensic report results
always_use_local_files (bool) – Do not download files
reverse_dns_map_path (str) – Path to a reverse DNS map file
reverse_dns_map_url (str) – URL to a reverse DNS map file
ip_db_path (str) – Path to a MMDB file from MaxMind or DBIP
offline (bool) – Do not make online queries for geolocation or DNS
normalize_timespan_threshold_hours (float) – Normalize timespans beyond this
- Returns:
Lists of
aggregate_reports,forensic_reports, andsmtp_tls_reports- Return type:
OrderedDict
- parsedmarc.get_report_zip(results: OrderedDict[str, Any]) bytes[source]
Creates a zip file of parsed report output
- Parameters:
results (OrderedDict) – The parsed results
- Returns:
zip file bytes
- Return type:
bytes
- parsedmarc.parse_aggregate_report_file(_input: str | bytes | IO[Any], *, offline: bool | None = False, always_use_local_files: bool | None = None, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, ip_db_path: str | None = None, nameservers: list[str] | None = None, dns_timeout: float | None = 2.0, keep_alive: Callable | None = None, normalize_timespan_threshold_hours: float | None = 24.0) OrderedDict[str, any][source]
Parses a file at the given path, a file-like object. or bytes as an aggregate DMARC report
- Parameters:
_input (str | bytes | IO) – A path to a file, a file like object, or bytes
offline (bool) – Do not query online for geolocation or DNS
always_use_local_files (bool) – Do not download files
reverse_dns_map_path (str) – Path to a reverse DNS map file
reverse_dns_map_url (str) – URL to a reverse DNS map file
ip_db_path (str) – Path to a MMDB file from MaxMind or DBIP
nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)
dns_timeout (float) – Sets the DNS timeout in seconds
keep_alive (callable) – Keep alive function
normalize_timespan_threshold_hours (float) – Normalize timespans beyond this
- Returns:
The parsed DMARC aggregate report
- Return type:
OrderedDict
- parsedmarc.parse_aggregate_report_xml(xml: str, *, ip_db_path: str | None = None, always_use_local_files: bool | None = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, offline: bool | None = False, nameservers: list[str] | None = None, timeout: float | None = 2.0, keep_alive: Callable | None = None, normalize_timespan_threshold_hours: float = 24.0) OrderedDict[str, Any][source]
Parses a DMARC XML report string and returns a consistent OrderedDict
- Parameters:
xml (str) – A string of DMARC aggregate report XML
ip_db_path (str) – Path to a MMDB file from MaxMind or DBIP
always_use_local_files (bool) – Do not download files
reverse_dns_map_path (str) – Path to a reverse DNS map file
reverse_dns_map_url (str) – URL to a reverse DNS map file
offline (bool) – Do not query online for geolocation or DNS
nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)
timeout (float) – Sets the DNS timeout in seconds
keep_alive (callable) – Keep alive function
normalize_timespan_threshold_hours (float) – Normalize timespans beyond this
- Returns:
The parsed aggregate DMARC report
- Return type:
OrderedDict
- parsedmarc.parse_forensic_report(feedback_report: str, sample: str, msg_date: datetime, *, always_use_local_files: bool | None = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, offline: bool | None = False, ip_db_path: str | None = None, nameservers: list[str] | None = None, dns_timeout: float | None = 2.0, strip_attachment_payloads: bool | None = False) OrderedDict[str, Any][source]
Converts a DMARC forensic report and sample to a
OrderedDict- Parameters:
feedback_report (str) – A message’s feedback report as a string
sample (str) – The RFC 822 headers or RFC 822 message sample
ip_db_path (str) – Path to a MMDB file from MaxMind or DBIP
always_use_local_files (bool) – Do not download files
reverse_dns_map_path (str) – Path to a reverse DNS map file
reverse_dns_map_url (str) – URL to a reverse DNS map file
offline (bool) – Do not query online for geolocation or DNS
msg_date (str) – The message’s date header
nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)
dns_timeout (float) – Sets the DNS timeout in seconds
strip_attachment_payloads (bool) – Remove attachment payloads from forensic report results
- Returns:
A parsed report and sample
- Return type:
OrderedDict
- parsedmarc.parse_report_email(input_: bytes | str, *, offline: bool | None = False, ip_db_path: str | None = None, always_use_local_files: bool | None = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, nameservers: list[str] = None, dns_timeout: float | None = 2.0, strip_attachment_payloads: bool | None = False, keep_alive: callable | None = None, normalize_timespan_threshold_hours: float | None = 24.0) OrderedDict[str, Any][source]
Parses a DMARC report from an email
- Parameters:
input – An emailed DMARC report in RFC 822 format, as bytes or a string
ip_db_path (str) – Path to a MMDB file from MaxMind or DBIP
always_use_local_files (bool) – Do not download files
reverse_dns_map_path (str) – Path to a reverse DNS map
reverse_dns_map_url (str) – URL to a reverse DNS map
offline (bool) – Do not query online for geolocation on DNS
nameservers (list) – A list of one or more nameservers to use
dns_timeout (float) – Sets the DNS timeout in seconds
strip_attachment_payloads (bool) – Remove attachment payloads from forensic report results
keep_alive (callable) – keep alive function
normalize_timespan_threshold_hours (float) – Normalize timespans beyond this
- Returns:
report_type:aggregateorforensicreport: The parsed report
- Return type:
OrderedDict
- parsedmarc.parse_report_file(input_: bytes | str | IO[Any], *, nameservers: list[str] | None = None, dns_timeout: float | None = 2.0, strip_attachment_payloads: bool | None = False, ip_db_path: str | None = None, always_use_local_files: bool | None = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, offline: bool | None = False, keep_alive: Callable | None = None, normalize_timespan_threshold_hours: float | None = 24) OrderedDict[str, Any][source]
Parses a DMARC aggregate or forensic file at the given path, a file-like object. or bytes
- Parameters:
input (str | bytes | IO) – A path to a file, a file like object, or bytes
nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)
dns_timeout (float) – Sets the DNS timeout in seconds
strip_attachment_payloads (bool) – Remove attachment payloads from forensic report results
ip_db_path (str) – Path to a MMDB file from MaxMind or DBIP
always_use_local_files (bool) – Do not download files
reverse_dns_map_path (str) – Path to a reverse DNS map
reverse_dns_map_url (str) – URL to a reverse DNS map
offline (bool) – Do not make online queries for geolocation or DNS
keep_alive (callable) – Keep alive function
- Returns:
The parsed DMARC report
- Return type:
OrderedDict
- parsedmarc.parsed_aggregate_reports_to_csv(reports: list[OrderedDict[str, Any]]) str[source]
Converts one or more parsed aggregate reports to flat CSV format, including headers
- Parameters:
reports – A parsed aggregate report or list of parsed aggregate reports
- Returns:
Parsed aggregate report data in flat CSV format, including headers
- Return type:
str
- parsedmarc.parsed_aggregate_reports_to_csv_rows(reports: list[OrderedDict[str, Any]]) list[dict[str, Any]][source]
Converts one or more parsed aggregate reports to list of dicts in flat CSV format
- Parameters:
reports – A parsed aggregate report or list of parsed aggregate reports
- Returns:
Parsed aggregate report data as a list of dicts in flat CSV format
- Return type:
list
- parsedmarc.parsed_forensic_reports_to_csv(reports: list[dict[str, Any]]) str[source]
Converts one or more parsed forensic reports to flat CSV format, including headers
- Parameters:
reports – A parsed forensic report or list of parsed forensic reports
- Returns:
Parsed forensic report data in flat CSV format, including headers
- Return type:
str
- parsedmarc.parsed_forensic_reports_to_csv_rows(reports: list[OrderedDict[str, Any]])[source]
Converts one or more parsed forensic reports to a list of dicts in flat CSV format
- Parameters:
reports – A parsed forensic report or list of parsed forensic reports
- Returns:
Parsed forensic report data as a list of dicts in flat CSV format
- Return type:
list
- parsedmarc.parsed_smtp_tls_reports_to_csv(reports: OrderedDict[str, Any]) str[source]
Converts one or more parsed SMTP TLS reports to flat CSV format, including headers
- Parameters:
reports – A parsed aggregate report or list of parsed aggregate reports
- Returns:
Parsed aggregate report data in flat CSV format, including headers
- Return type:
str
- parsedmarc.parsed_smtp_tls_reports_to_csv_rows(reports: OrderedDict[str, Any] | List[OrderedDict[str, Any]])[source]
Converts one oor more parsed SMTP TLS reports into a list of single layer OrderedDict objects suitable for use in a CSV
- parsedmarc.save_output(results: OrderedDict[str, Any], *, output_directory: str | None = 'output', aggregate_json_filename: str | None = 'aggregate.json', forensic_json_filename: str | None = 'forensic.json', smtp_tls_json_filename: str | None = 'smtp_tls.json', aggregate_csv_filename: str | None = 'aggregate.csv', forensic_csv_filename: str | None = 'forensic.csv', smtp_tls_csv_filename: str | None = 'smtp_tls.csv')[source]
Save report data in the given directory
- Parameters:
results (OrderedDict) – Parsing results
output_directory (str) – The path to the directory to save in
aggregate_json_filename (str) – Filename for the aggregate JSON file
forensic_json_filename (str) – Filename for the forensic JSON file
smtp_tls_json_filename (str) – Filename for the SMTP TLS JSON file
aggregate_csv_filename (str) – Filename for the aggregate CSV file
forensic_csv_filename (str) – Filename for the forensic CSV file
smtp_tls_csv_filename (str) – Filename for the SMTP TLS CSV file
- parsedmarc.watch_inbox(mailbox_connection: MailboxConnection, callback: Callable, *, reports_folder: str | None = 'INBOX', archive_folder: str | None = 'Archive', delete: bool | None = False, test: bool | None = False, check_timeout: int | None = 30, ip_db_path: str | None = None, always_use_local_files: bool | None = False, reverse_dns_map_path: str | None = None, reverse_dns_map_url: str | None = None, offline: bool | None = False, nameservers: list[str] | None = None, dns_timeout: float | None = 6.0, strip_attachment_payloads: bool | None = False, batch_size: int | None = None, normalize_timespan_threshold_hours: float | None = 24)[source]
- Watches the mailbox for new messages and
sends the results to a callback function
- Parameters:
mailbox_connection – The mailbox connection object
callback – The callback function to receive the parsing results
reports_folder (str) – The IMAP folder where reports can be found
archive_folder (str) – The folder to move processed mail to
delete (bool) – Delete messages after processing them
test (bool) – Do not move or delete messages after processing them
check_timeout (int) – Number of seconds to wait for a IMAP IDLE response or the number of seconds until the next mail check
ip_db_path (str) – Path to a MMDB file from MaxMind or DBIP
always_use_local_files (bool) – Do not download files
reverse_dns_map_path (str) – Path to a reverse DNS map file
reverse_dns_map_url (str) – URL to a reverse DNS map file
offline (bool) – Do not query online for geolocation or DNS
nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)
dns_timeout (float) – Set the DNS query timeout
strip_attachment_payloads (bool) – Replace attachment payloads in forensic report samples with None
batch_size (int) – Number of messages to read and process before saving
normalize_timespan_threshold_hours (float) – Normalize timespans beyond this
parsedmarc.elastic
- exception parsedmarc.elastic.AlreadySaved[source]
Raised when a report to be saved matches an existing report
- parsedmarc.elastic.create_indexes(names: list[str], settings: dict[str, Any] | None = None)[source]
Create Elasticsearch indexes
- Parameters:
names (list) – A list of index names
settings (dict) – Index settings
- parsedmarc.elastic.migrate_indexes(aggregate_indexes: list[str] | None = None, forensic_indexes: list[str] | None = None)[source]
Updates index mappings
- Parameters:
aggregate_indexes (list) – A list of aggregate index names
forensic_indexes (list) – A list of forensic index names
- parsedmarc.elastic.save_aggregate_report_to_elasticsearch(aggregate_report: OrderedDict[str, Any], index_suffix: str | None = None, index_prefix: str | None = None, monthly_indexes: bool | None = False, number_of_shards: int | None = 1, number_of_replicas: int | None = 0)[source]
Saves a parsed DMARC aggregate report to Elasticsearch
- Parameters:
aggregate_report (OrderedDict) – A parsed forensic report
index_suffix (str) – The suffix of the name of the index to save to
index_prefix (str) – The prefix of the name of the index to save to
monthly_indexes (bool) – Use monthly indexes instead of daily indexes
number_of_shards (int) – The number of shards to use in the index
number_of_replicas (int) – The number of replicas to use in the index
- Raises:
- parsedmarc.elastic.save_forensic_report_to_elasticsearch(forensic_report: OrderedDict[str, Any], index_suffix: Any | None = None, index_prefix: str | None = None, monthly_indexes: bool | None = False, number_of_shards: int = 1, number_of_replicas: int = 0)[source]
Saves a parsed DMARC forensic report to Elasticsearch
- Parameters:
forensic_report (OrderedDict) – A parsed forensic report
index_suffix (str) – The suffix of the name of the index to save to
index_prefix (str) – The prefix of the name of the index to save to
monthly_indexes (bool) – Use monthly indexes instead of daily indexes
number_of_shards (int) – The number of shards to use in the index
number_of_replicas (int) – The number of replicas to use in the index
- Raises:
- parsedmarc.elastic.save_smtp_tls_report_to_elasticsearch(report: OrderedDict[str, Any], index_suffix: str | None = None, index_prefix: str | None = None, monthly_indexes: bool | None = False, number_of_shards: int | None = 1, number_of_replicas: int | None = 0)[source]
Saves a parsed SMTP TLS report to Elasticsearch
- Parameters:
report (OrderedDict) – A parsed SMTP TLS report
index_suffix (str) – The suffix of the name of the index to save to
index_prefix (str) – The prefix of the name of the index to save to
monthly_indexes (bool) – Use monthly indexes instead of daily indexes
number_of_shards (int) – The number of shards to use in the index
number_of_replicas (int) – The number of replicas to use in the index
- Raises:
- parsedmarc.elastic.set_hosts(hosts: str | list[str], *, use_ssl: bool | None = False, ssl_cert_path: str | None = None, username: str | None = None, password: str | None = None, api_key: str | None = None, timeout: float | None = 60.0)[source]
Sets the Elasticsearch hosts to use
- Parameters:
hosts (str | list[str]) – A single hostname or URL, or list of hostnames or URLs
use_ssl (bool) – Use an HTTPS connection to the server
ssl_cert_path (str) – Path to the certificate chain
username (str) – The username to use for authentication
password (str) – The password to use for authentication
api_key (str) – The Base64 encoded API key to use for authentication
timeout (float) – Timeout in seconds
parsedmarc.opensearch
- exception parsedmarc.opensearch.AlreadySaved[source]
Raised when a report to be saved matches an existing report
- parsedmarc.opensearch.create_indexes(names: list[str], settings: dict[str, Any] | None = None)[source]
Create OpenSearch indexes
- Parameters:
names (list) – A list of index names
settings (dict) – Index settings
- parsedmarc.opensearch.migrate_indexes(aggregate_indexes: list[str] | None = None, forensic_indexes: list[str] | None = None)[source]
Updates index mappings
- Parameters:
aggregate_indexes (list) – A list of aggregate index names
forensic_indexes (list) – A list of forensic index names
- parsedmarc.opensearch.save_aggregate_report_to_opensearch(aggregate_report: OrderedDict[str, Any], index_suffix: str | None = None, index_prefix: str | None = None, monthly_indexes: bool | None = False, number_of_shards: int | None = 1, number_of_replicas: int | None = 0)[source]
Saves a parsed DMARC aggregate report to OpenSearch
- Parameters:
aggregate_report (OrderedDict) – A parsed forensic report
index_suffix (str) – The suffix of the name of the index to save to
index_prefix (str) – The prefix of the name of the index to save to
monthly_indexes (bool) – Use monthly indexes instead of daily indexes
number_of_shards (int) – The number of shards to use in the index
number_of_replicas (int) – The number of replicas to use in the index
- Raises:
- parsedmarc.opensearch.save_forensic_report_to_opensearch(forensic_report: OrderedDict[str, Any], index_suffix: str | None = None, index_prefix: str | None = None, monthly_indexes: bool | None = False, number_of_shards: int = 1, number_of_replicas: int = 0)[source]
Saves a parsed DMARC forensic report to OpenSearch
- Parameters:
forensic_report (OrderedDict) – A parsed forensic report
index_suffix (str) – The suffix of the name of the index to save to
index_prefix (str) – The prefix of the name of the index to save to
monthly_indexes (bool) – Use monthly indexes instead of daily indexes
number_of_shards (int) – The number of shards to use in the index
number_of_replicas (int) – The number of replicas to use in the index
- Raises:
- parsedmarc.opensearch.save_smtp_tls_report_to_opensearch(report: OrderedDict[str, Any], index_suffix: str | None = None, index_prefix: str | None = None, monthly_indexes: bool | None = False, number_of_shards: int | None = 1, number_of_replicas: int | None = 0)[source]
Saves a parsed SMTP TLS report to OpenSearch
- Parameters:
report (OrderedDict) – A parsed SMTP TLS report
index_suffix (str) – The suffix of the name of the index to save to
index_prefix (str) – The prefix of the name of the index to save to
monthly_indexes (bool) – Use monthly indexes instead of daily indexes
number_of_shards (int) – The number of shards to use in the index
number_of_replicas (int) – The number of replicas to use in the index
- Raises:
- parsedmarc.opensearch.set_hosts(hosts: str | list[str], *, use_ssl: bool | None = False, ssl_cert_path: str | None = None, username: str | None = None, password: str | None = None, api_key: str | None = None, timeout: float | None = 60.0)[source]
Sets the OpenSearch hosts to use
- Parameters:
hosts (str|list[str]) – A single hostname or URL, or list of hostnames or URLs
use_ssl (bool) – Use an HTTPS connection to the server
ssl_cert_path (str) – Path to the certificate chain
username (str) – The username to use for authentication
password (str) – The password to use for authentication
api_key (str) – The Base64 encoded API key to use for authentication
timeout (float) – Timeout in seconds
parsedmarc.splunk
- class parsedmarc.splunk.HECClient(url: str, access_token: str, index: str, source: str = 'parsedmarc', verify=True, timeout=60)[source]
Initializes the HECClient
- Parameters:
url (str) – The URL of the HEC
access_token (str) – The HEC access token
index (str) – The name of the index
source (str) – The source name
verify (bool) – Verify SSL certificates
timeout (float) – Number of seconds to wait for the server to send data before giving up
- save_aggregate_reports_to_splunk(aggregate_reports: list[OrderedDict[str, Any]] | OrderedDict[str, Any])[source]
Saves aggregate DMARC reports to Splunk
- Parameters:
aggregate_reports – A list of aggregate report dictionaries to save in Splunk
parsedmarc.utils
Utility functions that might be useful for other projects
- exception parsedmarc.utils.DownloadError[source]
Raised when an error occurs when downloading a file
- parsedmarc.utils.convert_outlook_msg(msg_bytes: bytes) str[source]
Uses the
msgconvertPerl utility to convert an Outlook MS file to standard RFC 822 format- Parameters:
msg_bytes (bytes) – the content of the .msg file
- Returns:
A RFC 822 string
- parsedmarc.utils.decode_base64(data) bytes[source]
Decodes a base64 string, with padding being optional
- Parameters:
data (str) – A base64 encoded string
- Returns:
The decoded bytes
- Return type:
bytes
- parsedmarc.utils.get_base_domain(domain: str) str[source]
Gets the base domain name for the given domain
Note
Results are based on a list of public domain suffixes at https://publicsuffix.org/list/public_suffix_list.dat and overrides included in parsedmarc.resources.maps.psl_overrides.txt
- Parameters:
domain (str) – A domain or subdomain
- Returns:
The base domain of the given domain
- Return type:
str
- parsedmarc.utils.get_filename_safe_string(string: str) str[source]
Converts a string to a string that is safe for a filename
- Parameters:
string (str) – A string to make safe for a filename
- Returns:
A string safe for a filename
- Return type:
str
- parsedmarc.utils.get_ip_address_country(ip_address: str, *, db_path: str | None = None) str[source]
Returns the ISO code for the country associated with the given IPv4 or IPv6 address
- Parameters:
ip_address (str) – The IP address to query for
db_path (str) – Path to a MMDB file from MaxMind or DBIP
- Returns:
And ISO country code associated with the given IP address
- Return type:
str
- parsedmarc.utils.get_ip_address_info(ip_address, *, ip_db_path: str | None = None, reverse_dns_map_path: str | None = None, always_use_local_files: bool | None = False, reverse_dns_map_url: str | None = None, cache: ExpiringDict | None = None, reverse_dns_map: dict | None = None, offline: bool | None = False, nameservers: list[str] | None = None, timeout: float | None = 2.0) OrderedDict[str, str][source]
Returns reverse DNS and country information for the given IP address
- Parameters:
ip_address (str) – The IP address to check
ip_db_path (str) – path to a MMDB file from MaxMind or DBIP
reverse_dns_map_path (str) – Path to a reverse DNS map file
reverse_dns_map_url (str) – URL to the reverse DNS map file
always_use_local_files (bool) – Do not download files
cache (ExpiringDict) – Cache storage
reverse_dns_map (dict) – A reverse DNS map
offline (bool) – Do not make online queries for geolocation or DNS
nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)
timeout (float) – Sets the DNS timeout in seconds
- Returns:
ip_address,reverse_dns,country- Return type:
OrderedDict
- parsedmarc.utils.get_reverse_dns(ip_address, *, cache: ExpiringDict | None = None, nameservers: list[str] = None, timeout: int = 2.0) str[source]
Resolves an IP address to a hostname using a reverse DNS query
- Parameters:
ip_address (str) – The IP address to resolve
cache (ExpiringDict) – Cache storage
nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)
timeout (float) – Sets the DNS query timeout in seconds
- Returns:
The reverse DNS hostname (if any)
- Return type:
str
- parsedmarc.utils.get_service_from_reverse_dns_base_domain(base_domain, *, always_use_local_file: bool | None = False, local_file_path: bool | None = None, url: bool | None = None, offline: bool | None = False, reverse_dns_map: bool | None = None) str[source]
Returns the service name of a given base domain name from reverse DNS.
- Parameters:
base_domain (str) – The base domain of the reverse DNS lookup
always_use_local_file (bool) – Always use a local map file
local_file_path (str) – Path to a local map file
url (str) – URL ro a reverse DNS map
offline (bool) – Use the built-in copy of the reverse DNS map
reverse_dns_map (dict) – A reverse DNS map
- Returns:
A dictionary containing name and type. If the service is unknown, the name will be the supplied reverse_dns_base_domain and the type will be None
- Return type:
dict
- parsedmarc.utils.human_timestamp_to_datetime(human_timestamp: str, *, to_utc: bool | None = False) datetime[source]
Converts a human-readable timestamp into a Python
datetimeobject- Parameters:
human_timestamp (str) – A timestamp string
to_utc (bool) – Convert the timestamp to UTC
- Returns:
The converted timestamp
- Return type:
datetime
- parsedmarc.utils.human_timestamp_to_unix_timestamp(human_timestamp: str) int[source]
Converts a human-readable timestamp into a UNIX timestamp
- Parameters:
human_timestamp (str) – A timestamp in YYYY-MM-DD HH:MM:SS` format
- Returns:
The converted timestamp
- Return type:
float
- parsedmarc.utils.is_mbox(path: str) bool[source]
Checks if the given content is an MBOX mailbox file
- Parameters:
path – Content to check
- Returns:
A flag that indicates if the file is an MBOX mailbox file
- Return type:
bool
- parsedmarc.utils.is_outlook_msg(content) bool[source]
Checks if the given content is an Outlook msg OLE/MSG file
- Parameters:
content – Content to check
- Returns:
A flag that indicates if the file is an Outlook MSG file
- Return type:
bool
- parsedmarc.utils.parse_email(data: bytes | str, *, strip_attachment_payloads: bool | None = False)[source]
A simplified email parser
- Parameters:
data – The RFC 822 message string, or MSG binary
strip_attachment_payloads (bool) – Remove attachment payloads
- Returns:
Parsed email data
- Return type:
dict
- parsedmarc.utils.query_dns(domain: str, record_type: str, *, cache: ExpiringDict | None = None, nameservers: list[str] = None, timeout: int = 2.0) list[str][source]
Queries DNS
- Parameters:
domain (str) – The domain or subdomain to query about
record_type (str) – The record type to query for
cache (ExpiringDict) – Cache storage
nameservers (list) – A list of one or more nameservers to use (Cloudflare’s public DNS resolvers by default)
timeout (float) – Sets the DNS timeout in seconds
- Returns:
A list of answers
- Return type:
list