|
from __future__ import unicode_literals |
|
|
|
import re |
|
import requests |
|
import sys |
|
|
|
def icy_monitor(stream_url, callback=None): |
|
r = requests.get(stream_url, headers={'Icy-MetaData': '1'}, stream=True) |
|
if r.encoding is None: |
|
r.encoding = 'utf-8' |
|
metadata_size = int(r.headers.get('icy-metaint')) |
|
bytecount = 0 |
|
state = 0 |
|
blocksize = 0 |
|
metadata_bytes = bytearray() |
|
for chunk in r.iter_content(1): |
|
bytecount += 1 |
|
# Skipping music data |
|
if state == 0: |
|
if bytecount == metadata_size: |
|
# Clean data and go to next state |
|
state = 1 |
|
bytecount = 0 |
|
# Getting metadata size (bytes / 16) |
|
elif state == 1: |
|
blocksize = int.from_bytes(chunk, byteorder='big', signed=False) |
|
# Clean data and go to next state |
|
bytecount = 0 |
|
if blocksize == 0: |
|
state = 0 |
|
else: |
|
state = 2 |
|
# Getting metadata |
|
elif state == 2: |
|
metadata_bytes += chunk |
|
if bytecount == blocksize * 16: |
|
m = re.search(br"StreamTitle='([^']*)';", metadata_bytes) |
|
if m: |
|
title = m.group(1).decode(r.encoding, errors='replace') |
|
if callback: |
|
callback(title) |
|
# Clean data and go to next state |
|
bytecount = 0 |
|
state = 0 |
|
blocksize = 0 |
|
metadata_bytes = bytearray() |
|
|
|
def print_title(title): |
|
print('Current track: {}'.format(title)) |
|
meta = re.search(r"^([\s\S]+?) - ([\s\S]+)$", title) |
|
if meta: |
|
file = open(sys.argv[2], 'w') |
|
file.write(meta.group(2)) |
|
file.close() |
|
file = open(sys.argv[3], 'w') |
|
file.write(meta.group(1)) |
|
file.close() |
|
else: |
|
file = open(sys.argv[2], 'w') |
|
file.write(title) |
|
file.close() |
|
file = open(sys.argv[3], 'w') |
|
file.write('Unknown') |
|
file.close() |
|
|
|
if __name__ == '__main__': |
|
stream_url = sys.argv[1] |
|
icy_monitor(stream_url, callback=print_title) |