Skip to content

Instantly share code, notes, and snippets.

@amattu2
Forked from jakecrowley/blinkpoc.py
Last active November 9, 2024 15:55
Show Gist options
  • Save amattu2/89c81a29a573d3cb0546d33506f87522 to your computer and use it in GitHub Desktop.
Save amattu2/89c81a29a573d3cb0546d33506f87522 to your computer and use it in GitHub Desktop.
Blink IMMIS live viewing PoC
import subprocess
import threading
import requests
import socket
import ssl
from time import sleep, time
# Replace these with your own values
config = {
"region": "",
"token": "",
"config.account_id": 0,
"config.network_id": 0,
"config.camera_id": 0
}
BASE_API_URL = f"https://rest-{config['region']}.immedia-semi.com"
def get_liveview():
headers = {
'app-build': 'ANDROID_28799573',
'user-agent': '37.0ANDROID_28799573',
'locale': 'en_US',
'x-blink-time-zone': 'America/New_York',
'token-auth': config.token,
'content-type': 'application/json; charset=UTF-8',
}
json_data = {
'intent': 'liveview',
}
response = requests.post(
f'{BASE_API_URL}/api/v2/accounts/{config.account_id}/networks/{config.network_id}/owls/{config.camera_id}/liveview',
headers=headers,
json=json_data,
)
print(f'Initiated live view command: {response.text}')
return response.json()
def poll(network_id, command_id, interval):
headers = {
'app-build': 'ANDROID_28799573',
'user-agent': '37.0ANDROID_28799573',
'locale': 'en_US',
'x-blink-time-zone': 'America/New_York',
'token-auth': config.token,
'content-type': 'application/json; charset=UTF-8',
}
while True:
response = requests.get(
f'{BASE_API_URL}/network/{network_id}/command/{command_id}',
headers=headers,
)
print(f"Polling command status: {response.status_code}")
sleep(interval)
liveview = get_liveview()
immis = liveview["server"]
host = immis.split('/')[2].split(':')[0]
conn_id = immis.split('/')[-1].split('_')[0]
client_id = int(immis.split('?client_id=')[1])
context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection((host, 443)) as sock:
command_poll = threading.Thread(target=poll, args=(config.network_id, liveview["command_id"], liveview["polling_interval"]))
command_poll.start()
conn_header = bytes([
0x00, 0x00, 0x00, 0x28, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] + list(client_id.to_bytes(4, 'big')) + [0x01, 0x08, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x10] + list(bytes(conn_id, 'ASCII')) + [0x00, 0x00, 0x00, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00,
])
keep_alive = bytes([
0x12, 0x00, 0x00, 0x03, 0xe8, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
0x00,
])
with context.wrap_socket(sock, server_hostname=host) as ssock:
start = time()
ssock.send(conn_header)
ffplay_process = subprocess.Popen(
['ffplay', '-f', 'mpegts', '-err_detect', 'ignore_err', '-'],
stdin=subprocess.PIPE
)
try:
data = ssock.recv(64)
while data:
ffplay_process.stdin.write(data)
data = ssock.recv(64)
if time() - start > 1:
ssock.send(keep_alive)
start = time()
except KeyboardInterrupt:
ffplay_process.kill()
exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment