Source code for parsedmarc.opensearch

# -*- coding: utf-8 -*-

from collections import OrderedDict

from opensearchpy import (
    Q,
    connections,
    Object,
    Document,
    Index,
    Nested,
    InnerDoc,
    Integer,
    Text,
    Boolean,
    Ip,
    Date,
    Search,
)
from opensearchpy.helpers import reindex

from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_datetime
from parsedmarc import InvalidForensicReport


[docs] class OpenSearchError(Exception): """Raised when an OpenSearch error occurs"""
class _PolicyOverride(InnerDoc): type = Text() comment = Text() class _PublishedPolicy(InnerDoc): domain = Text() adkim = Text() aspf = Text() p = Text() sp = Text() pct = Integer() fo = Text() class _DKIMResult(InnerDoc): domain = Text() selector = Text() result = Text() class _SPFResult(InnerDoc): domain = Text() scope = Text() results = Text() class _AggregateReportDoc(Document): class Index: name = "dmarc_aggregate" xml_schema = Text() org_name = Text() org_email = Text() org_extra_contact_info = Text() report_id = Text() date_range = Date() date_begin = Date() date_end = Date() errors = Text() published_policy = Object(_PublishedPolicy) source_ip_address = Ip() source_country = Text() source_reverse_dns = Text() source_base_domain = Text() source_type = Text() source_name = Text() message_count = Integer disposition = Text() dkim_aligned = Boolean() spf_aligned = Boolean() passed_dmarc = Boolean() policy_overrides = Nested(_PolicyOverride) header_from = Text() envelope_from = Text() envelope_to = Text() dkim_results = Nested(_DKIMResult) spf_results = Nested(_SPFResult) def add_policy_override(self, type_, comment): self.policy_overrides.append(_PolicyOverride(type=type_, comment=comment)) def add_dkim_result(self, domain, selector, result): self.dkim_results.append( _DKIMResult(domain=domain, selector=selector, result=result) ) def add_spf_result(self, domain, scope, result): self.spf_results.append(_SPFResult(domain=domain, scope=scope, result=result)) def save(self, **kwargs): self.passed_dmarc = False self.passed_dmarc = self.spf_aligned or self.dkim_aligned return super().save(**kwargs) class _EmailAddressDoc(InnerDoc): display_name = Text() address = Text() class _EmailAttachmentDoc(Document): filename = Text() content_type = Text() sha256 = Text() class _ForensicSampleDoc(InnerDoc): raw = Text() headers = Object() headers_only = Boolean() to = Nested(_EmailAddressDoc) subject = Text() filename_safe_subject = Text() _from = Object(_EmailAddressDoc) date = Date() reply_to = Nested(_EmailAddressDoc) cc = Nested(_EmailAddressDoc) bcc = Nested(_EmailAddressDoc) body = Text() attachments = Nested(_EmailAttachmentDoc) def add_to(self, display_name, address): self.to.append(_EmailAddressDoc(display_name=display_name, address=address)) def add_reply_to(self, display_name, address): self.reply_to.append( _EmailAddressDoc(display_name=display_name, address=address) ) def add_cc(self, display_name, address): self.cc.append(_EmailAddressDoc(display_name=display_name, address=address)) def add_bcc(self, display_name, address): self.bcc.append(_EmailAddressDoc(display_name=display_name, address=address)) def add_attachment(self, filename, content_type, sha256): self.attachments.append( _EmailAttachmentDoc( filename=filename, content_type=content_type, sha256=sha256 ) ) class _ForensicReportDoc(Document): class Index: name = "dmarc_forensic" feedback_type = Text() user_agent = Text() version = Text() original_mail_from = Text() arrival_date = Date() domain = Text() original_envelope_id = Text() authentication_results = Text() delivery_results = Text() source_ip_address = Ip() source_country = Text() source_reverse_dns = Text() source_authentication_mechanisms = Text() source_auth_failures = Text() dkim_domain = Text() original_rcpt_to = Text() sample = Object(_ForensicSampleDoc) class _SMTPTLSFailureDetailsDoc(InnerDoc): result_type = Text() sending_mta_ip = Ip() receiving_mx_helo = Text() receiving_ip = Ip() failed_session_count = Integer() additional_information_uri = Text() failure_reason_code = Text() class _SMTPTLSPolicyDoc(InnerDoc): policy_domain = Text() policy_type = Text() policy_strings = Text() mx_host_patterns = Text() successful_session_count = Integer() failed_session_count = Integer() failure_details = Nested(_SMTPTLSFailureDetailsDoc) def add_failure_details( self, result_type, ip_address, receiving_ip, receiving_mx_helo, failed_session_count, receiving_mx_hostname=None, additional_information_uri=None, failure_reason_code=None, ): self.failure_details.append( result_type=result_type, ip_address=ip_address, receiving_mx_hostname=receiving_mx_hostname, receiving_mx_helo=receiving_mx_helo, receiving_ip=receiving_ip, failed_session_count=failed_session_count, additional_information=additional_information_uri, failure_reason_code=failure_reason_code, ) class _SMTPTLSFailureReportDoc(Document): class Index: name = "smtp_tls" organization_name = Text() date_range = Date() date_begin = Date() date_end = Date() contact_info = Text() report_id = Text() policies = Nested(_SMTPTLSPolicyDoc) def add_policy( self, policy_type, policy_domain, successful_session_count, failed_session_count, policy_string=None, mx_host_patterns=None, failure_details=None, ): self.policies.append( policy_type=policy_type, policy_domain=policy_domain, successful_session_count=successful_session_count, failed_session_count=failed_session_count, policy_string=policy_string, mx_host_patterns=mx_host_patterns, failure_details=failure_details, )
[docs] class AlreadySaved(ValueError): """Raised when a report to be saved matches an existing report"""
[docs] def set_hosts( hosts, use_ssl=False, ssl_cert_path=None, username=None, password=None, apiKey=None, timeout=60.0, ): """ Sets the OpenSearch hosts to use Args: hosts (str|list): A hostname or URL, or list of hostnames or URLs use_ssl (bool): Use an HTTPS connection to the server ssl_cert_path (str): Path to the certificate chain username (str): The username to use for authentication password (str): The password to use for authentication apiKey (str): The Base64 encoded API key to use for authentication timeout (float): Timeout in seconds """ if not isinstance(hosts, list): hosts = [hosts] conn_params = {"hosts": hosts, "timeout": timeout} if use_ssl: conn_params["use_ssl"] = True if ssl_cert_path: conn_params["verify_certs"] = True conn_params["ca_certs"] = ssl_cert_path else: conn_params["verify_certs"] = False if username: conn_params["http_auth"] = username + ":" + password if apiKey: conn_params["api_key"] = apiKey connections.create_connection(**conn_params)
[docs] def create_indexes(names, settings=None): """ Create OpenSearch indexes Args: names (list): A list of index names settings (dict): Index settings """ for name in names: index = Index(name) try: if not index.exists(): logger.debug("Creating OpenSearch index: {0}".format(name)) if settings is None: index.settings(number_of_shards=1, number_of_replicas=0) else: index.settings(**settings) index.create() except Exception as e: raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
[docs] def migrate_indexes(aggregate_indexes=None, forensic_indexes=None): """ Updates index mappings Args: aggregate_indexes (list): A list of aggregate index names forensic_indexes (list): A list of forensic index names """ version = 2 if aggregate_indexes is None: aggregate_indexes = [] if forensic_indexes is None: forensic_indexes = [] for aggregate_index_name in aggregate_indexes: if not Index(aggregate_index_name).exists(): continue aggregate_index = Index(aggregate_index_name) doc = "doc" fo_field = "published_policy.fo" fo = "fo" fo_mapping = aggregate_index.get_field_mapping(fields=[fo_field]) fo_mapping = fo_mapping[list(fo_mapping.keys())[0]]["mappings"] if doc not in fo_mapping: continue fo_mapping = fo_mapping[doc][fo_field]["mapping"][fo] fo_type = fo_mapping["type"] if fo_type == "long": new_index_name = "{0}-v{1}".format(aggregate_index_name, version) body = { "properties": { "published_policy.fo": { "type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}, } } } Index(new_index_name).create() Index(new_index_name).put_mapping(doc_type=doc, body=body) reindex(connections.get_connection(), aggregate_index_name, new_index_name) Index(aggregate_index_name).delete() for forensic_index in forensic_indexes: pass
[docs] def save_aggregate_report_to_opensearch( aggregate_report, index_suffix=None, index_prefix=None, monthly_indexes=False, number_of_shards=1, number_of_replicas=0, ): """ Saves a parsed DMARC aggregate report to OpenSearch Args: aggregate_report (OrderedDict): A parsed forensic report index_suffix (str): The suffix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes number_of_shards (int): The number of shards to use in the index number_of_replicas (int): The number of replicas to use in the index Raises: AlreadySaved """ logger.info("Saving aggregate report to OpenSearch") aggregate_report = aggregate_report.copy() metadata = aggregate_report["report_metadata"] org_name = metadata["org_name"] report_id = metadata["report_id"] domain = aggregate_report["policy_published"]["domain"] begin_date = human_timestamp_to_datetime(metadata["begin_date"], to_utc=True) end_date = human_timestamp_to_datetime(metadata["end_date"], to_utc=True) begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") if monthly_indexes: index_date = begin_date.strftime("%Y-%m") else: index_date = begin_date.strftime("%Y-%m-%d") aggregate_report["begin_date"] = begin_date aggregate_report["end_date"] = end_date date_range = [aggregate_report["begin_date"], aggregate_report["end_date"]] org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) domain_query = Q(dict(match_phrase={"published_policy.domain": domain})) begin_date_query = Q(dict(match=dict(date_begin=begin_date))) end_date_query = Q(dict(match=dict(date_end=end_date))) if index_suffix is not None: search_index = "dmarc_aggregate_{0}*".format(index_suffix) else: search_index = "dmarc_aggregate*" if index_prefix is not None: search_index = "{0}{1}".format(index_prefix, search_index) search = Search(index=search_index) query = org_name_query & report_id_query & domain_query query = query & begin_date_query & end_date_query search.query = query try: existing = search.execute() except Exception as error_: raise OpenSearchError( "OpenSearch's search for existing report \ error: {}".format(error_.__str__()) ) if len(existing) > 0: raise AlreadySaved( "An aggregate report ID {0} from {1} about {2} " "with a date range of {3} UTC to {4} UTC already " "exists in " "OpenSearch".format( report_id, org_name, domain, begin_date_human, end_date_human ) ) published_policy = _PublishedPolicy( domain=aggregate_report["policy_published"]["domain"], adkim=aggregate_report["policy_published"]["adkim"], aspf=aggregate_report["policy_published"]["aspf"], p=aggregate_report["policy_published"]["p"], sp=aggregate_report["policy_published"]["sp"], pct=aggregate_report["policy_published"]["pct"], fo=aggregate_report["policy_published"]["fo"], ) for record in aggregate_report["records"]: agg_doc = _AggregateReportDoc( xml_schema=aggregate_report["xml_schema"], org_name=metadata["org_name"], org_email=metadata["org_email"], org_extra_contact_info=metadata["org_extra_contact_info"], report_id=metadata["report_id"], date_range=date_range, date_begin=aggregate_report["begin_date"], date_end=aggregate_report["end_date"], errors=metadata["errors"], published_policy=published_policy, source_ip_address=record["source"]["ip_address"], source_country=record["source"]["country"], source_reverse_dns=record["source"]["reverse_dns"], source_base_domain=record["source"]["base_domain"], source_type=record["source"]["type"], source_name=record["source"]["name"], message_count=record["count"], disposition=record["policy_evaluated"]["disposition"], dkim_aligned=record["policy_evaluated"]["dkim"] is not None and record["policy_evaluated"]["dkim"].lower() == "pass", spf_aligned=record["policy_evaluated"]["spf"] is not None and record["policy_evaluated"]["spf"].lower() == "pass", header_from=record["identifiers"]["header_from"], envelope_from=record["identifiers"]["envelope_from"], envelope_to=record["identifiers"]["envelope_to"], ) for override in record["policy_evaluated"]["policy_override_reasons"]: agg_doc.add_policy_override( type_=override["type"], comment=override["comment"] ) for dkim_result in record["auth_results"]["dkim"]: agg_doc.add_dkim_result( domain=dkim_result["domain"], selector=dkim_result["selector"], result=dkim_result["result"], ) for spf_result in record["auth_results"]["spf"]: agg_doc.add_spf_result( domain=spf_result["domain"], scope=spf_result["scope"], result=spf_result["result"], ) index = "dmarc_aggregate" if index_suffix: index = "{0}_{1}".format(index, index_suffix) if index_prefix: index = "{0}{1}".format(index_prefix, index) index = "{0}-{1}".format(index, index_date) index_settings = dict( number_of_shards=number_of_shards, number_of_replicas=number_of_replicas ) create_indexes([index], index_settings) agg_doc.meta.index = index try: agg_doc.save() except Exception as e: raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))
[docs] def save_forensic_report_to_opensearch( forensic_report, index_suffix=None, index_prefix=None, monthly_indexes=False, number_of_shards=1, number_of_replicas=0, ): """ Saves a parsed DMARC forensic report to OpenSearch Args: forensic_report (OrderedDict): A parsed forensic report index_suffix (str): The suffix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes number_of_shards (int): The number of shards to use in the index number_of_replicas (int): The number of replicas to use in the index Raises: AlreadySaved """ logger.info("Saving forensic report to OpenSearch") forensic_report = forensic_report.copy() sample_date = None if forensic_report["parsed_sample"]["date"] is not None: sample_date = forensic_report["parsed_sample"]["date"] sample_date = human_timestamp_to_datetime(sample_date) original_headers = forensic_report["parsed_sample"]["headers"] headers = OrderedDict() for original_header in original_headers: headers[original_header.lower()] = original_headers[original_header] arrival_date_human = forensic_report["arrival_date_utc"] arrival_date = human_timestamp_to_datetime(arrival_date_human) if index_suffix is not None: search_index = "dmarc_forensic_{0}*".format(index_suffix) else: search_index = "dmarc_forensic*" if index_prefix is not None: search_index = "{0}{1}".format(index_prefix, search_index) search = Search(index=search_index) arrival_query = {"match": {"arrival_date": arrival_date}} q = Q(arrival_query) from_ = None to_ = None subject = None if "from" in headers: from_ = headers["from"] from_query = {"match_phrase": {"sample.headers.from": from_}} q = q & Q(from_query) if "to" in headers: to_ = headers["to"] to_query = {"match_phrase": {"sample.headers.to": to_}} q = q & Q(to_query) if "subject" in headers: subject = headers["subject"] subject_query = {"match_phrase": {"sample.headers.subject": subject}} q = q & Q(subject_query) search.query = q existing = search.execute() if len(existing) > 0: raise AlreadySaved( "A forensic sample to {0} from {1} " "with a subject of {2} and arrival date of {3} " "already exists in " "OpenSearch".format(to_, from_, subject, arrival_date_human) ) parsed_sample = forensic_report["parsed_sample"] sample = _ForensicSampleDoc( raw=forensic_report["sample"], headers=headers, headers_only=forensic_report["sample_headers_only"], date=sample_date, subject=forensic_report["parsed_sample"]["subject"], filename_safe_subject=parsed_sample["filename_safe_subject"], body=forensic_report["parsed_sample"]["body"], ) for address in forensic_report["parsed_sample"]["to"]: sample.add_to(display_name=address["display_name"], address=address["address"]) for address in forensic_report["parsed_sample"]["reply_to"]: sample.add_reply_to( display_name=address["display_name"], address=address["address"] ) for address in forensic_report["parsed_sample"]["cc"]: sample.add_cc(display_name=address["display_name"], address=address["address"]) for address in forensic_report["parsed_sample"]["bcc"]: sample.add_bcc(display_name=address["display_name"], address=address["address"]) for attachment in forensic_report["parsed_sample"]["attachments"]: sample.add_attachment( filename=attachment["filename"], content_type=attachment["mail_content_type"], sha256=attachment["sha256"], ) try: forensic_doc = _ForensicReportDoc( feedback_type=forensic_report["feedback_type"], user_agent=forensic_report["user_agent"], version=forensic_report["version"], original_mail_from=forensic_report["original_mail_from"], arrival_date=arrival_date, domain=forensic_report["reported_domain"], original_envelope_id=forensic_report["original_envelope_id"], authentication_results=forensic_report["authentication_results"], delivery_results=forensic_report["delivery_result"], source_ip_address=forensic_report["source"]["ip_address"], source_country=forensic_report["source"]["country"], source_reverse_dns=forensic_report["source"]["reverse_dns"], source_base_domain=forensic_report["source"]["base_domain"], authentication_mechanisms=forensic_report["authentication_mechanisms"], auth_failure=forensic_report["auth_failure"], dkim_domain=forensic_report["dkim_domain"], original_rcpt_to=forensic_report["original_rcpt_to"], sample=sample, ) index = "dmarc_forensic" if index_suffix: index = "{0}_{1}".format(index, index_suffix) if index_prefix: index = "{0}{1}".format(index_prefix, index) if monthly_indexes: index_date = arrival_date.strftime("%Y-%m") else: index_date = arrival_date.strftime("%Y-%m-%d") index = "{0}-{1}".format(index, index_date) index_settings = dict( number_of_shards=number_of_shards, number_of_replicas=number_of_replicas ) create_indexes([index], index_settings) forensic_doc.meta.index = index try: forensic_doc.save() except Exception as e: raise OpenSearchError("OpenSearch error: {0}".format(e.__str__())) except KeyError as e: raise InvalidForensicReport( "Forensic report missing required field: {0}".format(e.__str__()) )
[docs] def save_smtp_tls_report_to_opensearch( report, index_suffix=None, index_prefix=None, monthly_indexes=False, number_of_shards=1, number_of_replicas=0, ): """ Saves a parsed SMTP TLS report to OpenSearch Args: report (OrderedDict): A parsed SMTP TLS report index_suffix (str): The suffix of the name of the index to save to index_prefix (str): The prefix of the name of the index to save to monthly_indexes (bool): Use monthly indexes instead of daily indexes number_of_shards (int): The number of shards to use in the index number_of_replicas (int): The number of replicas to use in the index Raises: AlreadySaved """ logger.info("Saving aggregate report to OpenSearch") org_name = report["org_name"] report_id = report["report_id"] begin_date = human_timestamp_to_datetime(report["begin_date"], to_utc=True) end_date = human_timestamp_to_datetime(report["end_date"], to_utc=True) begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") if monthly_indexes: index_date = begin_date.strftime("%Y-%m") else: index_date = begin_date.strftime("%Y-%m-%d") report["begin_date"] = begin_date report["end_date"] = end_date org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) begin_date_query = Q(dict(match=dict(date_begin=begin_date))) end_date_query = Q(dict(match=dict(date_end=end_date))) if index_suffix is not None: search_index = "smtp_tls_{0}*".format(index_suffix) else: search_index = "smtp_tls*" if index_prefix is not None: search_index = "{0}{1}".format(index_prefix, search_index) search = Search(index=search_index) query = org_name_query & report_id_query query = query & begin_date_query & end_date_query search.query = query try: existing = search.execute() except Exception as error_: raise OpenSearchError( "OpenSearch's search for existing report \ error: {}".format(error_.__str__()) ) if len(existing) > 0: raise AlreadySaved( f"An SMTP TLS report ID {report_id} from " f" {org_name} with a date range of " f"{begin_date_human} UTC to " f"{end_date_human} UTC already " "exists in OpenSearch" ) index = "smtp_tls" if index_suffix: index = "{0}_{1}".format(index, index_suffix) if index_prefix: index = "{0}{1}".format(index_prefix, index) index = "{0}-{1}".format(index, index_date) index_settings = dict( number_of_shards=number_of_shards, number_of_replicas=number_of_replicas ) smtp_tls_doc = _SMTPTLSFailureReportDoc( organization_name=report["organization_name"], date_range=[report["date_begin"], report["date_end"]], date_begin=report["date_begin"], date_end=report["date_end"], contact_info=report["contact_info"], report_id=report["report_id"], ) for policy in report["policies"]: policy_strings = None mx_host_patterns = None if "policy_strings" in policy: policy_strings = policy["policy_strings"] if "mx_host_patterns" in policy: mx_host_patterns = policy["mx_host_patterns"] policy_doc = _SMTPTLSPolicyDoc( policy_domain=policy["policy_domain"], policy_type=policy["policy_type"], policy_string=policy_strings, mx_host_patterns=mx_host_patterns, ) if "failure_details" in policy: failure_details = policy["failure_details"] receiving_mx_hostname = None additional_information_uri = None failure_reason_code = None if "receiving_mx_hostname" in failure_details: receiving_mx_hostname = failure_details["receiving_mx_hostname"] if "additional_information_uri" in failure_details: additional_information_uri = failure_details[ "additional_information_uri" ] if "failure_reason_code" in failure_details: failure_reason_code = failure_details["failure_reason_code"] policy_doc.add_failure_details( result_type=failure_details["result_type"], ip_address=failure_details["ip_address"], receiving_ip=failure_details["receiving_ip"], receiving_mx_helo=failure_details["receiving_mx_helo"], failed_session_count=failure_details["failed_session_count"], receiving_mx_hostname=receiving_mx_hostname, additional_information_uri=additional_information_uri, failure_reason_code=failure_reason_code, ) smtp_tls_doc.policies.append(policy_doc) create_indexes([index], index_settings) smtp_tls_doc.meta.index = index try: smtp_tls_doc.save() except Exception as e: raise OpenSearchError("OpenSearch error: {0}".format(e.__str__()))