Created
November 13, 2024 16:33
-
-
Save proffapt/4885e14b8bbbf3e385f2f9899a89eb58 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from io import StringIO | |
from linkedin_scraper.company import json | |
from linkedin_scraper.person import os | |
from pandas.io.parsers.readers import csv | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.chrome.options import Options | |
from bs4 import BeautifulSoup as bs | |
import pandas as pd | |
import time | |
from datetime import datetime | |
from dateutil.relativedelta import relativedelta | |
from selenium import webdriver | |
# Initialize Chrome options | |
chrome_options = Options() | |
today = datetime.today().strftime("%Y-%m-%d") | |
SCROLL_PAUSE_TIME = 1.5 | |
MAX_SCROLLS = False | |
FILENAME = "./tracker-export-2024-10-28_Arpit_shortened.csv" | |
COLUMN_NAME = "company-linkedin" | |
# LinkedIn Credentials | |
username = "[email protected]" | |
password = "!@#$%^&*()" | |
## Setup export directory | |
# Construct the base export directory path | |
export_dir_base = os.path.join(os.getcwd(), "export") | |
export_dir = "" | |
# Check if the base export directory exists | |
if not os.path.exists(export_dir_base): | |
export_dir = export_dir_base | |
else: | |
# If the base export directory exists, find the next available index | |
index = 0 | |
while os.path.exists(f"{export_dir_base}{index}"): | |
index += 1 | |
export_dir = f"{export_dir_base}{index}" | |
# Create the export directory | |
os.makedirs(export_dir, exist_ok=True) | |
# Initialize WebDriver for Chrome | |
browser = webdriver.Chrome() | |
# Helper functions for date and reaction conversions | |
def get_past_date(days=0, weeks=0, months=0, years=0): | |
date_format = "%Y-%m-%d" | |
dtObj = datetime.strptime(today, date_format) | |
past_date = dtObj - relativedelta( | |
days=days, weeks=weeks, months=months, years=years | |
) | |
past_date_str = past_date.strftime(date_format) | |
return past_date_str | |
def get_actual_date(date): | |
today = datetime.today().strftime("%Y-%m-%d") | |
current_year = datetime.today().strftime("%Y") | |
past_date = date | |
if "hour" in date: | |
past_date = today | |
elif "day" in date: | |
date.split(" ")[0] | |
past_date = get_past_date(days=int(date.split(" ")[0])) | |
elif "week" in date: | |
past_date = get_past_date(weeks=int(date.split(" ")[0])) | |
elif "month" in date: | |
past_date = get_past_date(months=int(date.split(" ")[0])) | |
elif "year" in date: | |
past_date = get_past_date(months=int(date.split(" ")[0])) | |
else: | |
split_date = date.split("-") | |
if len(split_date) == 2: | |
past_month = split_date[0] | |
past_day = split_date[1] | |
if len(past_month) < 2: | |
past_month = "0" + past_month | |
if len(past_day) < 2: | |
past_day = "0" + past_day | |
past_date = f"{current_year}-{past_month}-{past_day}" | |
elif len(split_date) == 3: | |
past_month = split_date[0] | |
past_day = split_date[1] | |
past_year = split_date[2] | |
if len(past_month) < 2: | |
past_month = "0" + past_month | |
if len(past_day) < 2: | |
past_day = "0" + past_day | |
past_date = f"{past_year}-{past_month}-{past_day}" | |
return past_date | |
def convert_abbreviated_to_number(s): | |
if "K" in s: | |
return int(float(s.replace("K", "")) * 1000) | |
elif "M" in s: | |
return int(float(s.replace("M", "")) * 1000000) | |
else: | |
return int(s) | |
# Functions to extract text from a container | |
def get_text(container, selector, attributes): | |
try: | |
element = container.find(selector, attributes) | |
if element: | |
return element.text.strip() | |
except Exception as e: | |
print(e) | |
return "" | |
def get_aria_label(container, selector, attributes): | |
try: | |
element = container.find(selector, attributes) | |
if element: | |
return element.get("aria-label") | |
else: | |
return "NA" | |
except Exception as e: | |
print(e) | |
return "" | |
# Function to extract media information | |
def get_media_info(container): | |
media_info = [ | |
("div", {"class": "update-components-video"}, "Video"), | |
("div", {"class": "update-components-linkedin-video"}, "Video"), | |
("div", {"class": "update-components-image"}, "Image"), | |
("article", {"class": "update-components-article"}, "Article"), | |
("div", {"class": "feed-shared-external-video__meta"}, "Youtube Video"), | |
( | |
"div", | |
{ | |
"class": "feed-shared-mini-update-v2 feed-shared-update-v2__update-content-wrapper artdeco-card" | |
}, | |
"Shared Post", | |
), | |
( | |
"div", | |
{"class": "feed-shared-poll ember-view"}, | |
"Other: Poll, Shared Post, etc", | |
), | |
] | |
for selector, attrs, media_type in media_info: | |
element = container.find(selector, attrs) | |
if element: | |
link = element.find("a", href=True) | |
return link["href"] if link else "None", media_type | |
return "None", "Unknown" | |
def save_csv_file(data, file_name="file.csv"): | |
""" | |
Save a Pandas DataFrame to a CSV file in the ./export/ directory. | |
If the export directory already exists, it will create a new directory with an incremented index (e.g., ./export0/, ./export1/, etc.). | |
Parameters: | |
data (pandas.DataFrame): The data to be saved to the CSV file. | |
file_name (str, optional): The name of the CSV file to be saved. Defaults to 'file.csv'. | |
""" | |
# Construct the full file path | |
csv_export_dir = os.path.join(export_dir, "csv") | |
os.makedirs(csv_export_dir, exist_ok=True) | |
file_path = os.path.join(csv_export_dir, file_name) | |
# Save the DataFrame to a CSV file | |
data.to_csv(file_path, index=False) | |
print(f"CSV file saved at: {file_path}") | |
# Navigate to the posts page of the company | |
def scrape_page(page): | |
post_page = page + "/posts" | |
post_page = post_page.replace("//posts", "/posts") | |
browser.get(post_page) | |
# Extract company name from URL | |
company_name = page.rstrip("/").split("/")[-1].replace("-", " ").title() | |
print(company_name) | |
# Set parameters for scrolling through the page | |
last_height = browser.execute_script("return document.body.scrollHeight") | |
scrolls = 0 | |
no_change_count = 0 | |
# Scroll through the page until no new content is loaded | |
while True: | |
# Expand all ...more texts | |
buttons = browser.find_elements( | |
By.CSS_SELECTOR, | |
"button.feed-shared-inline-show-more-text__see-more-less-toggle", | |
) | |
for button in buttons: | |
if button.is_displayed(): | |
try: | |
webdriver.ActionChains(browser).move_to_element( | |
button | |
).perform() # Scroll to the button | |
button.click() ## [!NOTE] THIS CAN FAIL specially if there is a popup on the screen | |
time.sleep(1) # Add a small delay after each click | |
except Exception as e: | |
print(f"Could not click on button: {e}") | |
browser.execute_script("window.scrollTo(0, document.body.scrollHeight);") | |
time.sleep(SCROLL_PAUSE_TIME) | |
new_height = browser.execute_script("return document.body.scrollHeight") | |
no_change_count = no_change_count + 1 if new_height == last_height else 0 | |
if no_change_count >= 3 or (MAX_SCROLLS and scrolls >= MAX_SCROLLS): | |
break | |
last_height = new_height | |
scrolls += 1 | |
# Parse the page source with BeautifulSoup | |
company_page = browser.page_source | |
linkedin_soup = bs(company_page.encode("utf-8"), "html.parser") | |
# Save the parsed HTML to a file | |
file_name = os.path.join(export_dir, "soup") | |
os.makedirs(file_name, exist_ok=True) | |
file_name = os.path.join(file_name, f"{company_name}_soup.txt") | |
with open(file_name, "w+") as t: | |
t.write(linkedin_soup.prettify()) | |
# Extract post containers from the HTML | |
containers = [ | |
container | |
for container in linkedin_soup.find_all( | |
"div", {"class": "feed-shared-update-v2"} | |
) | |
if "activity" in container.get("data-urn", "") | |
] | |
# Define a data structure to hold all the post information | |
posts_data = [] | |
# Main loop to process each container | |
index = 0 | |
for container in containers: | |
post_text = get_text( | |
container, "div", {"class": "feed-shared-update-v2__description-wrapper"} | |
) | |
post_date = get_aria_label( | |
container, | |
"a", | |
{"class": "app-aware-link update-components-actor__sub-description-link"}, | |
) | |
post_date = get_actual_date(post_date) | |
# media_link, media_type = get_media_info(container) | |
# # Reactions (likes) | |
# reactions_element = container.find_all(lambda tag: tag.name == 'button' and 'aria-label' in tag.attrs and 'reaction' in tag['aria-label'].lower()) | |
# reactions_idx = 1 if len(reactions_element) > 1 else 0 | |
# post_reactions = reactions_element[reactions_idx].text.strip() if reactions_element and reactions_element[reactions_idx].text.strip() != '' else 0 | |
# # Comments | |
# comment_element = container.find_all(lambda tag: tag.name == 'button' and 'aria-label' in tag.attrs and 'comment' in tag['aria-label'].lower()) | |
# comment_idx = 1 if len(comment_element) > 1 else 0 | |
# post_comments = comment_element[comment_idx].text.strip() if comment_element and comment_element[comment_idx].text.strip() != '' else 0 | |
# # Shares | |
# shares_element = container.find_all(lambda tag: tag.name == 'button' and 'aria-label' in tag.attrs and 'repost' in tag['aria-label'].lower()) | |
# shares_idx = 1 if len(shares_element) > 1 else 0 | |
# post_shares = shares_element[shares_idx].text.strip() if shares_element and shares_element[shares_idx].text.strip() != '' else 0 | |
posts_data.append({ | |
"index": index, | |
"post_date": post_date, | |
"post_text": post_text, | |
# "media_link" : media_link, | |
# "media_type" : media_type | |
}) | |
index += 1 | |
try: | |
final = json.dumps(posts_data, indent=2) | |
try: | |
df = pd.read_json(StringIO(final)) | |
df.sort_values(by="post_date", inplace=True, ascending=False) | |
csv_file = f"{company_name}_posts.csv" | |
save_csv_file(df, csv_file) | |
print(f"Data exported to {csv_file}") | |
except Exception as e: | |
print("error: ", e) | |
except Exception as e: | |
print("error: ", e) | |
def get_linkedin_urls_from_csv(file_path): | |
""" | |
Read a CSV file and return a list of values from the 'linkedin_urls' column. | |
Parameters: | |
file_path (str): The path to the CSV file. | |
Returns: | |
list: A list of LinkedIn URLs from the 'linkedin_urls' column. | |
""" | |
linkedin_urls = [] | |
with open(file_path, "r") as csv_file: | |
reader = csv.DictReader(csv_file) | |
for row in reader: | |
linkedin_urls.append(row[COLUMN_NAME]) | |
return linkedin_urls | |
def init(): | |
# Set LinkedIn page URL for scraping | |
page = "https://www.linkedin.com/company/nike" | |
# Open LinkedIn login page | |
browser.get("https://www.linkedin.com/login") | |
# Enter login credentials and submit | |
elementID = browser.find_element(By.ID, "username") | |
elementID.send_keys(username) | |
elementID = browser.find_element(By.ID, "password") | |
elementID.send_keys(password) | |
elementID.submit() | |
while True: | |
print("Seens a verification prompt came plese complete human verification, Waiting 5 secs..") | |
if(browser.current_url == "https://www.linkedin.com/feed/"): | |
print("Completed verification") | |
break | |
time.sleep(5) | |
pages = get_linkedin_urls_from_csv(FILENAME) | |
for page in pages: | |
scrape_page(page) | |
if __name__ == "__main__": | |
init() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment