#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""A Python module and CLI for managing zones and resource record sets on
Google Cloud DNS"""
from __future__ import absolute_import, print_function
import logging
import csv
import json
import collections
import io
import textwrap
import re
from google.oauth2 import service_account
from google.cloud import dns
import publicsuffix2
import click
__version__ = "1.2.9"
DEFAULT_TTL = 300
ZONE_CACHE = dict()
QUOTE_SPACE = re.compile(r"""["'`]\s*""")
logger = logging.getLogger(__name__)
[docs]class ZoneNotFound(ValueError):
"""Raised when a requested zone is not found"""
[docs]class ZoneConflict(ValueError):
"""Raised when a conflicting zone already exists"""
[docs]class RecordSetNotFound(ValueError):
"""Raised when a record set is not found"""
[docs]class ExistingRecordSetFound(ValueError):
"""Raised when an existing record set is found"""
[docs]class CSVError(ValueError):
"""Raised when a CSV parsing error occurs"""
[docs]class DNSClient(dns.Client):
"""An extended Google DNS client with helper functions"""
[docs] def get_zone(self, name):
"""
Get a Zone object by zone name or DNS name
Args:
name (str): zone name or dns_name
Raises:
gcpdns.ZoneNotFound
"""
dns_name = "{0}.".format(name.lower().rstrip("."))
zones = self.list_zones()
for zone in zones:
if zone.name == name or zone.dns_name == dns_name:
return zone
raise ZoneNotFound("The zone named {0} was not found".format(name))
[docs] def create_zone(self, dns_name, name=None, description=None):
"""
Creates a DNS zone
Args:
dns_name (str): The zone's DNS name
name (str): the zone's GCP name
description (str): A description of the zone
Raises:
gcpdns.ZoneConflict
"""
dns_name = dns_name.lower().rstrip(".")
if name is None:
name = dns_name.replace(".", "-")
dns_name = "{0}.".format(dns_name)
zones = self.list_zones()
for zone in zones:
if zone.name == name or zone.dns_name == dns_name:
raise ZoneConflict(
"A conflicting zone already exists: {0} ({1})".format(
zone.dns_name, zone.name
))
self.zone(name, dns_name=dns_name, description=description).create()
[docs] def delete_zone(self, zone_name):
"""
Deletes a zone
Args:
zone_name: The zone's DNS name of GCP name
Returns:
"""
self.get_zone(zone_name).delete()
[docs] def dump_zones(self):
"""
Outputs all managed zones for the project in JSON and CSV format
Returns:
dict: A dictionary with a csv and json key
"""
zones = []
for zone in self.list_zones():
zone_dict = collections.OrderedDict(
dns_name=zone.dns_name,
name=zone.name,
created=zone.created.isoformat(),
description=zone.description,
name_servers=zone.name_servers)
zones.append(zone_dict)
_json = json.dumps(zones, indent=2, ensure_ascii=False)
csv_str = io.StringIO()
csv_fields = ["dns_name", "name", "created",
"description", "name_servers"]
csv_rows = zones.copy()
for zone in csv_rows:
zone["name_servers"] = "|".join(zone["name_servers"])
csv_writer = csv.DictWriter(csv_str, fieldnames=csv_fields)
csv_writer.writeheader()
csv_writer.writerows(csv_rows)
_csv = csv_str.getvalue()
return dict(csv=_csv, json=_json)
[docs] def dump_records(self, zone_name):
"""
Outputs all record sets for a given zone in JSON and CSV format
Args:
zone_name (str): The zone name or DNS name
Returns:
dict: A dictionary with a csv and json key
"""
zone = self.get_zone(zone_name)
records = []
for record in zone.list_resource_record_sets():
record_dict = collections.OrderedDict(
name=record.name,
record_type=record.record_type,
ttl=record.ttl,
data=record.rrdatas
)
records.append(record_dict)
_json = json.dumps(records, indent=2, ensure_ascii=False)
csv_str = io.StringIO()
csv_fields = ["name", "record_type", "ttl", "data"]
csv_rows = records.copy()
for zone in csv_rows:
zone["data"] = "|".join(zone["data"])
csv_writer = csv.DictWriter(csv_str, fieldnames=csv_fields)
csv_writer.writeheader()
csv_writer.writerows(csv_rows)
_csv = csv_str.getvalue()
return dict(csv=_csv, json=_json)
[docs] def create_or_replace_record_set(self, name, record_type, data,
ttl=DEFAULT_TTL,
replace=False):
"""
Adds or replaces a DNS resource record set
Args:
name (str): The DNS name (i.e. the fully-qualified domain name)
record_type (str): The DNS record type
data: A list of resource record data strings,
or a string of one or more resource records separated by |
ttl (int): DNS time to live (in seconds)
replace (bool): Replace existing record set if needed
Raises:
gcpdns.ExistingRecordSetFound
"""
tld = publicsuffix2.get_public_suffix(name).lower()
if tld in ZONE_CACHE:
zone = ZONE_CACHE[tld]
else:
zone = self.get_zone(tld)
ZONE_CACHE[tld] = zone
name = "{0}{1}".format(
name.lower().rstrip(".").replace(zone.dns_name.rstrip("."), ""),
zone.dns_name).lstrip(".")
record_type = record_type.upper()
if ttl is None:
ttl = DEFAULT_TTL
ttl = int(ttl)
old_record_set = None
change = zone.changes()
for r_set in zone.list_resource_record_sets():
if r_set.name == name and r_set.record_type == record_type:
old_record_set = r_set
if not replace:
raise ExistingRecordSetFound(
"Existing record set found: {0} {1} {2} {3}".format(
r_set.name,
r_set.record_type,
r_set.ttl,
r_set.rrdatas
))
change.delete_record_set(r_set)
if type(data) == str:
if record_type == "CNAME":
data = "{0}.".format(data.rstrip("."))
data = [data]
elif record_type == "TXT":
new_data = []
data = data.split("|")
for r_set in data:
r_set = QUOTE_SPACE.sub("", r_set)
split_txt = textwrap.wrap(r_set, 253)
for i in range(len(split_txt)):
split_txt[i] = '"{0}"'.format(split_txt[i])
new_data.append("".join(split_txt))
data = new_data.copy()
else:
data = data.split("|")
if record_type in ["CNAME", "MX", "NS", "PTR", "SRV"]:
for i in range(len(data)):
data[i] = "{0}.".format(data[i].rstrip("."))
if old_record_set is None:
logging.info(
"Adding record set: {0} {1} {2} {3}".format(
name,
record_type,
ttl,
data
))
else:
logging.info(
"Replacing record set: {0} {1} {2} {3} "
"with: {4} {5} {6} {7}".format(
old_record_set.name,
old_record_set.record_type,
old_record_set.ttl,
old_record_set.rrdatas,
name,
record_type,
ttl,
data
))
r_set = zone.resource_record_set(name, record_type, ttl, rrdatas=data)
change.add_record_set(r_set)
change.create()
[docs] def delete_record_set(self, name, record_type):
"""
Deletes a record set
Args:
name (str): The DNS name (i.e. the fully-qualified domain name)
record_type (str): The DNS record type
Raises:
gcpdns.RecordSetNotFound
"""
logger.info("Deleting record set: {0} {1}".format(name, record_type))
tld = publicsuffix2.get_public_suffix(name).lower()
if tld in ZONE_CACHE:
zone = ZONE_CACHE[tld]
else:
zone = self.get_zone(tld)
ZONE_CACHE[tld] = zone
name = "{0}{1}".format(
name.lower().rstrip(".").replace(zone.dns_name.rstrip("."), ""),
zone.dns_name).lstrip(".")
record_type = record_type.upper()
record_to_delete = None
change = zone.changes()
records = zone.list_resource_record_sets()
for record in records:
if record.name == name and record.record_type == record_type:
record_to_delete = record
if record_to_delete is not None:
change.delete_record_set(record_to_delete)
change.create()
else:
raise RecordSetNotFound(
"Record set not found: {0} {1}".format(name,
record_type))
[docs] def apply_zones_csv(self, csv_file, ignore_errors=False):
"""
Apply a CSV of zones
The CSV fields are:
- ``action``
- ``create`` - Creates a zone
- ``delete`` - Deletes a zone
- ``dns_name`` - The zone's DNS name
- ``gcp_name`` - The zone's name in GCP (optional)
- ``description`` - The zone's description (optional)
Args:
csv_file: A file or file-like object
ignore_errors (bool): Log errors instead of raising an exception
Raises:
gcpdns.CSVError
"""
logger.info("Applying zones CSV")
reader = csv.DictReader(csv_file)
for row in reader:
try:
dns_name = row["dns_name"].lower()
action = row["action"].lower()
except KeyError as e:
error = "Line {0}: Missing field: {1)".format(reader.line_num,
e.__str__())
if ignore_errors:
logger.error(error)
continue
else:
raise CSVError(error)
gcp_name = None
if "gcp_name" in row:
gcp_name = row["gcp_name"]
description = None
if "description" in row:
description = row["description"]
if action == "delete":
try:
self.delete_zone(dns_name)
except ZoneNotFound as e:
error = "Line {0}: {1}".format(reader.line_num,
e.__str__())
if ignore_errors:
logger.warning(error)
else:
raise CSVError(error)
elif action == "create":
try:
self.create_zone(dns_name=dns_name, name=gcp_name,
description=description)
except ZoneConflict as e:
error = "Line {0}: {1}".format(reader.line_num,
e.__str__())
if ignore_errors:
logger.error(error)
else:
raise CSVError(error)
else:
error = "Line {0}: Invalid action".format(
reader.line_num)
if ignore_errors:
logger.error(error)
else:
raise CSVError(error)
[docs] def apply_record_sets_csv(self, csv_file, ignore_errors=False):
"""
Apply a CSV of record set changes
The CSV fields are:
- ``action``
- ``create`` - Creates a resource record set
- ``replace`` - Creates or replaces an existing record set
- ``delete`` - Deletes a resource record set
- ``name`` - The record set name (i.e. the Fully-Qualified Domain Name)
- ``record_type`` - The DNS record type
- ``ttl`` - DNS time to live (in seconds)
- ``data`` - DNS record data separated by ``|``
Args:
csv_file: A file or file-like object
ignore_errors (bool): Log errors instead of raising an exception
Raises:
gcpdns.CSVError
"""
logger.info("Applying record sets CSV")
reader = csv.DictReader(csv_file)
for row in reader:
try:
name = row["name"].lower()
action = row["action"].lower()
record_type = row["record_type"].upper()
except KeyError as e:
error = "Line {0}: Missing {1}".format(reader.line_num,
e.__str__())
if ignore_errors:
logger.error(error)
continue
else:
raise CSVError(error)
ttl = None
if "ttl" in row:
ttl = int(row["ttl"])
data = None
if "data" in row:
data = row["data"]
if action == "delete":
try:
self.delete_record_set(name, record_type)
except RecordSetNotFound as e:
error = "Line {0}: {1}".format(reader.line_num,
e.__str__())
if ignore_errors:
logger.warning(error)
else:
raise CSVError(error)
elif action in ["create", "replace"]:
if data is not None:
replace = action == "replace"
try:
self.create_or_replace_record_set(
name, record_type,
data, ttl=ttl, replace=replace)
except ExistingRecordSetFound as e:
error = "Line {0}: {1}".format(reader.line_num,
e.__str__())
if ignore_errors:
logger.error(error)
else:
raise CSVError(error)
else:
error = "Line {0}: Missing data".format(
reader.line_num)
if ignore_errors:
logger.error(error)
else:
raise CSVError(error)
else:
error = "Line {0}: Invalid action".format(
reader.line_num)
if ignore_errors:
logger.error(error)
else:
raise CSVError(error)
class _CLIConfig(object):
def __init__(self, credential_file, verbose=False):
if verbose:
logging.basicConfig(level=logging.INFO,
format="%(levelname)s: %(message)s")
else:
logging.basicConfig(level=logging.WARNING,
format="%(levelname)s: %(message)s")
scopes = ['https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/ndev.clouddns.readwrite']
credentials = service_account.Credentials.from_service_account_file(
credential_file, scopes=scopes)
self.client = DNSClient(credentials=credentials,
project=credentials.project_id)
@click.group()
@click.version_option(version=__version__)
@click.option("--verbose", is_flag=True, help="Enable verbose logging.")
@click.argument("credential_file",
type=click.Path(exists=True, dir_okay=False))
@click.pass_context
def _main(ctx, credential_file, verbose=False):
"""gcpdns: A CLI for managing zones and resource record sets on
Google Cloud DNS."""
ctx.obj = _CLIConfig(credential_file, verbose=verbose)
@_main.group("zone")
def _zone():
"""Manage DNS zones."""
@_zone.command("create")
@click.argument("dns_name")
@click.option("--gcp_name", help="Set the zone's GCP name.")
@click.option("--description", help="Set the zone's description.")
@click.pass_context
def _create_zone(ctx, dns_name, gcp_name=None, description=None):
"""Create a DNS zone."""
try:
ctx.obj.client.create_zone(dns_name, name=gcp_name,
description=description)
except Exception as e:
logger.error(e.__str__())
exit(-1)
@_zone.command("delete")
@click.confirmation_option(prompt="Are you sure you want to delete this zone?")
@click.argument("name")
@click.pass_context
def _delete_zone(ctx, name):
"""Delete a DNS zone and all its resource records."""
try:
ctx.obj.client.delete_zone(name)
except Exception as e:
logger.error(e.__str__())
exit(-1)
@_zone.command("dump")
@click.option("--format", "-f", "format_", default="json", show_default=True,
type=click.Choice(["json", "csv"]),
help="Set the screen output format")
@click.option("--output", "-o",
type=click.Path(dir_okay=False, writable=True),
multiple=True,
help="One or more output file paths that end in .csv or .json "
"(suppresses screen output).")
@click.pass_context
def _dump_zones(ctx, format_, output):
"""Dump a list of DNS zones."""
try:
zones = ctx.obj.client.dump_zones()
if len(output) == 0:
click.echo(zones[format_])
else:
for path in output:
if path.lower().endswith(".json"):
with open(path, "w", encoding="utf-8",
errors="ignore", newline="\n") as o_file:
o_file.write(zones["json"])
elif path.lower().endswith(".csv"):
with open(path, "w", encoding="utf-8",
errors="ignore", newline="\n") as o_file:
o_file.write(zones["csv"])
except Exception as e:
logger.error(e.__str__())
exit(-1)
@_zone.command("update")
@click.argument("csv_file_path",
type=click.Path(exists=True, dir_okay=False))
@click.option("--ignore-errors", is_flag=True,
help="Continue processing the CSV when errors occur.")
@click.pass_context
def _apply_zones_csv(ctx, csv_file_path, ignore_errors):
"""Create and delete zones using a CSV file."""
try:
with open(csv_file_path, encoding="utf-8",
errors="ignore") as file:
ctx.obj.client.apply_zones_csv(file, ignore_errors=ignore_errors)
except Exception as e:
logger.error(e.__str__())
exit(-1)
@_main.group("record")
def _record():
"""Manage DNS resource record sets."""
@_record.command("dump")
@click.argument("zone")
@click.option("--format", "-f", "format_", default="json", show_default=True,
type=click.Choice(["json", "csv"]),
help="Set the screen output format")
@click.option("--output", "-o",
type=click.Path(dir_okay=False, writable=True),
multiple=True,
help="One or more output file paths that end in .csv or .json "
"(suppresses screen output).")
@click.pass_context
def _dump_record_sets(ctx, zone, format_, output):
"""Dump a list of DNS resource records."""
try:
records = ctx.obj.client.dump_records(zone)
if len(output) == 0:
click.echo(records[format_])
else:
for path in output:
if path.lower().endswith(".json"):
with open(path, "w", encoding="utf-8",
errors="ignore", newline="\n") as o_file:
o_file.write(records["json"])
elif path.lower().endswith(".csv"):
with open(path, "w", encoding="utf-8",
errors="ignore", newline="\n") as o_file:
o_file.write(records["csv"])
except Exception as e:
logger.error(e.__str__())
exit(-1)
@_record.command("update")
@click.argument("csv_file_path",
type=click.Path(exists=True, dir_okay=False))
@click.option("--ignore-errors", is_flag=True,
help="Continue processing the CSV when errors occur.")
@click.pass_context
def _apply_record_sets_csv(ctx, csv_file_path, ignore_errors):
"""Create, replace, and delete resource record sets using a CSV file."""
try:
with open(csv_file_path, encoding="utf-8",
errors="ignore") as file:
ctx.obj.client.apply_record_sets_csv(
file,
ignore_errors=ignore_errors)
except Exception as e:
logger.error(e.__str__())
exit(-1)
@_record.command("create")
@click.option("--replace", "-r", is_flag=True,
help="Replace any conflicting resource record set")
@click.argument("name")
@click.argument("record_type")
@click.option("--ttl", "-t", type=int, default=DEFAULT_TTL, show_default=True,
metavar="seconds", help="DNS Time to live (in seconds)")
@click.argument("data")
@click.pass_context
def _create_or_replace_record_set(ctx, replace, name, record_type, ttl, data):
"""Create a resource record set (Data fields separated by |)."""
try:
ctx.obj.client.create_or_replace_record_set(name, record_type,
data, ttl=ttl,
replace=replace)
except Exception as e:
logger.error(e.__str__())
exit(-1)
@_record.command("delete")
@click.argument("name")
@click.argument("record_type")
@click.confirmation_option(
prompt="Are you sure you want to delete this resource record set?")
@click.pass_context
def _delete_record_set(ctx, name, record_type):
"""Delete a resource record set"""
try:
ctx.obj.client.delete_record_set(name, record_type)
except Exception as e:
logger.error(e.__str__())
exit(-1)
if __name__ == "__main__":
_main()