212 lines
7.1 KiB
Python
Executable file
212 lines
7.1 KiB
Python
Executable file
#!/usr/bin/env python
|
|
# Lain Iwakura <lain@serialexperiments.club> 2019 ~ 2025
|
|
|
|
|
|
import argparse
|
|
import configparser
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List, Tuple, Sequence
|
|
|
|
import ifaddr
|
|
import miniupnpc
|
|
import requests
|
|
|
|
script_dir = os.path.abspath(os.path.dirname(__file__))
|
|
config = configparser.RawConfigParser()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Cloudflare:
|
|
def __init__(self, host: str, mail: str, key: str, api_server: str = 'https://api.cloudflare.com') -> None:
|
|
self.api_server = api_server
|
|
self.mail = mail
|
|
self.key = key
|
|
self.host = host
|
|
|
|
def list_dns(self, zone_id: str, type_: str) -> Dict[str, Any]:
|
|
headers = {
|
|
'X-Auth-Email': self.mail,
|
|
'X-Auth-Key': self.key,
|
|
}
|
|
|
|
with requests.get(
|
|
f'{self.api_server}/client/v4/zones/{zone_id}/dns_records?type={type_}',
|
|
headers=headers
|
|
) as response_:
|
|
return response_.json()
|
|
|
|
def update_dns(
|
|
self,
|
|
domain: str,
|
|
zone_id: str,
|
|
record_id: str,
|
|
record_type: str,
|
|
address: str,
|
|
proxied: bool = True,
|
|
ttl: int = 1,
|
|
) -> requests.Response:
|
|
headers = {
|
|
'X-Auth-Email': self.mail,
|
|
'X-Auth-Key': self.key,
|
|
'Content-Type': 'application/json',
|
|
}
|
|
|
|
payload = {
|
|
'type': record_type,
|
|
'name': domain,
|
|
'proxied': proxied,
|
|
'ttl': ttl,
|
|
'content': address,
|
|
'comment': f"Updated at {datetime.now().ctime()}"
|
|
}
|
|
|
|
with requests.put(
|
|
f"{self.api_server}/client/v4/zones/{zone_id}/dns_records/{record_id}",
|
|
headers=headers,
|
|
data=json.dumps(payload),
|
|
) as response:
|
|
return response
|
|
|
|
def get_remote_address(self, zone_id: str, type_: str):
|
|
dns_list = self.list_dns(zone_id, type_)
|
|
return dns_list['result'][0]['content']
|
|
|
|
def issue_cert(self, ssl_home: str, acme_directory: str, domains: List[str]) -> None:
|
|
env = {
|
|
'HOME': ssl_home,
|
|
'CF_Key': self.key,
|
|
'CF_Email': self.mail,
|
|
}
|
|
|
|
acme = os.path.join(script_dir, acme_directory, 'acme.sh')
|
|
kwargs = [acme, '--issue', '--dns', 'dns_cf', '-d', self.host]
|
|
|
|
for domain in domains:
|
|
kwargs.extend(['-d', f"{domain}.{self.host}"])
|
|
|
|
try:
|
|
subprocess.check_call(kwargs, env=env)
|
|
except subprocess.CalledProcessError as exception:
|
|
if exception.returncode != 2 and exception.returncode != 0:
|
|
raise exception
|
|
|
|
|
|
class CloudflareAction(argparse.Action):
|
|
def __init__(self, **kwargs) -> None:
|
|
super().__init__(**kwargs)
|
|
mail = config.get('Cloudflare', 'mail')
|
|
key = config.get('Cloudflare', 'api_key')
|
|
self.zone_id = config.get('Cloudflare', 'zone_id')
|
|
self.domain = config.get('General', 'domain')
|
|
self.device = config.get('General', 'device')
|
|
|
|
self.a_id = config.get('Cloudflare', 'a_record_id')
|
|
self.a_proxied = config.getboolean('Cloudflare', 'a_proxied')
|
|
self.a_ttl = config.get('Cloudflare' 'a_ttl')
|
|
|
|
self.aaaa_id = config.get('Cloudflare', 'aaaa_record_id')
|
|
self.aaaa_proxied = config.getboolean('Cloudflare', 'aaaa_proxied')
|
|
self.aaaa_ttl = config.get('Cloudflare', 'aaaa_ttl')
|
|
|
|
self.cloudflare = Cloudflare(self.domain, mail, key)
|
|
|
|
def __call__(
|
|
self,
|
|
parser: argparse.ArgumentParser,
|
|
namespace: argparse.Namespace,
|
|
values: str | Sequence[Any] | None,
|
|
option_string: str | None = None,
|
|
) -> None:
|
|
if option_string == '--list-dns':
|
|
assert isinstance(values, list), "Wrong params"
|
|
dns_list = self.cloudflare.list_dns(self.zone_id, values[0])
|
|
print(json.dumps(dns_list, indent=4))
|
|
|
|
if option_string == '--update-dns':
|
|
local_a_address = get_local_address(self.device, (socket.AF_INET, 0))
|
|
local_aaaa_address = get_local_address(self.device, (socket.AF_INET6, 1))
|
|
|
|
remote_a_address = self.cloudflare.get_remote_address(self.zone_id, 'A')
|
|
remote_aaaa_address = self.cloudflare.get_remote_address(self.zone_id, 'AAAA')
|
|
|
|
if local_a_address != remote_a_address:
|
|
response = self.cloudflare.update_dns(
|
|
self.domain,
|
|
self.zone_id,
|
|
self.a_id,
|
|
'A',
|
|
local_a_address,
|
|
self.a_proxied,
|
|
self.a_ttl,
|
|
)
|
|
print(f'A record: {response.status_code}', end='')
|
|
else:
|
|
print(f'A record: Already updated', end='')
|
|
|
|
print(f' [{local_a_address}]')
|
|
|
|
if local_aaaa_address != remote_aaaa_address:
|
|
response = self.cloudflare.update_dns(
|
|
self.domain,
|
|
self.zone_id,
|
|
self.aaaa_id,
|
|
'AAAA',
|
|
local_aaaa_address,
|
|
self.aaaa_proxied,
|
|
self.aaaa_ttl,
|
|
)
|
|
print(f'AAAA record: {response.status_code}', end='')
|
|
else:
|
|
print(f'AAAA record: Already updated', end='')
|
|
|
|
print(f' [{local_aaaa_address}]')
|
|
|
|
if option_string == '--issue-cert':
|
|
ssl_home = config.get('ssl', 'ssl_home')
|
|
acme_directory = config.get('ssl', 'acme_directory')
|
|
subdomains = config.get('General', 'subdomains').replace(' ', '').split(',')
|
|
self.cloudflare.issue_cert(ssl_home, acme_directory, subdomains)
|
|
|
|
if option_string == '--maintenance':
|
|
self.cloudflare.update_dns(
|
|
self.domain, self.zone_id, self.a_id, 'A', '192.0.2.1', True
|
|
)
|
|
|
|
self.cloudflare.update_dns(
|
|
self.domain, self.zone_id, self.aaaa_id, 'AAAA', '2001:db8::', True
|
|
)
|
|
|
|
print(f'Enabled')
|
|
|
|
def get_local_address(interface: str, type_: Tuple[socket.AddressFamily, int]):
|
|
if type_[0] == socket.AF_INET:
|
|
upnp = miniupnpc.UPnP()
|
|
upnp.discoverdelay = 200
|
|
upnp.discover()
|
|
upnp.selectigd()
|
|
return upnp.externalipaddress()
|
|
|
|
for adapter in ifaddr.get_adapters():
|
|
if adapter.name == interface:
|
|
for entry in adapter.ips:
|
|
if entry.is_IPv4:
|
|
continue
|
|
|
|
return entry.ip[0]
|
|
|
|
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(level=logging.INFO)
|
|
config.read(os.path.join(script_dir, 'config.ini'))
|
|
|
|
command_parser = argparse.ArgumentParser()
|
|
command_parser.add_argument('--list-dns', action=CloudflareAction, nargs=1)
|
|
command_parser.add_argument('--update-dns', action=CloudflareAction, nargs=0)
|
|
command_parser.add_argument('--issue-cert', action=CloudflareAction, nargs=0)
|
|
command_parser.add_argument('--maintenance', action=CloudflareAction, nargs=0)
|
|
command_parser.parse_args()
|