cloudflare/cloudflare.py
2025-01-25 23:25:44 -03:00

228 lines
7.5 KiB
Python
Executable file

#!/usr/bin/env python
# Lain Iwakura <lain@serialexperiments.club> 2019 ~ 2025
import argparse
import configparser
import json
import logging
import os
import socket
import subprocess
from datetime import datetime
from typing import Any, Dict, List, Tuple, Sequence
import ifaddr
import miniupnpc
import requests
script_dir = os.path.abspath(os.path.dirname(__file__))
config = configparser.RawConfigParser()
logger = logging.getLogger(__name__)
class Cloudflare:
def __init__(self, host: str, mail: str, key: str, api_server: str = 'https://api.cloudflare.com') -> None:
self.api_server = api_server
self.mail = mail
self.key = key
self.host = host
def list_dns(self, zone_id: str, type_: str) -> Dict[str, Any]:
headers = {
'X-Auth-Email': self.mail,
'X-Auth-Key': self.key,
}
with requests.get(
f'{self.api_server}/client/v4/zones/{zone_id}/dns_records?type={type_}',
headers=headers
) as response_:
return response_.json()
def update_dns(
self,
domain: str,
zone_id: str,
record_id: str,
record_type: str,
address: str,
proxied: bool = True,
ttl: int = 1,
) -> requests.Response:
headers = {
'X-Auth-Email': self.mail,
'X-Auth-Key': self.key,
'Content-Type': 'application/json',
}
payload = {
'type': record_type,
'name': domain,
'proxied': proxied,
'ttl': ttl,
'content': address,
'comment': f"Updated at {datetime.now().ctime()}"
}
with requests.put(
f"{self.api_server}/client/v4/zones/{zone_id}/dns_records/{record_id}",
headers=headers,
data=json.dumps(payload),
) as response:
return response
def get_remote_address(self, zone_id: str, type_: str):
dns_list = self.list_dns(zone_id, type_)
return dns_list['result'][0]['content']
def issue_cert(
self,
ssl_home: str,
acme_directory: str,
domains: List[str],
server: str = 'zerossl',
force: bool = False,
) -> None:
env = {
'HOME': ssl_home,
'CF_Key': self.key,
'CF_Email': self.mail,
}
acme = os.path.join(script_dir, acme_directory, 'acme.sh')
kwargs = [acme, '--server', server, '--issue', '--dns', 'dns_cf', '-d', self.host]
if force:
kwargs.append('--force')
for domain in domains:
kwargs.extend(['-d', f"{domain}.{self.host}"])
try:
subprocess.check_call(kwargs, env=env)
except subprocess.CalledProcessError as exception:
if exception.returncode != 2 and exception.returncode != 0:
raise exception
class CloudflareAction(argparse.Action):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
mail = config.get('Cloudflare', 'mail')
key = config.get('Cloudflare', 'api_key')
self.zone_id = config.get('Cloudflare', 'zone_id')
self.domain = config.get('General', 'domain')
self.device = config.get('General', 'device')
self.a_id = config.get('Cloudflare', 'a_record_id')
self.a_proxied = config.getboolean('Cloudflare', 'a_proxied')
self.a_ttl = config.get('Cloudflare', 'a_ttl')
self.aaaa_id = config.get('Cloudflare', 'aaaa_record_id')
self.aaaa_proxied = config.getboolean('Cloudflare', 'aaaa_proxied')
self.aaaa_ttl = config.get('Cloudflare', 'aaaa_ttl')
self.cloudflare = Cloudflare(self.domain, mail, key)
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: str | Sequence[Any] | None,
option_string: str | None = None,
) -> None:
if option_string == '--list-dns':
assert isinstance(values, list), "Wrong params"
dns_list = self.cloudflare.list_dns(self.zone_id, values[0])
print(json.dumps(dns_list, indent=4))
if option_string == '--update-dns':
local_a_address = get_local_address(self.device, (socket.AF_INET, 0))
local_aaaa_address = get_local_address(self.device, (socket.AF_INET6, 1))
remote_a_address = self.cloudflare.get_remote_address(self.zone_id, 'A')
remote_aaaa_address = self.cloudflare.get_remote_address(self.zone_id, 'AAAA')
if local_a_address != remote_a_address:
response = self.cloudflare.update_dns(
self.domain,
self.zone_id,
self.a_id,
'A',
local_a_address,
self.a_proxied,
self.a_ttl,
)
print(f'A record: {response.status_code}', end='')
else:
print(f'A record: Already updated', end='')
print(f' [{local_a_address}]')
if local_aaaa_address != remote_aaaa_address:
response = self.cloudflare.update_dns(
self.domain,
self.zone_id,
self.aaaa_id,
'AAAA',
local_aaaa_address,
self.aaaa_proxied,
self.aaaa_ttl,
)
print(f'AAAA record: {response.status_code}', end='')
else:
print(f'AAAA record: Already updated', end='')
print(f' [{local_aaaa_address}]')
if option_string == '--issue-cert':
ssl_home = config.get('ssl', 'ssl_home')
acme_directory = config.get('ssl', 'acme_directory')
server = config.get('ssl', 'server', fallback='zerossl')
subdomains = config.get('General', 'subdomains').replace(' ', '').split(',')
force = False
if values == "force":
force = True
self.cloudflare.issue_cert(ssl_home, acme_directory, subdomains, server, force)
if option_string == '--maintenance':
self.cloudflare.update_dns(
self.domain, self.zone_id, self.a_id, 'A', '192.0.2.1', True
)
self.cloudflare.update_dns(
self.domain, self.zone_id, self.aaaa_id, 'AAAA', '2001:db8::', True
)
print(f'Enabled')
def get_local_address(interface: str, type_: Tuple[socket.AddressFamily, int]):
if type_[0] == socket.AF_INET:
upnp = miniupnpc.UPnP()
upnp.discoverdelay = 200
upnp.discover()
upnp.selectigd()
return upnp.externalipaddress()
for adapter in ifaddr.get_adapters():
if adapter.name == interface:
for entry in adapter.ips:
if entry.is_IPv4:
continue
return entry.ip[0]
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
config.read(os.path.join(script_dir, 'config.ini'))
command_parser = argparse.ArgumentParser()
command_parser.add_argument('--list-dns', action=CloudflareAction, nargs=1)
command_parser.add_argument('--update-dns', action=CloudflareAction, nargs=0)
command_parser.add_argument('--issue-cert', action=CloudflareAction, nargs='?')
command_parser.add_argument('--maintenance', action=CloudflareAction, nargs=0)
command_parser.parse_args()