#!/usr/bin/env python # Lain Iwakura 2019 ~ 2025 import argparse import configparser import json import logging import os import socket import subprocess from datetime import datetime from typing import Any, Dict, List, Tuple, Sequence import ifaddr import miniupnpc import requests script_dir = os.path.abspath(os.path.dirname(__file__)) config = configparser.RawConfigParser() logger = logging.getLogger(__name__) class Cloudflare: def __init__(self, host: str, mail: str, key: str, api_server: str = 'https://api.cloudflare.com') -> None: self.api_server = api_server self.mail = mail self.key = key self.host = host def list_dns(self, zone_id: str, type_: str) -> Dict[str, Any]: headers = { 'X-Auth-Email': self.mail, 'X-Auth-Key': self.key, } with requests.get( f'{self.api_server}/client/v4/zones/{zone_id}/dns_records?type={type_}', headers=headers ) as response_: return response_.json() def update_dns( self, domain: str, zone_id: str, record_id: str, record_type: str, address: str, proxied: bool = True, ttl: int = 1, ) -> requests.Response: headers = { 'X-Auth-Email': self.mail, 'X-Auth-Key': self.key, 'Content-Type': 'application/json', } payload = { 'type': record_type, 'name': domain, 'proxied': proxied, 'ttl': ttl, 'content': address, 'comment': f"Updated at {datetime.now().ctime()}" } with requests.put( f"{self.api_server}/client/v4/zones/{zone_id}/dns_records/{record_id}", headers=headers, data=json.dumps(payload), ) as response: return response def get_remote_address(self, zone_id: str, type_: str): dns_list = self.list_dns(zone_id, type_) return dns_list['result'][0]['content'] def issue_cert( self, ssl_home: str, acme_directory: str, domains: List[str], server: str = 'zerossl', force: bool = False, ) -> None: env = { 'HOME': ssl_home, 'CF_Key': self.key, 'CF_Email': self.mail, } acme = os.path.join(script_dir, acme_directory, 'acme.sh') kwargs = [acme, '--server', server, '--issue', '--dns', 'dns_cf', '-d', self.host] if force: kwargs.append('--force') for domain in domains: kwargs.extend(['-d', f"{domain}.{self.host}"]) try: subprocess.check_call(kwargs, env=env) except subprocess.CalledProcessError as exception: if exception.returncode != 2 and exception.returncode != 0: raise exception class CloudflareAction(argparse.Action): def __init__(self, **kwargs) -> None: super().__init__(**kwargs) mail = config.get('Cloudflare', 'mail') key = config.get('Cloudflare', 'api_key') self.zone_id = config.get('Cloudflare', 'zone_id') self.domain = config.get('General', 'domain') self.device = config.get('General', 'device') self.a_id = config.get('Cloudflare', 'a_record_id') self.a_proxied = config.getboolean('Cloudflare', 'a_proxied') self.a_ttl = config.get('Cloudflare', 'a_ttl') self.aaaa_id = config.get('Cloudflare', 'aaaa_record_id') self.aaaa_proxied = config.getboolean('Cloudflare', 'aaaa_proxied') self.aaaa_ttl = config.get('Cloudflare', 'aaaa_ttl') self.cloudflare = Cloudflare(self.domain, mail, key) def __call__( self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: str | Sequence[Any] | None, option_string: str | None = None, ) -> None: if option_string == '--list-dns': assert isinstance(values, list), "Wrong params" dns_list = self.cloudflare.list_dns(self.zone_id, values[0]) print(json.dumps(dns_list, indent=4)) if option_string == '--update-dns': local_a_address = get_local_address(self.device, (socket.AF_INET, 0)) local_aaaa_address = get_local_address(self.device, (socket.AF_INET6, 1)) remote_a_address = self.cloudflare.get_remote_address(self.zone_id, 'A') remote_aaaa_address = self.cloudflare.get_remote_address(self.zone_id, 'AAAA') if local_a_address != remote_a_address: response = self.cloudflare.update_dns( self.domain, self.zone_id, self.a_id, 'A', local_a_address, self.a_proxied, self.a_ttl, ) print(f'A record: {response.status_code}', end='') else: print(f'A record: Already updated', end='') print(f' [{local_a_address}]') if local_aaaa_address != remote_aaaa_address: response = self.cloudflare.update_dns( self.domain, self.zone_id, self.aaaa_id, 'AAAA', local_aaaa_address, self.aaaa_proxied, self.aaaa_ttl, ) print(f'AAAA record: {response.status_code}', end='') else: print(f'AAAA record: Already updated', end='') print(f' [{local_aaaa_address}]') if option_string == '--issue-cert': ssl_home = config.get('ssl', 'ssl_home') acme_directory = config.get('ssl', 'acme_directory') server = config.get('ssl', 'server', fallback='zerossl') subdomains = config.get('General', 'subdomains').replace(' ', '').split(',') force = False if values == "force": force = True self.cloudflare.issue_cert(ssl_home, acme_directory, subdomains, server, force) if option_string == '--maintenance': self.cloudflare.update_dns( self.domain, self.zone_id, self.a_id, 'A', '192.0.2.1', True ) self.cloudflare.update_dns( self.domain, self.zone_id, self.aaaa_id, 'AAAA', '2001:db8::', True ) print(f'Enabled') def get_local_address(interface: str, type_: Tuple[socket.AddressFamily, int]): if type_[0] == socket.AF_INET: upnp = miniupnpc.UPnP() upnp.discoverdelay = 200 upnp.discover() upnp.selectigd() return upnp.externalipaddress() for adapter in ifaddr.get_adapters(): if adapter.name == interface: for entry in adapter.ips: if entry.is_IPv4: continue return entry.ip[0] if __name__ == '__main__': logging.basicConfig(level=logging.INFO) config.read(os.path.join(script_dir, 'config.ini')) command_parser = argparse.ArgumentParser() command_parser.add_argument('--list-dns', action=CloudflareAction, nargs=1) command_parser.add_argument('--update-dns', action=CloudflareAction, nargs=0) command_parser.add_argument('--issue-cert', action=CloudflareAction, nargs='?') command_parser.add_argument('--maintenance', action=CloudflareAction, nargs=0) command_parser.parse_args()