Skip to content

Instantly share code, notes, and snippets.

@proffapt
Created November 13, 2024 16:33
Show Gist options
  • Save proffapt/4885e14b8bbbf3e385f2f9899a89eb58 to your computer and use it in GitHub Desktop.
Save proffapt/4885e14b8bbbf3e385f2f9899a89eb58 to your computer and use it in GitHub Desktop.
from io import StringIO
from linkedin_scraper.company import json
from linkedin_scraper.person import os
from pandas.io.parsers.readers import csv
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup as bs
import pandas as pd
import time
from datetime import datetime
from dateutil.relativedelta import relativedelta
from selenium import webdriver
# Initialize Chrome options
chrome_options = Options()
today = datetime.today().strftime("%Y-%m-%d")
SCROLL_PAUSE_TIME = 1.5
MAX_SCROLLS = False
FILENAME = "./tracker-export-2024-10-28_Arpit_shortened.csv"
COLUMN_NAME = "company-linkedin"
# LinkedIn Credentials
username = "[email protected]"
password = "!@#$%^&*()"
## Setup export directory
# Construct the base export directory path
export_dir_base = os.path.join(os.getcwd(), "export")
export_dir = ""
# Check if the base export directory exists
if not os.path.exists(export_dir_base):
export_dir = export_dir_base
else:
# If the base export directory exists, find the next available index
index = 0
while os.path.exists(f"{export_dir_base}{index}"):
index += 1
export_dir = f"{export_dir_base}{index}"
# Create the export directory
os.makedirs(export_dir, exist_ok=True)
# Initialize WebDriver for Chrome
browser = webdriver.Chrome()
# Helper functions for date and reaction conversions
def get_past_date(days=0, weeks=0, months=0, years=0):
date_format = "%Y-%m-%d"
dtObj = datetime.strptime(today, date_format)
past_date = dtObj - relativedelta(
days=days, weeks=weeks, months=months, years=years
)
past_date_str = past_date.strftime(date_format)
return past_date_str
def get_actual_date(date):
today = datetime.today().strftime("%Y-%m-%d")
current_year = datetime.today().strftime("%Y")
past_date = date
if "hour" in date:
past_date = today
elif "day" in date:
date.split(" ")[0]
past_date = get_past_date(days=int(date.split(" ")[0]))
elif "week" in date:
past_date = get_past_date(weeks=int(date.split(" ")[0]))
elif "month" in date:
past_date = get_past_date(months=int(date.split(" ")[0]))
elif "year" in date:
past_date = get_past_date(months=int(date.split(" ")[0]))
else:
split_date = date.split("-")
if len(split_date) == 2:
past_month = split_date[0]
past_day = split_date[1]
if len(past_month) < 2:
past_month = "0" + past_month
if len(past_day) < 2:
past_day = "0" + past_day
past_date = f"{current_year}-{past_month}-{past_day}"
elif len(split_date) == 3:
past_month = split_date[0]
past_day = split_date[1]
past_year = split_date[2]
if len(past_month) < 2:
past_month = "0" + past_month
if len(past_day) < 2:
past_day = "0" + past_day
past_date = f"{past_year}-{past_month}-{past_day}"
return past_date
def convert_abbreviated_to_number(s):
if "K" in s:
return int(float(s.replace("K", "")) * 1000)
elif "M" in s:
return int(float(s.replace("M", "")) * 1000000)
else:
return int(s)
# Functions to extract text from a container
def get_text(container, selector, attributes):
try:
element = container.find(selector, attributes)
if element:
return element.text.strip()
except Exception as e:
print(e)
return ""
def get_aria_label(container, selector, attributes):
try:
element = container.find(selector, attributes)
if element:
return element.get("aria-label")
else:
return "NA"
except Exception as e:
print(e)
return ""
# Function to extract media information
def get_media_info(container):
media_info = [
("div", {"class": "update-components-video"}, "Video"),
("div", {"class": "update-components-linkedin-video"}, "Video"),
("div", {"class": "update-components-image"}, "Image"),
("article", {"class": "update-components-article"}, "Article"),
("div", {"class": "feed-shared-external-video__meta"}, "Youtube Video"),
(
"div",
{
"class": "feed-shared-mini-update-v2 feed-shared-update-v2__update-content-wrapper artdeco-card"
},
"Shared Post",
),
(
"div",
{"class": "feed-shared-poll ember-view"},
"Other: Poll, Shared Post, etc",
),
]
for selector, attrs, media_type in media_info:
element = container.find(selector, attrs)
if element:
link = element.find("a", href=True)
return link["href"] if link else "None", media_type
return "None", "Unknown"
def save_csv_file(data, file_name="file.csv"):
"""
Save a Pandas DataFrame to a CSV file in the ./export/ directory.
If the export directory already exists, it will create a new directory with an incremented index (e.g., ./export0/, ./export1/, etc.).
Parameters:
data (pandas.DataFrame): The data to be saved to the CSV file.
file_name (str, optional): The name of the CSV file to be saved. Defaults to 'file.csv'.
"""
# Construct the full file path
csv_export_dir = os.path.join(export_dir, "csv")
os.makedirs(csv_export_dir, exist_ok=True)
file_path = os.path.join(csv_export_dir, file_name)
# Save the DataFrame to a CSV file
data.to_csv(file_path, index=False)
print(f"CSV file saved at: {file_path}")
# Navigate to the posts page of the company
def scrape_page(page):
post_page = page + "/posts"
post_page = post_page.replace("//posts", "/posts")
browser.get(post_page)
# Extract company name from URL
company_name = page.rstrip("/").split("/")[-1].replace("-", " ").title()
print(company_name)
# Set parameters for scrolling through the page
last_height = browser.execute_script("return document.body.scrollHeight")
scrolls = 0
no_change_count = 0
# Scroll through the page until no new content is loaded
while True:
# Expand all ...more texts
buttons = browser.find_elements(
By.CSS_SELECTOR,
"button.feed-shared-inline-show-more-text__see-more-less-toggle",
)
for button in buttons:
if button.is_displayed():
try:
webdriver.ActionChains(browser).move_to_element(
button
).perform() # Scroll to the button
button.click() ## [!NOTE] THIS CAN FAIL specially if there is a popup on the screen
time.sleep(1) # Add a small delay after each click
except Exception as e:
print(f"Could not click on button: {e}")
browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(SCROLL_PAUSE_TIME)
new_height = browser.execute_script("return document.body.scrollHeight")
no_change_count = no_change_count + 1 if new_height == last_height else 0
if no_change_count >= 3 or (MAX_SCROLLS and scrolls >= MAX_SCROLLS):
break
last_height = new_height
scrolls += 1
# Parse the page source with BeautifulSoup
company_page = browser.page_source
linkedin_soup = bs(company_page.encode("utf-8"), "html.parser")
# Save the parsed HTML to a file
file_name = os.path.join(export_dir, "soup")
os.makedirs(file_name, exist_ok=True)
file_name = os.path.join(file_name, f"{company_name}_soup.txt")
with open(file_name, "w+") as t:
t.write(linkedin_soup.prettify())
# Extract post containers from the HTML
containers = [
container
for container in linkedin_soup.find_all(
"div", {"class": "feed-shared-update-v2"}
)
if "activity" in container.get("data-urn", "")
]
# Define a data structure to hold all the post information
posts_data = []
# Main loop to process each container
index = 0
for container in containers:
post_text = get_text(
container, "div", {"class": "feed-shared-update-v2__description-wrapper"}
)
post_date = get_aria_label(
container,
"a",
{"class": "app-aware-link update-components-actor__sub-description-link"},
)
post_date = get_actual_date(post_date)
# media_link, media_type = get_media_info(container)
# # Reactions (likes)
# reactions_element = container.find_all(lambda tag: tag.name == 'button' and 'aria-label' in tag.attrs and 'reaction' in tag['aria-label'].lower())
# reactions_idx = 1 if len(reactions_element) > 1 else 0
# post_reactions = reactions_element[reactions_idx].text.strip() if reactions_element and reactions_element[reactions_idx].text.strip() != '' else 0
# # Comments
# comment_element = container.find_all(lambda tag: tag.name == 'button' and 'aria-label' in tag.attrs and 'comment' in tag['aria-label'].lower())
# comment_idx = 1 if len(comment_element) > 1 else 0
# post_comments = comment_element[comment_idx].text.strip() if comment_element and comment_element[comment_idx].text.strip() != '' else 0
# # Shares
# shares_element = container.find_all(lambda tag: tag.name == 'button' and 'aria-label' in tag.attrs and 'repost' in tag['aria-label'].lower())
# shares_idx = 1 if len(shares_element) > 1 else 0
# post_shares = shares_element[shares_idx].text.strip() if shares_element and shares_element[shares_idx].text.strip() != '' else 0
posts_data.append({
"index": index,
"post_date": post_date,
"post_text": post_text,
# "media_link" : media_link,
# "media_type" : media_type
})
index += 1
try:
final = json.dumps(posts_data, indent=2)
try:
df = pd.read_json(StringIO(final))
df.sort_values(by="post_date", inplace=True, ascending=False)
csv_file = f"{company_name}_posts.csv"
save_csv_file(df, csv_file)
print(f"Data exported to {csv_file}")
except Exception as e:
print("error: ", e)
except Exception as e:
print("error: ", e)
def get_linkedin_urls_from_csv(file_path):
"""
Read a CSV file and return a list of values from the 'linkedin_urls' column.
Parameters:
file_path (str): The path to the CSV file.
Returns:
list: A list of LinkedIn URLs from the 'linkedin_urls' column.
"""
linkedin_urls = []
with open(file_path, "r") as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
linkedin_urls.append(row[COLUMN_NAME])
return linkedin_urls
def init():
# Set LinkedIn page URL for scraping
page = "https://www.linkedin.com/company/nike"
# Open LinkedIn login page
browser.get("https://www.linkedin.com/login")
# Enter login credentials and submit
elementID = browser.find_element(By.ID, "username")
elementID.send_keys(username)
elementID = browser.find_element(By.ID, "password")
elementID.send_keys(password)
elementID.submit()
while True:
print("Seens a verification prompt came plese complete human verification, Waiting 5 secs..")
if(browser.current_url == "https://www.linkedin.com/feed/"):
print("Completed verification")
break
time.sleep(5)
pages = get_linkedin_urls_from_csv(FILENAME)
for page in pages:
scrape_page(page)
if __name__ == "__main__":
init()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment