Created
July 4, 2023 09:55
-
-
Save josifoski/6766bca3678da0672bfc472779fc2962 to your computer and use it in GitHub Desktop.
Linkedin groups
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3.6 | |
# Script for scraping linkedin group members | |
# Aleksandar Josifoski for Jordan Hollander | |
# 2017 May 09; | |
from pyvirtualdisplay import Display | |
from bs4 import BeautifulSoup | |
from selenium import webdriver | |
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities | |
from selenium.webdriver.common.keys import Keys | |
from selenium.common.exceptions import TimeoutException | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support.ui import Select | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
import openpyxl | |
import datetime | |
import random | |
import time | |
import html | |
import os | |
import re | |
import sys | |
import codecs | |
lgroup_members_url = [ | |
"https://www.linkedin.com/groups/121543/" | |
] | |
#reading parameters file | |
print("loading linkedin_parameters.py file") | |
with codecs.open("linkedin_parameters.py", "r", "utf-8") as fp: | |
sparam = '' | |
for line in fp: | |
if len(line.strip()) > 0: | |
if not line.strip()[0] == '#': | |
sparam += line | |
try: | |
dparameters = dict(eval(sparam)) | |
except Exception as e: | |
print(str(e)) | |
now = str(datetime.datetime.now())[:16] | |
log.write(now + ' ' + str(e) + os.linesep) | |
sys.exit() | |
dir_in = dparameters["dir_in"].strip() | |
timeout = dparameters["timeout"] | |
geckodriverexcecutablePath = dparameters["geckodriverexcecutablePath"].strip() | |
usegecko = dparameters["usegecko"] | |
ffProfilePath = dparameters["ffProfilePath"] | |
ffWidth = dparameters["ffWidth"] | |
ffHeight = dparameters["ffHeight"] | |
scrollbypx = dparameters["scrollbypx"] | |
headlessMode = dparameters["headlessMode"] | |
log = codecs.open(dir_in + "linkedin_errorslog.txt", "a", "utf-8") | |
time1 = time.time() | |
def open_tag_by_css(css_selector): | |
'''function to click item based on css selector''' | |
driver.find_element_by_css_selector(css_selector).click() | |
def open_tag_by_xpath(xpath): | |
'''function to click item based on xpath''' | |
driver.find_element_by_xpath(xpath).click() | |
def enter_in_tag_by_css(css_selector, text): | |
'''function to enter text based on css selector''' | |
driver.find_element_by_css_selector(css_selector).send_keys(text) | |
def enter_in_tag_by_xpath(xpath, text): | |
'''function to enter text based on xpath''' | |
driver.find_element_by_xpath(xpath).send_keys(text) | |
def save_response_to_file(text): | |
'''temporary function to analyse html response''' | |
with codecs.open(dir_in + "rawresponse.txt", "w", "utf-8") as fresp: | |
fresp.write(html.unescape(text)) | |
def waitForLoadbyCSS(CSS_SELECTOR): | |
'''function to wait until web element is available via css check''' | |
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, CSS_SELECTOR))) | |
def waitForLoadbyXpath(xpath): | |
'''function to wait until web element is available via xpath check''' | |
try: | |
wait.until(EC.presence_of_element_located((By.XPATH, xpath))) | |
return True | |
except: | |
return False | |
def openurl(url): | |
'''function to open url using selenium''' | |
try: | |
driver.get(url) | |
except Exception as e: | |
now = str(datetime.datetime.now())[:16] | |
log.write(now + ' ' + str(e) + os.linesep) | |
print(str(e)) | |
def setbrowser(): | |
''' function for preparing browser for automation ''' | |
print("Preparing browser") | |
global driver | |
global wait | |
profile = webdriver.FirefoxProfile(profile_directory = ffProfilePath) | |
capabilities = DesiredCapabilities.FIREFOX | |
if usegecko: | |
capabilities["marionette"] = True | |
driver = webdriver.Firefox(firefox_profile = profile, | |
capabilities = capabilities, | |
executable_path = geckodriverexcecutablePath) | |
driver.set_window_size(ffWidth, ffHeight) | |
driver.implicitly_wait(timeout) | |
wait = WebDriverWait(driver, timeout) | |
def scroll_smoothly(): | |
driver.execute_script("window.scrollTo(0, 0);") | |
time.sleep(0.5) | |
driver.execute_script("window.scrollBy(0, %d);" % (scrollbypx)) | |
time.sleep(0.5) | |
#driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") | |
#time.sleep(1) | |
def navigate2group(url): | |
'''function to navigate to group url''' | |
global row | |
print() | |
sys.stdout.flush() | |
try: | |
openurl(url) | |
time.sleep(3) | |
scroll_smoothly() | |
xpath = "//a[@class='module-link js-members-count']" | |
snum_of_members = driver.find_element_by_xpath(xpath).get_attribute("innerHTML") | |
print(snum_of_members) | |
num_of_members = int(snum_of_members.split()[0].replace(',', '')) | |
# we have num of members, now go to members page | |
if url.endswith('/'): | |
urlg = url + 'members' | |
else: | |
urlg = url + '/members' | |
openurl(urlg) | |
time.sleep(3) | |
clicknum = 0 | |
print('writing to xlsx file...') | |
keep_looping = True | |
while keep_looping: | |
# get members on current page | |
print(str(clicknum + 1), end = ' ') | |
sys.stdout.flush() | |
scroll_smoothly() | |
xpath = "//li[@class='member-view']" | |
elems = driver.find_elements_by_xpath(xpath) | |
if len(elems) == 0: | |
keep_looping = False | |
print('no more members found') | |
for elem in elems: | |
row += 1 | |
selemi = elem.get_attribute('innerHTML') | |
soup = BeautifulSoup(selemi, 'html.parser') | |
name = soup.find("span", {"class": "js-hovercard entity-name-text"}).text.strip() | |
headline = soup.find("p", {"class": "entity-headline"}).text.strip() | |
inlink = soup.find("a", {"class": "entity-container entity-link js-member-entity-link"})["href"] | |
# write element | |
#print(name) | |
#print(headline) | |
#print(inlink) | |
#print('-' * 10) | |
sheet.cell(row = row, column=1).value = name | |
sheet.cell(row = row, column=2).value = headline | |
sheet.cell(row = row, column=3).value = inlink | |
# click on Next | |
clicknum += 1 | |
xpath = "//span[@class='link-text' and contains(.,'Next')]" | |
driver.find_element_by_xpath(xpath).click() | |
time.sleep(0.7 + random.uniform(0.1, 0.6)) | |
except Exception as e: | |
print('Exception: ' + str(e)) | |
now = str(datetime.datetime.now())[:16] | |
log.write(now + ' ' + str(e) + os.linesep) | |
# now save xlsx file | |
wb.save(dir_in + group_members_url.split('/')[-2] + '_' + str(datetime.datetime.now())[:16].replace('-', '').replace(' ', '_').replace(':', '_') + '.xlsx') | |
driver.close() | |
if headlessMode: | |
display.stop() | |
def is_element_present(xpath): | |
'''checking is element present based on xpath''' | |
try: | |
driver.find_element_by_xpath(xpath) | |
bprocess = True | |
except: | |
bprocess = False | |
return bprocess | |
def calculate_time(): | |
'''function to calculate elapsed time''' | |
time2 = time.time() | |
hours = int((time2-time1)/3600) | |
minutes = int((time2-time1 - hours * 3600)/60) | |
sec = time2 - time1 - hours * 3600 - minutes * 60 | |
print("processed in %dh:%dm:%ds" % (hours, minutes, sec)) | |
if __name__ == '__main__': | |
for group_members_url in lgroup_members_url: | |
wb = openpyxl.Workbook() | |
sheet = wb.get_active_sheet() | |
sheet.title = group_members_url.split('/')[-2] | |
sheet.cell(row=1, column=1).value = "Name" | |
sheet.cell(row=1, column=2).value = "Headline" | |
sheet.cell(row=1, column=3).value = "Linkedin link" | |
row = 1 | |
if headlessMode: | |
display = Display(visible=0, size=(ffWidth, ffHeight)) | |
display.start() | |
print('*' * 80) | |
print(group_members_url) | |
print('*' * 80) | |
setbrowser() | |
navigate2group(group_members_url) | |
calculate_time() | |
time1 = time.time() | |
log.close() | |
print('Done.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment