Skip to content

Instantly share code, notes, and snippets.

@mydreambei-ai
Last active August 12, 2016 07:05
Show Gist options
  • Save mydreambei-ai/bdba961c09a78076d60a584374c53600 to your computer and use it in GitHub Desktop.
Save mydreambei-ai/bdba961c09a78076d60a584374c53600 to your computer and use it in GitHub Desktop.
multiprocess download file by http range headers
import argparse
import collections
import http.client
import multiprocessing
import os
import re
import time
from multiprocessing import Queue, Lock, Process
from select import poll, POLLIN
from urllib.parse import urlparse
Setting = collections.namedtuple("Setting", "host path process size verbose output length")
class HeadError(Exception):
pass
class RangeError(HeadError):
pass
def _is_active(sock):
if sock is not None:
p = poll()
p.register(sock, POLLIN)
for fno, ev in p.poll(0.0):
if fno == sock.fileno():
return False
else:
return True
return False
def is_valid_url(url):
regex = re.compile(
r'^(?:http)s?://'
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|'
r'localhost|'
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
r'(?::\d+)?'
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
if not regex.match(url):
raise ValueError("url is invalid")
else:
return url
def download(host, path, output, lock, q):
connection = http.client.HTTPConnection(host)
connection.connect()
while 1:
time.sleep(1)
if not _is_active(connection.sock):
connection = http.client.HTTPConnection(host)
connection.connect()
item, output_seek = q.get()
try:
print(item, output_seek)
if item is None:
break
else:
connection.request("GET", path, headers={"range": item})
response = connection.getresponse()
if response.status == 200:
pass
if response.status == 206:
with lock:
if output_seek > 0:
output_seek += 1
f = os.open(output, os.O_WRONLY)
os.lseek(f, output_seek, os.SEEK_SET)
os.write(f, response.read())
os.close(f)
except (http.client.HTTPException, ConnectionResetError) as e:
print(e)
q.put((item, output_seek))
continue
except Exception as e:
q.put((item, output_seek))
print(e)
break
connection.close()
def unsupport_range_download(host, path, output):
connection = http.client.HTTPConnection(host)
with open(output, "wb") as f:
connection.request('GET', path)
response = connection.getresponse()
f.write(response.read())
def get_url_length(host, path):
connection = http.client.HTTPConnection(host)
connection.request('HEAD', path)
response = connection.getresponse()
if response.status == 302:
location = response.getheader("location")
if location is not None:
return get_url_length(*parse_url(location))
raise HeadError("302 not found location")
else:
if not response.getheader("accept-ranges"):
raise RangeError("not support range header")
length = response.getheader("content-length")
connection.close()
return int(length)
def create_hole_file(length, filename):
with open(filename, "wb") as f:
f.write(b'0' * length)
def parse_url(url):
_, host, path, *_ = urlparse(url)
return host, path or '/'
def calculate_process_size(length, size, process):
if size >= length:
process = 1
if length <= size * process:
process, _ = divmod(length, size)
process += 1
return size, process
def add_item_to_queue(length, size, p):
queue = Queue()
a, b = divmod(length, size)
for i in range(a + 1):
if i == 0:
queue.put(("bytes={}-{}".format(0, (i + 1) * size), 0))
elif i == a:
queue.put(("bytes={}-{}".format(i * size + 1, length), i * size))
else:
queue.put(("bytes={}-{}".format(i * size + 1, (i + 1) * size), i * size))
else:
for i in range(p):
queue.put((None, None))
return queue
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-u", '--url', nargs=1, type=is_valid_url, help="scrawle url", required=True)
parser.add_argument("-p", nargs='?', default=10, type=int, help="process default 10")
parser.add_argument("-o", nargs=1, type=str, help="output")
parser.add_argument("-s", nargs='?', default=1000000, type=int, help="range size default 1000000")
parser.add_argument("-v", action="store_true")
args = parser.parse_args()
host, path = parse_url(args.url[0])
*_, filename = path.split("/")
output = args.o or filename or host
try:
length = get_url_length(host, path)
size, process = calculate_process_size(length, args.s, args.p)
create_hole_file(length, output)
setting = Setting(host=host, process=process, size=size, verbose=args.v, output=output, path=path,
length=length)
queue = add_item_to_queue(setting.length, setting.size, setting.process)
lock = Lock()
processes = []
print(setting)
for i in range(setting.process):
p = Process(target=download, args=(setting.host, setting.path, setting.output, lock, queue))
processes.append(p)
p.start()
while 1:
try:
pid, return_code = os.wait()
print(pid, return_code)
except ChildProcessError:
break
# for p in processes:
# p.join()
except RangeError as e:
print(e)
unsupport_range_download(host, path, output)
if __name__ == '__main__':
main()
@mydreambei-ai
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment