Source code for parsedmarc.splunk

from urllib.parse import urlparse
import socket
import json

import urllib3
import requests

from parsedmarc import __version__
from parsedmarc.log import logger
from parsedmarc.utils import human_timestamp_to_unix_timestamp

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


[docs]class SplunkError(RuntimeError): """Raised when a Splunk API error occurs"""
[docs]class HECClient(object): """A client for a Splunk HTTP Events Collector (HEC)""" # http://docs.splunk.com/Documentation/Splunk/latest/Data/AboutHEC # http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector def __init__(self, url, access_token, index, source="parsedmarc", verify=True, timeout=60): """ Initializes the HECClient Args: url (str): The URL of the HEC access_token (str): The HEC access token index (str): The name of the index source (str): The source name verify (bool): Verify SSL certificates timeout (float): Number of seconds to wait for the server to send data before giving up """ url = urlparse(url) self.url = "{0}://{1}/services/collector/event/1.0".format(url.scheme, url.netloc) self.access_token = access_token.lstrip("Splunk ") self.index = index self.host = socket.getfqdn() self.source = source self.session = requests.Session() self.timeout = timeout self.session.verify = verify self._common_data = dict(host=self.host, source=self.source, index=self.index) self.session.headers = { "User-Agent": "parsedmarc/{0}".format(__version__), "Authorization": "Splunk {0}".format(self.access_token) }
[docs] def save_aggregate_reports_to_splunk(self, aggregate_reports): """ Saves aggregate DMARC reports to Splunk Args: aggregate_reports: A list of aggregate report dictionaries to save in Splunk """ logger.debug("Saving aggregate reports to Splunk") if isinstance(aggregate_reports, dict): aggregate_reports = [aggregate_reports] if len(aggregate_reports) < 1: return data = self._common_data.copy() json_str = "" for report in aggregate_reports: for record in report["records"]: new_report = dict() for metadata in report["report_metadata"]: new_report[metadata] = report["report_metadata"][metadata] new_report["published_policy"] = report["policy_published"] new_report["source_ip_address"] = record["source"][ "ip_address"] new_report["source_country"] = record["source"]["country"] new_report["source_reverse_dns"] = record["source"][ "reverse_dns"] new_report["source_base_domain"] = record["source"][ "base_domain"] new_report["source_type"] = record["source"]["type"] new_report["source_name"] = record["source"]["name"] new_report["message_count"] = record["count"] new_report["disposition"] = record["policy_evaluated"][ "disposition" ] new_report["spf_aligned"] = record["alignment"]["spf"] new_report["dkim_aligned"] = record["alignment"]["dkim"] new_report["passed_dmarc"] = record["alignment"]["dmarc"] new_report["header_from"] = record["identifiers"][ "header_from"] new_report["envelope_from"] = record["identifiers"][ "envelope_from"] if "dkim" in record["auth_results"]: new_report["dkim_results"] = record["auth_results"][ "dkim"] if "spf" in record["auth_results"]: new_report["spf_results"] = record["auth_results"][ "spf"] data["sourcetype"] = "dmarc:aggregate" timestamp = human_timestamp_to_unix_timestamp( new_report["begin_date"]) data["time"] = timestamp data["event"] = new_report.copy() json_str += "{0}\n".format(json.dumps(data)) if not self.session.verify: logger.debug("Skipping certificate verification for Splunk HEC") try: response = self.session.post(self.url, data=json_str, timeout=self.timeout) response = response.json() except Exception as e: raise SplunkError(e.__str__()) if response["code"] != 0: raise SplunkError(response["text"])
[docs] def save_forensic_reports_to_splunk(self, forensic_reports): """ Saves forensic DMARC reports to Splunk Args: forensic_reports (list): A list of forensic report dictionaries to save in Splunk """ logger.debug("Saving forensic reports to Splunk") if isinstance(forensic_reports, dict): forensic_reports = [forensic_reports] if len(forensic_reports) < 1: return json_str = "" for report in forensic_reports: data = self._common_data.copy() data["sourcetype"] = "dmarc:forensic" timestamp = human_timestamp_to_unix_timestamp( report["arrival_date_utc"]) data["time"] = timestamp data["event"] = report.copy() json_str += "{0}\n".format(json.dumps(data)) if not self.session.verify: logger.debug("Skipping certificate verification for Splunk HEC") try: response = self.session.post(self.url, data=json_str, timeout=self.timeout) response = response.json() except Exception as e: raise SplunkError(e.__str__()) if response["code"] != 0: raise SplunkError(response["text"])
[docs] def save_smtp_tls_reports_to_splunk(self, reports): """ Saves aggregate DMARC reports to Splunk Args: reports: A list of SMTP TLS report dictionaries to save in Splunk """ logger.debug("Saving SMTP TLS reports to Splunk") if isinstance(reports, dict): reports = [reports] if len(reports) < 1: return data = self._common_data.copy() json_str = "" for report in reports: data["sourcetype"] = "smtp:tls" timestamp = human_timestamp_to_unix_timestamp( report["begin_date"]) data["time"] = timestamp data["event"] = report.copy() json_str += "{0}\n".format(json.dumps(data)) if not self.session.verify: logger.debug("Skipping certificate verification for Splunk HEC") try: response = self.session.post(self.url, data=json_str, timeout=self.timeout) response = response.json() except Exception as e: raise SplunkError(e.__str__()) if response["code"] != 0: raise SplunkError(response["text"])