Created
July 5, 2022 17:31
-
-
Save kafkaesqu3/b18d9c040a843c3e38c78ef446892d15 to your computer and use it in GitHub Desktop.
python HTTP miner scripts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bs4 import BeautifulSoup | |
import requests | |
import shutil | |
import os | |
def uniquify(path): | |
filename, extension = os.path.splitext(path) | |
counter = 1 | |
while os.path.exists(path): | |
path = filename + " (" + str(counter) + ")" + extension | |
counter += 1 | |
return path | |
baseurl = "https://gitlab.contoso.com" | |
session = requests.Session() | |
session.proxies = { | |
'http': 'http://127.0.0.1:8000'} | |
f = open("repos.txt", 'r') | |
contents = f.readlines() | |
cookies = {} | |
headers = {} | |
for s in contents: | |
s = s.strip() | |
url = baseurl + s + "/-/archive/master/" + s.split('/')[-1] + "-master.zip" | |
filename = url.split('/')[-1] | |
response = requests.get(url, headers=headers, cookies=cookies, stream=True, verify=False) | |
if response.status_code != 200: | |
print("Received status {0} for url {1} ({2})".format(response.status_code, url, s)) | |
continue | |
with open(uniquify(filename), 'wb') as f: | |
shutil.copyfileobj(response.raw, f) | |
print("Got {0} as {1}".format(url, filename)) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from email import header | |
from re import U | |
import grequests | |
import os | |
import sys | |
baseurl = "https://gitlab.contoso.com" | |
f = open("repos.txt", 'r') | |
contents = f.readlines() | |
cookies = {} | |
headers = {} | |
urls = [] | |
for s in contents: | |
s = s.strip() | |
url = baseurl + s + "/-/archive/master/" + s.split('/')[-1] + "-master.zip" | |
urls.append(url) | |
print("Read in {0} repos".format(len(urls))) | |
class AsyncDownload: | |
def __init__(self, urls): | |
self.urls = urls | |
def exception(self, request, exception): | |
print("ERROR exception: {}: {}".format(request.url, exception)) | |
def uniquify(self, path): | |
filename, extension = os.path.splitext(path) | |
counter = 1 | |
while os.path.exists(path): | |
path = filename + " (" + str(counter) + ")" + extension | |
counter += 1 | |
return path | |
def async_get(self): | |
results = grequests.map((grequests.get(u, cookies=cookies, headers=headers, verify=False, stream=True) for u in self.urls), exception_handler=self.exception, size=3) | |
if not results: | |
print("ERROR issuing request. Exiting") | |
sys.exit() | |
for result in results: | |
if result.status_code != 200: | |
print("ERROR: status {0} for url {1} ({2})".format(result.status_code, url, s)) | |
else: | |
filename = result.url.split('/')[-1] | |
url = result.url | |
with open(self.uniquify(filename), 'wb') as f: | |
f.write(result.content) | |
print("Got {0} as {1}".format(url, filename)) | |
def chunkify(lst, n): | |
return [lst[i::n] for i in range(n)] | |
chunk_size = 15 | |
chunks = chunkify(urls, chunk_size) | |
for chunk in chunks: | |
print("Next {0} chunks".format(chunk_size)) | |
test = AsyncDownload(chunk) | |
test.async_get() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment