Skip to content

Instantly share code, notes, and snippets.

@executed
Created April 18, 2025 01:20
Show Gist options
  • Save executed/32f8d248a3d703d0fcf3fa7fd4add990 to your computer and use it in GitHub Desktop.
Save executed/32f8d248a3d703d0fcf3fa7fd4add990 to your computer and use it in GitHub Desktop.
import logging
import time
import os
import json
import requests
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.date import DateTrigger
from flask import Flask, request, jsonify
from playwright.sync_api import sync_playwright
from cryptography.fernet import Fernet
import random
import datetime
# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
IP_LOG_FILE_NAME = "data/selected_ips.log"
AUTH_CACHE_FILE = "data/auth_cache.enc"
# 1 hour
SERVER_CONSIDERED_ALREADY_USED_SEC = 3600
# 23 hours -> every 24 hours it's expired by ProtonVPN
AUTH_REUSE_THRESHOLD_SEC = 23 * 3600
USERNAME_FILE = "/run/secrets/vpn_username"
PASSWORD_FILE = "/run/secrets/vpn_password"
DECRYPTION_KEY_FILE = "/run/secrets/decryption_key"
LOAD_UPPER_LIMIT_PCT = 90
BLACKLIST_FILE_NAME = "data/ip_blacklist.log"
DAILY_AUTH_APPROX_HOUR = 9
DAILY_AUTH_HOUR_DELTA_VARIATION = 3
scheduler = None
app = Flask(__name__)
# Function to read blacklisted IPs
def get_blacklist_ips():
"""Read the blacklist file and return a set of blacklisted IPs."""
try:
if not os.path.exists(BLACKLIST_FILE_NAME):
return set()
with open(BLACKLIST_FILE_NAME, "r") as f:
ips = [line.strip() for line in f if line.strip()]
return set(ips)
except Exception as e:
logger.error(f"Error reading blacklist file: {str(e)}", exc_info=True)
raise
@app.route('/best-server', methods=['GET'])
def get_best_server():
"""Return the best VPN server, skipping blacklisted and recently used IPs."""
try:
country_code = request.args.get("country_code", "US")
logger.info(f"Received request for best server for country code: {country_code}")
auth_info = get_auth()
logger.info("Authentication successful or skipped.")
url = f"https://account.protonvpn.com/api/vpn/logicals?Tier=0&ExitCountry={country_code}"
headers = {
'accept': 'application/vnd.protonmail.v1+json',
'accept-language': 'en-US,en;q=0.6',
'dnt': '1',
'priority': 'u=1, i',
'referer': 'https://account.protonvpn.com/downloads',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Gecko/20100101 Firefox/110.0',
'x-pm-uid': auth_info["pm_uuid"],
'Cookie': f'AUTH-{auth_info["pm_uuid"]}={auth_info["auth_token"]}; Session-Id={auth_info["session_id"]}'
}
response = requests.get(url, headers=headers)
if response.status_code != 200:
logger.error(f"Failed to fetch servers: {response.status_code} - {response.text}")
raise Exception(f"Failed to fetch servers: {response.status_code} - {response.text}")
logger.info("Server response received successfully.")
servers_dict = response.json()
if not servers_dict.get("LogicalServers"):
logger.error("No servers found in response!")
raise Exception("No servers found in response!")
recent_ips = get_recent_ips()
blacklist_ips = get_blacklist_ips()
logger.info("Sorting best server")
# Primary sort by 'Score', secondary sort by 'Load'
# The lower the score - the better
sorted_servers = sorted(
servers_dict["LogicalServers"],
key=lambda s: (s.get("Score", 100), s.get("Load", 100)),
reverse=False
)
minutes_since_last_selection = minutes_since_last_best_server_selection()
logger.info(f"Minutes since last selection: {minutes_since_last_selection}")
first_weird_ip_skipped = False
for server in sorted_servers:
if not server.get("Servers"):
continue
if server["Load"] > LOAD_UPPER_LIMIT_PCT:
logger.info(f"Skipping best score server because load is too heavy: {get_api_human_readable_server_details_str(server)}. Exceeds upper limit {LOAD_UPPER_LIMIT_PCT}")
continue
entry_ip = server["Servers"][0]["EntryIP"]
if entry_ip in blacklist_ips:
logger.info(f"Skipping server because it's blacklisted: {get_api_human_readable_server_details_str(server)}")
continue
if entry_ip in recent_ips:
logger.info(f"Skipping server because it's recently used: {get_api_human_readable_server_details_str(server)}")
continue
if (minutes_since_last_selection * 60) > SERVER_CONSIDERED_ALREADY_USED_SEC and not first_weird_ip_skipped:
first_weird_ip_skipped = True
logger.info(f"Skipping weird best server probably without internet connection: {get_api_human_readable_server_details_str(server)}")
continue
log_selected_ip(entry_ip)
logger.info(f"Selected server: {get_api_human_readable_server_details_str(server)}")
return jsonify({"ip": entry_ip, "name": server['Name'], "loadPct": server['Load'], "city": server['City'], "score": server['Score']})
logger.warning("No suitable server found that hasn't been used recently or isn't blacklisted!")
raise Exception("No suitable server found that hasn't been used recently or isn't blacklisted!")
except Exception as e:
logger.error(f"Exception in get_best_server: {str(e)}", exc_info=True)
return jsonify({"error": str(e)}), 500
@app.route('/blacklist-ip', methods=['POST'])
def blacklist_ip():
"""Append an IP address to the blacklist file."""
try:
data = request.get_json()
if not data or 'ip' not in data:
return jsonify({"error": "Invalid request, 'ip' field is required."}), 400
ip = data['ip']
os.makedirs(os.path.dirname(BLACKLIST_FILE_NAME), exist_ok=True)
with open(BLACKLIST_FILE_NAME, "a") as f:
f.write(ip + "\n")
logger.info(f"Added IP to blacklist: {ip}")
return jsonify({"message": "IP added to blacklist successfully."}), 200
except Exception as eBlck:
logger.error(f"Error in blacklist_ip: {str(eBlck)}", exc_info=True)
return jsonify({"error": str(eBlck)}), 500
@app.route('/blacklist-recent-ip', methods=['POST'])
def blacklist_recent_ip():
"""Blacklist the most recently selected IP from the selected_ips.log file."""
try:
# Get recent IPs from selected_ips.log
recent_ips = get_recent_ips()
if not recent_ips:
return jsonify({"error": "No recent IP found to blacklist."}), 400
# Select the last IP (most recent within the threshold)
ip = list(recent_ips.keys())[-1]
# Check if the IP is already blacklisted
blacklist_ips = get_blacklist_ips()
if ip in blacklist_ips:
return jsonify({"message": "Recent IP is already blacklisted."}), 200
# Ensure the directory exists
os.makedirs(os.path.dirname(BLACKLIST_FILE_NAME), exist_ok=True)
# Append the IP to the blacklist file
with open(BLACKLIST_FILE_NAME, "a") as f:
f.write(ip + "\n")
logger.info(f"Added recent IP to blacklist: {ip}")
return jsonify({"message": "Recent IP added to blacklist successfully."}), 200
except Exception as e:
logger.error(f"Error in blacklist_recent_ip: {str(e)}", exc_info=True)
return jsonify({"error": str(e)}), 500
def get_auth():
try:
if should_skip_auth():
logger.info("Skipping authentication, loading from cache.")
return load_cached_auth()
logger.info("Performing authentication.")
auth_info = perform_auth()
save_auth(auth_info)
logger.info("Authentication successful and saved.")
return auth_info
except Exception as e:
logger.error(f"Exception in get_auth: {str(e)}", exc_info=True)
raise
def should_skip_auth():
try:
if not os.path.exists(IP_LOG_FILE_NAME) or not os.path.exists(AUTH_CACHE_FILE):
return False
with open(IP_LOG_FILE_NAME, "r") as f:
lines = f.readlines()
if lines:
last_timestamp = float(lines[-1].strip().split()[1])
if time.time() - last_timestamp < AUTH_REUSE_THRESHOLD_SEC:
logger.info("Authentication skipped due to recent use.")
return True
return False
except Exception as e:
logger.error(f"Exception in should_skip_auth: {str(e)}", exc_info=True)
raise
def load_cached_auth():
try:
with open(DECRYPTION_KEY_FILE, "rb") as key_file:
key = key_file.read()
cipher = Fernet(key)
with open(AUTH_CACHE_FILE, "rb") as f:
encrypted_data = f.read()
decrypted_data = cipher.decrypt(encrypted_data)
auth_info = json.loads(decrypted_data)
logger.info("Loaded cached authentication info successfully.")
return auth_info
except Exception as e:
logger.error(f"Exception in load_cached_auth: {str(e)}", exc_info=True)
raise
def save_auth(auth_info):
try:
with open(DECRYPTION_KEY_FILE, "rb") as key_file:
key = key_file.read()
cipher = Fernet(key)
encrypted_data = cipher.encrypt(json.dumps(auth_info).encode())
with open(AUTH_CACHE_FILE, "wb") as f:
f.write(encrypted_data)
logger.info("Saved authentication info successfully.")
except Exception as e:
logger.error(f"Exception in save_auth: {str(e)}", exc_info=True)
raise
def perform_auth():
try:
with open(USERNAME_FILE, "r") as f:
username = f.read().strip()
with open(PASSWORD_FILE, "r") as f:
password = f.read().strip()
with sync_playwright() as p:
browser = p.firefox.launch(headless=True)
page = browser.new_page()
page.goto('https://account.protonvpn.com/login')
page.fill('#username', username)
page.locator("xpath=/html/body/div[1]/div[4]/div[1]/main/div[1]/div[2]/form/button").click()
time.sleep(3)
page.fill('#password', password)
page.locator("xpath=/html/body/div[1]/div[4]/div[1]/main/div[1]/div[2]/form/button").click()
time.sleep(5)
cookies = page.context.cookies()
browser.close()
pm_uuid, auth_token, session_id = None, None, None
for cookie in cookies:
if cookie['name'].startswith("AUTH-"):
pm_uuid = cookie['name'].split("-")[1]
auth_token = cookie['value']
elif cookie['name'] == "Session-Id":
session_id = cookie['value']
if not (pm_uuid and auth_token and session_id):
raise Exception("Failed to retrieve authentication tokens.")
logger.info("Authentication successful")
return {"pm_uuid": pm_uuid, "auth_token": auth_token, "session_id": session_id}
except Exception as e:
logger.error(f"Exception in perform_auth: {str(e)}", exc_info=True)
raise
def get_recent_ips():
try:
recent_ips = {}
if os.path.exists(IP_LOG_FILE_NAME):
with open(IP_LOG_FILE_NAME, "r") as f:
for line in f:
parts = line.strip().split()
if len(parts) == 2:
ip, timestamp = parts
timestamp = float(timestamp)
if time.time() - timestamp < SERVER_CONSIDERED_ALREADY_USED_SEC:
recent_ips[ip] = timestamp
return recent_ips
except Exception as e:
logger.error(f"Exception in get_recent_ips: {str(e)}", exc_info=True)
raise
def minutes_since_last_best_server_selection():
try:
# Check if the log file exists
if not os.path.exists(IP_LOG_FILE_NAME):
return 0
# Variable to store the last valid timestamp
last_timestamp = None
# Open and read the file line by line
with open(IP_LOG_FILE_NAME, "r") as f:
for line in f:
# Split the line into parts and ensure it has exactly two components
parts = line.strip().split()
if len(parts) == 2:
try:
# Attempt to parse the timestamp as a float
timestamp = float(parts[1])
# Update last_timestamp with the current valid timestamp
last_timestamp = timestamp
except ValueError:
# Skip lines where the timestamp isn't a valid float
continue
# If no valid timestamp was found, return None
if last_timestamp is None:
return 0
# Calculate the difference in seconds between now and the last timestamp
current_time = time.time()
delta_seconds = current_time - last_timestamp
# Convert the difference to minutes
delta_minutes = delta_seconds / 60
# Return the delta in minutes as a float
return delta_minutes
except Exception as e:
# Log any exceptions and re-raise, consistent with the original function
logger.error(f"Exception in get_last_entry_delta_minutes: {str(e)}", exc_info=True)
raise
def log_selected_ip(ip):
try:
with open(IP_LOG_FILE_NAME, "a") as f:
f.write(f"{ip} {time.time()}\n")
except Exception as e:
logger.error(f"Exception in log_selected_ip: {str(e)}", exc_info=True)
raise
def get_api_human_readable_server_details_str(server):
return f"Name: {server['Name']}, Load: {server['Load']}, Score: {server['Score']} City: {server['City']} IP: {server['Servers'][0]['EntryIP']}"
def daily_auth():
try:
logger.info("Running daily authentication")
auth_info = perform_auth()
save_auth(auth_info)
logger.info("Daily authentication completed")
except Exception as e:
logger.error(f"Error in daily_auth: {str(e)}", exc_info=True)
finally:
schedule_next_daily_auth()
def schedule_next_daily_auth():
try:
# Get the current date and time
now = datetime.datetime.now()
# Define the range for the random hour, ensuring it stays between 0 and 23
lower_hour = max(0, DAILY_AUTH_APPROX_HOUR - DAILY_AUTH_HOUR_DELTA_VARIATION)
upper_hour = min(23, DAILY_AUTH_APPROX_HOUR + DAILY_AUTH_HOUR_DELTA_VARIATION)
# Generate random hour, minute, and second
random_hour = random.randint(lower_hour, upper_hour)
random_minute = random.randint(0, 59)
random_second = random.randint(0, 59)
# Create a potential run time for today with the random time
potential_run_time_today = now.replace(hour=random_hour, minute=random_minute, second=random_second, microsecond=0)
# Check if this time is still in the future
if potential_run_time_today > now:
# If the time is in the future, schedule for today
next_run_time = potential_run_time_today
else:
# If the time has passed today, schedule for tomorrow
next_run_time = potential_run_time_today + datetime.timedelta(days=1)
# Schedule the job
scheduler.add_job(daily_auth, trigger=DateTrigger(run_date=next_run_time))
logger.info(f"Scheduled next daily authentication for {next_run_time}")
except Exception as eSched:
logger.error(f"Error scheduling next daily auth: {str(eSched)}", exc_info=True)
if __name__ == "__main__":
#global scheduler
try:
logger.info("Starting Proton Explorer")
scheduler = BackgroundScheduler({
'apscheduler.executors.default': {
'class': ThreadPoolExecutor,
'max_workers': 1
}
})
scheduler.start()
schedule_next_daily_auth()
app.run(host='0.0.0.0', port=43999)
except Exception as e:
logger.error(f"Exception while starting Proton Explorer: {str(e)}", exc_info=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment