Last active
January 26, 2025 16:21
-
-
Save jeffehobbs/57e4a692903fcdac16bb4953ae43aab4 to your computer and use it in GitHub Desktop.
record radio, transcribe it, make art
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# radiodreambot | [email protected] | |
# record radio, transcribe it, make art | |
import requests, os, shutil, configparser, datetime, time, multiprocessing, openai, tweepy, subprocess | |
from mastodon import Mastodon | |
from faster_whisper import WhisperModel | |
from atproto import Client | |
from PIL import Image | |
#STREAM_URL = 'https://live.wostreaming.net/direct/saga-wrsifmaac-ibc2' | |
STREAM_URL = 'http://icecast.radiofrance.fr/fip-midfi.mp3' | |
LOCAL_STREAM_OUTPUT_DIRECTORY = '/tmp/' | |
RUN_TIME = 60 | |
TIMESTAMP = str(datetime.datetime.now().strftime("%Y-%m-%d_%H:%M")) | |
DEBUG = False | |
# set up API keys from external config apikeys.txt file | |
SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__)) | |
config = configparser.ConfigParser() | |
config.read(SCRIPT_PATH +'/secrets.txt') | |
OPENAI_APIKEY = config.get('openai', 'api_key') | |
TWITTER_CONSUMER_KEY = config.get('twitter', 'api_key') | |
TWITTER_CONSUMER_SECRET = config.get('twitter', 'api_key_secret') | |
TWITTER_ACCESS_TOKEN = config.get('twitter', 'access_token') | |
TWITTER_ACCESS_TOKEN_SECRET = config.get('twitter', 'access_token_secret') | |
MASTODON_ACCESS_TOKEN = config.get('mastodon','access_token') | |
BLUESKY_USERNAME = config.get('bluesky','username') | |
BLUESKY_PASSWORD = config.get('bluesky','password') | |
def main(): | |
print(f'recording for {RUN_TIME} seconds...') | |
r = requests.get(STREAM_URL, stream=True) | |
with open(LOCAL_STREAM_OUTPUT_DIRECTORY + TIMESTAMP + '.mp3', 'wb') as f: | |
for block in r.iter_content(1024): | |
f.write(block) | |
return | |
def transcribe(): | |
print(f'transcribing {TIMESTAMP}.mp3 ...') | |
# tiny.en, tiny, base.en, base, small.en, small, medium.en, medium, large-v1, large-v2 | |
# adjust the model depending on how much RAM you have to toss at this | |
model_size = 'large-v2' | |
model = WhisperModel(model_size, device="cpu", compute_type="int8") | |
segments, info = model.transcribe(LOCAL_STREAM_OUTPUT_DIRECTORY + TIMESTAMP + '.mp3', beam_size=5) | |
text = '' | |
for segment in segments: | |
print(segment.text[1:]) | |
text = text + segment.text[1:] + '\n' | |
f = open(LOCAL_STREAM_OUTPUT_DIRECTORY + TIMESTAMP + '.txt', "w") | |
f.write(text) | |
f.close() | |
return(text) | |
# generate image from post text | |
# def get_openai_image(text, num_images): | |
# openai.api_key = OPENAI_APIKEY | |
# file_path = f'/tmp/{TIMESTAMP}.jpg' | |
# text = text[:400] | |
# response = openai.Image.create(prompt=text, n=num_images, size="1024x1024") | |
# image_url = response['data'][0]['url'] | |
# response = requests.get(image_url, stream=True) | |
# with open(file_path, 'wb') as out_file: | |
# shutil.copyfileobj(response.raw, out_file) | |
# del response | |
# return(file_path) | |
# generate image from post text | |
def get_openai_image(text, num_images): | |
print('...making image from text...') | |
openai.api_key = OPENAI_APIKEY | |
file_path = f'/tmp/{TIMESTAMP}.jpg' | |
response = openai.Image.create(prompt=f'{text}', model='dall-e-3', style='vivid', quality='hd', n=num_images, size="1024x1024") | |
image_url = response['data'][0]['url'] | |
response = requests.get(image_url, stream=True) | |
with open(file_path, 'wb') as out_file: | |
shutil.copyfileobj(response.raw, out_file) | |
del response | |
return(file_path) | |
def make_video_from_image_and_audio(image_path, audio_path): | |
print('...building video with image and audio...') | |
ffmpeg_command = "ffmpeg -loop 1 -i " + image_path + " -i " + audio_path + " -c:v libx264 -tune stillimage -c:a aac -b:a 192k -pix_fmt yuv420p -shortest " + audio_path.replace(".mp3",".mp4") | |
print(ffmpeg_command) | |
subprocess.call(ffmpeg_command, shell=True) | |
return audio_path.replace(".mp3",".mp4") | |
# tweet that stuff | |
def send_tweet(status, image_file_path): | |
print('...posting on twitter...') | |
media_ids = [] | |
client = tweepy.Client(consumer_key=TWITTER_CONSUMER_KEY, | |
consumer_secret=TWITTER_CONSUMER_SECRET, | |
access_token=TWITTER_ACCESS_TOKEN, | |
access_token_secret=TWITTER_ACCESS_TOKEN_SECRET) | |
auth = tweepy.OAuth1UserHandler( | |
TWITTER_CONSUMER_KEY, | |
TWITTER_CONSUMER_SECRET, | |
TWITTER_ACCESS_TOKEN, | |
TWITTER_ACCESS_TOKEN_SECRET, | |
) | |
api = tweepy.API(auth) | |
media_upload_response = api.media_upload(image_file_path) | |
media_ids.append(media_upload_response.media_id) | |
if (len(status) > 270): | |
status = status[:267] + "..." | |
response = client.create_tweet(text=status, user_auth=True, media_ids=media_ids) | |
return | |
def send_mastodon(status, image_file_path): | |
print('...posting on mastodon...') | |
mastodon = Mastodon( | |
access_token = MASTODON_ACCESS_TOKEN, | |
api_base_url = 'https://mastodon.social/' | |
) | |
media = mastodon.media_post(image_file_path, description="") | |
if (len(status) > 500): | |
status = status[:497] + "..." | |
time.sleep(60) | |
mastodon.status_post(status, media_ids=media) | |
return | |
def send_bluesky(status, image_file_path): | |
client = Client() | |
client.login(BLUESKY_USERNAME, BLUESKY_PASSWORD) | |
with open(image_file_path, 'rb') as f: | |
img_data = f.read() | |
client.send_image(text=status, image=img_data, image_alt=status) | |
return | |
if __name__ == '__main__': | |
print('starting...') | |
p = multiprocessing.Process(target=main, name="Main") | |
p.start() | |
time.sleep(RUN_TIME) | |
p.terminate() | |
p.join() | |
text = transcribe() | |
image_file = get_openai_image(text, 1) | |
image_file_jpg = image_file.replace('.png','.jpg') | |
img = Image.open(image_file) | |
img = img.convert("RGB") | |
img.save(image_file_jpg) | |
video_file = make_video_from_image_and_audio(image_file, LOCAL_STREAM_OUTPUT_DIRECTORY + TIMESTAMP + '.mp3') | |
if DEBUG: | |
print('debug mode, exiting early,') | |
else: | |
print('posting...') | |
try: | |
send_tweet(text, video_file) | |
print('posted to twitter') | |
except: | |
print('twitter error') | |
try: | |
send_mastodon(text, image_file) | |
print('posted to mastodon') | |
except: | |
print('mastodon error') | |
try: | |
print('posted to bluesky') | |
send_bluesky(text, image_file_jpg) | |
except: | |
print("bluesky error") | |
print('...end.') | |
# fin |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment