Last active
July 23, 2023 08:12
-
-
Save fadenb/2bffa2941a5d540c851fb6ed6c1e29fe to your computer and use it in GitHub Desktop.
Quick and dirty download speedtest for Ubuntu mirrors. Script looks up the largest file available on all mirrors and downloads it from each one. Download speeds are summarized at the end.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import gzip | |
import requests | |
from collections import defaultdict | |
from urllib.parse import urljoin | |
from datetime import datetime, timedelta | |
# List of Ubuntu mirrors | |
mirrors = [ | |
"http://archive.ubuntu.com/ubuntu/", | |
"http://is.archive.ubuntu.com/ubuntu/", | |
"http://mirror.haf.is.nord.genesiscloudusercontent.com/ubuntu/", | |
"http://no.archive.ubuntu.com/ubuntu/", | |
"http://de.archive.ubuntu.com/ubuntu/", | |
] | |
def download_gzipped_metadata(mirror): | |
url = urljoin(mirror, "dists/focal/main/binary-amd64/Packages.gz") | |
response = requests.get(url) | |
if response.status_code == 200: | |
with open("Packages.gz", "wb") as f: | |
f.write(response.content) | |
return True | |
return False | |
def parse_packages_file(content): | |
packages = {} | |
for paragraph in content.split("\n\n"): | |
fields = {} | |
for line in paragraph.split("\n"): | |
if line and ':' in line: | |
key, value = line.split(":", 1) | |
fields[key.strip()] = value.strip() | |
if "Package" in fields and "Size" in fields and "Filename" in fields: | |
packages[fields["Package"]] = (int(fields["Size"]), fields["Filename"]) | |
return packages | |
def find_largest_common_package(): | |
common_packages = None | |
for mirror in mirrors: | |
if download_gzipped_metadata(mirror): | |
with gzip.open("Packages.gz", "rt") as f: | |
content = f.read() | |
packages = parse_packages_file(content) | |
if common_packages is None: | |
common_packages = packages | |
else: | |
common_packages = {k: v for k, v in common_packages.items() if k in packages} | |
else: | |
print(f"Failed to download metadata from {mirror}") | |
if not common_packages: | |
return None | |
largest_package = max(common_packages.items(), key=lambda x: x[1][0]) | |
return largest_package | |
def download_with_timeout(mirror, url, timeout): | |
full_url = urljoin(mirror, url) | |
with requests.get(full_url, stream=True, timeout=timeout) as response: | |
start_time = datetime.now() | |
total_size = int(response.headers.get("content-length", 0)) | |
downloaded = 0 | |
with open("temp_file", "wb") as f: | |
for chunk in response.iter_content(chunk_size=524288): | |
downloaded += len(chunk) | |
f.write(chunk) | |
elapsed = datetime.now() - start_time | |
speed = downloaded / elapsed.total_seconds() / 1024 | |
print(f"Download speed: {speed:.2f} KB/s") | |
if elapsed > timedelta(seconds=timeout): | |
return False, speed | |
return True, speed | |
def main(): | |
largest_package = find_largest_common_package() | |
if largest_package is None: | |
print("No common package found.") | |
return | |
package_name, (size, url) = largest_package | |
print(f"Largest package: {package_name}") | |
download_speeds = defaultdict(list) | |
for mirror in mirrors: | |
print(f"Downloading from {mirror}") | |
success, speed = download_with_timeout(mirror, url, 30) | |
download_speeds[mirror].append(speed) | |
if success: | |
print(f"Download successful") | |
else: | |
print(f"Download timeout, canceled") | |
os.remove("temp_file") | |
os.remove("Packages.gz") | |
print("\nAverage download speeds:") | |
print("-" * 40) | |
print(f"{'Speed (KB/s)':>10} | {'Mirror':<30}") | |
print("-" * 40) | |
for mirror, speeds in download_speeds.items(): | |
average_speed = sum(speeds) / len(speeds) | |
print(f"{average_speed:>10.2f} | {mirror:<30}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment