import re
import time
import streamlit as st
import pandas as pd
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import vertexai
from vertexai.generative_models import GenerativeModel
import vertexai.preview.generative_models as generative_models
from concurrent.futures import ThreadPoolExecutor, as_completed
# Function to decode Cloudflare-protected emails
def decode_cf_email(encoded_string):
r = int(encoded_string[:2], 16)
email = ''.join([chr(int(encoded_string[i:i+2], 16) ^ r) for i in range(2, len(encoded_string), 2)])
return email
# Functions from the provided code
def get_place_details(place_id, api_key):
url = ""
params = {
'place_id': place_id,
'fields': 'name,formatted_address,website,formatted_phone_number',
'key': api_key
response = requests.get(url, params=params, verify=False)
if response.status_code == 200:
return response.json().get('result', {})
return None
def save_content_to_file(content, filename):
with open(filename, 'w', encoding='utf-8') as file:
def is_valid_link(link, base_domain):
parsed_link = urlparse(link)
if not (parsed_link.scheme in ['http', 'https'] and not parsed_link.path.startswith('mailto') and parsed_link.netloc == base_domain):
return False
# Exclude links to common file types (images, documents, etc.)
excluded_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.zip', '.rar', '.tar', '.gz', '.mp3', '.mp4', '.avi', '.mkv')
if any(parsed_link.path.lower().endswith(ext) for ext in excluded_extensions):
return False
return True
# Function to scrape website content including subpages and extract emails
def normalize_url(url):
if url is None:
return None
parsed_url = urlparse(url)
netloc = parsed_url.netloc
if isinstance(netloc, str) and netloc.startswith('www.'):
netloc = netloc[4:]
return parsed_url._replace(netloc=netloc).geturl()
def scrape_website_content(url, max_depth=1):
visited = set()
to_visit = [(normalize_url(url), 0)]
all_text = []
found_pages = []
emails = set()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
base_domain = urlparse(normalize_url(url)).netloc
email_pattern = re.compile(r'[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+')
while to_visit:
current_url, depth = to_visit.pop(0)
if current_url in visited or depth > max_depth:
response = requests.get(current_url, headers=headers, verify=False)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
page_text = soup.get_text()
# Extract emails from the current page
page_emails = email_pattern.findall(page_text)
# Check for Cloudflare email protection
for email_element in'a.__cf_email__'):
encoded_email = email_element['data-cfemail']
decoded_email = decode_cf_email(encoded_email)
# Find all links and add to the to_visit list
for link in soup.find_all('a', href=True):
absolute_url = normalize_url(urljoin(current_url, link['href']))
if is_valid_link(absolute_url, base_domain) and absolute_url not in visited:
to_visit.append((absolute_url, depth + 1))
st.error(f"Failed to retrieve {current_url}: {response.status_code}")
except requests.RequestException as e:
st.error(f"An error occurred while trying to retrieve {current_url}: {e}")
return ' '.join(all_text), found_pages, emails
def write_cold_outreach(content):
vertexai.init(project="sascha-playground-doit", location="us-central1")
model = GenerativeModel("gemini-1.5-pro-001")
responses = model.generate_content(
[f"""You are a expert of writing cold outreach in german as markdown
use the mail address and if available a name of the contact person of this company.
only return the cold outreach email no explenations
if there is no contact person name write a generic salutation
a few information about me
-I am heiko Grünagel
-my phone number is 0176 111 111 111
- my company name is: Onpoint Digitalagentur
- I am running a SEO company that helps kleinen und mittelständischen unternehmen ihre webseiten bei google sichtbar nach oben zu bringen
customer information:
return responses.candidates[0][0].text
generation_config = {
"max_output_tokens": 8192,
"temperature": 1,
"top_p": 0.95,
safety_settings = {
generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
def get_all_places(api_key, query):
places = []
search_url = ""
search_params = {
'query': query,
'key': api_key
response = requests.get(search_url, params=search_params, verify=False)
results = response.json().get('results', [])
next_page_token = response.json().get('next_page_token')
while next_page_token:
search_params['pagetoken'] = next_page_token
time.sleep(2) # Sleep to allow the next_page_token to become valid
response = requests.get(search_url, params=search_params, verify=False)
results = response.json().get('results', [])
next_page_token = response.json().get('next_page_token')
return places
def process_place(place, api_key):
place_id = place['place_id']
details = get_place_details(place_id, api_key)
result = {}
if details:
name = details.get('name')
address = details.get('formatted_address')
website = details.get('website')
phone_number = details.get('formatted_phone_number')
result = {
"Name": name,
"Website": website,
"Emails found on the website": "",
"Address": address,
"Phone Number": phone_number,
# Scrape website for content if website is available
if website:
content, found_pages, emails = scrape_website_content(website)
if content:
filename = f"{name.replace(' ', '_')}.txt"
save_content_to_file(content, filename)
if emails:
result["Emails found on the website"] = ", ".join(emails)
return result
# Streamlit UI
st.set_page_config(layout="wide") # Set the app to widescreen
st.title("Business Finder and Cold Outreach Email Generator")
api_key = st.text_input("API Key", type="password", value="AIzaSyDoD8xsUezDHd0ZKF1U7mSoSFv2twDont0")
business_type = st.text_input("Type of Business or Professional", value="architekten")
location = st.text_input("City or Region", value="Zweibrücken")
if st.button("Find Businesses"):
if api_key and business_type and location:
with st.spinner("Searching for businesses..."):
query = f'{business_type} in {location}'
places = get_all_places(api_key, query)
st.write(f"Found {len(places)} places")
results = []
status_text = st.empty()
status_list = st.empty() # Placeholder for the status list
results_table = st.empty() # Placeholder for the results table
statuses = []
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_place = {executor.submit(process_place, place, api_key): place for place in places}
for future in as_completed(future_to_place):
place = future_to_place[future]
result = future.result()
if result:
# Update the results table
df = pd.DataFrame(results)
results_table.dataframe(df, height = int(35.2*(len(places)+1)))
# Update the status list
statuses.append(f"{len(results)}/{len(places)}: {result['Name']} - Processed")
except Exception as exc:
statuses.append(f"{place['name']} generated an exception: {exc}")
status_text.text("All places processed.")
st.error("Please provide API Key, Type of Business, and City or Region")
