cloudflare/cloudflare.py
2025-01-03 10:58:38 -03:00

182 lines
6.2 KiB
Python
Executable file

#!/usr/bin/env python
# Lain Iwakura <lain@serialexperiments.club> 2019 ~ 2025
import argparse
import configparser
import json
import logging
import os
import socket
import subprocess
from typing import Any, Dict, List, Tuple, Sequence
import ifaddr
import miniupnpc
import requests
script_dir = os.path.abspath(os.path.dirname(__file__))
config = configparser.RawConfigParser()
logger = logging.getLogger(__name__)
class Cloudflare:
def __init__(self, host: str, mail: str, key: str, api_server: str = 'https://api.cloudflare.com') -> None:
self.api_server = api_server
self.mail = mail
self.key = key
self.host = host
def list_dns(self, zone_id: str, type_: str) -> Dict[str, Any]:
headers = {
'X-Auth-Email': self.mail,
'X-Auth-Key': self.key,
}
with requests.get(
f'{self.api_server}/client/v4/zones/{zone_id}/dns_records?type={type_}',
headers=headers
) as response_:
return response_.json()
def update_dns(
self,
domain: str,
zone_id: str,
record_id: str,
record_type: str,
address: str,
proxied: bool = True,
) -> requests.Response:
headers = {
'X-Auth-Email': self.mail,
'X-Auth-Key': self.key,
'Content-Type': 'application/json',
}
payload = {
'type': record_type,
'name': domain,
'proxied': proxied,
'content': address,
}
with requests.put(
f"{self.api_server}/client/v4/zones/{zone_id}/dns_records/{record_id}",
headers=headers,
data=json.dumps(payload),
) as response:
return response
def get_remote_address(self, zone_id: str, type_: str):
dns_list = self.list_dns(zone_id, type_)
return dns_list['result'][0]['content']
def issue_cert(self, ssl_home: str, acme_directory: str, domains: List[str]) -> None:
env = {
'HOME': ssl_home,
'CF_Key': self.key,
'CF_Email': self.mail,
}
acme = os.path.join(script_dir, acme_directory, 'acme.sh')
kwargs = [acme, '--issue', '--dns', 'dns_cf', '-d', self.host]
for domain in domains:
kwargs.extend(['-d', f"{domain}.{self.host}"])
try:
subprocess.check_call(kwargs, env=env)
except subprocess.CalledProcessError as exception:
if exception.returncode != 2 and exception.returncode != 0:
raise exception
class CloudflareAction(argparse.Action):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
mail = config.get('Cloudflare', 'mail')
key = config.get('Cloudflare', 'api_key')
self.zone_id = config.get('Cloudflare', 'zone_id')
self.domain = config.get('General', 'domain')
self.cloudflare = Cloudflare(self.domain, mail, key)
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: str | Sequence[Any] | None,
option_string: str | None = None,
) -> None:
if option_string == '--list-dns':
assert isinstance(values, list), "Wrong params"
dns_list = self.cloudflare.list_dns(self.zone_id, values[0])
print(json.dumps(dns_list, indent=4))
if option_string == '--update-dns':
aaaa_record_id = config.get('Cloudflare', 'aaaa_record_id')
a_record_id = config.get('Cloudflare', 'a_record_id')
aaaa_proxied = config.getboolean('Cloudflare', 'aaaa_proxied')
a_proxied = config.getboolean('Cloudflare', 'a_proxied')
device = config.get('General', 'device')
local_a_address = get_local_address(device, (socket.AF_INET, 0))
local_aaaa_address = get_local_address(device, (socket.AF_INET6, 1))
remote_a_address = self.cloudflare.get_remote_address(self.zone_id, 'A')
remote_aaaa_address = self.cloudflare.get_remote_address(self.zone_id, 'AAAA')
if local_a_address != remote_a_address:
response = self.cloudflare.update_dns(
self.domain, self.zone_id, a_record_id, 'A', local_a_address, a_proxied,
)
print(f'A record: {response.status_code}', end='')
else:
print(f'A record: Already updated', end='')
print(f' [{local_a_address}]')
if local_aaaa_address != remote_aaaa_address:
response = self.cloudflare.update_dns(
self.domain, self.zone_id, aaaa_record_id, 'AAAA', local_aaaa_address, aaaa_proxied,
)
print(f'AAAA record: {response.status_code}', end='')
else:
print(f'AAAA record: Already updated', end='')
print(f' [{local_aaaa_address}]')
if option_string == '--issue-cert':
ssl_home = config.get('ssl', 'ssl_home')
acme_directory = config.get('ssl', 'acme_directory')
subdomains = config.get('General', 'subdomains').replace(' ', '').split(',')
self.cloudflare.issue_cert(ssl_home, acme_directory, subdomains)
def get_local_address(interface: str, type_: Tuple[socket.AddressFamily, int]):
if type_[0] == socket.AF_INET:
upnp = miniupnpc.UPnP()
upnp.discoverdelay = 200
upnp.discover()
upnp.selectigd()
return upnp.externalipaddress()
for adapter in ifaddr.get_adapters():
if adapter.name == interface:
for entry in adapter.ips:
if entry.is_IPv4:
continue
return entry.ip[0]
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
config.read(os.path.join(script_dir, 'config.ini'))
command_parser = argparse.ArgumentParser()
command_parser.add_argument('--list-dns', action=CloudflareAction, nargs=1)
command_parser.add_argument('--update-dns', action=CloudflareAction, nargs=0)
command_parser.add_argument('--issue-cert', action=CloudflareAction, nargs=0)
command_parser.parse_args()