Created
July 4, 2023 09:53
-
-
Save josifoski/3150f3b5d5ba4762517c165eae13352d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3.6 | |
# Script for scraping linkedin companies data with input csv with companies urls | |
# Aleksandar Josifoski for Jordan Hollander | |
# 2017 May 22; | |
from pyvirtualdisplay import Display | |
from bs4 import BeautifulSoup | |
from selenium import webdriver | |
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities | |
from selenium.webdriver.common.keys import Keys | |
from selenium.common.exceptions import TimeoutException | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support.ui import Select | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
import datetime | |
import random | |
import time | |
import html | |
import os | |
import re | |
import sys | |
import codecs | |
import csv | |
#reading parameters file | |
print("loading linkedin_parameters2.py file") | |
with codecs.open("linkedin_parameters2.py", "r", "utf-8") as fp: | |
sparam = '' | |
for line in fp: | |
if len(line.strip()) > 0: | |
if not line.strip()[0] == '#': | |
sparam += line | |
try: | |
dparameters = dict(eval(sparam)) | |
except Exception as e: | |
print(str(e)) | |
now = str(datetime.datetime.now())[:16] | |
log.write(now + ' ' + str(e) + os.linesep) | |
sys.exit() | |
dir_in = dparameters["dir_in"].strip() | |
timeout = dparameters["timeout"] | |
geckodriverexcecutablePath = dparameters["geckodriverexcecutablePath"].strip() | |
usegecko = dparameters["usegecko"] | |
ffProfilePath = dparameters["ffProfilePath"] | |
ffWidth = dparameters["ffWidth"] | |
ffHeight = dparameters["ffHeight"] | |
scrollbypx = dparameters["scrollbypx"] | |
headlessMode = dparameters["headlessMode"] | |
input_csv = dparameters["input_csv"] | |
# read input from csv file. You'll have to set correct info here, ie place/create input_companies_urls.csv in dir_in | |
# Note that first line in input csv will be skipped | |
with open(dir_in + input_csv, 'r') as fcsvinput: | |
lurls = fcsvinput.readlines() | |
lurls = lurls[1:] | |
# check if linkedin_companies_output.csv exists | |
if not os.path.exists(dir_in + 'linkedin_companies_output.csv'): | |
write_first_row = True | |
else: | |
write_first_row = False | |
# define output csv | |
csvdelimiter = '#' | |
csvFile = codecs.open(dir_in + 'linkedin_companies_output.csv', 'a', 'utf-8') | |
csvl = csv.writer(csvFile, delimiter = csvdelimiter) | |
if write_first_row: | |
csvl.writerow(["logo image url", "company name", "num of employees", "description", "specialities", "location", "website url", "year founded"]) | |
if headlessMode: | |
display = Display(visible=0, size=(ffWidth, ffHeight)) | |
display.start() | |
log = codecs.open(dir_in + "linkedin_errorslog.txt", "a", "utf-8") | |
time1 = time.time() | |
counter = 0 | |
def open_tag_by_css(css_selector): | |
'''function to click item based on css selector''' | |
driver.find_element_by_css_selector(css_selector).click() | |
def open_tag_by_xpath(xpath): | |
'''function to click item based on xpath''' | |
driver.find_element_by_xpath(xpath).click() | |
def enter_in_tag_by_css(css_selector, text): | |
'''function to enter text based on css selector''' | |
driver.find_element_by_css_selector(css_selector).send_keys(text) | |
def enter_in_tag_by_xpath(xpath, text): | |
'''function to enter text based on xpath''' | |
driver.find_element_by_xpath(xpath).send_keys(text) | |
def save_response_to_file(text): | |
'''temporary function to analyse html response''' | |
with codecs.open(dir_in + "rawresponse.txt", "w", "utf-8") as fresp: | |
fresp.write(html.unescape(text)) | |
def waitForLoadbyCSS(CSS_SELECTOR): | |
'''function to wait until web element is available via css check''' | |
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, CSS_SELECTOR))) | |
def waitForLoadbyXpath(xpath): | |
'''function to wait until web element is available via xpath check''' | |
try: | |
wait.until(EC.presence_of_element_located((By.XPATH, xpath))) | |
return True | |
except: | |
return False | |
def openurl(url): | |
'''function to open url using selenium''' | |
global counter | |
try: | |
counter += 1 | |
driver.get(url) | |
print('%05d' % counter + '-' * 100) | |
print("loading " + url) | |
except Exception as e: | |
now = str(datetime.datetime.now())[:16] | |
log.write(now + ' ' + str(e) + os.linesep) | |
print(str(e)) | |
def setbrowser(): | |
''' function for preparing browser for automation ''' | |
print("Preparing browser") | |
global driver | |
global wait | |
profile = webdriver.FirefoxProfile(profile_directory = ffProfilePath) | |
capabilities = DesiredCapabilities.FIREFOX | |
if usegecko: | |
capabilities["marionette"] = True | |
driver = webdriver.Firefox(firefox_profile = profile, | |
capabilities = capabilities, | |
executable_path = geckodriverexcecutablePath) | |
driver.set_window_size(ffWidth, ffHeight) | |
driver.implicitly_wait(timeout) | |
wait = WebDriverWait(driver, timeout) | |
def scroll_smoothly(sbypx): | |
#driver.execute_script("window.scrollTo(0, 0);") | |
#time.sleep(0.3) | |
driver.execute_script("window.scrollBy(0, %d);" % (sbypx)) | |
time.sleep(0.3) | |
#driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") | |
#time.sleep(1) | |
def is_element_present(xpath): | |
'''checking is element present based on xpath''' | |
try: | |
driver.find_element_by_xpath(xpath) | |
bprocess = True | |
except: | |
bprocess = False | |
return bprocess | |
def parse_companies(url): | |
# open url company page | |
openurl(url) | |
# since MacOS have problem with wait for element presence, time.sleep should be added with large num of seconds | |
time.sleep(7) | |
xpath = "//h1[contains(@dir,'ltr')]" | |
is_element_present(xpath) | |
# Logo url | |
xpath = "//img[contains(@alt,'Logo')]" | |
try: | |
logo_url = driver.find_element_by_xpath(xpath).get_attribute("outerHTML") | |
pattern = re.compile(r'src="(.*?)"') | |
logo_url = pattern.search(logo_url).group(1) | |
except: | |
logo_url = '/' | |
print(logo_url) | |
# company_name | |
xpath = "//h1[contains(@dir,'ltr')]" | |
try: | |
company_name = driver.find_element_by_xpath(xpath).get_attribute("innerHTML") | |
company_name = company_name.strip() | |
except: | |
company_name = "/" | |
print(company_name) | |
# number of employees | |
xpath = "//strong[contains(.,'See all') and contains(.,'employees on LinkedIn')]" | |
try: | |
num_of_employees = driver.find_element_by_xpath(xpath).get_attribute("innerHTML") | |
pattern = re.compile(r'(\d+)') | |
num_of_employees = pattern.search(num_of_employees).group(1) | |
except: | |
num_of_employees = "/" | |
print(num_of_employees) | |
# description | |
xpath = "//p[contains(@class,'org-about-us-organization-description__text description')]" | |
try: | |
description = driver.find_element_by_xpath(xpath).get_attribute("innerHTML") | |
description = description.replace('\r', '').replace('\n', ' ').replace('\t', ' ') | |
description = description.strip() | |
except: | |
description = "/" | |
print(description) | |
scroll_smoothly(100) | |
# is See more collapsed? click on it | |
xpath = "//button[contains(@id,'show-details-btn') and contains(@aria-expanded,'false')]" | |
try: | |
driver.find_element_by_xpath(xpath).click() | |
time.sleep(1) | |
scroll_smoothly(400) | |
time.sleep(0.5) | |
except: | |
#xpath = "//button[contains(@id,'show-details-btn') and contains(@aria-expanded,'true')]" | |
pass | |
# specialities | |
xpath = "//p[contains(@class,'specialities mb5')]" | |
try: | |
specialities = driver.find_element_by_xpath(xpath).get_attribute("innerHTML") | |
specialities = specialities.replace('\n', ' ') | |
specialities = specialities.strip() | |
except: | |
specialities = "/" | |
print(specialities) | |
# headquarters | |
xpath = "//p[contains(@class,'headquarters')]" | |
try: | |
headquarters = driver.find_element_by_xpath(xpath).get_attribute("innerHTML") | |
headquarters = headquarters.strip() | |
except: | |
headquarters = "/" | |
print(headquarters) | |
# website | |
xpath = "//a[contains(@class,'website') and contains(@class,'link')]" | |
try: | |
website = driver.find_element_by_xpath(xpath).get_attribute("innerHTML") | |
website = website.strip() | |
except: | |
website = "/" | |
print(website) | |
# founded | |
xpath = "//p[contains(@class,'org-about-company-module__founded')]" | |
try: | |
founded = driver.find_element_by_xpath(xpath).get_attribute("innerHTML") | |
founded = founded.strip() | |
except: | |
founded = "/" | |
print(founded) | |
csvl.writerow([logo_url, company_name, num_of_employees, description, specialities, headquarters, website, founded]) | |
def calculate_time(): | |
'''function to calculate elapsed time''' | |
time2 = time.time() | |
hours = int((time2-time1)/3600) | |
minutes = int((time2-time1 - hours * 3600)/60) | |
sec = time2 - time1 - hours * 3600 - minutes * 60 | |
print("processed in %dh:%dm:%ds" % (hours, minutes, sec)) | |
if __name__ == '__main__': | |
setbrowser() | |
for url in lurls: | |
url = url.strip() | |
parse_companies(url) | |
calculate_time() | |
log.close() | |
driver.close() | |
if headlessMode: | |
display.stop() | |
csvFile.close() | |
print('Done.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment