Last active
July 13, 2025 19:28
-
-
Save jeffehobbs/666351c5f577cc7adc22a90a48f2a1ce to your computer and use it in GitHub Desktop.
curbalertbot - filters free stuff by location and tweets new items. https://twitter.com/CurbAlertBot
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# curbalert 2025 | [email protected] | |
# refactored to use playwright | |
from playwright.sync_api import sync_playwright | |
import requests, time, os, tweepy, hashlib, configparser, logging | |
from mastodon import Mastodon | |
from atproto import Client, client_utils | |
# globals | |
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__)) | |
INDEX_URL = 'https://westernmass.craigslist.org/search/zip?hasPic=1#search=1~list~0~0' | |
LOCATIONS = ['amherst','ashfield','easthampton','conway','deerfield','florence','goshen','greenfield','hatfield','holyoke','leeds','montague','northampton','shelburne','south deerfield','southampton','sunderland','west hatfield','whately','williamsburg'] | |
HIGH_ALERTS = ['bookshelf','bookshelves','bookcase','bookcases','bricks','laptop','monitor','computer','tablet','phone','air conditioner'] | |
DEBUG = False | |
# set up API keys from external config apikeys.txt file | |
config = configparser.ConfigParser() | |
config.read(SCRIPT_PATH +'/apikeys.txt') | |
TWITTER_CONSUMER_KEY = config.get('twitter', 'consumer_key') | |
TWITTER_CONSUMER_SECRET = config.get('twitter', 'consumer_secret') | |
TWITTER_ACCESS_TOKEN = config.get('twitter', 'access_token') | |
TWITTER_ACCESS_TOKEN_SECRET = config.get('twitter', 'access_token_secret') | |
PUSHOVER_USER_KEY = config.get('pushover', 'user_key') | |
PUSHOVER_APP_TOKEN = config.get('pushover', 'app_token') | |
MASTODON_ACCESS_TOKEN = config.get('mastodon', 'access_token') | |
BLUESKY_USERNAME = config.get('bluesky', 'username') | |
BLUESKY_PASSWORD = config.get('bluesky', 'password') | |
logging.basicConfig(filename=SCRIPT_PATH + '/log.txt', level=logging.INFO, filemode='a', format='%(asctime)s - %(levelname)s - %(message)s') | |
def get_CL_index(): | |
with sync_playwright() as p: | |
print('loading browser...') | |
browser = p.webkit.launch() | |
page = browser.new_page() | |
print(f'loading {INDEX_URL}...') | |
page.goto(INDEX_URL) | |
for i in range(1): | |
print('scroll...') | |
page.mouse.wheel(0, 15000) | |
time.sleep(2) | |
playwright_divs = page.locator('div.result-data') | |
count = playwright_divs.count() | |
print(f'found {count} items...') | |
individual_playwright_divs = playwright_divs.all() | |
data = [] | |
for item in individual_playwright_divs: | |
meta = item.locator('div.meta').inner_html() | |
metadata = meta.split('<span class="separator">') | |
location = metadata[1].replace('</span>','') | |
if 'priceinfo' in location: | |
continue | |
url = item.locator('a').get_attribute("href") | |
post_id = hashlib.md5(str(url).encode('utf-8')).hexdigest() | |
title = item.locator('span.label').inner_html() | |
if DEBUG: | |
print(url) | |
print(post_id) | |
print(title) | |
print(location) | |
print('---') | |
# used to populate history on first run | |
#file_path = SCRIPT_PATH + '/history/' + post_id + '.html' | |
#with open(file_path, "w", encoding="utf-8") as f: | |
# f.write('') | |
data.append({'title': title.strip(), 'url': url, 'id': post_id, 'location': location}) | |
browser.close() | |
return(data) | |
def get_CL_article(url): | |
with sync_playwright() as p: | |
print(f'loading {url}...') | |
browser = p.webkit.launch() | |
page = browser.new_page() | |
page.goto(url) | |
og_image_tag = page.locator('meta[property="og:image"]') | |
og_image_url = og_image_tag.get_attribute("content") | |
text_div = page.locator('#postingbody').inner_html() | |
text_div = text_div.split('</canvas></div>\n </div>\n') | |
text_div = text_div[1].replace('<br>','').replace('\n','').rstrip() | |
print(og_image_url) | |
print(text_div) | |
return(og_image_url, text_div) | |
def get_image_locally(url, id): | |
file_path = SCRIPT_PATH + '/images/' + id + '.jpg' | |
r = requests.get(url) | |
with open(file_path, 'wb') as f: | |
f.write(r.content) | |
return(file_path) | |
# send a pushover push | |
def send_pushover(status, image_file_path, url): | |
r = requests.post("https://api.pushover.net/1/messages.json", data = { | |
"token": PUSHOVER_APP_TOKEN, | |
"user": PUSHOVER_USER_KEY, | |
"message": status, | |
"url": url, | |
"url_title": "More info", | |
"priority": "1" | |
}, | |
files = { | |
"attachment": ("image.jpg", open(image_file_path, "rb"), "image/jpeg") | |
}) | |
print(r.text) | |
return | |
# tweet that shit | |
def send_tweet(status, image_file_path, url): | |
media_ids = [] | |
if (len(status) > 256): | |
status = status[:240] + "..." | |
tweet = status + ' ' + url | |
client = tweepy.Client(consumer_key=TWITTER_CONSUMER_KEY, | |
consumer_secret=TWITTER_CONSUMER_SECRET, | |
access_token=TWITTER_ACCESS_TOKEN, | |
access_token_secret=TWITTER_ACCESS_TOKEN_SECRET) | |
auth = tweepy.OAuth1UserHandler( | |
TWITTER_CONSUMER_KEY, | |
TWITTER_CONSUMER_SECRET, | |
TWITTER_ACCESS_TOKEN, | |
TWITTER_ACCESS_TOKEN_SECRET, | |
) | |
api = tweepy.API(auth) | |
media_upload_response = api.media_upload(image_file_path) | |
media_ids.append(media_upload_response.media_id) | |
response = client.create_tweet(text=tweet, user_auth=True, media_ids=media_ids) | |
return | |
# send it to mastodon, why not | |
def send_mastodon(status, image_file_path, url): | |
post = status + ' ' + url | |
mastodon = Mastodon( | |
access_token = MASTODON_ACCESS_TOKEN, | |
api_base_url = 'https://mastodon.social/' | |
) | |
media = mastodon.media_post(image_file_path, description=status) | |
mastodon.status_post(post, media_ids=media) | |
return | |
def send_bluesky(status, image_file_path, url): | |
print('posting to bluesky...') | |
client = Client() | |
client.login(BLUESKY_USERNAME, BLUESKY_PASSWORD) | |
with open(image_file_path, 'rb') as f: | |
img_data = f.read() | |
status = status + ' ' | |
text = client_utils.TextBuilder().text(status + ' ').link('More info', url) | |
client.send_image(text=text, image=img_data, image_alt=status) | |
return | |
# main logic | |
def main(): | |
if DEBUG: | |
print('starting in debug mode...') | |
posts = get_CL_index() | |
for post in posts: | |
if DEBUG: | |
print(f"evaluating {post['title']}...") | |
if post['location'].lower() in LOCATIONS: | |
if DEBUG: | |
print(f"evaluating {post['title']} in {post['location']}...") | |
image_file_path = SCRIPT_PATH + '/images/' + post['id'] + '.jpg' | |
file_exists = os.path.isfile(image_file_path) | |
if DEBUG: | |
print(f"stub file {post['id']} exists") | |
if not file_exists: | |
print('new item, getting image and text...') | |
#print(file_exists) | |
print(post['title'] + ': ' + post['location']) | |
#print(image_file_path) | |
post['image'], post['text'] = get_CL_article(post['url']) | |
image_file_path = get_image_locally(post['image'], post['id']) | |
for item in HIGH_ALERTS: | |
if item.lower() in post['text'].lower(): | |
send_pushover((post['location'].upper() + ': ' + post['text']), image_file_path, post['url']) | |
print('sending pushover alert...') | |
# give myself a 10-minute advantage :) | |
time.sleep(600) | |
if not DEBUG: | |
try: | |
send_bluesky((post['location'].upper() + ': ' + post['text'])[:250], image_file_path, post['url']) | |
except: | |
print('post to bluesky failed') | |
try: | |
send_mastodon((post['location'].upper() + ': ' + post['text'])[:450], image_file_path, post['url']) | |
except: | |
print('post to bluesky failed') | |
try: | |
send_tweet((post['location'].upper() + ': ' + post['text'])[:230], image_file_path, post['url']) | |
except: | |
print('post to twitter failed') | |
print('...done.') | |
if __name__ == '__main__': | |
main() | |
#fin |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment