Created
March 27, 2020 01:13
-
-
Save chumaumenze/065a6cf6d5b926718d4d5567a793437b to your computer and use it in GitHub Desktop.
Another midnight scraper tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
from asgiref.sync import sync_to_async | |
from selenium import webdriver | |
from selenium.webdriver.support.ui import WebDriverWait | |
class Crawler(object): | |
def __init__(self, url: str, path: dict, cookies: [dict]=None, | |
headers:dict=None, test_mode: bool=False, timeout=30, | |
username=None, password=None) -> None: | |
self.url = url | |
self.cookies = cookies | |
self.headers = headers or {} | |
self.test_mode = test_mode | |
self.path = path or PATH | |
self.time_out = timeout | |
self.username = username | |
self.password = password | |
self.all_video_data = [] | |
self.processed_video_data = [] | |
self.failed_video_data = [] | |
self.browser = None | |
async def launch(self): | |
self.browser = webdriver.Chrome(chrome_options=webdriver.ChromeOptions()) | |
self.browser.maximize_window() | |
self.browser.implicitly_wait(self.time_out) | |
self.browser.get(self.url) | |
if self.cookies: | |
for cookie in self.cookies: | |
self.browser.add_cookie(cookie) | |
self.browser.refresh() | |
elif self.username and self.password: | |
await self.login() | |
try: | |
await self.crawl() | |
finally: | |
self.browser.quit() | |
self.browser = None | |
async def login(self): | |
for sub_path in self.path['ROOT_PATH_USER_LOGIN']: | |
targets, action = await self._get_markers(sub_path) | |
await self._run_action(action, targets[0]) | |
async def get_video_link(self, sub_path): | |
video_link = '' | |
for p in sub_path: | |
try: | |
elements, action = await self._get_markers(p) | |
except Exception as e: | |
print(f'ERROR OCCURRED: Failed to execute self._get_markers({p}) - {e}') | |
else: | |
for el in elements: | |
value = await self._run_action(action, el) | |
if action == 'src': | |
video_link = value | |
return video_link | |
@sync_to_async | |
def _run_action(self, action, element): | |
value = None | |
if action == 'click': | |
self.browser.execute_script("arguments[0].scrollIntoView();", element) | |
value = WebDriverWait(self.browser, 30).until( | |
lambda x=None: element | |
if element and element.is_enabled() | |
else False | |
).click() | |
elif action == 'text': | |
value = element.get_attribute('innerText') | |
elif action == 'src': | |
value = element.get_attribute('src') | |
elif action in ('username', 'password'): | |
value = WebDriverWait(self.browser, 15).until( | |
lambda x=None: element | |
if element and element.is_enabled() and element.is_displayed() | |
else False | |
).send_keys(getattr(self, action)) | |
return value | |
@sync_to_async | |
def _get_markers(self, sub_path): | |
marker_type, marker, vrange, action = sub_path | |
if action == 'src': | |
video_iframe = self.browser.find_elements_by_tag_name("iframe")[0] | |
self.browser.switch_to.frame(video_iframe) | |
else: | |
self.browser.switch_to.parent_frame() | |
if marker_type == 'selector': | |
targets = WebDriverWait(self.browser, 10).until( | |
lambda x=None : self.browser.find_elements_by_css_selector(marker) | |
) | |
elif marker_type == 'xpath': | |
targets = WebDriverWait(self.browser, 10).until( | |
lambda x=None : self.browser.find_elements_by_xpath(marker), | |
message=f'XPathError: {marker} not found.' | |
) | |
else: | |
raise ValueError(f'Invalid marker type: {marker_type}') | |
if -1 <= vrange >= 1: | |
targets = targets[:vrange] | |
print(targets[:vrange]) | |
return targets, action | |
async def crawl(self): | |
video_pages, action = await self._get_markers(self.path['ROOT_PATH'][0]) | |
print(self.path['ROOT_PATH'][0]) | |
print(len(video_pages)) | |
for i, page_element in enumerate(video_pages): | |
self.browser.switch_to.parent_frame() | |
self.browser.execute_script("arguments[0].scrollIntoView();", page_element) | |
page_element.click() | |
# Get video title | |
video_title_element, action = await self._get_markers(self.path['ROOT_PATH'][1]) | |
if len(video_title_element) >= 1: | |
video_title = await self._run_action(action, video_title_element[0]) | |
else: | |
raise ValueError('No markers found') | |
video_question_link = await self.get_video_link(self.path['ROOT_PATH_QUESTIONS']) | |
print(f'Found question src: {video_question_link}') | |
video_answer_link = await self.get_video_link(self.path['ROOT_PATH_ANSWERS']) | |
print(f'Found answer src: {video_answer_link}') | |
self.all_video_data.append({ | |
'q_name': f'{i}_{video_title}_question', | |
'q_link': video_question_link, | |
'a_name': f'{i}_{video_title}_answer', | |
'a_link': video_answer_link | |
}) | |
self.browser.execute_script("arguments[0].style.display='none';", page_element) | |
async def download_videos(self, dry_run=False): | |
download_dir = os.path.join( | |
os.path.expanduser('~'), 'Downloads', 'Linkedin-Interview-Prep' | |
) | |
if not os.path.exists(download_dir): | |
os.mkdir(download_dir) | |
print(self.all_video_data) | |
while self.all_video_data or self.browser: | |
print('Downloading...') | |
if self.all_video_data: | |
process_return_code = 0 | |
video_data = self.all_video_data.pop(0) | |
if video_data.get('q_name') and video_data.get('q_link'): | |
print(f"Downloading video...: {video_data['q_name']}") | |
name = video_data['q_name'].replace('.','').strip() | |
q_file_name = os.path.join(download_dir, f"{name}.mp4") | |
q_command = f"wget {video_data['q_link']} -O '{q_file_name}'" | |
if not dry_run: | |
process = await asyncio.subprocess.create_subprocess_shell(q_command) | |
process_return_code = process.returncode | |
else: | |
print(f"Issuing command: {q_command}") | |
if process_return_code == 0: | |
self.processed_video_data.append({ | |
'q_name': video_data.pop('q_name'), | |
'q_link': video_data.pop('q_link') | |
}) | |
if video_data.get('a_name') and video_data.get('a_link'): | |
print(f"Downloading video...: {video_data['a_name']}") | |
name = video_data['a_name'].replace('.','').strip() | |
a_file_name = os.path.join(download_dir, f"{name}.mp4") | |
a_command = f"wget {video_data['a_link']} -O '{a_file_name}'" | |
if not dry_run: | |
process = await asyncio.subprocess.create_subprocess_shell(a_command) | |
process_return_code = process.returncode | |
else: | |
print(f"Issuing command: {a_command}") | |
if process_return_code == 0: | |
self.processed_video_data.append({ | |
'a_name': video_data.pop('a_name'), | |
'a_link': video_data.pop('a_link') | |
}) | |
self.failed_video_data.append(video_data) | |
await asyncio.sleep(2) | |
if __name__ == '__main__': | |
PATH = dict( | |
ROOT_PATH=[ | |
('selector', '#ember60 > section > div > div > div > ol > li', -1, | |
'click'), # Video List | |
('selector', ( | |
'#ember156 > div.interview-prep-question-details__container.container' | |
'-with-shadow.p0.mb4 > header > h1'), 1, 'text') # Video title | |
], | |
ROOT_PATH_QUESTIONS=[ | |
# Video Question | |
('xpath', ( # Watch Question video | |
'/html/body/div[6]/div[4]/div[3]/div[1]/div/article/div[1]/div[2]/article/button'), | |
1, 'click'), | |
# Play Question video | |
('xpath', '/html/body/div[4]/div/div/div[2]/div/div[1]/button', 1, | |
'click'), | |
# Get Question video | |
('xpath', '/html/body/div/div/div[5]/div[1]/video', 1, 'src'), | |
# close Question video | |
# ('xpath', '/html/body/div[4]/div/div/button', 1, 'click') | |
('selector', 'button[data-test-modal-close-btn=""]', 1, 'click') | |
], | |
ROOT_PATH_ANSWERS=[ | |
# Video Answer | |
('xpath', ( # Watch Answer video | |
'/html/body/div[6]/div[4]/div[3]/div[1]/div/article/div[1]/div[3]/article/button'), | |
1, 'click'), | |
# Play Answer video | |
('xpath', '/html/body/div[4]/div/div/div[2]/div/div[1]/button', 1, | |
'click'), | |
# Get Answer video | |
('xpath', '/html/body/div/div/div[5]/div[1]/video', 1, 'src'), | |
# close Answer video | |
# ('xpath', '/html/body/div[4]/div/div/button', 1, 'click') | |
('selector', 'button[data-test-modal-close-btn=""]', 1, 'click') | |
], | |
ROOT_PATH_USER_LOGIN=[ | |
# Click Sign in | |
('xpath', '/html/body/div/main/p/a', 1, 'click'), | |
# Enter username | |
('selector', '#username', 1, 'username'), | |
# Enter password | |
('selector', '#password', 1, 'password'), | |
# Submit form | |
('selector', '#app__container > main > div > form > div.login__form_action_container > button', 1, 'click') | |
# ('xpath', '/html/body/div/main/div/form/div[3]/button', 1, 'click') | |
], | |
) | |
COOKIES = None | |
APP_MODE = True | |
site_url = ('https://www.linkedin.com/interview-prep/assessments' | |
'/urn:li:fs_assessment:(1,a)/question' | |
'/urn:li:fs_assessmentQuestion:(10011,aq11)/') | |
username = os.getenv('LINKEDIN_USER') | |
password = os.getenv('LINKEDIN_PASS') | |
crawler = Crawler(site_url, PATH, COOKIES, username=username, | |
password=password, test_mode=APP_MODE, timeout=5) | |
loop = asyncio.get_event_loop() | |
tasks = [ | |
crawler.launch(), | |
crawler.download_videos(dry_run=True), | |
] | |
loop.run_until_complete(asyncio.gather(*tasks)) | |
print(f"Unprocessed Videos:{len(crawler.all_video_data)} {crawler.all_video_data}\n" | |
f"Processed Videos:{len(crawler.processed_video_data)} {crawler.processed_video_data}\n" | |
f"Failed Videos:{len(crawler.failed_video_data)} {crawler.failed_video_data}\n") | |
loop.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
asgiref | |
selenium |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment