Skip to content

Instantly share code, notes, and snippets.

@Jamesits
Created January 1, 2019 13:13
Show Gist options
  • Select an option

  • Save Jamesits/b7400e3c838d63e87dfa8d08570ee5f0 to your computer and use it in GitHub Desktop.

Select an option

Save Jamesits/b7400e3c838d63e87dfa8d08570ee5f0 to your computer and use it in GitHub Desktop.
豆瓣名人图片爬虫
#!/usr/bin/env python3
import logging
import typing
import os
import shutil
import time
import requests
from bs4 import BeautifulSoup
# logging
logging.basicConfig(format='%(asctime)-15s|%(name)s|%(levelname)-6s: %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"
list_url = "https://movie.douban.com/celebrity/1274989/photos/"
download_folder = "data"
def retry(func: callable, retry_times: int, *args, **kwargs) -> typing.Any:
count = retry_times
ex = Exception()
while count > 0:
try:
return func(*args, **kwargs)
except Exception as e:
ex = e
count -= 1
raise ex
def download_file(url: str, dst: str) -> str:
filename = url.split("/")[-1]
temp_filename = filename + ".part"
if os.path.exists(os.path.join(dst, filename)):
logger.warning("File %s exists, skipping", filename)
return filename
r = requests.get(url, headers={
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7,zh-TW;q=0.6',
'DNT': '1',
'Upgrade-Insecure-Requests': "1",
"Referer": list_url, # this is the key to prevent 403 errors
}, stream=True)
if r.status_code == 200:
with open(os.path.join(dst, temp_filename), 'wb') as f:
for chunk in r:
f.write(chunk)
shutil.move(os.path.join(dst, temp_filename), os.path.join(dst, filename))
else:
logger.error("HTTP error %d: %s", r.status_code, r.text)
return filename
def get_html(url: str, params: typing.Dict) -> str:
r = retry(requests.get, 3, url, headers={
'User-Agent': user_agent,
}, params=params)
return r.text
def convert_url(origin_url: str) -> str:
return origin_url.replace("/m/", "/raw/").replace(".webp", "jpg")
def main() -> None:
# check environment
os.makedirs(download_folder, exist_ok=True)
# crawl image list
image_urls = []
current_page = 0
current_index = 0
last_count = 0
continue_crawl = True
while continue_crawl:
new_list = get_html(list_url, {
"type": "C",
"start": current_index,
"sortby": "time",
"size": "a",
"subtype": "a",
})
soup = BeautifulSoup(new_list, 'html.parser')
current_count = 0
for div in soup.find_all("div", class_="cover"):
image_url = convert_url(div.a.img.get("src"))
image_urls.append(image_url)
current_count += 1
current_page += 1
current_index += current_count
logger.info("Page #%d, got %d images", current_page, current_count)
if current_count == 0:
break
logger.info("Total %d images", len(image_urls))
# download image
for url in image_urls:
logger.info("Downloading file %s", url)
retry(download_file, 3, url, download_folder)
time.sleep(5)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment