Created
October 22, 2024 17:27
-
-
Save SaschaHeyer/afaf3723a7a04c026b610f25fbb3b46d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import time | |
import streamlit as st | |
import pandas as pd | |
import requests | |
from bs4 import BeautifulSoup | |
from urllib.parse import urljoin, urlparse | |
import vertexai | |
from vertexai.generative_models import GenerativeModel | |
import vertexai.preview.generative_models as generative_models | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
# Function to decode Cloudflare-protected emails | |
def decode_cf_email(encoded_string): | |
r = int(encoded_string[:2], 16) | |
email = ''.join([chr(int(encoded_string[i:i+2], 16) ^ r) for i in range(2, len(encoded_string), 2)]) | |
return email | |
# Functions from the provided code | |
def get_place_details(place_id, api_key): | |
url = "https://maps.googleapis.com/maps/api/place/details/json" | |
params = { | |
'place_id': place_id, | |
'fields': 'name,formatted_address,website,formatted_phone_number', | |
'key': api_key | |
} | |
response = requests.get(url, params=params, verify=False) | |
if response.status_code == 200: | |
return response.json().get('result', {}) | |
else: | |
return None | |
def save_content_to_file(content, filename): | |
with open(filename, 'w', encoding='utf-8') as file: | |
file.write(content) | |
def is_valid_link(link, base_domain): | |
parsed_link = urlparse(link) | |
if not (parsed_link.scheme in ['http', 'https'] and not parsed_link.path.startswith('mailto') and parsed_link.netloc == base_domain): | |
return False | |
# Exclude links to common file types (images, documents, etc.) | |
excluded_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.zip', '.rar', '.tar', '.gz', '.mp3', '.mp4', '.avi', '.mkv') | |
if any(parsed_link.path.lower().endswith(ext) for ext in excluded_extensions): | |
return False | |
return True | |
# Function to scrape website content including subpages and extract emails | |
def normalize_url(url): | |
if url is None: | |
return None | |
parsed_url = urlparse(url) | |
netloc = parsed_url.netloc | |
if isinstance(netloc, str) and netloc.startswith('www.'): | |
netloc = netloc[4:] | |
return parsed_url._replace(netloc=netloc).geturl() | |
def scrape_website_content(url, max_depth=1): | |
visited = set() | |
to_visit = [(normalize_url(url), 0)] | |
all_text = [] | |
found_pages = [] | |
emails = set() | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3' | |
} | |
base_domain = urlparse(normalize_url(url)).netloc | |
email_pattern = re.compile(r'[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+') | |
while to_visit: | |
current_url, depth = to_visit.pop(0) | |
if current_url in visited or depth > max_depth: | |
continue | |
visited.add(current_url) | |
try: | |
response = requests.get(current_url, headers=headers, verify=False) | |
if response.status_code == 200: | |
soup = BeautifulSoup(response.text, 'html.parser') | |
page_text = soup.get_text() | |
all_text.append(page_text) | |
found_pages.append(current_url) | |
# Extract emails from the current page | |
page_emails = email_pattern.findall(page_text) | |
emails.update(page_emails) | |
# Check for Cloudflare email protection | |
for email_element in soup.select('a.__cf_email__'): | |
encoded_email = email_element['data-cfemail'] | |
decoded_email = decode_cf_email(encoded_email) | |
emails.add(decoded_email) | |
# Find all links and add to the to_visit list | |
for link in soup.find_all('a', href=True): | |
absolute_url = normalize_url(urljoin(current_url, link['href'])) | |
if is_valid_link(absolute_url, base_domain) and absolute_url not in visited: | |
to_visit.append((absolute_url, depth + 1)) | |
else: | |
st.error(f"Failed to retrieve {current_url}: {response.status_code}") | |
except requests.RequestException as e: | |
st.error(f"An error occurred while trying to retrieve {current_url}: {e}") | |
return ' '.join(all_text), found_pages, emails | |
def write_cold_outreach(content): | |
vertexai.init(project="sascha-playground-doit", location="us-central1") | |
model = GenerativeModel("gemini-1.5-pro-001") | |
responses = model.generate_content( | |
[f"""You are a expert of writing cold outreach in german as markdown | |
use the mail address and if available a name of the contact person of this company. | |
only return the cold outreach email no explenations | |
if there is no contact person name write a generic salutation | |
a few information about me | |
-I am heiko Grünagel | |
-my phone number is 0176 111 111 111 | |
- my company name is: Onpoint Digitalagentur | |
- I am running a SEO company that helps kleinen und mittelständischen unternehmen ihre webseiten bei google sichtbar nach oben zu bringen | |
customer information: | |
{content}"""], | |
generation_config=generation_config, | |
safety_settings=safety_settings, | |
stream=False, | |
) | |
return responses.candidates[0].content.parts[0].text | |
generation_config = { | |
"max_output_tokens": 8192, | |
"temperature": 1, | |
"top_p": 0.95, | |
} | |
safety_settings = { | |
generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, | |
generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, | |
generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, | |
generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, | |
} | |
def get_all_places(api_key, query): | |
places = [] | |
search_url = "https://maps.googleapis.com/maps/api/place/textsearch/json" | |
search_params = { | |
'query': query, | |
'key': api_key | |
} | |
response = requests.get(search_url, params=search_params, verify=False) | |
results = response.json().get('results', []) | |
places.extend(results) | |
next_page_token = response.json().get('next_page_token') | |
while next_page_token: | |
search_params['pagetoken'] = next_page_token | |
time.sleep(2) # Sleep to allow the next_page_token to become valid | |
response = requests.get(search_url, params=search_params, verify=False) | |
results = response.json().get('results', []) | |
places.extend(results) | |
next_page_token = response.json().get('next_page_token') | |
return places | |
def process_place(place, api_key): | |
place_id = place['place_id'] | |
details = get_place_details(place_id, api_key) | |
result = {} | |
if details: | |
name = details.get('name') | |
address = details.get('formatted_address') | |
website = details.get('website') | |
phone_number = details.get('formatted_phone_number') | |
result = { | |
"Name": name, | |
"Website": website, | |
"Emails found on the website": "", | |
"Address": address, | |
"Phone Number": phone_number, | |
} | |
# Scrape website for content if website is available | |
if website: | |
content, found_pages, emails = scrape_website_content(website) | |
if content: | |
filename = f"{name.replace(' ', '_')}.txt" | |
save_content_to_file(content, filename) | |
if emails: | |
result["Emails found on the website"] = ", ".join(emails) | |
return result | |
# Streamlit UI | |
st.set_page_config(layout="wide") # Set the app to widescreen | |
st.title("Business Finder and Cold Outreach Email Generator") | |
api_key = st.text_input("API Key", type="password", value="AIzaSyDoD8xsUezDHd0ZKF1U7mSoSFv2twDont0") | |
business_type = st.text_input("Type of Business or Professional", value="architekten") | |
location = st.text_input("City or Region", value="Zweibrücken") | |
if st.button("Find Businesses"): | |
if api_key and business_type and location: | |
with st.spinner("Searching for businesses..."): | |
query = f'{business_type} in {location}' | |
places = get_all_places(api_key, query) | |
st.write(f"Found {len(places)} places") | |
results = [] | |
status_text = st.empty() | |
status_list = st.empty() # Placeholder for the status list | |
results_table = st.empty() # Placeholder for the results table | |
statuses = [] | |
with ThreadPoolExecutor(max_workers=10) as executor: | |
future_to_place = {executor.submit(process_place, place, api_key): place for place in places} | |
for future in as_completed(future_to_place): | |
place = future_to_place[future] | |
try: | |
result = future.result() | |
if result: | |
results.append(result) | |
# Update the results table | |
df = pd.DataFrame(results) | |
results_table.dataframe(df, height = int(35.2*(len(places)+1))) | |
# Update the status list | |
statuses.append(f"{len(results)}/{len(places)}: {result['Name']} - Processed") | |
status_list.text("\n".join(statuses)) | |
except Exception as exc: | |
statuses.append(f"{place['name']} generated an exception: {exc}") | |
status_list.text("\n".join(statuses)) | |
status_text.text("All places processed.") | |
else: | |
st.error("Please provide API Key, Type of Business, and City or Region") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment