|
from email.header import Header |
|
import random |
|
import threading |
|
import time |
|
import logging |
|
import re |
|
import csv |
|
import json |
|
import os |
|
from lxml import etree |
|
|
|
import threading |
|
import yaml |
|
import requests |
|
from selenium import webdriver |
|
from selenium.webdriver import ActionChains |
|
from bs4 import BeautifulSoup |
|
from fake_useragent import UserAgent |
|
|
|
mail_regex = r"([a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+){0,4}@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+){0,4})" |
|
phone_regex = r"(1(3\d|4[4-9]|5[0-35-9]|6[67]|7[013-8]|8[0-9]|9[0-9])\d{8})" |
|
|
|
mail_pattern = re.compile(mail_regex) |
|
phone_pattern = re.compile(phone_regex) |
|
|
|
# load config |
|
with open("config.yaml") as f: |
|
conf = yaml.load(f, Loader=yaml.FullLoader) |
|
ip_proxy_api = conf["ip_proxy_api"] |
|
thread_maxsize = conf["thread_maxsize"] |
|
max_retry = conf["max_retry"] |
|
use_selenium = conf["use_selenium"] |
|
|
|
mutex = threading.Lock() |
|
|
|
def get_logger(): |
|
''' |
|
logger instance |
|
''' |
|
formatter = logging.Formatter("%(asctime)s - %(message)s") |
|
logger = logging.getLogger("monitor") |
|
logger.setLevel(LOG_LEVEL) |
|
|
|
ch = logging.StreamHandler() |
|
ch.setFormatter(formatter) |
|
logger.addHandler(ch) |
|
return logger |
|
|
|
HEADERS = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko', |
|
'Accept-Language': 'zh-Hans-CN, zh-Hans;q=0.5' |
|
} |
|
|
|
START_URL = ( |
|
"http://search.51job.com/list/010000%252C020000%252C030200%252C040000" |
|
",000000,0000,00,9,99,{},2,{}.html?lang=c&stype=1&postchannel=00" |
|
"00&workyear=99&cotype=99°reefrom=99&jobterm=99&companysize=99&lon" |
|
"lat=0%2C0&radius=-1&ord_field=1&confirmdate=9&fromType=1&dibiaoid=0&" |
|
"address=&line=&specialarea=00&from=&welfare=" |
|
) |
|
|
|
LOG_LEVEL = logging.INFO |
|
|
|
logger = get_logger() |
|
|
|
class JobSpider: |
|
def __init__(self, proxy_api="", proxies=[]): |
|
self.keyword = "python" |
|
keyword = input("input keyword please: ") |
|
keyword = keyword.strip() |
|
if keyword != "": |
|
self.keyword = keyword |
|
self.csv_file = "%s.csv" % self.keyword |
|
self.init_csv_writer() |
|
self.proxy_api = proxy_api |
|
self.proxies = proxies |
|
self.headers = [HEADERS] |
|
|
|
def get_random_headers(self): |
|
try: |
|
headers = {'User-Agent':str(UserAgent().random)} |
|
except Exception: |
|
pass |
|
else: |
|
mutex.acquire() |
|
self.headers.append(headers) |
|
if len(self.headers) > 1000: |
|
self.headers = self.headers[500:] |
|
mutex.release() |
|
return random.choice(self.headers) |
|
|
|
def init_csv_writer(self): |
|
is_csv_null = True |
|
if os.path.exists(self.csv_file): |
|
with open(self.csv_file, "r", encoding="utf-8") as rf: |
|
is_csv_null = (rf.read() == "") |
|
self.csv_writer = csv.writer(open( |
|
self.csv_file, "w+", encoding="utf-8")) |
|
if is_csv_null: |
|
self.csv_writer.writerow([ |
|
"job_name","providesalary_text","company_name", |
|
"workarea_text","companytype_text","jobwelf", |
|
"attribute_text","companysize_text","companyind_text", |
|
"job_detail","mails","phones", |
|
]) |
|
|
|
def init_webdriver(self, proxy=None): |
|
options = webdriver.ChromeOptions() |
|
options.add_argument('--user-agent=%s' % self.get_random_headers()["User-Agent"]) |
|
# options.add_argument('--headless') |
|
# options.add_argument('--disable-gpu') |
|
if proxy is not None: |
|
options.add_argument('proxy-server=' + proxy["https"]) |
|
driver = webdriver.Chrome(options=options) |
|
# bypass aliyun nocaptcha detection |
|
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { |
|
"source": """ |
|
Object.defineProperty(navigator, 'webdriver', { |
|
get: () => undefined |
|
}) |
|
""" |
|
}) |
|
return driver |
|
|
|
def save_to_csv(self, job, job_detail, mails, phones): |
|
name = job.get("job_name", "") |
|
money = job.get("providesalary_text", "") |
|
company_name = job.get("company_name", "") |
|
workarea_text = job.get("workarea_text", "") |
|
companytype_text = job.get("companytype_text", "") |
|
jobwelf = job.get("jobwelf", "") |
|
attribute_text = job.get("attribute_text", "") |
|
attribute_text = "".join(attribute_text) |
|
companysize_text = job.get("companysize_text", "") |
|
companyind_text = job.get("companyind_text", "") |
|
mutex.acquire() |
|
self.csv_writer.writerow([name,money,company_name,workarea_text,companytype_text, |
|
jobwelf,attribute_text,companysize_text,companyind_text,job_detail,','.join(mails),','.join(phones)]) |
|
mutex.release() |
|
|
|
def parse_jobs_from_html(self, html): |
|
html = etree.HTML(html) |
|
jobs_str = str(html.xpath('//script[@type="text/javascript"]/text()')[0]) |
|
jobs_str = jobs_str.split('=', 1)[1] |
|
obj = json.loads(jobs_str) |
|
jobs = obj['engine_jds'] |
|
return jobs |
|
|
|
def parse_job_detail_from_html(self, html): |
|
job_detail = BeautifulSoup(html, "lxml").find( |
|
"div", class_="bmsg job_msg inbox" |
|
).text |
|
return job_detail |
|
|
|
def spide_with_requests(self, url, is_proxy=False, retry=0): |
|
if retry > max_retry: return "" |
|
logger.info("spide_with_requests retry=%d" % retry) |
|
proxy = None |
|
if is_proxy: |
|
proxy = self.choose_proxy() |
|
if proxy is None: |
|
logger.error("cannot get proxy") |
|
req = requests.get(url, headers=self.get_random_headers()) |
|
else: |
|
# logger.info("use proxy: %s, url=%s" % (proxy["https"], url)) |
|
logger.info("use proxy: %s" % proxy["https"]) |
|
req = requests.get(url, headers=self.get_random_headers(), proxies=proxy) |
|
else: |
|
req = requests.get(url, headers=self.get_random_headers()) |
|
req.encoding = "gbk" |
|
html = req.text |
|
if "TraceID" in html and use_selenium: |
|
html = self.spide_with_selenium(url, proxy) |
|
while "TraceID" in html or "arg1=" in html: # still be blocked, use proxy |
|
html = self.spide_with_requests(url, True, retry+1) |
|
return html |
|
|
|
def choose_proxy(self): |
|
req = requests.get(self.proxy_api) |
|
if req.status_code == 200 and "err" not in req.text and ":" in req.text: |
|
res = req.text.replace("\r", "") |
|
proxies = res.split("\n") |
|
for pxy in proxies: |
|
if pxy == "": continue |
|
proxy = { |
|
"http": "http://%s" % pxy, |
|
"https": "http://%s" % pxy, |
|
} |
|
mutex.acquire() |
|
if len(self.proxies) > 1000: # delete useless |
|
self.proxies = self.proxies[500:] |
|
self.proxies.append(proxy) |
|
mutex.release() |
|
if len(self.proxies) == 0: |
|
return None |
|
return random.choice(self.proxies) |
|
|
|
def spide_with_selenium(self, url, proxy=None): |
|
driver = self.init_webdriver(proxy) |
|
driver.get(url) |
|
try: |
|
slider = driver.find_element("xpath", '//*[@id="nc_1_n1z"]') |
|
if slider: |
|
self.do_slide(driver, slider, 340) |
|
time.sleep(0.5) |
|
except Exception: |
|
# no slider means normal |
|
pass |
|
finally: |
|
html = driver.page_source |
|
driver.close() |
|
return html |
|
|
|
def do_slide(self, driver, slider, distance): |
|
''' |
|
slide automatically |
|
''' |
|
v, t, trace = 0, 0.2, [] |
|
current, mid = 0, distance * 4 / 5 |
|
|
|
while current < distance: |
|
a = 100 if current < mid else -5 |
|
v0 = v |
|
v = v0 + a * t |
|
move = v0 * t + 1/2 * a * (t**2) |
|
current += move |
|
trace.append(round(move)) |
|
ActionChains(driver).click_and_hold(slider).perform() |
|
for x in trace: |
|
ActionChains(driver).move_by_offset(xoffset=x, yoffset=0).perform() |
|
time.sleep(0.5) |
|
ActionChains(driver).release().perform() |
|
|
|
def job_spider(self, thread_id=0): |
|
''' |
|
spider entrence |
|
''' |
|
cur_page, self.total_page = 1, 1024 # temp number |
|
while cur_page <= self.total_page: |
|
if cur_page % thread_maxsize == thread_id: |
|
self.job_spider_for_one_page(cur_page) |
|
cur_page += 1 |
|
|
|
def job_spider_for_one_page(self, page, retry=0): |
|
if retry > max_retry: |
|
return |
|
url = START_URL.format(self.keyword, page) |
|
# logger.info("spide page {}, url {}".format(page, url)) |
|
logger.info("spide page {}, retry {}".format(page, retry)) |
|
html = self.spide_with_requests(url, False) |
|
if "total_page" not in html: |
|
self.job_spider_for_one_page(page, retry+1) |
|
else: |
|
try: |
|
self.total_page = int(re.findall('"total_page":"(.*?)",', html)[0]) |
|
jobs = self.parse_jobs_from_html(html) |
|
except Exception as err: |
|
logger.error("[exception]: just ignore it and retry: cannot find total_page in html `or` cannot parse jobs from html, err {}".format(err)) |
|
self.job_spider_for_one_page(page, retry+1) |
|
else: |
|
for job in jobs: |
|
self.parse_job_and_save(job) |
|
|
|
def get_job_detail(self, job_url, retry=0): |
|
job_detail = "unknown" |
|
if retry > max_retry: return job_detail |
|
try: |
|
html = self.spide_with_requests(job_url) |
|
job_detail = self.parse_job_detail_from_html(html) |
|
except Exception as err: |
|
logger.error("[exception]: just ignore it, job_url=%s, err=%s, retry=%d" % (job_url, err, retry)) |
|
# with open("error_htmls/" + "_".join(job_url[8:].split("?")[0].split("/")), "w") as wf: |
|
# wf.write(html) # debug |
|
return self.get_job_detail(job_url, retry+1) |
|
return job_detail.replace("\n", " ") |
|
|
|
def parse_job_and_save(self, job): |
|
job_url = job.get("job_href", "") |
|
job_detail = self.get_job_detail(job_url) |
|
mails = mail_pattern.findall(job_detail) |
|
phones = phone_pattern.findall(job_detail) |
|
self.save_to_csv(job, job_detail, [mail[0] for mail in mails], [phone[0] for phone in phones]) |
|
|
|
def run(self): |
|
''' |
|
run spider |
|
''' |
|
threads = [] |
|
for i in range(thread_maxsize): |
|
thread = threading.Thread(target=self.job_spider, args=(i,)) |
|
thread.start() |
|
threads.append(thread) |
|
for thread in threads: |
|
thread.join() |
|
|
|
def test(): |
|
spider = JobSpider(proxy_api=ip_proxy_api) |
|
cnt = 0 |
|
while True: |
|
cnt += 1 |
|
print(cnt) |
|
try: |
|
# req = requests.get("https://jobs.51job.com/shanghai/123006503.html?s=sou_sou_soulb&t=0_0", |
|
# headers=HEADERS, proxies=spider.choose_proxy()) |
|
html = spider.spide_with_selenium("https://jobs.51job.com/shanghai/123006503.html?s=sou_sou_soulb&t=0_0", |
|
proxy=spider.choose_proxy()) |
|
except Exception: |
|
continue |
|
else: |
|
if "arg1=" in html: |
|
continue |
|
with open("%d.html" % cnt, "w") as wf: |
|
wf.write(html) |
|
# print(html) |
|
break |
|
|
|
if __name__ == "__main__": |
|
spider = JobSpider(proxy_api=ip_proxy_api) |
|
|
|
start = time.time() |
|
spider.run() |
|
logger.info("total cost {} seconds".format(time.time() - start)) |
|
|
|
# test() |