Forked from eighthave/find-https-debian-archives.py
Last active
February 9, 2018 10:57
-
-
Save sipdbg2/f6e7db720bdbd7ad6775eaa099c256df to your computer and use it in GitHub Desktop.
Script to find official Debian mirrors that support HTTPS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Debian repository HTTPS-enabled mirrors scanner | |
# The package "apt-transport-https" does not ship an easy way to find https | |
# mirror to use. This script can fix that. | |
# | |
# rewrite of eighthave's code from https://gist.github.com/eighthave/7285154 | |
# 09/2016 -- modem <[email protected]> | |
# | |
import threading | |
import sys | |
import re | |
import copy | |
import argparse | |
import urllib2 | |
import ssl | |
THREAD_COUNT = 64 | |
TIMEOUT_CONNECT = 2 | |
class retcode (object): | |
success = 1 | |
fail = 2 | |
bad = 3 | |
error = 4 | |
class bcolors: | |
OKGREEN = '\033[92m' | |
FAIL = '\033[91m' | |
ENDC = '\033[0m' | |
class MirrorWorker (object): | |
def __init__ (self, host_list): | |
self.host_list = host_list | |
self.lock = threading.RLock () | |
def get_host (self): | |
self.lock.acquire () | |
if not self.host_list: | |
self.lock.release () | |
return None | |
host = self.host_list.pop (0) | |
self.lock.release () | |
return host | |
class MirrorConsumer (threading.Thread): | |
def __init__(self, timeout_connect, noerr, worker): | |
threading.Thread.__init__(self) | |
self.timeout_connect = timeout_connect | |
self.worker = worker | |
self.noerr = noerr | |
def run (self): | |
while True: | |
url = self.worker.get_host () | |
if not url: | |
return | |
ret = self.https_try_connect (url) | |
self.log (url, ret) | |
return | |
def https_try_connect (self, url): | |
try: | |
response = urllib2.urlopen (url, timeout=self.timeout_connect) | |
except urllib2.URLError as err: | |
return retcode.fail | |
except ssl.SSLError as err: | |
return retcode.bad | |
except: | |
return retcode.error | |
return retcode.success | |
def log (self, url, ret): | |
url = url.ljust (70, ' ') | |
if retcode.success == ret: | |
sys.stdout.write ( | |
("%s"%url) + bcolors.OKGREEN + "SUCCESS" + \ | |
bcolors.ENDC + "\n") | |
if self.noerr: | |
return | |
elif retcode.fail == ret: | |
sys.stderr.write ( | |
("%s"%url) + bcolors.FAIL + "fail" + \ | |
bcolors.ENDC + "\n") | |
elif retcode.bad == ret: | |
sys.stderr.write ( | |
("%s"%url) + bcolors.FAIL + "bad" + \ | |
bcolors.ENDC + "\n") | |
elif retcode.error == ret: | |
sys.stderr.write ( | |
("%s"%url) + bcolors.FAIL + "error" + \ | |
bcolors.ENDC + "\n") | |
return | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser () | |
parser.add_argument('--timeout-connect', dest='timeout_connect', type=int, | |
help='connect() timeout', | |
default=TIMEOUT_CONNECT) | |
parser.add_argument('--thread', dest='thread', type=int, | |
default=THREAD_COUNT, | |
help='the threads count') | |
parser.add_argument('--no-err', action='store_true', | |
help='does not print failure') | |
group = parser.add_mutually_exclusive_group () | |
group.add_argument('--generic', action='store_true', | |
help='find generic mirrors') | |
group.add_argument('--security', action='store_true', | |
help='find security mirrors') | |
group.add_argument('--backports', action='store_true', | |
help='find the backports mirrors') | |
group.add_argument('--cd', action='store_true', | |
help='find the CD image mirrors') | |
args = parser.parse_args () | |
mirrors = urllib2.urlopen ('http://www.debian.org/mirror/list') | |
host_list = [] | |
# find generic mirrors | |
if args.generic: | |
for line in mirrors.readlines (): | |
m = re.match ('.*<td valign="top"><a rel="nofollow" href="http(.*)">.*', line) | |
if m: | |
url = 'https' + m.group (1) | |
host_list += [url] | |
if not host_list: | |
print "No mirrors found; exiting" | |
sys.exit (0) | |
# find security mirrors | |
if not host_list and args.security: | |
mirrors = urllib2.urlopen('http://www.debian.org/mirror/list-full') | |
securitys = [] | |
for line in mirrors.readlines(): | |
m = re.match('.*</tt><br>Security updates over HTTP: <tt><a rel="nofollow" href="http(.*)">.*/debian-security/</a>.*', line) | |
if m: | |
url = 'https' + m.group (1) | |
host_list += [url] | |
if not host_list: | |
print "No mirrors found; exiting" | |
sys.exit (0) | |
# now find the backports mirrors | |
if not host_list and args.backports: | |
mirrors = urllib2.urlopen('http://backports-master.debian.org/Mirrors/') | |
for line in mirrors.readlines(): | |
m = re.match('.*<td><a href="http(.*)">.*/debian-backports/</a>.*', line) | |
if m: | |
url = 'https' + m.group (1) | |
host_list += [url] | |
if not host_list: | |
print "No mirrors found; exiting" | |
sys.exit (0) | |
# now find the CD image mirrors | |
if not host_list and args.cd: | |
mirrors = urllib2.urlopen('http://www.debian.org/CD/http-ftp/') | |
for line in mirrors.readlines(): | |
m = re.match('.*<a rel="nofollow" href="http(:.*)">HTTP</a></li>.*', line) | |
if m: | |
url = 'https' + m.group (1) | |
host_list += [url] | |
if not host_list: | |
print "No mirrors found; exiting" | |
sys.exit (0) | |
if not host_list: | |
parser.print_help () | |
sys.exit (0) | |
print >> sys.stderr, "starting, scanning %d hosts"%len (host_list) | |
mw = MirrorWorker (host_list) | |
consumer_array = [] | |
for i in range (0, args.thread): | |
consumer = MirrorConsumer (args.timeout_connect, args.no_err, mw) | |
consumer.start () | |
consumer_array += [consumer] | |
for consumer in consumer_array: | |
consumer.join () | |
print "All %d threads ended!! :-)"%args.thread |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
now it has threading 👍