Skip to content

Instantly share code, notes, and snippets.

@jboone
Created May 10, 2014 18:47
Show Gist options
  • Save jboone/340abf06d38ee220b86f to your computer and use it in GitHub Desktop.
Save jboone/340abf06d38ee220b86f to your computer and use it in GitHub Desktop.
Gnarly Python code to pick through the wreckage of an MPEG-2 transport stream.
#!/usr/bin/env python
#
# Copyright (C) 2014 Jared Boone, ShareBrained Technology, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#
import sys
import struct
import numpy
import operator
packet_length = 188
d = numpy.fromfile(sys.argv[1], dtype=numpy.uint8)
f = open('out.ts', 'wb')
# per_pid_output_files = {}
def make_empty_packet():
packet = numpy.empty((packet_length,), dtype=numpy.uint8)
packet[:] = 0xff
return packet
def make_packet(data):
packet = make_empty_packet()
packet[:len(data)] = data
return packet
def make_padding_packet(continuity_counter=0):
data = (
0x47, 0x1f, 0xff, 0x10 | (continuity_counter & 0xf),
)
return make_packet(data)
def packet_hamming_distance(data, test_value, test_mask):
mismatches = (data ^ test_value) & test_mask
mismatches_count = sum(numpy.unpackbits(mismatches))
return mismatches_count
def find_padding(data):
test_value = make_padding_packet()
test_mask = make_empty_packet()
test_mask[0:4] = 0xff
test_mask[3] &= 0xf0 # Ignore continuity counter
test_mask[4:] = 0
for n in range(0, len(data) - packet_length, 4):
d = packet_hamming_distance(data[n:n+packet_length], test_value, test_mask)
if d < 4:
print('%8x %d' % (n, d))
def format_bytes(d):
return ''.join(['%02x' % c for c in d])
def find_pes_headers(data):
test_value = numpy.array((
0x47, 0x43, 0xe8, 0x30,
0x07, 0x10, 0x00, 0x37, 0xFA, 0xB4, 0x7E, 0x00,
0x00, 0x00, 0x01, 0xE0,
0x00, 0x00,
0x81, 0x80,
0x07 #[2101BF9CB1FFFF] 000001B6 53D7AE064298D7C32F86625AA6C12159786595C7C2B845DEF12D7864216E3A69C9A4E846ABFF4CEAA8BBFF540C012892F58FDE02586422B678D38E09C6F84479C165B738E3D8DCED28571F0A2FC43E059B8F3C4DBCE051CDC703271C1AB60E0CE1E8684544E20380E0CC2879C7AB38F1ED9F51A7805AE6E9DBA4A989438193B1C913871560654893694288EC64D6CA4E5C692B43565EA94827A423D0
), dtype=numpy.uint8)
test_mask = numpy.array((
0xff, 0x5f, 0xff, 0x30,
0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00,
0xff, 0xff, 0xff, 0xff,
0xff, 0xff,
0xff, 0xff,
0xff
), dtype=numpy.uint8)
last_pcr = None
last_pts = None
test_len = len(test_value)
for n in range(0, len(data), 188):
d = packet_hamming_distance(data[n:n+test_len], test_value, test_mask)
if d < 23:
# Adaptation field
t = data[n+4:n+32]
pcr_base = (((((((t[2] << 8) | t[3]) << 8) | t[4]) << 8) | t[5]) << 1) | (t[6] >> 7)
pcr_reserved = (t[6] >> 1) & 0x3f
pcr_ext = ((t[6] & 1) << 8) | t[7]
pcr = pcr_base * 300 + pcr_ext
pcr_diff = (pcr - last_pcr) if last_pcr else 0
pes = data[n+12:n+18]
video_0 = tuple(pes) == (0x00, 0x00, 0x01, 0xe0, 0x00, 0x00)
if video_0:
pes_extension = data[n+18:n+21]
# 81 80 07 21 01 b7 94 69 ffff
reserved_0 = (pes_extension[0] >> 6) & 3
#...
pts_dts_flags = (pes_extension[1] >> 6) & 3
if pts_dts_flags in (0b10, 0b11):
pts_data = data[n+21:n+26]
pts = (pts_data[0] >> 1) & 0x7
pts <<= 8
pts |= pts_data[1]
pts <<= 7
pts |= (pts_data[2] >> 1) & 0x7f
pts <<= 8
pts |= pts_data[3]
pts <<= 7
pts |= (pts_data[4] >> 1) & 0x7f
pts_diff = (pts - last_pts) if last_pts else 0
#print('%9x %10d %7d' % (pts, pts, pts_diff))
last_pts = pts
print('%8x %2d %s %s %s %s | pts %9x %7d | %7d | pcr %9x %3x %12x %d %f' % (
n, d,
format_bytes(data[n:n+4]),
format_bytes(data[n+4:n+12]),
format_bytes(data[n+12:n+16]),
format_bytes(data[n+16:n+28]),
pts, pts_diff,
pcr_base - pts,
#format_bytes(data[n+28:n+64]),
pcr_base, pcr_ext, pcr, pcr_diff, pcr_diff / 1801800.0)
)
# vop = tuple(data[n+28:n+32]) == (0x00, 0x00, 0x01, 0xb0)
# if vop:
# print('%8x %2d %s %s %s %s %s' % (
# n, d,
# format_bytes(data[n:n+4]),
# format_bytes(data[n+4:n+12]),
# format_bytes(data[n+12:n+16]),
# format_bytes(data[n+16:n+28]),
# format_bytes(data[n+28:n+96]),
# ))
last_pcr = pcr
def encode_pts(pts):
data = (
0b00100001 | (((pts >> 30) & 0x7) << 1),
(pts >> 22) & 0xff,
0b00000001 | (((pts >> 15) & 0x7f) << 1),
(pts >> 7) & 0xff,
0b00000001 | ((pts & 0x7f) << 1)
)
data_s = ''.join(['%02x' % n for n in data])
print(data_s)
#find_padding(d)
# find_pes_headers(d)
# sys.exit(0)
continuity_counts = {}
desired_pids = frozenset((0, 32, 1000, 1001))
#pids_to_fix = frozenset((0x2c8, 0x343, 0x3e4, 0x3ee, 0x3f8, 0xfcd, 0xfe8))
# file_segments = (
# (0, 0x37acc),
# (0x39738, 0xd6e20),
# (0xd7250, 0x215c58),
# (0x215d98, 0x356ae0),
# (0x358a3c, 0x3dc004),
# (0x3dd16c, 0x4460f0),
# )
video_pid = 0x3e8
pat_continuity_counter = 0
pat_data = (
0x47, 0x40, 0x00, 0x10 | (pat_continuity_counter & 0xf),
0x00, # Pointer field
0x00, # Table ID
0b10110000,
0x11, # Section length LSB
0x00, 0x00, # TSID
0b11000001, # version = 0, data is current
0x00, # Section number
0x00, # Last section number
0x00, 0x00, 0xe0, 0x10, # Program 0x0000: PMT PID 0x0010
0x00, 0x01, 0xe0, 0x20, # Program 0x0001: PMT PID 0x0020
0xd3, 0x6a, 0xf0, 0xac, # CRC32
)
pat = numpy.empty((188,), dtype=numpy.uint8)
pat[:] = 0xff
pat[:len(pat_data)] = pat_data
f.write(pat)
pat_continuity_counter += 1
pmt_10_data = (
0x47, 0x40, 0x10, 0x10,
)
pmt_20_continuity_counter = 0
pmt_20_data = (
0x47, 0x40, 0x20, 0x10 | (pmt_20_continuity_counter & 0xf),
0x00, 0x02, 0xB0, 0x1F, 0x00, 0x01, 0xC1, 0x00, 0x00, 0xE3, 0xE8, 0xF0, 0x00, 0x10, 0xE3, 0xE8, 0xF0,
0x03, 0x1B, 0x01, 0xF5, 0x80, 0xE3, 0xE9, 0xF0, 0x00, 0x81, 0xE3, 0xF3, 0xF0, 0x00, 0x3F, 0x64, 0xF1, 0x15,
)
pmt_20 = numpy.empty((188,), dtype=numpy.uint8)
pmt_20[:] = 0xff
pmt_20[:len(pmt_20_data)] = pmt_20_data
f.write(pmt_20)
pmt_20_continuity_counter += 1
pad_continuity_counter = 0
pad = make_padding_packet(continuity_counter=pad_continuity_counter)
pad_continuity_counter += 1
hamming_tests_header = {
# Test sync byte, transport priority, PID, scrambling flags
0x0000: (0x47400010, 0xfffffff0), # PAT
#0x0010: (0x47401010, 0xff7fffd0), # PMT
0x0020: (0x47402010, 0xfffffff0), # PMT
0x03e8: (0x4703e800, 0xffbfffc0), # Video
0x1fff: (0x471fff10, 0xfffffff0), # Padding
}
video_continuity_counter = 0
n = 0
while n < (len(d) - 4):
header_int = struct.unpack('>I', d[n:n+4])[0]
hamming_distances = {}
for test_pid, test in hamming_tests_header.items():
value, mask = test
different_bits = (header_int ^ value) & mask
different_bits_count = sum(map(int, bin(different_bits)[2:]))
hamming_distances[test_pid] = different_bits_count
n_mod = n % 188
best_pids = sorted(hamming_distances.items(), key=operator.itemgetter(1))
#print(best_pid)
best_pid = best_pids[0][0]
different_bits_count = hamming_distances[best_pid]
if different_bits_count > 1:
header_before = struct.unpack('>I', d[n:n+4])[0]
pad[3] = (pad[3] & 0xf0) | (pad_continuity_counter & 0xf)
header_after = struct.unpack('>I', pad[0:4])[0]
print('%8x %3d %8x -> %8x %2d %s' % (n, n_mod, header_before, header_after, different_bits_count, '*' * different_bits_count))
pad_continuity_counter += 1
f.write(pad)
n += 188
continue
# print(best_pids)
if best_pid == 0x03e8:
#value, mask = hamming_tests_header[best_pid]
#fixed_header = (header_int & (mask ^ 0xffffffff)) | value
#fixed_header = (fixed_header & 0xfffffff0) | (video_continuity_counter & 0xf)
payload_unit_start_indicator = (d[n+1] >> 6) & 1
#if payload_unit_start_indicator and different_bits_count > 1:
# # Clear the PUSI, as it's likely bunk.
# payload_unit_start_indicator = 0
adaptation_field_present = (d[n+3] >> 5) & 1
payload_present = (d[n+3] >> 4) & 1
if payload_present != 1:
# There's no reason why the video PID would not carry a payload.
payload_present = 1
n_mod = n % 188
header_before = struct.unpack('>I', d[n:n+4])[0]
d[n:n+4] = (
0x47,
(payload_unit_start_indicator << 6) | 0x3,
0xe8,
(adaptation_field_present << 5) | (payload_present << 4) | (video_continuity_counter & 0xf)
)
header_after = struct.unpack('>I', d[n:n+4])[0]
print('%8x %3d %8x -> %8x %d %d %d %2d %s' % (n, n_mod, header_before, header_after, payload_unit_start_indicator, adaptation_field_present, payload_present, different_bits_count, '*' * different_bits_count))
video_continuity_counter += 1
f.write(d[n:n+188])
n += 188
elif best_pid == 0x0000:
header_before = struct.unpack('>I', d[n:n+4])[0]
pat[3] = (pat[3] & 0xf0) | (pat_continuity_counter & 0xf)
header_after = struct.unpack('>I', pat[0:4])[0]
print('%8x %3d %8x -> %8x %2d %s' % (n, n_mod, header_before, header_after, different_bits_count, '*' * different_bits_count))
pat_continuity_counter += 1
f.write(pat)
n += 188
elif best_pid == 0x0020:
header_before = struct.unpack('>I', d[n:n+4])[0]
pmt_20[3] = (pmt_20[3] & 0xf0) | (pmt_20_continuity_counter & 0xf)
header_after = struct.unpack('>I', pmt_20[0:4])[0]
print('%8x %3d %8x -> %8x %2d %s' % (n, n_mod, header_before, header_after, different_bits_count, '*' * different_bits_count))
pmt_20_continuity_counter += 1
f.write(pmt_20)
n += 188
elif best_pid == 0x1fff:
header_before = struct.unpack('>I', d[n:n+4])[0]
pad[3] = (pad[3] & 0xf0) | (pad_continuity_counter & 0xf)
header_after = struct.unpack('>I', pad[0:4])[0]
print('%8x %3d %8x -> %8x %2d %s' % (n, n_mod, header_before, header_after, different_bits_count, '*' * different_bits_count))
pad_continuity_counter += 1
f.write(pad)
n += 188
else:
print()
n += 188
# print(hex(header_int))
# sync_byte = d[n]
# no_sync = ' ' if sync_byte == 0x47 else '*'
# pid = ((d[n+1] & 0x1f) << 8) | d[n+2]
# video_pid_hamming_distance = sum(map(int, bin(pid ^ video_pid)[2:]))
# continuity_counter = d[n+3] & 0xf
# print('%s %8x %4x %13s %2d %s' % (no_sync, n, pid, bin(pid)[2:], continuity_counter, video_pid_hamming_distance))
# d[n] = 0x47
# # if pid in pids_to_fix:
# # pid = 0x3e8
# # d[n+1] = (d[n+1] & 0xe0) | (pid >> 8)
# # d[n+2] = pid & 0xff
# if pid in desired_pids:
# f.write(d[n:n+188])
f.close()
#d.tofile('out.ts')
sys.exit(0)
for n in range(len(d) - 188):
possible_pid = ((d[n+1] & 0x1f) << 8) | d[n+2]
#if d[n] == 0x47: # or possible_pid in desired_pids:
if possible_pid == 0x3e8:
#d[n] = 0x47
#sync_byte = d[n]
#transport_error_indicator = (d[n+1] >> 7) & 1
#payload_unit_start_indicator = (d[n+1] >> 6) & 1
#transport_priority = (d[n+1] >> 5) & 1
pid = ((d[n+1] & 0x1f) << 8) | d[n+2]
#pid = possible_pid
#scrambling_control = (d[n+3] >> 6) & 3
#adaptation_field_present = (d[n+3] >> 5) & 1
#payload_present = (d[n+3] >> 4) & 1
continuity_counter = d[n+3] & 0xf
if d[n] != 0x47:
no_sync = '*'
else:
no_sync = ' '
if pid == 0x3e8:
print('%s %8x %4x %2d' % (no_sync, n, pid, continuity_counter))
# if pid in desired_pids:
# if pid in continuity_counts:
# expected_continuity_counter = (continuity_counts[pid] + 1) & 0xf
# if expected_continuity_counter != continuity_counter:
# print('%7d %4x %2d != %2d' % (n, pid, continuity_counter, expected_continuity_counter))
# if pid not in per_pid_output_files:
# per_pid_output_files[pid] = open('%04x.ts' % pid, "wb")
# per_pid_output_files[pid].write(d[n:n+188])
# #print(n, d[n:n+4])
# f.write(d[n:n+188])
# continuity_counts[pid] = continuity_counter
# for pid_f in per_pid_output_files.values():
# pid_f.close()
# f.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment