Created
September 27, 2019 22:28
-
-
Save McFlat/017c8ed308f78cae7512059930e118b4 to your computer and use it in GitHub Desktop.
Google images downloader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from __future__ import print_function | |
IMPORTED = False | |
import os | |
import sys | |
import json | |
import argparse | |
import string | |
import concurrent.futures | |
import multiprocessing | |
from urllib import parse, request | |
from hashlib import md5 | |
import subprocess | |
try: | |
import requests | |
from bs4 import BeautifulSoup | |
IMPORTED = True | |
except ImportError as e: | |
print('\033[31m@ Error: %s \033[0m' % str(e)) | |
print('pip3 install requests bs4') | |
pip3output = subprocess.check_output( | |
'pip3 install requests bs4', | |
stderr=subprocess.STDOUT, | |
shell=True | |
) | |
print(pip3output) | |
try: | |
import requests | |
from bs4 import BeautifulSoup | |
IMPORTED = True | |
except ImportError as e: | |
print('\033[31m@ Error: %s \033[0m' % str(e)) | |
exit(1) | |
GOOGLE_IMAGES_URL = 'https://www.google.com/search?q={query}&source=lnms&tbm=isch' | |
USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36' | |
class DownloaderException(BaseException): | |
pass | |
class Filesystem(object): | |
@staticmethod | |
def makedirs(directory): | |
if not os.path.exists(directory): | |
os.makedirs(directory) | |
@staticmethod | |
def get_queries_from_pictures_directory(directory): | |
queries = set([]) | |
if os.path.exists(directory): | |
for f in os.scandir(directory): | |
if f.is_dir(): | |
queries.add(os.path.basename(f.path)) | |
return list(queries) | |
class Downloader(object): | |
@staticmethod | |
def download_webpage(query): | |
session = requests.Session() | |
session.headers['User-Agent'] = USER_AGENT | |
# Get Google Images webpage | |
response = session.get(GOOGLE_IMAGES_URL.format(query=query)) | |
return BeautifulSoup(response.text, 'html.parser') | |
@staticmethod | |
def extract_images(soup, directory, query): | |
images = [] # contains the link for Large original images, type of image | |
for a in soup.find_all("div", {"class": "rg_meta"}): | |
data = json.loads(a.text) | |
# original_url = data["ou"] # original_url | |
# original_width = data["ow"] # original width | |
# original_height = data["oh"] # original height | |
# thumb_url = data["tu"] # thumb url | |
# thumb_width = data["tw"] # thumb width | |
# thumb_height = data["th"] # thumb height | |
# image_type = data["ity"] # image type | |
# resource_url = data["ru"] # resource url | |
# page_title = data["pt"] # page title | |
p = os.path.splitext( | |
parse.unquote(os.path.basename(parse.urlparse(data['ou']).path.strip('/'))) | |
) | |
if not p[1] or p[1] not in ['.jpg', '.png', '.gif', '.jpeg', '.bmp']: | |
filename = ''.join(list([ | |
p[0] | |
.replace('+', '-') | |
.replace('--', '-') | |
.replace('--', '-') | |
.replace('__', '_') | |
.replace('__', '_'), | |
'.jpg' | |
])) | |
if len(filename) > 250: | |
filename = filename[:250] | |
path = os.path.join(directory, filename) | |
else: | |
filename = ''.join(list([ | |
p[0] | |
.replace('+', '-') | |
.replace('--', '-') | |
.replace('--', '-') | |
.replace('__', '_') | |
.replace('__', '_'), | |
p[1] | |
])) | |
if len(filename) > 210: | |
filename = filename[:210] | |
path = os.path.join(directory, filename) | |
path = ''.join(filter(lambda x: x in string.printable, path)) | |
images.append({ | |
'url': data['ou'], | |
'path': path, | |
'type': data['ity'], | |
'query': query | |
}) | |
return images | |
@staticmethod | |
def download_images(timeout=180): | |
def inner_di(arg): | |
try: | |
req = request.Request(arg['url']) | |
req.add_header('User-Agent', USER_AGENT) | |
result = request.urlopen(req, timeout=timeout) | |
if result.status == 200: | |
res = result.read() | |
if res: | |
p = os.path.splitext(arg['path']) | |
arg['path'] = '%s-%s%s' % (p[0], md5(res).hexdigest(), p[1]) | |
print(arg['path']) | |
f = open(arg['path'], 'wb') | |
f.write(res) | |
f.close() | |
except (Exception, KeyboardInterrupt) as e: | |
print('\033[31mError:\033[0m q:%s - p:%s - u:%s - e:%s' % (arg['query'], arg['path'], arg['url'], e)) | |
return None | |
try: | |
return inner_di | |
except (Exception, KeyboardInterrupt) as e: | |
pass | |
class Runner(object): | |
@staticmethod | |
def run_io_bound(items, callback): | |
try: | |
with concurrent.futures.ThreadPoolExecutor(max_workers=int(multiprocessing.cpu_count())) as ex: | |
future_to_item = {ex.submit(callback, i): i for i in items} | |
for future in concurrent.futures.as_completed(future_to_item): | |
item = future_to_item[future] | |
try: | |
res = future.result() | |
except Exception as exc: | |
# print('%r generated an exception: %s' % (item, exc)) | |
raise exc | |
finally: | |
pass # print(res) | |
except KeyboardInterrupt: | |
pass | |
@staticmethod | |
def process_pictures_query(timeout): | |
def inner_ppq(args): | |
query = string.capwords(args['query']) | |
pictures_directory = args['pictures_directory'] | |
parent_directory = args['parent_directory'] | |
download_images_timeout = args['timeout'] | |
if pictures_directory is not '': | |
directory = pictures_directory | |
else: | |
directory = os.path.join(parent_directory, query) | |
Filesystem.makedirs(directory) | |
html = Downloader.download_webpage(query.replace(' ', '+')) | |
images = Downloader.extract_images(html, directory, query) | |
Runner.run_io_bound(images, Downloader.download_images(download_images_timeout)) | |
return inner_ppq | |
def main(argv): | |
parser = argparse.ArgumentParser( | |
add_help=False, description=('Download Google Images') | |
) | |
parser.add_argument( | |
'--help', '-h', action='help', default=argparse.SUPPRESS, help='Show this help message and exit' | |
) | |
parser.add_argument( | |
'--query', '-q', help='Search queries', action='append' | |
) | |
parser.add_argument( | |
'--parent-directory', '-P', help='Parent directory; default: "./Pictures"', default='./Pictures' | |
) | |
parser.add_argument( | |
'--pictures-directory', '-p', help='Pictures directory, override "parent-directory + query" path', default='' | |
) | |
parser.add_argument( | |
'--timeout', '-t', help='Timeout', default=180, type=int | |
) | |
parser.add_argument( | |
'--refresh', '-r', help='Redownload images for all query directories in parent-directory' dest='refresh', action='store_true', default=False | |
) | |
operations = [] | |
def add_operation(args, q): | |
data = { | |
'parent_directory': args.parent_directory, | |
'pictures_directory': args.pictures_directory, | |
'timeout': args.timeout, | |
'query': q | |
} | |
operations.append(data) | |
try: | |
args = parser.parse_args(argv) | |
if not args.refresh: | |
print(args.query) | |
for q in args.query: | |
add_operation(args, q) | |
Runner.run_io_bound(operations, Runner.process_pictures_query(args.timeout)) | |
else: | |
queries = Filesystem.get_queries_from_pictures_directory(args.parent_directory) | |
print(queries) | |
for q in queries: | |
add_operation(args, q) | |
Runner.run_io_bound(operations, Runner.process_pictures_query(args.timeout)) | |
except Exception as e: | |
print('\033[31m@ Error: %s \033[0m' % str(e)) | |
sys.exit(1) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment