Last active
June 8, 2019 17:47
-
-
Save ripiuk/326b7dcf25b162f2004c293842c92600 to your computer and use it in GitHub Desktop.
search images in google and parse the html page
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import uuid | |
import asyncio | |
import typing as typ | |
import urllib.parse as urlparse | |
from lxml import html | |
from aiohttp import ClientSession | |
SEARCH = "Dogs" | |
DOWNLOAD_DIR = "imgs/{}".format(SEARCH) | |
PAGES = 5 | |
def custom_search() -> typ.List[str]: | |
# https://developers.google.com/custom-search/ | |
pass | |
async def download_imgs(imgs: typ.List[str], session) -> None: | |
tasks = [] | |
if not os.path.exists(DOWNLOAD_DIR): | |
os.makedirs(DOWNLOAD_DIR) | |
async def _download_image(img_url: str): | |
async with session.get(img_url) as response: | |
img_data = await response.read() | |
with open(f"{DOWNLOAD_DIR}/{str(uuid.uuid4())}.jpg", 'wb') as file: | |
file.write(img_data) | |
for img in imgs: | |
task = asyncio.ensure_future(_download_image(img)) | |
tasks.append(task) | |
await asyncio.gather(*tasks) | |
async def parse_html(session, what_to_search: str, pages: int = 1) -> typ.List[str]: | |
base_url = "https://www.google.com.ua/search?" | |
imgs = list() # type: typ.List[html.HtmlElement] | |
start_from = 0 # 0 - first page, 20 - second page, ... | |
for _ in range(pages): | |
params = { | |
"q": what_to_search, | |
"authuser": "0", | |
"hl": "uk", | |
"biw": "963", | |
"bih": "983", | |
"ie": "UTF-8", | |
"tbm": "isch", | |
"ei": "SMnyXNHKFruBk74Pn_2v4Aw", | |
"start": str(start_from), | |
"sa": "N" | |
} | |
query = urlparse.urlencode(params) | |
url = base_url + query | |
headers = { | |
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng," | |
"*/*;q=0.8,application/signed-exchange;v=b3", | |
"accept-language": "uk-UA,uk;q=0.9,ru;q=0.8,en-US;q=0.7,en;q=0.6", | |
"referer": "https://www.google.com.ua/", | |
"upgrade-insecure-requests": "1", | |
"user-agent": "python-requests/2.22.0" | |
} | |
async with session.get(url, headers=headers) as response: | |
resp = await response.text() | |
page = html.fromstring(resp) # type: html.HtmlElement | |
try: | |
table = page.cssselect("table.images_table")[0] # type: html.HtmlElement | |
except IndexError: | |
return [] | |
imgs += table.cssselect("img") | |
start_from += 20 | |
print("No images found") if not imgs else None | |
return [img.attrib.get("src") for img in imgs] | |
def content_type_jpeg() -> typ.List[str]: | |
pass | |
async def main(): | |
sm = asyncio.Semaphore(100) | |
async with ClientSession() as session: | |
async with sm: | |
images = await parse_html(session, SEARCH, pages=PAGES) | |
await download_imgs(images, session) | |
if __name__ == "__main__": | |
start = time.time() | |
loop = asyncio.get_event_loop() | |
future = asyncio.ensure_future(main()) | |
loop.run_until_complete(future) | |
print("Time:", time.time() - start) # 0.8931670188903809 - 1 page, 3.6354501247406006 - 5 pages |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment