Skip to content

Instantly share code, notes, and snippets.

@jeffehobbs
Last active March 1, 2025 23:47
Show Gist options
  • Save jeffehobbs/e19a9c5b61fea35a89f81d617667745e to your computer and use it in GitHub Desktop.
Save jeffehobbs/e19a9c5b61fea35a89f81d617667745e to your computer and use it in GitHub Desktop.
venue event scraper: watches events sites and reports new events
# venue.py | [email protected]
#
# watches events sites and reports new events
#
# [X] get list of sitemaps
# [X] parse sitemaps
# [X] get existing data from sqlite db
# [X] compare listings to existing data from sqlite
# [X] save data in sqlite db
# [X] make list of new pages
# [X] scrape html of page
# [X] screenshot page
# [X] parse screenshot with AI
# [X] email summary of new events
# [X] post to bluesky, mastodon, twitter
# [ ] introduce bluesky tagging
import requests, json, xmltodict, sqlite3, asyncio, hashlib, base64, os, configparser, time, tweepy
from mastodon import Mastodon
from datetime import datetime
from trafilatura import fetch_url, extract
from pyppeteer import launch
from PIL import Image
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
from atproto import Client, client_utils
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
config = configparser.ConfigParser(interpolation=None)
config.read(SCRIPT_PATH +'/apikeys.txt')
SENDGRID_APIKEY = config.get('sendgrid', 'apikey')
OPENAI_APIKEY = config.get('openai', 'apikey')
BLUESKY_USERNAME = config.get('bluesky','username')
BLUESKY_PASSWORD = config.get('bluesky','password')
MASTODON_ACCESS_TOKEN = config.get('mastodon', 'access_token')
TWITTER_CONSUMER_KEY = config.get('apikey', 'key')
TWITTER_CONSUMER_SECRET = config.get('apikey', 'secret')
TWITTER_ACCESS_TOKEN = config.get('token', 'access_token')
TWITTER_ACCESS_TOKEN_SECRET = config.get('token', 'access_token_secret')
MODEL = "gpt-4o-mini"
PROMPT = """Describe factually the event that is in this image and text in less than 240 characters.
Give details about time, date, price and anything else useful to know.
Text description of event: """
BLOCKLIST = ['2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018', '2019', '2020', '2021', '2022', '2023', '2024']
MAX_TOKENS = 500
DEBUG_MODE = False
def create_database():
try:
conn = sqlite3.connect(SCRIPT_PATH + '/venue_data.sqlite')
cur = conn.cursor()
cur.execute('CREATE TABLE events (loc VARCHAR, lastmod VARCHAR, PRIMARY KEY (loc) ON CONFLICT REPLACE)')
conn.commit()
print('database created')
except:
print('database exists')
pass
def get_venues():
sitemaps = []
f = open(SCRIPT_PATH + '/venues.txt','r')
lines = f.readlines()
for line in lines:
venue, url = line.split('|')
if (venue[0] != '#'):
sitemaps.append({'venue': venue, 'url': url.strip()})
f.close()
return(sitemaps)
def parse_sitemaps(url):
print(f'checking {url}')
r = requests.get(url)
data = xmltodict.parse(r.text)
return(data)
def get_database_data():
conn = sqlite3.connect(SCRIPT_PATH + '/venue_data.sqlite')
cur = conn.cursor()
cur.execute('SELECT * FROM events')
data = cur.fetchall();
conn.close()
return(data)
def save_sitemap_data(data):
conn = sqlite3.connect(SCRIPT_PATH + '/venue_data.sqlite')
for event in data['urlset']['url']:
cur = conn.cursor()
event_loc = "'" + event['loc'] + "'"
try:
event_lastmod = "'" + event['lastmod'] + "'"
except:
event_lastmod = "''"
cur.execute(f"INSERT INTO events (loc, lastmod) values ({event_loc},{event_lastmod})")
conn.commit()
conn.close()
return
def scrape_url(url):
print(f'scraping {url}')
downloaded_url = fetch_url(url)
result = extract(downloaded_url, output_format='json', with_metadata=True)
decoded_result = json.loads(result)
print(json.dumps(decoded_result, indent=4))
return(decoded_result)
async def screenshot(url):
print(f'screenshotting {url}')
browser = await launch({
'executablePath':'/usr/bin/chromium',
'autoClose': False,
'args': ['--load-extension=' + SCRIPT_PATH + '/extension/uBlock.zip']
})
page = await browser.newPage()
await page.setViewport({'width': 1280, 'height': 720})
await page.goto(url,{
'waitUntil': 'networkidle0'}
)
hasher = hashlib.sha1(url.encode('utf-8'))
screenshot_filename = SCRIPT_PATH + "/screenshots/" + str(base64.urlsafe_b64encode(hasher.digest())) + ".jpg"
await page.waitFor(5000);
await page.screenshot(path=screenshot_filename, fullPage=True)
await browser.close()
return(screenshot_filename)
def describe_image(filename, raw_text):
print(f'describing {filename} screenshot')
with open(filename, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode('utf-8')
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAI_APIKEY}"
}
payload = {
"model": MODEL,
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": PROMPT + ' ' + raw_text
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
"detail": "auto"
}
}
]
}
],
"max_tokens": MAX_TOKENS
}
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
#print(json.dumps(response.json(), indent=4))
return(response.json())
def send_email(recipient, html):
print(f'sending email to {recipient}')
subject = '🤖 Events found ' + str(datetime.today().strftime('%B %-d, %Y'))
message = Mail(
from_email="🤖 VENUEBOT<[email protected]>",
to_emails=recipient,
subject=subject,
html_content=html)
try:
sg = SendGridAPIClient(SENDGRID_APIKEY)
response = sg.send(message)
print(f'email sent with {response.status_code}')
except Exception as e:
print(str(e))
def send_bluesky(status, image, url):
print('sending post to bluesky')
client = Client()
client.login(BLUESKY_USERNAME, BLUESKY_PASSWORD)
with open('/tmp/venue_image.jpg', 'rb') as f:
img_data = f.read()
if (len(status) > 250):
status = status[:247] + "..."
status = status + ' '
text = client_utils.TextBuilder().text(status).link('More info', url)
client.send_image(text=text, image=img_data, image_alt=status)
return
def send_mastodon(status, image_file_path, url):
print('sending post to mastodon')
mastodon = Mastodon(
access_token = MASTODON_ACCESS_TOKEN,
api_base_url = 'https://mastodon.social/'
)
media = mastodon.media_post(image_file_path, description=status)
mastodon.status_post(status + ' ' + url, media_ids=media)
return
def send_twitter(status, image_file_path, url):
print('sending post to twitter')
media_ids = []
if (len(status) > 140):
status = status[:140] + "..."
status = status + ' ' + url
client = tweepy.Client(consumer_key=TWITTER_CONSUMER_KEY,
consumer_secret=TWITTER_CONSUMER_SECRET,
access_token=TWITTER_ACCESS_TOKEN,
access_token_secret=TWITTER_ACCESS_TOKEN_SECRET)
auth = tweepy.OAuth1UserHandler(
TWITTER_CONSUMER_KEY,
TWITTER_CONSUMER_SECRET,
TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_TOKEN_SECRET,
)
api = tweepy.API(auth)
media_upload_response = api.media_upload(image_file_path)
media_ids.append(media_upload_response.media_id)
if (len(status) > 270):
status = status[:267] + "..."
response = client.create_tweet(text=status, user_auth=True, media_ids=media_ids)
return
def compress_local_image(image_url):
print('compressing image...')
image = Image.open(image_url)
image.convert('RGB').save(image_url, optimize=True, quality=75)
return(image_url)
# main process
def main():
create_database()
sitemaps = get_venues()
previous_data = get_database_data()
print(f'{len(previous_data)} previous records found')
for sitemap in sitemaps:
current_data = parse_sitemaps(sitemap['url'])
#print(json.dumps(current_data, indent=4))
save_sitemap_data(current_data)
# at this point the db is updated, so let's find what's new
current_data = get_database_data()
print(f'{len(current_data)} current records found')
delta = list(set(current_data) - set(previous_data))
email_content = ''
for new_item in delta:
valid = True
url = new_item[0]
for blocked_string in BLOCKLIST:
if blocked_string in url:
print(f'url contains {blocked_string}, event invalid')
valid = False
event_data = scrape_url(url)
title = event_data['title']
image = event_data['image']
raw_text = event_data['raw_text']
screenshot_filename = asyncio.run(screenshot(url))
description = describe_image(screenshot_filename, raw_text)
description_content = description['choices'][0]['message']['content']
print(f'\n{description_content}\n')
for blocked_string in BLOCKLIST:
if blocked_string in description_content:
print(f'description contains {blocked_string}, event invalid')
valid = False
if valid:
email_content += f'<img src={image} style="width:100%;height:auto"><h3><a href="{url}">{title}</a></h3><p>{description_content}</p><br/>'
response = requests.get(image)
if response.status_code == 200:
with open('/tmp/venue_image.jpg', 'wb') as f:
f.write(response.content)
image = compress_local_image('/tmp/venue_image.jpg')
if valid:
try:
send_bluesky(description_content, image, url)
print('posted to bluesky')
except:
print('bluesky post failed')
try:
send_mastodon(description_content, image, url)
print('posted to mastodon')
except:
print('mastodon post failed')
try:
send_twitter(description_content, image, url)
print('posted to twitter')
except:
print('twitter post failed')
if (email_content != ''):
send_email('[email protected]', email_content)
print('...done.')
if __name__ == '__main__':
main()
#fin
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment