跳至正文
  • 193 views
  • 3 min read

cloudflare ddns自用更新脚本

新浪微博 豆瓣 QQ 百度贴吧 QQ空间

cloudflare ddns自用更新脚本,自建邮件+博客服务器专用,记录一下,以备下次需要的时候使用

使用方法:

在crontab里面添加如下行,每20分钟运行一次:

*/20 * * * * /usr/bin/python3 /usr/local/bin/update_all.py >> /var/log/update_all.log 2>&1
#!/bin/env python3
# -*- coding: UTF-8 -*-
import requests
from requests.adapters import HTTPAdapter
import re
import json
import time
import sys
import os

req = requests.session()
req.mount('https://', HTTPAdapter(max_retries=9))
req.mount('http://', HTTPAdapter(max_retries=9))
base_url = "https://api.cloudflare.com/client/v4/zones"


def timeStamp():
    tStamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    return tStamp


def prtMsg(msgType="None", msgConn="Not defined msg type."):
    msgStamp = timeStamp()
    if msgType == "err":
        print(msgStamp + " [ERROR]" + ":" + msgConn)
    elif msgType == "msg":
        print(msgStamp + " [MESSAGE]" + ":" + msgConn)
    else:
        print(msgStamp + ":" + msgConn)


def get_ipv6_addr():
    res = os.popen("""/sbin/ip -6 addr show $(/sbin/ip a|awk -F ":" '/ ens| eth/&&!/inet/ {print $2}'
                        )|awk '/inet6/&&/240e/ {print $2}'|awk -F "/" '{print $1}'""").read()
    ipv6_list = [i for i in res.splitlines() if len(i) > 0]
    if ipv6_list:
        return (max(ipv6_list, key=len))
    else:
        return False


def get_ipv4_addr():
    ip = ''
    try:
        res = req.get('https://myip.ipip.net', timeout=5).text
        ip = re.findall(r'(\d+\.\d+\.\d+\.\d+)', res)
        ip = ip[0] if ip else ''
    except:
        pass
    return ip if ip else False


def get_cf_all_record(get_headers, zone_id):
    api_url = "{0}/{1}/dns_records".format(base_url, zone_id)
    res = req.get(api_url, headers=get_headers)
    return json.loads(res.text)['result']


def cr_headers_data(email, apikey):
    cf_headers = {
        "Content-Type": "application/json",
        "X-Auth-Email": email,
        "X-Auth-Key": apikey
    }
    return cf_headers

def cr_record_dict(headers, zone_id):
    rec_dict, zone_name = {}, ""
    all_rec = get_cf_all_record(headers, zone_id)
    for rec in all_rec:
        if not zone_name:
            zone_name = rec['zone_name']
        domain_name = ""
        if isinstance(rec, dict):
            if "name" in rec.keys():
                domain_name = "{0}-{1}".format(rec['name'], rec['type'])
        if domain_name:
            dom_dict = {'domain': rec['name'], 'id': rec['id'], 'type': rec['type'], 'content': rec['content'],
                        'proxied': rec['proxied']}
            if dom_dict:
                rec_dict[domain_name] = dom_dict
    return (rec_dict, zone_name) if rec_dict else ({}, "")


def cr_post_data(post_type, post_domain, post_content, proxy_type):
    post_data_json = {
      "type": post_type,
      "name": post_domain,
      "content": post_content,
      "ttl": 1,
      "proxied": proxy_type
    }
    return post_data_json


def put_cf_record(zone_id, record_id, put_headers, post_data):
    api_url = "{0}/{1}/dns_records/{2}".format(base_url, zone_id, record_id)
    res = req.put(api_url, headers=put_headers, json=post_data)
    if res.status_code == 200:
        return True
    else:
        return False


def main():
    email = "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
    apikey = "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
    zone_id = "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
    headers = cr_headers_data(email, apikey)
    all_rec_dict, zone_name = cr_record_dict(headers, zone_id)
    cf_list = ["www-AAAA", "gra-AAAA", "hub-AAAA", "ai-AAAA", "zbx-AAAA", "mail-TXT", "m-AAAA", "mail-A"]
    ipv6 = get_ipv6_addr()
    ipv4 = get_ipv4_addr()
    print("-" * 20)
    if ipv6 and ipv4:
        prtMsg('msg', "Current IPV6 address is {0}".format(ipv6))
        prtMsg('msg', "Current IPV4 address is {0}".format(ipv4))
    else:
        prtMsg('err', "GET IP address failed.")
        sys.exit(10)
    for cf in cf_list:
        post_domain, post_type = cf.split("-")
        if post_type == "AAAA":
            post_content = ipv6
        elif post_type == "A":
            post_content = ipv4
        else:
            post_content = "v=spf1 mx ip4:{0} ip6:{1} ~all".format(ipv4, ipv6)
        post_domain_name = "{0}.{1}".format(post_domain, zone_name)
        if post_type == "TXT":
            post_domain_name = zone_name
        post_key = "{0}-{1}".format(post_domain_name, post_type)
        if post_key in str(all_rec_dict):
            if post_content != all_rec_dict[post_key]['content']:
                proxy_type = all_rec_dict[post_key]['proxied']
                record_id = all_rec_dict[post_key]['id']
                post_data = cr_post_data(post_type, post_domain_name, post_content, proxy_type)
                res = put_cf_record(zone_id, record_id, headers, post_data)
                if res:
                    prtMsg('msg', 'Update {0} record for {1} success.'.format(post_domain_name, post_type))
                else:
                    prtMsg('err', 'Update {0} record for {1} failed.'.format(post_domain_name, post_type))
            else:
                prtMsg('msg', 'No needs to update {0} record for {1}.'.format(post_domain_name, post_type))
        else:
            prtMsg('err', 'GET {0} Record for {1} failed.'.format(post_domain_name, post_type))



if __name__ == '__main__':
    main()

发表回复

联系站长