Last active
January 26, 2025 16:20
-
-
Save jeffehobbs/9d75fdfc14af13d768a5729d805d5aac to your computer and use it in GitHub Desktop.
police log bot: 1. pulls/filters RSS for search phrase, 2. scrapes page, 3. gets text, 4. builds art, 5. tweets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# policelog.py // [email protected] | |
# | |
# todo: | |
# | |
# 1. get latest police log from RSS feed | |
# 2. scrape html and get individual log entries | |
# 3. if unique: pull text from entry, redact people's names, create illustration | |
# 4. tweet text and illustration | |
# 5. mastodon too | |
# 6. also bluesky | |
import feedparser | |
from bs4 import BeautifulSoup | |
import openai, tweepy, requests, configparser, os, shutil, hashlib, random, urllib.parse | |
from mastodon import Mastodon | |
from atproto import Client | |
from PIL import Image | |
from flair.models import SequenceTagger | |
from flair.data import Sentence | |
# set up API keys from external config apikeys.txt file | |
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__)) | |
config = configparser.ConfigParser() | |
config.read(SCRIPT_PATH +'/apikeys.txt') | |
OPENAI_APIKEY = config.get('apikeys', 'openai_apikey') | |
TWITTER_CONSUMER_KEY = config.get('twitter', 'consumer_key') | |
TWITTER_CONSUMER_SECRET = config.get('twitter', 'consumer_secret') | |
TWITTER_ACCESS_TOKEN = config.get('twitter', 'access_token') | |
TWITTER_ACCESS_TOKEN_SECRET = config.get('twitter', 'access_token_secret') | |
MASTODON_ACCESS_TOKEN = config.get('mastodon', 'access_token') | |
BLUESKY_USERNAME = config.get('bluesky', 'username') | |
BLUESKY_PASSWORD = config.get('bluesky', 'password') | |
HOMEPAGE = 'https://www.recorder.com' | |
URL_SLUG = 'Police-Logs' | |
FEED = 'https://www.recorder.com/SpecialPages/RSS?feed=News' # feed URL to parse | |
SEARCH_TERM = 'Police Logs' | |
TEXT_CONTAINER_ID = 'article-text' | |
with open(SCRIPT_PATH + '/fallback.txt', 'r') as f: | |
FALLBACK = f.read() | |
PHOTO_PROMPT_PREFIX = 'A photograph without text of the following incident: ' | |
# get posts from RSS | |
def get_current_post(feed): | |
d = feedparser.parse(feed) | |
for entry in d.entries: | |
if SEARCH_TERM in entry.title: | |
with open(SCRIPT_PATH + '/fallback.txt', 'w') as f: | |
f.write(entry.link) | |
print(entry.link) | |
return(entry.link) | |
print('no search term found, falling back to last url') | |
return(FALLBACK) | |
# get content of post | |
def get_current_post_scrape(url): | |
soup = BeautifulSoup(requests.get(url).content, 'html.parser') | |
links = [] | |
for a in soup.find_all('a', href=True): | |
links.append(a['href']) | |
headlines = [] | |
for link in links: | |
if URL_SLUG in link: | |
headlines.append(str(HOMEPAGE + link)) | |
#print(headlines) | |
return(headlines[0]) | |
# get content of post | |
def get_article_content(url): | |
print(f'getting content from {url}...') | |
soup = BeautifulSoup(requests.get(url).content, 'html.parser') | |
content = soup.find("div", {"id": TEXT_CONTAINER_ID}).find_all('p') | |
paragraphs = [] | |
for graf in content: | |
paragraphs.append(graf.text) | |
return(paragraphs) | |
# redact proper nouns | |
def redact_text(text): | |
tagger = SequenceTagger.load('ner') | |
sentence = Sentence(text) | |
tagger.predict(sentence) | |
#print(sentence) | |
for entity in sentence.get_spans('ner'): | |
text_fragment = entity.text | |
label = entity.get_label('ner').value | |
print(f'{text_fragment} is: "{label}"') | |
if (label == 'PER'): | |
text = text.replace(text_fragment, '(REDACTED)') | |
print(text) | |
return text | |
# generate image from post text | |
def get_openai_image(text, num_images): | |
openai.api_key = OPENAI_APIKEY | |
response = openai.Image.create(prompt=f'{PHOTO_PROMPT_PREFIX}{text}', model='dall-e-3', style='vivid', quality='hd', n=num_images, size="1024x1024") | |
image_url = response['data'][0]['url'] | |
return(image_url) | |
# tweet that stuff | |
def send_tweet(status, image_file_path, url): | |
media_ids = [] | |
tweet = status + ' ' + url | |
client = tweepy.Client(consumer_key=TWITTER_CONSUMER_KEY, | |
consumer_secret=TWITTER_CONSUMER_SECRET, | |
access_token=TWITTER_ACCESS_TOKEN, | |
access_token_secret=TWITTER_ACCESS_TOKEN_SECRET) | |
auth = tweepy.OAuth1UserHandler( | |
TWITTER_CONSUMER_KEY, | |
TWITTER_CONSUMER_SECRET, | |
TWITTER_ACCESS_TOKEN, | |
TWITTER_ACCESS_TOKEN_SECRET, | |
) | |
api = tweepy.API(auth) | |
media_upload_response = api.media_upload(image_file_path) | |
media_ids.append(media_upload_response.media_id) | |
if (len(status) > 256): | |
status = status[:253] + "..." | |
tweet_text = status + " " + url | |
response = client.create_tweet(text=tweet, user_auth=True, media_ids=media_ids) | |
return | |
def send_mastodon(status, image_file_path, url): | |
mastodon = Mastodon( | |
access_token = MASTODON_ACCESS_TOKEN, | |
api_base_url = 'https://mastodon.social/' | |
) | |
media = mastodon.media_post(image_file_path, description=status) | |
post = status + ' ' + url | |
mastodon.status_post(post, media_ids=media) | |
return | |
def send_bluesky(status, image_file_path, url): | |
client = Client() | |
client.login(BLUESKY_USERNAME, BLUESKY_PASSWORD) | |
with open(image_file_path, 'rb') as f: | |
img_data = f.read() | |
client.send_image(text=status, image=img_data, image_alt=status) | |
return | |
# get posts, get post content, check if post has been made before; if not, generate art & tweet it | |
def main(): | |
print("---") | |
url = get_current_post(FEED) | |
#url = get_current_post_scrape(HOMEPAGE) | |
print(f'newest post: {url}') | |
print("---") | |
content = get_article_content(url) | |
#print(f'content : {content}') | |
#print("---") | |
random_graf = random.choice(content) | |
redacted_text = redact_text(random_graf) | |
print(redacted_text) | |
print("---") | |
file_hash = hashlib.md5(str(random_graf).encode('utf-8')).hexdigest() | |
file_path = SCRIPT_PATH + '/output/' + file_hash + '.png' | |
print(f"file path : {file_path}") | |
file_exists = os.path.isfile(file_path) | |
if not file_exists: | |
image_url = get_openai_image(str(random_graf), 1) | |
response = requests.get(image_url, stream=True) | |
with open(file_path, 'wb') as out_file: | |
shutil.copyfileobj(response.raw, out_file) | |
del response | |
file_path_jpg = file_path.replace('.png','.jpg') | |
img = Image.open(file_path) | |
img = img.convert("RGB") | |
img.save(file_path_jpg) | |
try: | |
send_tweet(redacted_text, file_path, url) | |
print("sent tweet") | |
except: | |
print("error tweet") | |
pass | |
try: | |
send_mastodon(redacted_text, file_path, url) | |
print("sent mastodon") | |
except: | |
print("error mastodon") | |
pass | |
try: | |
send_bluesky(redacted_text, file_path_jpg, url) | |
print("sent bluesky") | |
except: | |
print("error bluesky") | |
pass | |
os.remove(file_path) | |
os.remove(file_path_jpg) | |
exit() | |
else: | |
print('file exists!') | |
exit() | |
if __name__ == '__main__': | |
main() | |
#fin |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment