Last active
March 1, 2025 23:47
-
-
Save jeffehobbs/e19a9c5b61fea35a89f81d617667745e to your computer and use it in GitHub Desktop.
venue event scraper: watches events sites and reports new events
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# venue.py | [email protected] | |
# | |
# watches events sites and reports new events | |
# | |
# [X] get list of sitemaps | |
# [X] parse sitemaps | |
# [X] get existing data from sqlite db | |
# [X] compare listings to existing data from sqlite | |
# [X] save data in sqlite db | |
# [X] make list of new pages | |
# [X] scrape html of page | |
# [X] screenshot page | |
# [X] parse screenshot with AI | |
# [X] email summary of new events | |
# [X] post to bluesky, mastodon, twitter | |
# [ ] introduce bluesky tagging | |
import requests, json, xmltodict, sqlite3, asyncio, hashlib, base64, os, configparser, time, tweepy | |
from mastodon import Mastodon | |
from datetime import datetime | |
from trafilatura import fetch_url, extract | |
from pyppeteer import launch | |
from PIL import Image | |
from sendgrid import SendGridAPIClient | |
from sendgrid.helpers.mail import Mail | |
from atproto import Client, client_utils | |
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__)) | |
config = configparser.ConfigParser(interpolation=None) | |
config.read(SCRIPT_PATH +'/apikeys.txt') | |
SENDGRID_APIKEY = config.get('sendgrid', 'apikey') | |
OPENAI_APIKEY = config.get('openai', 'apikey') | |
BLUESKY_USERNAME = config.get('bluesky','username') | |
BLUESKY_PASSWORD = config.get('bluesky','password') | |
MASTODON_ACCESS_TOKEN = config.get('mastodon', 'access_token') | |
TWITTER_CONSUMER_KEY = config.get('apikey', 'key') | |
TWITTER_CONSUMER_SECRET = config.get('apikey', 'secret') | |
TWITTER_ACCESS_TOKEN = config.get('token', 'access_token') | |
TWITTER_ACCESS_TOKEN_SECRET = config.get('token', 'access_token_secret') | |
MODEL = "gpt-4o-mini" | |
PROMPT = """Describe factually the event that is in this image and text in less than 240 characters. | |
Give details about time, date, price and anything else useful to know. | |
Text description of event: """ | |
BLOCKLIST = ['2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018', '2019', '2020', '2021', '2022', '2023', '2024'] | |
MAX_TOKENS = 500 | |
DEBUG_MODE = False | |
def create_database(): | |
try: | |
conn = sqlite3.connect(SCRIPT_PATH + '/venue_data.sqlite') | |
cur = conn.cursor() | |
cur.execute('CREATE TABLE events (loc VARCHAR, lastmod VARCHAR, PRIMARY KEY (loc) ON CONFLICT REPLACE)') | |
conn.commit() | |
print('database created') | |
except: | |
print('database exists') | |
pass | |
def get_venues(): | |
sitemaps = [] | |
f = open(SCRIPT_PATH + '/venues.txt','r') | |
lines = f.readlines() | |
for line in lines: | |
venue, url = line.split('|') | |
if (venue[0] != '#'): | |
sitemaps.append({'venue': venue, 'url': url.strip()}) | |
f.close() | |
return(sitemaps) | |
def parse_sitemaps(url): | |
print(f'checking {url}') | |
r = requests.get(url) | |
data = xmltodict.parse(r.text) | |
return(data) | |
def get_database_data(): | |
conn = sqlite3.connect(SCRIPT_PATH + '/venue_data.sqlite') | |
cur = conn.cursor() | |
cur.execute('SELECT * FROM events') | |
data = cur.fetchall(); | |
conn.close() | |
return(data) | |
def save_sitemap_data(data): | |
conn = sqlite3.connect(SCRIPT_PATH + '/venue_data.sqlite') | |
for event in data['urlset']['url']: | |
cur = conn.cursor() | |
event_loc = "'" + event['loc'] + "'" | |
try: | |
event_lastmod = "'" + event['lastmod'] + "'" | |
except: | |
event_lastmod = "''" | |
cur.execute(f"INSERT INTO events (loc, lastmod) values ({event_loc},{event_lastmod})") | |
conn.commit() | |
conn.close() | |
return | |
def scrape_url(url): | |
print(f'scraping {url}') | |
downloaded_url = fetch_url(url) | |
result = extract(downloaded_url, output_format='json', with_metadata=True) | |
decoded_result = json.loads(result) | |
print(json.dumps(decoded_result, indent=4)) | |
return(decoded_result) | |
async def screenshot(url): | |
print(f'screenshotting {url}') | |
browser = await launch({ | |
'executablePath':'/usr/bin/chromium', | |
'autoClose': False, | |
'args': ['--load-extension=' + SCRIPT_PATH + '/extension/uBlock.zip'] | |
}) | |
page = await browser.newPage() | |
await page.setViewport({'width': 1280, 'height': 720}) | |
await page.goto(url,{ | |
'waitUntil': 'networkidle0'} | |
) | |
hasher = hashlib.sha1(url.encode('utf-8')) | |
screenshot_filename = SCRIPT_PATH + "/screenshots/" + str(base64.urlsafe_b64encode(hasher.digest())) + ".jpg" | |
await page.waitFor(5000); | |
await page.screenshot(path=screenshot_filename, fullPage=True) | |
await browser.close() | |
return(screenshot_filename) | |
def describe_image(filename, raw_text): | |
print(f'describing {filename} screenshot') | |
with open(filename, "rb") as image_file: | |
base64_image = base64.b64encode(image_file.read()).decode('utf-8') | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {OPENAI_APIKEY}" | |
} | |
payload = { | |
"model": MODEL, | |
"messages": [ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": PROMPT + ' ' + raw_text | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{base64_image}", | |
"detail": "auto" | |
} | |
} | |
] | |
} | |
], | |
"max_tokens": MAX_TOKENS | |
} | |
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) | |
#print(json.dumps(response.json(), indent=4)) | |
return(response.json()) | |
def send_email(recipient, html): | |
print(f'sending email to {recipient}') | |
subject = '🤖 Events found ' + str(datetime.today().strftime('%B %-d, %Y')) | |
message = Mail( | |
from_email="🤖 VENUEBOT<[email protected]>", | |
to_emails=recipient, | |
subject=subject, | |
html_content=html) | |
try: | |
sg = SendGridAPIClient(SENDGRID_APIKEY) | |
response = sg.send(message) | |
print(f'email sent with {response.status_code}') | |
except Exception as e: | |
print(str(e)) | |
def send_bluesky(status, image, url): | |
print('sending post to bluesky') | |
client = Client() | |
client.login(BLUESKY_USERNAME, BLUESKY_PASSWORD) | |
with open('/tmp/venue_image.jpg', 'rb') as f: | |
img_data = f.read() | |
if (len(status) > 250): | |
status = status[:247] + "..." | |
status = status + ' ' | |
text = client_utils.TextBuilder().text(status).link('More info', url) | |
client.send_image(text=text, image=img_data, image_alt=status) | |
return | |
def send_mastodon(status, image_file_path, url): | |
print('sending post to mastodon') | |
mastodon = Mastodon( | |
access_token = MASTODON_ACCESS_TOKEN, | |
api_base_url = 'https://mastodon.social/' | |
) | |
media = mastodon.media_post(image_file_path, description=status) | |
mastodon.status_post(status + ' ' + url, media_ids=media) | |
return | |
def send_twitter(status, image_file_path, url): | |
print('sending post to twitter') | |
media_ids = [] | |
if (len(status) > 140): | |
status = status[:140] + "..." | |
status = status + ' ' + url | |
client = tweepy.Client(consumer_key=TWITTER_CONSUMER_KEY, | |
consumer_secret=TWITTER_CONSUMER_SECRET, | |
access_token=TWITTER_ACCESS_TOKEN, | |
access_token_secret=TWITTER_ACCESS_TOKEN_SECRET) | |
auth = tweepy.OAuth1UserHandler( | |
TWITTER_CONSUMER_KEY, | |
TWITTER_CONSUMER_SECRET, | |
TWITTER_ACCESS_TOKEN, | |
TWITTER_ACCESS_TOKEN_SECRET, | |
) | |
api = tweepy.API(auth) | |
media_upload_response = api.media_upload(image_file_path) | |
media_ids.append(media_upload_response.media_id) | |
if (len(status) > 270): | |
status = status[:267] + "..." | |
response = client.create_tweet(text=status, user_auth=True, media_ids=media_ids) | |
return | |
def compress_local_image(image_url): | |
print('compressing image...') | |
image = Image.open(image_url) | |
image.convert('RGB').save(image_url, optimize=True, quality=75) | |
return(image_url) | |
# main process | |
def main(): | |
create_database() | |
sitemaps = get_venues() | |
previous_data = get_database_data() | |
print(f'{len(previous_data)} previous records found') | |
for sitemap in sitemaps: | |
current_data = parse_sitemaps(sitemap['url']) | |
#print(json.dumps(current_data, indent=4)) | |
save_sitemap_data(current_data) | |
# at this point the db is updated, so let's find what's new | |
current_data = get_database_data() | |
print(f'{len(current_data)} current records found') | |
delta = list(set(current_data) - set(previous_data)) | |
email_content = '' | |
for new_item in delta: | |
valid = True | |
url = new_item[0] | |
for blocked_string in BLOCKLIST: | |
if blocked_string in url: | |
print(f'url contains {blocked_string}, event invalid') | |
valid = False | |
event_data = scrape_url(url) | |
title = event_data['title'] | |
image = event_data['image'] | |
raw_text = event_data['raw_text'] | |
screenshot_filename = asyncio.run(screenshot(url)) | |
description = describe_image(screenshot_filename, raw_text) | |
description_content = description['choices'][0]['message']['content'] | |
print(f'\n{description_content}\n') | |
for blocked_string in BLOCKLIST: | |
if blocked_string in description_content: | |
print(f'description contains {blocked_string}, event invalid') | |
valid = False | |
if valid: | |
email_content += f'<img src={image} style="width:100%;height:auto"><h3><a href="{url}">{title}</a></h3><p>{description_content}</p><br/>' | |
response = requests.get(image) | |
if response.status_code == 200: | |
with open('/tmp/venue_image.jpg', 'wb') as f: | |
f.write(response.content) | |
image = compress_local_image('/tmp/venue_image.jpg') | |
if valid: | |
try: | |
send_bluesky(description_content, image, url) | |
print('posted to bluesky') | |
except: | |
print('bluesky post failed') | |
try: | |
send_mastodon(description_content, image, url) | |
print('posted to mastodon') | |
except: | |
print('mastodon post failed') | |
try: | |
send_twitter(description_content, image, url) | |
print('posted to twitter') | |
except: | |
print('twitter post failed') | |
if (email_content != ''): | |
send_email('[email protected]', email_content) | |
print('...done.') | |
if __name__ == '__main__': | |
main() | |
#fin |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment