-
-
Save executed/32f8d248a3d703d0fcf3fa7fd4add990 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import time | |
import os | |
import json | |
import requests | |
from apscheduler.executors.pool import ThreadPoolExecutor | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from apscheduler.triggers.date import DateTrigger | |
from flask import Flask, request, jsonify | |
from playwright.sync_api import sync_playwright | |
from cryptography.fernet import Fernet | |
import random | |
import datetime | |
# Configure logging | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger() | |
IP_LOG_FILE_NAME = "data/selected_ips.log" | |
AUTH_CACHE_FILE = "data/auth_cache.enc" | |
# 1 hour | |
SERVER_CONSIDERED_ALREADY_USED_SEC = 3600 | |
# 23 hours -> every 24 hours it's expired by ProtonVPN | |
AUTH_REUSE_THRESHOLD_SEC = 23 * 3600 | |
USERNAME_FILE = "/run/secrets/vpn_username" | |
PASSWORD_FILE = "/run/secrets/vpn_password" | |
DECRYPTION_KEY_FILE = "/run/secrets/decryption_key" | |
LOAD_UPPER_LIMIT_PCT = 90 | |
BLACKLIST_FILE_NAME = "data/ip_blacklist.log" | |
DAILY_AUTH_APPROX_HOUR = 9 | |
DAILY_AUTH_HOUR_DELTA_VARIATION = 3 | |
scheduler = None | |
app = Flask(__name__) | |
# Function to read blacklisted IPs | |
def get_blacklist_ips(): | |
"""Read the blacklist file and return a set of blacklisted IPs.""" | |
try: | |
if not os.path.exists(BLACKLIST_FILE_NAME): | |
return set() | |
with open(BLACKLIST_FILE_NAME, "r") as f: | |
ips = [line.strip() for line in f if line.strip()] | |
return set(ips) | |
except Exception as e: | |
logger.error(f"Error reading blacklist file: {str(e)}", exc_info=True) | |
raise | |
@app.route('/best-server', methods=['GET']) | |
def get_best_server(): | |
"""Return the best VPN server, skipping blacklisted and recently used IPs.""" | |
try: | |
country_code = request.args.get("country_code", "US") | |
logger.info(f"Received request for best server for country code: {country_code}") | |
auth_info = get_auth() | |
logger.info("Authentication successful or skipped.") | |
url = f"https://account.protonvpn.com/api/vpn/logicals?Tier=0&ExitCountry={country_code}" | |
headers = { | |
'accept': 'application/vnd.protonmail.v1+json', | |
'accept-language': 'en-US,en;q=0.6', | |
'dnt': '1', | |
'priority': 'u=1, i', | |
'referer': 'https://account.protonvpn.com/downloads', | |
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) Gecko/20100101 Firefox/110.0', | |
'x-pm-uid': auth_info["pm_uuid"], | |
'Cookie': f'AUTH-{auth_info["pm_uuid"]}={auth_info["auth_token"]}; Session-Id={auth_info["session_id"]}' | |
} | |
response = requests.get(url, headers=headers) | |
if response.status_code != 200: | |
logger.error(f"Failed to fetch servers: {response.status_code} - {response.text}") | |
raise Exception(f"Failed to fetch servers: {response.status_code} - {response.text}") | |
logger.info("Server response received successfully.") | |
servers_dict = response.json() | |
if not servers_dict.get("LogicalServers"): | |
logger.error("No servers found in response!") | |
raise Exception("No servers found in response!") | |
recent_ips = get_recent_ips() | |
blacklist_ips = get_blacklist_ips() | |
logger.info("Sorting best server") | |
# Primary sort by 'Score', secondary sort by 'Load' | |
# The lower the score - the better | |
sorted_servers = sorted( | |
servers_dict["LogicalServers"], | |
key=lambda s: (s.get("Score", 100), s.get("Load", 100)), | |
reverse=False | |
) | |
minutes_since_last_selection = minutes_since_last_best_server_selection() | |
logger.info(f"Minutes since last selection: {minutes_since_last_selection}") | |
first_weird_ip_skipped = False | |
for server in sorted_servers: | |
if not server.get("Servers"): | |
continue | |
if server["Load"] > LOAD_UPPER_LIMIT_PCT: | |
logger.info(f"Skipping best score server because load is too heavy: {get_api_human_readable_server_details_str(server)}. Exceeds upper limit {LOAD_UPPER_LIMIT_PCT}") | |
continue | |
entry_ip = server["Servers"][0]["EntryIP"] | |
if entry_ip in blacklist_ips: | |
logger.info(f"Skipping server because it's blacklisted: {get_api_human_readable_server_details_str(server)}") | |
continue | |
if entry_ip in recent_ips: | |
logger.info(f"Skipping server because it's recently used: {get_api_human_readable_server_details_str(server)}") | |
continue | |
if (minutes_since_last_selection * 60) > SERVER_CONSIDERED_ALREADY_USED_SEC and not first_weird_ip_skipped: | |
first_weird_ip_skipped = True | |
logger.info(f"Skipping weird best server probably without internet connection: {get_api_human_readable_server_details_str(server)}") | |
continue | |
log_selected_ip(entry_ip) | |
logger.info(f"Selected server: {get_api_human_readable_server_details_str(server)}") | |
return jsonify({"ip": entry_ip, "name": server['Name'], "loadPct": server['Load'], "city": server['City'], "score": server['Score']}) | |
logger.warning("No suitable server found that hasn't been used recently or isn't blacklisted!") | |
raise Exception("No suitable server found that hasn't been used recently or isn't blacklisted!") | |
except Exception as e: | |
logger.error(f"Exception in get_best_server: {str(e)}", exc_info=True) | |
return jsonify({"error": str(e)}), 500 | |
@app.route('/blacklist-ip', methods=['POST']) | |
def blacklist_ip(): | |
"""Append an IP address to the blacklist file.""" | |
try: | |
data = request.get_json() | |
if not data or 'ip' not in data: | |
return jsonify({"error": "Invalid request, 'ip' field is required."}), 400 | |
ip = data['ip'] | |
os.makedirs(os.path.dirname(BLACKLIST_FILE_NAME), exist_ok=True) | |
with open(BLACKLIST_FILE_NAME, "a") as f: | |
f.write(ip + "\n") | |
logger.info(f"Added IP to blacklist: {ip}") | |
return jsonify({"message": "IP added to blacklist successfully."}), 200 | |
except Exception as eBlck: | |
logger.error(f"Error in blacklist_ip: {str(eBlck)}", exc_info=True) | |
return jsonify({"error": str(eBlck)}), 500 | |
@app.route('/blacklist-recent-ip', methods=['POST']) | |
def blacklist_recent_ip(): | |
"""Blacklist the most recently selected IP from the selected_ips.log file.""" | |
try: | |
# Get recent IPs from selected_ips.log | |
recent_ips = get_recent_ips() | |
if not recent_ips: | |
return jsonify({"error": "No recent IP found to blacklist."}), 400 | |
# Select the last IP (most recent within the threshold) | |
ip = list(recent_ips.keys())[-1] | |
# Check if the IP is already blacklisted | |
blacklist_ips = get_blacklist_ips() | |
if ip in blacklist_ips: | |
return jsonify({"message": "Recent IP is already blacklisted."}), 200 | |
# Ensure the directory exists | |
os.makedirs(os.path.dirname(BLACKLIST_FILE_NAME), exist_ok=True) | |
# Append the IP to the blacklist file | |
with open(BLACKLIST_FILE_NAME, "a") as f: | |
f.write(ip + "\n") | |
logger.info(f"Added recent IP to blacklist: {ip}") | |
return jsonify({"message": "Recent IP added to blacklist successfully."}), 200 | |
except Exception as e: | |
logger.error(f"Error in blacklist_recent_ip: {str(e)}", exc_info=True) | |
return jsonify({"error": str(e)}), 500 | |
def get_auth(): | |
try: | |
if should_skip_auth(): | |
logger.info("Skipping authentication, loading from cache.") | |
return load_cached_auth() | |
logger.info("Performing authentication.") | |
auth_info = perform_auth() | |
save_auth(auth_info) | |
logger.info("Authentication successful and saved.") | |
return auth_info | |
except Exception as e: | |
logger.error(f"Exception in get_auth: {str(e)}", exc_info=True) | |
raise | |
def should_skip_auth(): | |
try: | |
if not os.path.exists(IP_LOG_FILE_NAME) or not os.path.exists(AUTH_CACHE_FILE): | |
return False | |
with open(IP_LOG_FILE_NAME, "r") as f: | |
lines = f.readlines() | |
if lines: | |
last_timestamp = float(lines[-1].strip().split()[1]) | |
if time.time() - last_timestamp < AUTH_REUSE_THRESHOLD_SEC: | |
logger.info("Authentication skipped due to recent use.") | |
return True | |
return False | |
except Exception as e: | |
logger.error(f"Exception in should_skip_auth: {str(e)}", exc_info=True) | |
raise | |
def load_cached_auth(): | |
try: | |
with open(DECRYPTION_KEY_FILE, "rb") as key_file: | |
key = key_file.read() | |
cipher = Fernet(key) | |
with open(AUTH_CACHE_FILE, "rb") as f: | |
encrypted_data = f.read() | |
decrypted_data = cipher.decrypt(encrypted_data) | |
auth_info = json.loads(decrypted_data) | |
logger.info("Loaded cached authentication info successfully.") | |
return auth_info | |
except Exception as e: | |
logger.error(f"Exception in load_cached_auth: {str(e)}", exc_info=True) | |
raise | |
def save_auth(auth_info): | |
try: | |
with open(DECRYPTION_KEY_FILE, "rb") as key_file: | |
key = key_file.read() | |
cipher = Fernet(key) | |
encrypted_data = cipher.encrypt(json.dumps(auth_info).encode()) | |
with open(AUTH_CACHE_FILE, "wb") as f: | |
f.write(encrypted_data) | |
logger.info("Saved authentication info successfully.") | |
except Exception as e: | |
logger.error(f"Exception in save_auth: {str(e)}", exc_info=True) | |
raise | |
def perform_auth(): | |
try: | |
with open(USERNAME_FILE, "r") as f: | |
username = f.read().strip() | |
with open(PASSWORD_FILE, "r") as f: | |
password = f.read().strip() | |
with sync_playwright() as p: | |
browser = p.firefox.launch(headless=True) | |
page = browser.new_page() | |
page.goto('https://account.protonvpn.com/login') | |
page.fill('#username', username) | |
page.locator("xpath=/html/body/div[1]/div[4]/div[1]/main/div[1]/div[2]/form/button").click() | |
time.sleep(3) | |
page.fill('#password', password) | |
page.locator("xpath=/html/body/div[1]/div[4]/div[1]/main/div[1]/div[2]/form/button").click() | |
time.sleep(5) | |
cookies = page.context.cookies() | |
browser.close() | |
pm_uuid, auth_token, session_id = None, None, None | |
for cookie in cookies: | |
if cookie['name'].startswith("AUTH-"): | |
pm_uuid = cookie['name'].split("-")[1] | |
auth_token = cookie['value'] | |
elif cookie['name'] == "Session-Id": | |
session_id = cookie['value'] | |
if not (pm_uuid and auth_token and session_id): | |
raise Exception("Failed to retrieve authentication tokens.") | |
logger.info("Authentication successful") | |
return {"pm_uuid": pm_uuid, "auth_token": auth_token, "session_id": session_id} | |
except Exception as e: | |
logger.error(f"Exception in perform_auth: {str(e)}", exc_info=True) | |
raise | |
def get_recent_ips(): | |
try: | |
recent_ips = {} | |
if os.path.exists(IP_LOG_FILE_NAME): | |
with open(IP_LOG_FILE_NAME, "r") as f: | |
for line in f: | |
parts = line.strip().split() | |
if len(parts) == 2: | |
ip, timestamp = parts | |
timestamp = float(timestamp) | |
if time.time() - timestamp < SERVER_CONSIDERED_ALREADY_USED_SEC: | |
recent_ips[ip] = timestamp | |
return recent_ips | |
except Exception as e: | |
logger.error(f"Exception in get_recent_ips: {str(e)}", exc_info=True) | |
raise | |
def minutes_since_last_best_server_selection(): | |
try: | |
# Check if the log file exists | |
if not os.path.exists(IP_LOG_FILE_NAME): | |
return 0 | |
# Variable to store the last valid timestamp | |
last_timestamp = None | |
# Open and read the file line by line | |
with open(IP_LOG_FILE_NAME, "r") as f: | |
for line in f: | |
# Split the line into parts and ensure it has exactly two components | |
parts = line.strip().split() | |
if len(parts) == 2: | |
try: | |
# Attempt to parse the timestamp as a float | |
timestamp = float(parts[1]) | |
# Update last_timestamp with the current valid timestamp | |
last_timestamp = timestamp | |
except ValueError: | |
# Skip lines where the timestamp isn't a valid float | |
continue | |
# If no valid timestamp was found, return None | |
if last_timestamp is None: | |
return 0 | |
# Calculate the difference in seconds between now and the last timestamp | |
current_time = time.time() | |
delta_seconds = current_time - last_timestamp | |
# Convert the difference to minutes | |
delta_minutes = delta_seconds / 60 | |
# Return the delta in minutes as a float | |
return delta_minutes | |
except Exception as e: | |
# Log any exceptions and re-raise, consistent with the original function | |
logger.error(f"Exception in get_last_entry_delta_minutes: {str(e)}", exc_info=True) | |
raise | |
def log_selected_ip(ip): | |
try: | |
with open(IP_LOG_FILE_NAME, "a") as f: | |
f.write(f"{ip} {time.time()}\n") | |
except Exception as e: | |
logger.error(f"Exception in log_selected_ip: {str(e)}", exc_info=True) | |
raise | |
def get_api_human_readable_server_details_str(server): | |
return f"Name: {server['Name']}, Load: {server['Load']}, Score: {server['Score']} City: {server['City']} IP: {server['Servers'][0]['EntryIP']}" | |
def daily_auth(): | |
try: | |
logger.info("Running daily authentication") | |
auth_info = perform_auth() | |
save_auth(auth_info) | |
logger.info("Daily authentication completed") | |
except Exception as e: | |
logger.error(f"Error in daily_auth: {str(e)}", exc_info=True) | |
finally: | |
schedule_next_daily_auth() | |
def schedule_next_daily_auth(): | |
try: | |
# Get the current date and time | |
now = datetime.datetime.now() | |
# Define the range for the random hour, ensuring it stays between 0 and 23 | |
lower_hour = max(0, DAILY_AUTH_APPROX_HOUR - DAILY_AUTH_HOUR_DELTA_VARIATION) | |
upper_hour = min(23, DAILY_AUTH_APPROX_HOUR + DAILY_AUTH_HOUR_DELTA_VARIATION) | |
# Generate random hour, minute, and second | |
random_hour = random.randint(lower_hour, upper_hour) | |
random_minute = random.randint(0, 59) | |
random_second = random.randint(0, 59) | |
# Create a potential run time for today with the random time | |
potential_run_time_today = now.replace(hour=random_hour, minute=random_minute, second=random_second, microsecond=0) | |
# Check if this time is still in the future | |
if potential_run_time_today > now: | |
# If the time is in the future, schedule for today | |
next_run_time = potential_run_time_today | |
else: | |
# If the time has passed today, schedule for tomorrow | |
next_run_time = potential_run_time_today + datetime.timedelta(days=1) | |
# Schedule the job | |
scheduler.add_job(daily_auth, trigger=DateTrigger(run_date=next_run_time)) | |
logger.info(f"Scheduled next daily authentication for {next_run_time}") | |
except Exception as eSched: | |
logger.error(f"Error scheduling next daily auth: {str(eSched)}", exc_info=True) | |
if __name__ == "__main__": | |
#global scheduler | |
try: | |
logger.info("Starting Proton Explorer") | |
scheduler = BackgroundScheduler({ | |
'apscheduler.executors.default': { | |
'class': ThreadPoolExecutor, | |
'max_workers': 1 | |
} | |
}) | |
scheduler.start() | |
schedule_next_daily_auth() | |
app.run(host='0.0.0.0', port=43999) | |
except Exception as e: | |
logger.error(f"Exception while starting Proton Explorer: {str(e)}", exc_info=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment