cloudflare ddns自用更新脚本,自建邮件+博客服务器专用,记录一下,以备下次需要的时候使用
使用方法:
在crontab里面添加如下行,每20分钟运行一次:
*/20 * * * * /usr/bin/python3 /usr/local/bin/update_all.py >> /var/log/update_all.log 2>&1
#!/bin/env python3
# -*- coding: UTF-8 -*-
import requests
from requests.adapters import HTTPAdapter
import re
import json
import time
import sys
import os
req = requests.session()
req.mount('https://', HTTPAdapter(max_retries=9))
req.mount('http://', HTTPAdapter(max_retries=9))
base_url = "https://api.cloudflare.com/client/v4/zones"
def timeStamp():
tStamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
return tStamp
def prtMsg(msgType="None", msgConn="Not defined msg type."):
msgStamp = timeStamp()
if msgType == "err":
print(msgStamp + " [ERROR]" + ":" + msgConn)
elif msgType == "msg":
print(msgStamp + " [MESSAGE]" + ":" + msgConn)
else:
print(msgStamp + ":" + msgConn)
def get_ipv6_addr():
res = os.popen("""/sbin/ip -6 addr show $(/sbin/ip a|awk -F ":" '/ ens| eth/&&!/inet/ {print $2}'
)|awk '/inet6/&&/240e/ {print $2}'|awk -F "/" '{print $1}'""").read()
ipv6_list = [i for i in res.splitlines() if len(i) > 0]
if ipv6_list:
return (max(ipv6_list, key=len))
else:
return False
def get_ipv4_addr():
ip = ''
try:
res = req.get('https://myip.ipip.net', timeout=5).text
ip = re.findall(r'(\d+\.\d+\.\d+\.\d+)', res)
ip = ip[0] if ip else ''
except:
pass
return ip if ip else False
def get_cf_all_record(get_headers, zone_id):
api_url = "{0}/{1}/dns_records".format(base_url, zone_id)
res = req.get(api_url, headers=get_headers)
return json.loads(res.text)['result']
def cr_headers_data(email, apikey):
cf_headers = {
"Content-Type": "application/json",
"X-Auth-Email": email,
"X-Auth-Key": apikey
}
return cf_headers
def cr_record_dict(headers, zone_id):
rec_dict, zone_name = {}, ""
all_rec = get_cf_all_record(headers, zone_id)
for rec in all_rec:
if not zone_name:
zone_name = rec['zone_name']
domain_name = ""
if isinstance(rec, dict):
if "name" in rec.keys():
domain_name = "{0}-{1}".format(rec['name'], rec['type'])
if domain_name:
dom_dict = {'domain': rec['name'], 'id': rec['id'], 'type': rec['type'], 'content': rec['content'],
'proxied': rec['proxied']}
if dom_dict:
rec_dict[domain_name] = dom_dict
return (rec_dict, zone_name) if rec_dict else ({}, "")
def cr_post_data(post_type, post_domain, post_content, proxy_type):
post_data_json = {
"type": post_type,
"name": post_domain,
"content": post_content,
"ttl": 1,
"proxied": proxy_type
}
return post_data_json
def put_cf_record(zone_id, record_id, put_headers, post_data):
api_url = "{0}/{1}/dns_records/{2}".format(base_url, zone_id, record_id)
res = req.put(api_url, headers=put_headers, json=post_data)
if res.status_code == 200:
return True
else:
return False
def main():
email = "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
apikey = "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
zone_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
headers = cr_headers_data(email, apikey)
all_rec_dict, zone_name = cr_record_dict(headers, zone_id)
cf_list = ["www-AAAA", "gra-AAAA", "hub-AAAA", "ai-AAAA", "zbx-AAAA", "mail-TXT", "m-AAAA", "mail-A"]
ipv6 = get_ipv6_addr()
ipv4 = get_ipv4_addr()
print("-" * 20)
if ipv6 and ipv4:
prtMsg('msg', "Current IPV6 address is {0}".format(ipv6))
prtMsg('msg', "Current IPV4 address is {0}".format(ipv4))
else:
prtMsg('err', "GET IP address failed.")
sys.exit(10)
for cf in cf_list:
post_domain, post_type = cf.split("-")
if post_type == "AAAA":
post_content = ipv6
elif post_type == "A":
post_content = ipv4
else:
post_content = "v=spf1 mx ip4:{0} ip6:{1} ~all".format(ipv4, ipv6)
post_domain_name = "{0}.{1}".format(post_domain, zone_name)
if post_type == "TXT":
post_domain_name = zone_name
post_key = "{0}-{1}".format(post_domain_name, post_type)
if post_key in str(all_rec_dict):
if post_content != all_rec_dict[post_key]['content']:
proxy_type = all_rec_dict[post_key]['proxied']
record_id = all_rec_dict[post_key]['id']
post_data = cr_post_data(post_type, post_domain_name, post_content, proxy_type)
res = put_cf_record(zone_id, record_id, headers, post_data)
if res:
prtMsg('msg', 'Update {0} record for {1} success.'.format(post_domain_name, post_type))
else:
prtMsg('err', 'Update {0} record for {1} failed.'.format(post_domain_name, post_type))
else:
prtMsg('msg', 'No needs to update {0} record for {1}.'.format(post_domain_name, post_type))
else:
prtMsg('err', 'GET {0} Record for {1} failed.'.format(post_domain_name, post_type))
if __name__ == '__main__':
main()
