Created
January 1, 2019 13:13
-
-
Save Jamesits/b7400e3c838d63e87dfa8d08570ee5f0 to your computer and use it in GitHub Desktop.
豆瓣名人图片爬虫
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import logging | |
| import typing | |
| import os | |
| import shutil | |
| import time | |
| import requests | |
| from bs4 import BeautifulSoup | |
| # logging | |
| logging.basicConfig(format='%(asctime)-15s|%(name)s|%(levelname)-6s: %(message)s') | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.DEBUG) | |
| user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" | |
| list_url = "https://movie.douban.com/celebrity/1274989/photos/" | |
| download_folder = "data" | |
| def retry(func: callable, retry_times: int, *args, **kwargs) -> typing.Any: | |
| count = retry_times | |
| ex = Exception() | |
| while count > 0: | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| ex = e | |
| count -= 1 | |
| raise ex | |
| def download_file(url: str, dst: str) -> str: | |
| filename = url.split("/")[-1] | |
| temp_filename = filename + ".part" | |
| if os.path.exists(os.path.join(dst, filename)): | |
| logger.warning("File %s exists, skipping", filename) | |
| return filename | |
| r = requests.get(url, headers={ | |
| 'User-Agent': user_agent, | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', | |
| 'Accept-Encoding': 'gzip, deflate, br', | |
| 'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7,zh-TW;q=0.6', | |
| 'DNT': '1', | |
| 'Upgrade-Insecure-Requests': "1", | |
| "Referer": list_url, # this is the key to prevent 403 errors | |
| }, stream=True) | |
| if r.status_code == 200: | |
| with open(os.path.join(dst, temp_filename), 'wb') as f: | |
| for chunk in r: | |
| f.write(chunk) | |
| shutil.move(os.path.join(dst, temp_filename), os.path.join(dst, filename)) | |
| else: | |
| logger.error("HTTP error %d: %s", r.status_code, r.text) | |
| return filename | |
| def get_html(url: str, params: typing.Dict) -> str: | |
| r = retry(requests.get, 3, url, headers={ | |
| 'User-Agent': user_agent, | |
| }, params=params) | |
| return r.text | |
| def convert_url(origin_url: str) -> str: | |
| return origin_url.replace("/m/", "/raw/").replace(".webp", "jpg") | |
| def main() -> None: | |
| # check environment | |
| os.makedirs(download_folder, exist_ok=True) | |
| # crawl image list | |
| image_urls = [] | |
| current_page = 0 | |
| current_index = 0 | |
| last_count = 0 | |
| continue_crawl = True | |
| while continue_crawl: | |
| new_list = get_html(list_url, { | |
| "type": "C", | |
| "start": current_index, | |
| "sortby": "time", | |
| "size": "a", | |
| "subtype": "a", | |
| }) | |
| soup = BeautifulSoup(new_list, 'html.parser') | |
| current_count = 0 | |
| for div in soup.find_all("div", class_="cover"): | |
| image_url = convert_url(div.a.img.get("src")) | |
| image_urls.append(image_url) | |
| current_count += 1 | |
| current_page += 1 | |
| current_index += current_count | |
| logger.info("Page #%d, got %d images", current_page, current_count) | |
| if current_count == 0: | |
| break | |
| logger.info("Total %d images", len(image_urls)) | |
| # download image | |
| for url in image_urls: | |
| logger.info("Downloading file %s", url) | |
| retry(download_file, 3, url, download_folder) | |
| time.sleep(5) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment