Skip to content

Instantly share code, notes, and snippets.

@jeffehobbs
Last active July 13, 2025 19:28
Show Gist options
  • Save jeffehobbs/666351c5f577cc7adc22a90a48f2a1ce to your computer and use it in GitHub Desktop.
Save jeffehobbs/666351c5f577cc7adc22a90a48f2a1ce to your computer and use it in GitHub Desktop.
curbalertbot - filters free stuff by location and tweets new items. https://twitter.com/CurbAlertBot
# curbalert 2025 | [email protected]
# refactored to use playwright
from playwright.sync_api import sync_playwright
import requests, time, os, tweepy, hashlib, configparser, logging
from mastodon import Mastodon
from atproto import Client, client_utils
# globals
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
INDEX_URL = 'https://westernmass.craigslist.org/search/zip?hasPic=1#search=1~list~0~0'
LOCATIONS = ['amherst','ashfield','easthampton','conway','deerfield','florence','goshen','greenfield','hatfield','holyoke','leeds','montague','northampton','shelburne','south deerfield','southampton','sunderland','west hatfield','whately','williamsburg']
HIGH_ALERTS = ['bookshelf','bookshelves','bookcase','bookcases','bricks','laptop','monitor','computer','tablet','phone','air conditioner']
DEBUG = False
# set up API keys from external config apikeys.txt file
config = configparser.ConfigParser()
config.read(SCRIPT_PATH +'/apikeys.txt')
TWITTER_CONSUMER_KEY = config.get('twitter', 'consumer_key')
TWITTER_CONSUMER_SECRET = config.get('twitter', 'consumer_secret')
TWITTER_ACCESS_TOKEN = config.get('twitter', 'access_token')
TWITTER_ACCESS_TOKEN_SECRET = config.get('twitter', 'access_token_secret')
PUSHOVER_USER_KEY = config.get('pushover', 'user_key')
PUSHOVER_APP_TOKEN = config.get('pushover', 'app_token')
MASTODON_ACCESS_TOKEN = config.get('mastodon', 'access_token')
BLUESKY_USERNAME = config.get('bluesky', 'username')
BLUESKY_PASSWORD = config.get('bluesky', 'password')
logging.basicConfig(filename=SCRIPT_PATH + '/log.txt', level=logging.INFO, filemode='a', format='%(asctime)s - %(levelname)s - %(message)s')
def get_CL_index():
with sync_playwright() as p:
print('loading browser...')
browser = p.webkit.launch()
page = browser.new_page()
print(f'loading {INDEX_URL}...')
page.goto(INDEX_URL)
for i in range(1):
print('scroll...')
page.mouse.wheel(0, 15000)
time.sleep(2)
playwright_divs = page.locator('div.result-data')
count = playwright_divs.count()
print(f'found {count} items...')
individual_playwright_divs = playwright_divs.all()
data = []
for item in individual_playwright_divs:
meta = item.locator('div.meta').inner_html()
metadata = meta.split('<span class="separator">')
location = metadata[1].replace('</span>','')
if 'priceinfo' in location:
continue
url = item.locator('a').get_attribute("href")
post_id = hashlib.md5(str(url).encode('utf-8')).hexdigest()
title = item.locator('span.label').inner_html()
if DEBUG:
print(url)
print(post_id)
print(title)
print(location)
print('---')
# used to populate history on first run
#file_path = SCRIPT_PATH + '/history/' + post_id + '.html'
#with open(file_path, "w", encoding="utf-8") as f:
# f.write('')
data.append({'title': title.strip(), 'url': url, 'id': post_id, 'location': location})
browser.close()
return(data)
def get_CL_article(url):
with sync_playwright() as p:
print(f'loading {url}...')
browser = p.webkit.launch()
page = browser.new_page()
page.goto(url)
og_image_tag = page.locator('meta[property="og:image"]')
og_image_url = og_image_tag.get_attribute("content")
text_div = page.locator('#postingbody').inner_html()
text_div = text_div.split('</canvas></div>\n </div>\n')
text_div = text_div[1].replace('<br>','').replace('\n','').rstrip()
print(og_image_url)
print(text_div)
return(og_image_url, text_div)
def get_image_locally(url, id):
file_path = SCRIPT_PATH + '/images/' + id + '.jpg'
r = requests.get(url)
with open(file_path, 'wb') as f:
f.write(r.content)
return(file_path)
# send a pushover push
def send_pushover(status, image_file_path, url):
r = requests.post("https://api.pushover.net/1/messages.json", data = {
"token": PUSHOVER_APP_TOKEN,
"user": PUSHOVER_USER_KEY,
"message": status,
"url": url,
"url_title": "More info",
"priority": "1"
},
files = {
"attachment": ("image.jpg", open(image_file_path, "rb"), "image/jpeg")
})
print(r.text)
return
# tweet that shit
def send_tweet(status, image_file_path, url):
media_ids = []
if (len(status) > 256):
status = status[:240] + "..."
tweet = status + ' ' + url
client = tweepy.Client(consumer_key=TWITTER_CONSUMER_KEY,
consumer_secret=TWITTER_CONSUMER_SECRET,
access_token=TWITTER_ACCESS_TOKEN,
access_token_secret=TWITTER_ACCESS_TOKEN_SECRET)
auth = tweepy.OAuth1UserHandler(
TWITTER_CONSUMER_KEY,
TWITTER_CONSUMER_SECRET,
TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_TOKEN_SECRET,
)
api = tweepy.API(auth)
media_upload_response = api.media_upload(image_file_path)
media_ids.append(media_upload_response.media_id)
response = client.create_tweet(text=tweet, user_auth=True, media_ids=media_ids)
return
# send it to mastodon, why not
def send_mastodon(status, image_file_path, url):
post = status + ' ' + url
mastodon = Mastodon(
access_token = MASTODON_ACCESS_TOKEN,
api_base_url = 'https://mastodon.social/'
)
media = mastodon.media_post(image_file_path, description=status)
mastodon.status_post(post, media_ids=media)
return
def send_bluesky(status, image_file_path, url):
print('posting to bluesky...')
client = Client()
client.login(BLUESKY_USERNAME, BLUESKY_PASSWORD)
with open(image_file_path, 'rb') as f:
img_data = f.read()
status = status + ' '
text = client_utils.TextBuilder().text(status + ' ').link('More info', url)
client.send_image(text=text, image=img_data, image_alt=status)
return
# main logic
def main():
if DEBUG:
print('starting in debug mode...')
posts = get_CL_index()
for post in posts:
if DEBUG:
print(f"evaluating {post['title']}...")
if post['location'].lower() in LOCATIONS:
if DEBUG:
print(f"evaluating {post['title']} in {post['location']}...")
image_file_path = SCRIPT_PATH + '/images/' + post['id'] + '.jpg'
file_exists = os.path.isfile(image_file_path)
if DEBUG:
print(f"stub file {post['id']} exists")
if not file_exists:
print('new item, getting image and text...')
#print(file_exists)
print(post['title'] + ': ' + post['location'])
#print(image_file_path)
post['image'], post['text'] = get_CL_article(post['url'])
image_file_path = get_image_locally(post['image'], post['id'])
for item in HIGH_ALERTS:
if item.lower() in post['text'].lower():
send_pushover((post['location'].upper() + ': ' + post['text']), image_file_path, post['url'])
print('sending pushover alert...')
# give myself a 10-minute advantage :)
time.sleep(600)
if not DEBUG:
try:
send_bluesky((post['location'].upper() + ': ' + post['text'])[:250], image_file_path, post['url'])
except:
print('post to bluesky failed')
try:
send_mastodon((post['location'].upper() + ': ' + post['text'])[:450], image_file_path, post['url'])
except:
print('post to bluesky failed')
try:
send_tweet((post['location'].upper() + ': ' + post['text'])[:230], image_file_path, post['url'])
except:
print('post to twitter failed')
print('...done.')
if __name__ == '__main__':
main()
#fin
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment