Skip to content

Instantly share code, notes, and snippets.

@nitori
Created February 5, 2023 11:37
Show Gist options
  • Save nitori/d825201d9669d028c3bc286151299d1f to your computer and use it in GitHub Desktop.
Save nitori/d825201d9669d028c3bc286151299d1f to your computer and use it in GitHub Desktop.
from multiprocessing import Process, Queue
import os
import time
from urllib.parse import urlencode, urljoin
import requests
from dotenv import load_dotenv
load_dotenv()
STREAMLABS_API_BASE = 'https://streamlabs.com/api/v1.0/'
STREAMLABS_REDIRECT_URI = os.environ['STREAMLABS_REDIRECT_URI']
STREAMLABS_CLIENT_ID = os.environ['STREAMLABS_CLIENT_ID']
STREAMLABS_CLIENT_SECRET = os.environ['STREAMLABS_CLIENT_SECRET']
def endpoint(path):
return urljoin(STREAMLABS_API_BASE, path.lstrip('/'))
def get_access_token():
if not os.path.exists('access_token.txt'):
return
with open('access_token.txt', 'r', encoding='utf-8') as f:
data = f.read().strip()
if data:
return data
def write_access_token(access_token):
with open('access_token.txt', 'w', encoding='utf-8') as f:
f.write(access_token)
def run_flask_app(q: Queue, client_id, client_secret):
from flask import Flask, request, redirect, url_for
app = Flask('oauth2')
@app.route('/end')
def end():
q.put(None)
return 'Your Request is done. Look in access_token.txt file'
@app.route('/auth')
def auth():
code = request.args.get('code')
r = requests.post(endpoint('/token'), data={
'grant_type': 'authorization_code',
'client_id': client_id,
'client_secret': client_secret,
'redirect_uri': STREAMLABS_REDIRECT_URI,
'code': code,
})
r.raise_for_status()
q.put(r.json())
return redirect(url_for('end'))
# host/port should match redirect uri
app.run(host='localhost', port=5001)
def do_oauth2_flow(client_id, client_secret):
q = Queue()
proc = Process(target=run_flask_app, args=(q, client_id, client_secret))
proc.start()
# wait for the token to get send from the process through the Queue
print('Waiting for access token ...')
json_data = q.get(timeout=120)
q.get(timeout=120)
time.sleep(.5)
proc.terminate()
proc.join()
return json_data
def main():
access_token = get_access_token()
if not access_token:
url = endpoint('/authorize')
qs = urlencode({
'response_type': 'code',
'client_id': STREAMLABS_CLIENT_ID,
'redirect_uri': STREAMLABS_REDIRECT_URI,
'scope': 'alerts.create',
})
auth_url = url + '?' + qs
print(f'+--------------------------------------------...')
print(f'| Follow this link to authenticate your user:')
print(f'| {auth_url}')
print(f'+--------------------------------------------...')
json_data = do_oauth2_flow(STREAMLABS_CLIENT_ID, STREAMLABS_CLIENT_SECRET)
access_token = json_data['access_token']
write_access_token(access_token)
print(f'Your Access Token, DO NOT SHARE: {access_token}')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment