Skip to content

Instantly share code, notes, and snippets.

@livibetter
Last active September 17, 2016 03:27
Show Gist options
  • Save livibetter/614a080fd08e05d86bb6 to your computer and use it in GitHub Desktop.
Save livibetter/614a080fd08e05d86bb6 to your computer and use it in GitHub Desktop.
Check Twitch.tv live streams and videos

chk-jtv-lives

Some time after 2012, my old script chk-jtv-lives.sh had stopped working, didn't try to fix it, and still don't. It's from my dotfiles/bin.

Instead, I wrote a new script in Python 3, chk-jtv-lives.py with Twitch.tv API v3 and Justin.tv API, It's no longer check username's favorites, a list of channel IDs and/or URLs has to be supplied.

My primary need isn't actually to check if a stream is live, but more often to know how long the channel has been streaming.

There is also a script called lst-ttv-videos.py for listing broadcast videos.

In August, 2014, Justin.tv was discontinued, I decided to keep "jtv" in script name.

https://cdn.rawgit.com/livibetter/614a080fd08e05d86bb6/raw/651f923333b6983d09a0097da9be705b2bd8ff8a/chk-jtv-lives.py.png

Just for historical record.

https://cdn.rawgit.com/livibetter/614a080fd08e05d86bb6/raw/896daaa192ef220726d3c8925f0529bdb4b88f2b/chk-jtv-lives.sh.png

Checking Twitch.tv live streams:

chk-jtv-lives.py [OPTIONS] CHANNEL [CHANNEL ...]
argument description default
-h --help show help message and exit  
-m INT --monitor INT monitoring mode, run constantly 0 (m)
-1 --one one stream per line False
-g --game
search by game,
CHANNEL arguments would be treated as name of game
False
-l LIMIT --limit LIMIT maximum number of streams 25
-o OFFSET --offset OFFSET offset of first stream 0
-s SORT --sort SORT sorting key (s)
-r REV --reverse REV sort in reverse order (r)
CHANNEL channel ID or URL, or game if --game is used  

Notes

(m)

The interval INT of checking. 0 (default) means non-monitoring mode.

https://cdn.rawgit.com/livibetter/614a080fd08e05d86bb6/raw/f13409465610cb42a1293a720d39e5c356e44815/chk-jtv-lives.py.monitor.gif

This feature monitors:

  1. streams go online since last check,
  2. streams go offline since last check, and
  3. streams update game and/or status since last check.

Streams list is in same format as in non-monitoring mode.

It also displays:

  1. live count at last check and
  2. countdown timer in relative time format and date/time of next check.
(s)
None; viewers if --game is used.
(r)
Valid values are default, true, and false; default is true if --game is used; othewise, false.
$ chk-jtv-lives.py Day9tv dreamhack
$ chk-jtv-lives.py -1 http://www.twitch.tv/gsl sc2proleague
$ chk-jtv-lives.py --limit 5 -g StarCraft II: Heart of the Swarm
$ chk-jtv-lives.py -l5 -s viewers -r false g Counter-Strike: Global Offensive
0 (EX_OK):
When there is at least one live channel
69 (EX_UNAVAILABLE):
No live channels

Listing videos of a Twitch.tv channel:

lst-ttv-videos.py [OPTIONS] CHANNEL
-h --help show help message and exit  
-1 --one one video per line False
-l LIMIT --limit LIMIT maximum number of videos 10
-o OFFSET --offset OFFSET offset of first video 0
-b --broadcast only broadcasts False
CHANNEL channel ID or URL  
  • Timezone

    Timestamp is parsed and converted to local time zone using the following libraries:

The contents of this repository are written by Yu-Jie Lin and have been placed in Public Domain or via WTFPL Version 2, see COPYING.

#!/usr/bin/env python3
# Check Twitch.tv live streams
# Written by Yu-Jie Lin
# WTFPL Version 2, see COPYING
import argparse
import json
import os
import re
import sys
import time
from datetime import datetime
from textwrap import fill
from urllib import request
from urllib.parse import quote_plus
import pytz
from dateutil.parser import parse
from dateutil.tz import tzlocal
__description__ = 'Check Twitch.tv live streams'
# Twitch API GET /streams http://git.io/Z2ZRqQ
API_BASE = 'https://api.twitch.tv/kraken/streams'
INDENT_SIZE = 4
INDENT = ' ' * INDENT_SIZE
FMT = (
'\033[1;32m{name:{lens[name]}}\033[0m '
'\033[1;31m{viewers_str:>{lens[viewers_str]}}\033[0m '
'\033[1;33m{up_time_since:>{lens[up_time_since]}}\033[0m{PAD_C1} '
'{up_time}\n'
'{url:{lens[url]}}{PAD_url} '
'\033[1;34m{game}\033[0m\n'
'\n'
'{wrapped_status}\n'
)
FMT_ONE = (
'http://{url_one}/\033[1;32m{name_one:{lens[name]}}\033[0m '
'\033[1;31m{viewers_str:>{lens[viewers_str]}}\033[0m '
'\033[1;33m{up_time_since:>{lens[up_time_since]}}\033[0m '
'{status_one}'
)
FMT_TIME = '{h:d}h {m:2d}m {s:2d}s'
FMT_TIME_ONE = '{h:d}:{m:02d}:{s:02d}'
RE_WHITESPACES = re.compile(r'\s{2,}')
def relative_time(d, FMT='{h:2d}h {m:2d}m {s:2d}s'):
t = abs(d.seconds if hasattr(d, 'seconds') else int(d))
s = t % 60
m = t // 60 % 60
h = t // 3600 + (d.days if hasattr(d, 'days') else 0) * 24
return FMT.format(**locals())
def extract_stream(stream):
channel = stream['channel']
stream['name'] = channel['display_name']
stream['status'] = channel['status'].strip()
stream['url'] = channel['url']
stream['viewers_str'] = '{:,}'.format(stream['viewers'])
return stream['name'], stream
def main():
parser = argparse.ArgumentParser(description=__description__)
parser.add_argument('-m', '--monitor', default=0, type=int, metavar='MON',
help=('monitoring mode, run constantly '
'(Default %(default)s)'))
parser.add_argument('-1', '--one', action='store_true',
help='one stream per line')
parser.add_argument('-g', '--game', action='store_true',
help=('search by game, CHANNEL arguments would be '
'treated as name of game'))
parser.add_argument('-l', '--limit', default=25, type=int,
help='maximum number of streams (Default: %(default)s)')
parser.add_argument('-o', '--offset', default=0, type=int,
help='offset of first stream (Default: %(default)s)')
parser.add_argument('-s', '--sort',
help='sorting key (Default: None; viewers if --game')
parser.add_argument('-r', '--reverse', default='default',
choices=('default', 'true', 'false'),
help=('sort in reverse order '
'(Default: false; true if --game)'))
parser.add_argument('channels', metavar='CHANNEL', nargs='+',
help='channel ID or URL, or game if --game is used')
args = parser.parse_args()
if args.game:
URL = API_BASE + '?game=' + quote_plus(' '.join(args.channels))
if args.sort is None:
args.sort = 'viewers'
else:
m = map(lambda ch: ch.rsplit('/', 1)[-1].lower(), args.channels)
URL = API_BASE + '?channel=' + ','.join(m)
URL += '&limit=%d&offset=%d' % (args.limit, args.offset)
args.reverse = (args.game if args.reverse == 'default' else
args.reverse == 'true')
if not args.monitor:
streams = get_live_streams(args, URL)
if not streams:
return os.EX_UNAVAILABLE
show_streams(args, streams)
else:
try:
monitor(args, URL)
except KeyboardInterrupt:
print()
return os.EX_OK
def monitor(args, URL):
old_streams = {}
k_old_streams = set(old_streams.keys())
while True:
t = time.asctime(time.localtime())
streams = get_live_streams(args, URL)
k_streams = set(streams.keys())
new_streams = {k: streams[k] for k in k_streams - k_old_streams}
off_streams = {k: old_streams[k] for k in k_old_streams - k_streams}
upd_streams = {
key: streams[key]
for key in k_streams & k_old_streams
if streams[key]['status'] != old_streams[key]['status'] or
streams[key]['game'] != old_streams[key]['game']
}
TPL_MSG = '\033[1;%%dm=== %%s at %s ===\033[0m' % t
monitor_list_streams(args, new_streams, TPL_MSG % (32, 'Online'))
monitor_list_streams(args, off_streams, TPL_MSG % (31, 'Offline'))
monitor_list_streams(args, upd_streams, TPL_MSG % (33, 'Updated'))
print()
old_streams = streams
k_old_streams = k_streams
msg = ('\033[2K\033[1A\033[2K\033[1G'
'%s: %d lives\n'
'%s: next check in %s')
c = time.time() + args.monitor
n = time.asctime(time.localtime(c))
lives = len(streams)
while time.time() <= c:
print(msg % (t, lives, n, relative_time(c - time.time())), end='')
sys.stdout.flush()
time.sleep(1)
print('\033[2K\033[1A\033[2K\033[1G', end='')
def monitor_list_streams(args, streams, msg):
if not streams:
return
print(msg, end='\n' if args.one else '\n\n')
show_streams(args, streams)
def get_live_streams(args, URL):
with request.urlopen(URL) as f:
data = json.loads(f.read().decode('utf8'))
streams = data['streams']
streams = filter(lambda stream: 'url' in stream['channel'], streams)
streams = dict(map(extract_stream, streams))
for stream in streams.values():
if not args.one:
stream['wrapped_status'] = fill(
text=stream['status'],
width=72,
initial_indent=INDENT,
subsequent_indent=INDENT,
)
else:
stream['name_one'] = stream['name'].replace(' ', '_')
stream['status_one'] = RE_WHITESPACES.sub(' ', stream['status'])
up_time = parse(stream['created_at'])
up_time = up_time.astimezone(tzlocal())
d = datetime.utcnow().replace(tzinfo=pytz.utc) - up_time
d = relative_time(d, FMT=FMT_TIME_ONE if args.one else FMT_TIME)
stream['up_time'] = up_time
stream['up_time_since'] = d
return streams
def show_streams(args, streams):
get_len = lambda key: max(len(s[key]) for s in streams.values())
keys = ('viewers_str', 'name', 'url', 'up_time_since')
lens = dict((key, get_len(key)) for key in keys)
if not args.one:
lens['C1'] = lens['name'] + 2 + lens['viewers_str'] + 2
lens['C1'] += lens['up_time_since']
lens['LEFT_COLUMN'] = max(lens['url'], lens['C1'])
streams = list(streams.values())
if args.sort:
streams.sort(key=lambda stream: stream[args.sort], reverse=args.reverse)
for stream in streams:
if args.one:
print(FMT_ONE.format(
lens=lens,
url_one=stream['url'].split('/')[2].replace('www.', ''),
**stream
))
else:
PAD_C1 = ' ' * (lens['LEFT_COLUMN'] - lens['C1'])
PAD_url = ' ' * (lens['LEFT_COLUMN'] - lens['url'])
print(FMT.format(lens=lens, PAD_C1=PAD_C1, PAD_url=PAD_url, **stream))
if __name__ == '__main__':
sys.exit(main())
#!/bin/bash
# WTFPL License Version 2.0, see COPYING
#
# Checks Justin.tv user's favored channels live status
#
# Usage
# chk-jtv-lives.sh [USERNAME]
#
# USERNAME default is $USER
#
# Requires
# td.sh <http://code.google.com/p/yjl/source/browse/Bash/td.sh>
# xsltproc
# wget
XSLT="$(readlink "$0")"
XSLT="${XSLT%.sh}.xslt"
JTV_USERNAME="${1:-$USER}"
# Remove http://..../ part
JTV_USERNAME="${JTV_USERNAME##*/}"
# Favorites are whom this user follows
# http://apiwiki.justin.tv/mediawiki/index.php/User/favorites
LOGINS="$JTV_USERNAME$(wget -q "http://api.justin.tv/api/user/favorites/$JTV_USERNAME.xml?live=true" -O - |
sed -n '/login/ {s/ \+<login>\([^<]\+\)<\/login>/\1/;H} ; $ {x;s/\n/,/g;p}')"
i=0
API_URL="http://api.justin.tv/api/stream/list.xml?channel=$LOGINS"
wget -q "$API_URL" -O - |
xsltproc "$XSLT" - |
sed $'s/ANSI/\033/g' |
while read line; do
if [[ "$line" =~ "%DATE%" ]]; then
echo "NOFOLD$(td.sh $(($(date +%s) - $(TZ=US/Pacific date -d "${line:6}" +%s)))) ago"
else
echo "$line"
fi
done |
fold -w 68 -s |
sed '/NOFOLD/ {s/NOFOLD//;p;d} ; s/^./ &/'
<xsl:stylesheet version="1.0"
xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
<xsl:output method="text" disable-output-escaping="yes"/>
<xsl:template match="/">
<xsl:for-each select="*/stream">
<xsl:text>NOFOLDANSI[1;32m</xsl:text>
<xsl:value-of select="channel/title"/>
<xsl:text>ANSI[0m</xsl:text>
<xsl:text> </xsl:text>
<xsl:value-of select="channel_count"/>
<xsl:text> (</xsl:text>
<xsl:value-of select="embed_count"/>
<xsl:text> embeds)&#xA;</xsl:text>
<xsl:text>NOFOLD</xsl:text>
<xsl:value-of select="channel/channel_url"/>
<xsl:text>&#xA;</xsl:text>
<xsl:text>%DATE%</xsl:text>
<xsl:value-of select="up_time"/>
<xsl:text>&#xA;&#xA;</xsl:text>
<xsl:text>ANSI[1;37m</xsl:text>
<xsl:choose>
<xsl:when test="channel/status != ''">
<xsl:value-of select="channel/status"/>
</xsl:when>
<xsl:otherwise>
<xsl:text>[no status]</xsl:text>
</xsl:otherwise>
</xsl:choose>
<xsl:text>ANSI[0m</xsl:text>
<xsl:text>&#xA;&#xA;</xsl:text>
</xsl:for-each>
</xsl:template>
</xsl:stylesheet>
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
Version 2, December 2004
Copyright (C) 2004 Sam Hocevar <[email protected]>
Everyone is permitted to copy and distribute verbatim or modified
copies of this license document, and changing it is allowed as long
as the name is changed.
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
0. You just DO WHAT THE FUCK YOU WANT TO.
#!/usr/bin/env python3
# List videos of a Twitch.tv channel
# Written by Yu-Jie Lin
# WTFPL Version 2, see COPYING
import argparse
import json
import re
from datetime import datetime
from textwrap import fill
from urllib import request
import pytz
from dateutil.parser import parse
from dateutil.tz import tzlocal
__description__ = 'List videos of a Twitch.tv channel'
INDENT_SIZE = 4
INDENT = ' ' * INDENT_SIZE
FMT = (
'\033[1;32m{name:{lens[name]}}\033[0m '
'\033[1;31m{views_str:>{lens[views_str]}}\033[0m '
'\033[1;33m{readable_length:>{lens[readable_length]}}\033[0m '
'\033[1;35m{recorded_at_since:>{lens[recorded_at_since]}}\033[0m{PAD_C1} '
'{recorded_at}\n'
'{url:{lens[url]}}{PAD_url} '
'\033[1;34m{game}\033[0m\n'
'\n'
'{wrapped_title}\n'
)
FMT_ONE = (
'http://{u[2]}/\033[1;32m{name:{lens[name]}}\033[0m/{u[4]}/{u[5]} '
'\033[1;31m{views_str:>{lens[views_str]}}\033[0m '
'\033[1;33m{readable_length:>{lens[readable_length]}}\033[0m '
'\033[1;35m{recorded_at_since:>{lens[recorded_at_since]}}\033[0m '
'{title_out}'
)
RE_WHITESPACES = re.compile(r'\s{2,}')
def relative_time(d, FMT='{d:d}d {h:2d}h {m:2d}m {s:2d}s'):
t = abs(d.seconds)
s = t % 60
m = t // 60 % 60
h = t // 3600
d = d.days
return FMT.format(**locals())
def readable_length(t, FMT='{h:d}h {m:2d}m {s:2d}s'):
t = abs(t)
s = t % 60
m = t // 60 % 60
h = t // 3600
return FMT.format(**locals())
def main():
parser = argparse.ArgumentParser(description=__description__)
parser.add_argument('-l', '--limit', default=10,
help='maximum number of videos (Default: %(default)s)')
parser.add_argument('-o', '--offset', default=0,
help='offset of first video (Default: %(default)s)')
parser.add_argument('-b', '--broadcasts', default='false',
choices=('true', 'false'),
help='only broadcasts (Default: %(default)s)')
parser.add_argument('-1', '--one', action='store_true',
help='one video per line')
parser.add_argument('channel', metavar='CHANNEL', help='channel ID or URL')
args = parser.parse_args()
channel = args.channel.rsplit('/', 1)[-1].lower()
# Twitch API: GET /channels/:channel/videos - http://git.io/Qjp4Tw
_args = vars(args)
_args['channel'] = channel
URL = (
'https://api.twitch.tv/kraken/channels/{channel}/videos?'
'limit={limit}&'
'offset={offset}&'
'broadcasts={broadcasts}'
).format(**_args)
with request.urlopen(URL) as f:
videos = json.loads(f.read().decode('utf8'))['videos']
for video in videos:
channel = video['channel']
video['title'] = RE_WHITESPACES.sub(' ', video['title'].strip())
description = RE_WHITESPACES.sub(' ', (video['description'] or '').strip())
video['description'] = description
video['title_out'] = (
'%s: %s' % (video['title'], description)
if description else video['title']
)
video['login'] = channel['name']
video['name'] = channel['display_name']
video['views_str'] = '{:,}'.format(video['views'])
if not args.one:
video['wrapped_title'] = fill(
text='%s' % video['title_out'],
width=72,
initial_indent=INDENT,
subsequent_indent=INDENT,
)
recorded_at = parse(video['recorded_at'])
recorded_at = recorded_at.astimezone(tzlocal())
d = datetime.utcnow().replace(tzinfo=pytz.utc) - recorded_at
video['recorded_at'] = recorded_at
if args.one:
d = relative_time(d, FMT='{d:d}d{h:02d}:{m:02d}:{s:02d}')
hr_len = readable_length(video['length'], FMT='{h:d}:{m:02d}:{s:02d}')
else:
d = relative_time(d)
hr_len = readable_length(video['length'])
video['recorded_at_since'] = d
video['readable_length'] = hr_len
get_len = lambda key: max(len(v[key]) for v in videos)
lens = {
'name': get_len('name'),
'readable_length': get_len('readable_length'),
'recorded_at_since': get_len('recorded_at_since'),
'url': get_len('url'),
'views_str': get_len('views_str'),
}
if not args.one:
lens['C1'] = (
lens['name'] + 2 +
lens['views_str'] + 2 +
lens['readable_length'] + 2 +
lens['recorded_at_since']
)
lens['LEFT_COLUMN'] = max(lens['url'], lens['C1'])
for video in videos:
if args.one:
print(FMT_ONE.format(lens=lens, u=video['url'].split('/'), **video))
else:
print(FMT.format(
lens=lens,
PAD_C1=' ' * (lens['LEFT_COLUMN'] - lens['C1']),
PAD_url=' ' * (lens['LEFT_COLUMN'] - lens['url']),
**video
))
if __name__ == '__main__':
main()
@the1ts
Copy link

the1ts commented Sep 16, 2016

Loved this script, any way to add and oauth key? Twitch now require it for all api calls.

@livibetter
Copy link
Author

Sorry, you'll have to find someone to do it. Please do share a link if you get a solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment