averainy's Blog

averainy

23 Apr 2022

Auto Select Fastest Proxy in Clash With Python

起因

为了能够方便家里的电脑,电视盒子以及手机能够方便的使用代理,因此在我的群晖上用docker部署了clash。同时为了能够自动选择响应速度最快的代理,因此,用python写了这个脚本。 将这个脚本加入到定时任务即可实现自动切换最快代理。

代码

# coding=utf-8
import requests
import json
import urllib.parse
from concurrent.futures import ThreadPoolExecutor,wait,ALL_COMPLETED
headers = {
    'Accept': '*/*',
    'Accept-Language': 'zh,en;q=0.9,zh-CN;q=0.8',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    # Already added when you pass json=
    # 'Content-Type': 'application/json',
    'Origin': 'http://192.168.0.4:5080',
    'Pragma': 'no-cache',
    'Referer': 'http://192.168.0.4:5080/',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36',
}

json_data = {
    'name': '标准|台湾04解锁',
}
def change_proxy():
    response = requests.put('http://192.168.0.4:9090/proxies/%F0%9F%94%B0%E5%9B%BD%E5%A4%96%E6%B5%81%E9%87%8F', headers=headers, json=json_data, verify=False)

def get_proxies():
    response = requests.get('http://192.168.0.4:9090/proxies/%F0%9F%94%B0%E5%9B%BD%E5%A4%96%E6%B5%81%E9%87%8F', headers=headers, verify=False)
    proxies = json.loads(response.text)
    return proxies['all']
def test_speed(proxy_name):
    response_time = -1
    headers = {
            'Accept': '*/*',
            'Accept-Language': 'zh,en;q=0.9,zh-CN;q=0.8',
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive',
            'Content-Type': 'application/json',
            'Origin': 'http://192.168.0.4:5080',
            'Pragma': 'no-cache',
            'Referer': 'http://192.168.0.4:5080/',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36',
    }
    url = "http://192.168.0.4:9090/proxies/{0}/delay?timeout=5000&url=http://www.gstatic.com/generate_204".format(urllib.parse.quote(proxy_name))
    try:
        response = requests.get(url, headers=headers, verify=False)
        response_time = response.elapsed.total_seconds()
    except Exception:
        response_time = -1
    return (response_time,proxy_name)
def set_speedest_proxy():
    proxies = get_proxies()
    speedest_proxy=None
    speedest_time=100000
    pool  = ThreadPoolExecutor(max_workers=20)
    all_tasks = [pool.submit(test_speed,(proxy)) for proxy in proxies]
    wait(all_tasks,return_when=ALL_COMPLETED)
    for task in all_tasks:
        response_time,proxy_name = task.result()
        if u'直接连' in proxy_name:
            continue
        if response_time < 0:
            continue
        if response_time < speedest_time:
            speedest_time = response_time
            speedest_proxy = proxy_name
    if speedest_proxy:
        json_data['name']=speedest_proxy
        print(speedest_proxy)
    change_proxy()
if __name__ == '__main__':
    set_speedest_proxy()

结语

请将其中的ip以及端口换成自己。