Skip to content

Instantly share code, notes, and snippets.

@SaschaHeyer
Created October 22, 2024 17:27
Show Gist options
  • Save SaschaHeyer/afaf3723a7a04c026b610f25fbb3b46d to your computer and use it in GitHub Desktop.
Save SaschaHeyer/afaf3723a7a04c026b610f25fbb3b46d to your computer and use it in GitHub Desktop.
import re
import time
import streamlit as st
import pandas as pd
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import vertexai
from vertexai.generative_models import GenerativeModel
import vertexai.preview.generative_models as generative_models
from concurrent.futures import ThreadPoolExecutor, as_completed
# Function to decode Cloudflare-protected emails
def decode_cf_email(encoded_string):
r = int(encoded_string[:2], 16)
email = ''.join([chr(int(encoded_string[i:i+2], 16) ^ r) for i in range(2, len(encoded_string), 2)])
return email
# Functions from the provided code
def get_place_details(place_id, api_key):
url = "https://maps.googleapis.com/maps/api/place/details/json"
params = {
'place_id': place_id,
'fields': 'name,formatted_address,website,formatted_phone_number',
'key': api_key
}
response = requests.get(url, params=params, verify=False)
if response.status_code == 200:
return response.json().get('result', {})
else:
return None
def save_content_to_file(content, filename):
with open(filename, 'w', encoding='utf-8') as file:
file.write(content)
def is_valid_link(link, base_domain):
parsed_link = urlparse(link)
if not (parsed_link.scheme in ['http', 'https'] and not parsed_link.path.startswith('mailto') and parsed_link.netloc == base_domain):
return False
# Exclude links to common file types (images, documents, etc.)
excluded_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.zip', '.rar', '.tar', '.gz', '.mp3', '.mp4', '.avi', '.mkv')
if any(parsed_link.path.lower().endswith(ext) for ext in excluded_extensions):
return False
return True
# Function to scrape website content including subpages and extract emails
def normalize_url(url):
if url is None:
return None
parsed_url = urlparse(url)
netloc = parsed_url.netloc
if isinstance(netloc, str) and netloc.startswith('www.'):
netloc = netloc[4:]
return parsed_url._replace(netloc=netloc).geturl()
def scrape_website_content(url, max_depth=1):
visited = set()
to_visit = [(normalize_url(url), 0)]
all_text = []
found_pages = []
emails = set()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
}
base_domain = urlparse(normalize_url(url)).netloc
email_pattern = re.compile(r'[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+')
while to_visit:
current_url, depth = to_visit.pop(0)
if current_url in visited or depth > max_depth:
continue
visited.add(current_url)
try:
response = requests.get(current_url, headers=headers, verify=False)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
page_text = soup.get_text()
all_text.append(page_text)
found_pages.append(current_url)
# Extract emails from the current page
page_emails = email_pattern.findall(page_text)
emails.update(page_emails)
# Check for Cloudflare email protection
for email_element in soup.select('a.__cf_email__'):
encoded_email = email_element['data-cfemail']
decoded_email = decode_cf_email(encoded_email)
emails.add(decoded_email)
# Find all links and add to the to_visit list
for link in soup.find_all('a', href=True):
absolute_url = normalize_url(urljoin(current_url, link['href']))
if is_valid_link(absolute_url, base_domain) and absolute_url not in visited:
to_visit.append((absolute_url, depth + 1))
else:
st.error(f"Failed to retrieve {current_url}: {response.status_code}")
except requests.RequestException as e:
st.error(f"An error occurred while trying to retrieve {current_url}: {e}")
return ' '.join(all_text), found_pages, emails
def write_cold_outreach(content):
vertexai.init(project="sascha-playground-doit", location="us-central1")
model = GenerativeModel("gemini-1.5-pro-001")
responses = model.generate_content(
[f"""You are a expert of writing cold outreach in german as markdown
use the mail address and if available a name of the contact person of this company.
only return the cold outreach email no explenations
if there is no contact person name write a generic salutation
a few information about me
-I am heiko Grünagel
-my phone number is 0176 111 111 111
- my company name is: Onpoint Digitalagentur
- I am running a SEO company that helps kleinen und mittelständischen unternehmen ihre webseiten bei google sichtbar nach oben zu bringen
customer information:
{content}"""],
generation_config=generation_config,
safety_settings=safety_settings,
stream=False,
)
return responses.candidates[0].content.parts[0].text
generation_config = {
"max_output_tokens": 8192,
"temperature": 1,
"top_p": 0.95,
}
safety_settings = {
generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
}
def get_all_places(api_key, query):
places = []
search_url = "https://maps.googleapis.com/maps/api/place/textsearch/json"
search_params = {
'query': query,
'key': api_key
}
response = requests.get(search_url, params=search_params, verify=False)
results = response.json().get('results', [])
places.extend(results)
next_page_token = response.json().get('next_page_token')
while next_page_token:
search_params['pagetoken'] = next_page_token
time.sleep(2) # Sleep to allow the next_page_token to become valid
response = requests.get(search_url, params=search_params, verify=False)
results = response.json().get('results', [])
places.extend(results)
next_page_token = response.json().get('next_page_token')
return places
def process_place(place, api_key):
place_id = place['place_id']
details = get_place_details(place_id, api_key)
result = {}
if details:
name = details.get('name')
address = details.get('formatted_address')
website = details.get('website')
phone_number = details.get('formatted_phone_number')
result = {
"Name": name,
"Website": website,
"Emails found on the website": "",
"Address": address,
"Phone Number": phone_number,
}
# Scrape website for content if website is available
if website:
content, found_pages, emails = scrape_website_content(website)
if content:
filename = f"{name.replace(' ', '_')}.txt"
save_content_to_file(content, filename)
if emails:
result["Emails found on the website"] = ", ".join(emails)
return result
# Streamlit UI
st.set_page_config(layout="wide") # Set the app to widescreen
st.title("Business Finder and Cold Outreach Email Generator")
api_key = st.text_input("API Key", type="password", value="AIzaSyDoD8xsUezDHd0ZKF1U7mSoSFv2twDont0")
business_type = st.text_input("Type of Business or Professional", value="architekten")
location = st.text_input("City or Region", value="Zweibrücken")
if st.button("Find Businesses"):
if api_key and business_type and location:
with st.spinner("Searching for businesses..."):
query = f'{business_type} in {location}'
places = get_all_places(api_key, query)
st.write(f"Found {len(places)} places")
results = []
status_text = st.empty()
status_list = st.empty() # Placeholder for the status list
results_table = st.empty() # Placeholder for the results table
statuses = []
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_place = {executor.submit(process_place, place, api_key): place for place in places}
for future in as_completed(future_to_place):
place = future_to_place[future]
try:
result = future.result()
if result:
results.append(result)
# Update the results table
df = pd.DataFrame(results)
results_table.dataframe(df, height = int(35.2*(len(places)+1)))
# Update the status list
statuses.append(f"{len(results)}/{len(places)}: {result['Name']} - Processed")
status_list.text("\n".join(statuses))
except Exception as exc:
statuses.append(f"{place['name']} generated an exception: {exc}")
status_list.text("\n".join(statuses))
status_text.text("All places processed.")
else:
st.error("Please provide API Key, Type of Business, and City or Region")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment