Created
May 10, 2014 18:47
-
-
Save jboone/340abf06d38ee220b86f to your computer and use it in GitHub Desktop.
Gnarly Python code to pick through the wreckage of an MPEG-2 transport stream.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Copyright (C) 2014 Jared Boone, ShareBrained Technology, Inc. | |
# | |
# This program is free software; you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation; either version 2, or (at your option) | |
# any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program; see the file COPYING. If not, write to | |
# the Free Software Foundation, Inc., 51 Franklin Street, | |
# Boston, MA 02110-1301, USA. | |
# | |
import sys | |
import struct | |
import numpy | |
import operator | |
packet_length = 188 | |
d = numpy.fromfile(sys.argv[1], dtype=numpy.uint8) | |
f = open('out.ts', 'wb') | |
# per_pid_output_files = {} | |
def make_empty_packet(): | |
packet = numpy.empty((packet_length,), dtype=numpy.uint8) | |
packet[:] = 0xff | |
return packet | |
def make_packet(data): | |
packet = make_empty_packet() | |
packet[:len(data)] = data | |
return packet | |
def make_padding_packet(continuity_counter=0): | |
data = ( | |
0x47, 0x1f, 0xff, 0x10 | (continuity_counter & 0xf), | |
) | |
return make_packet(data) | |
def packet_hamming_distance(data, test_value, test_mask): | |
mismatches = (data ^ test_value) & test_mask | |
mismatches_count = sum(numpy.unpackbits(mismatches)) | |
return mismatches_count | |
def find_padding(data): | |
test_value = make_padding_packet() | |
test_mask = make_empty_packet() | |
test_mask[0:4] = 0xff | |
test_mask[3] &= 0xf0 # Ignore continuity counter | |
test_mask[4:] = 0 | |
for n in range(0, len(data) - packet_length, 4): | |
d = packet_hamming_distance(data[n:n+packet_length], test_value, test_mask) | |
if d < 4: | |
print('%8x %d' % (n, d)) | |
def format_bytes(d): | |
return ''.join(['%02x' % c for c in d]) | |
def find_pes_headers(data): | |
test_value = numpy.array(( | |
0x47, 0x43, 0xe8, 0x30, | |
0x07, 0x10, 0x00, 0x37, 0xFA, 0xB4, 0x7E, 0x00, | |
0x00, 0x00, 0x01, 0xE0, | |
0x00, 0x00, | |
0x81, 0x80, | |
0x07 #[2101BF9CB1FFFF] 000001B6 53D7AE064298D7C32F86625AA6C12159786595C7C2B845DEF12D7864216E3A69C9A4E846ABFF4CEAA8BBFF540C012892F58FDE02586422B678D38E09C6F84479C165B738E3D8DCED28571F0A2FC43E059B8F3C4DBCE051CDC703271C1AB60E0CE1E8684544E20380E0CC2879C7AB38F1ED9F51A7805AE6E9DBA4A989438193B1C913871560654893694288EC64D6CA4E5C692B43565EA94827A423D0 | |
), dtype=numpy.uint8) | |
test_mask = numpy.array(( | |
0xff, 0x5f, 0xff, 0x30, | |
0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, | |
0xff, 0xff, 0xff, 0xff, | |
0xff, 0xff, | |
0xff, 0xff, | |
0xff | |
), dtype=numpy.uint8) | |
last_pcr = None | |
last_pts = None | |
test_len = len(test_value) | |
for n in range(0, len(data), 188): | |
d = packet_hamming_distance(data[n:n+test_len], test_value, test_mask) | |
if d < 23: | |
# Adaptation field | |
t = data[n+4:n+32] | |
pcr_base = (((((((t[2] << 8) | t[3]) << 8) | t[4]) << 8) | t[5]) << 1) | (t[6] >> 7) | |
pcr_reserved = (t[6] >> 1) & 0x3f | |
pcr_ext = ((t[6] & 1) << 8) | t[7] | |
pcr = pcr_base * 300 + pcr_ext | |
pcr_diff = (pcr - last_pcr) if last_pcr else 0 | |
pes = data[n+12:n+18] | |
video_0 = tuple(pes) == (0x00, 0x00, 0x01, 0xe0, 0x00, 0x00) | |
if video_0: | |
pes_extension = data[n+18:n+21] | |
# 81 80 07 21 01 b7 94 69 ffff | |
reserved_0 = (pes_extension[0] >> 6) & 3 | |
#... | |
pts_dts_flags = (pes_extension[1] >> 6) & 3 | |
if pts_dts_flags in (0b10, 0b11): | |
pts_data = data[n+21:n+26] | |
pts = (pts_data[0] >> 1) & 0x7 | |
pts <<= 8 | |
pts |= pts_data[1] | |
pts <<= 7 | |
pts |= (pts_data[2] >> 1) & 0x7f | |
pts <<= 8 | |
pts |= pts_data[3] | |
pts <<= 7 | |
pts |= (pts_data[4] >> 1) & 0x7f | |
pts_diff = (pts - last_pts) if last_pts else 0 | |
#print('%9x %10d %7d' % (pts, pts, pts_diff)) | |
last_pts = pts | |
print('%8x %2d %s %s %s %s | pts %9x %7d | %7d | pcr %9x %3x %12x %d %f' % ( | |
n, d, | |
format_bytes(data[n:n+4]), | |
format_bytes(data[n+4:n+12]), | |
format_bytes(data[n+12:n+16]), | |
format_bytes(data[n+16:n+28]), | |
pts, pts_diff, | |
pcr_base - pts, | |
#format_bytes(data[n+28:n+64]), | |
pcr_base, pcr_ext, pcr, pcr_diff, pcr_diff / 1801800.0) | |
) | |
# vop = tuple(data[n+28:n+32]) == (0x00, 0x00, 0x01, 0xb0) | |
# if vop: | |
# print('%8x %2d %s %s %s %s %s' % ( | |
# n, d, | |
# format_bytes(data[n:n+4]), | |
# format_bytes(data[n+4:n+12]), | |
# format_bytes(data[n+12:n+16]), | |
# format_bytes(data[n+16:n+28]), | |
# format_bytes(data[n+28:n+96]), | |
# )) | |
last_pcr = pcr | |
def encode_pts(pts): | |
data = ( | |
0b00100001 | (((pts >> 30) & 0x7) << 1), | |
(pts >> 22) & 0xff, | |
0b00000001 | (((pts >> 15) & 0x7f) << 1), | |
(pts >> 7) & 0xff, | |
0b00000001 | ((pts & 0x7f) << 1) | |
) | |
data_s = ''.join(['%02x' % n for n in data]) | |
print(data_s) | |
#find_padding(d) | |
# find_pes_headers(d) | |
# sys.exit(0) | |
continuity_counts = {} | |
desired_pids = frozenset((0, 32, 1000, 1001)) | |
#pids_to_fix = frozenset((0x2c8, 0x343, 0x3e4, 0x3ee, 0x3f8, 0xfcd, 0xfe8)) | |
# file_segments = ( | |
# (0, 0x37acc), | |
# (0x39738, 0xd6e20), | |
# (0xd7250, 0x215c58), | |
# (0x215d98, 0x356ae0), | |
# (0x358a3c, 0x3dc004), | |
# (0x3dd16c, 0x4460f0), | |
# ) | |
video_pid = 0x3e8 | |
pat_continuity_counter = 0 | |
pat_data = ( | |
0x47, 0x40, 0x00, 0x10 | (pat_continuity_counter & 0xf), | |
0x00, # Pointer field | |
0x00, # Table ID | |
0b10110000, | |
0x11, # Section length LSB | |
0x00, 0x00, # TSID | |
0b11000001, # version = 0, data is current | |
0x00, # Section number | |
0x00, # Last section number | |
0x00, 0x00, 0xe0, 0x10, # Program 0x0000: PMT PID 0x0010 | |
0x00, 0x01, 0xe0, 0x20, # Program 0x0001: PMT PID 0x0020 | |
0xd3, 0x6a, 0xf0, 0xac, # CRC32 | |
) | |
pat = numpy.empty((188,), dtype=numpy.uint8) | |
pat[:] = 0xff | |
pat[:len(pat_data)] = pat_data | |
f.write(pat) | |
pat_continuity_counter += 1 | |
pmt_10_data = ( | |
0x47, 0x40, 0x10, 0x10, | |
) | |
pmt_20_continuity_counter = 0 | |
pmt_20_data = ( | |
0x47, 0x40, 0x20, 0x10 | (pmt_20_continuity_counter & 0xf), | |
0x00, 0x02, 0xB0, 0x1F, 0x00, 0x01, 0xC1, 0x00, 0x00, 0xE3, 0xE8, 0xF0, 0x00, 0x10, 0xE3, 0xE8, 0xF0, | |
0x03, 0x1B, 0x01, 0xF5, 0x80, 0xE3, 0xE9, 0xF0, 0x00, 0x81, 0xE3, 0xF3, 0xF0, 0x00, 0x3F, 0x64, 0xF1, 0x15, | |
) | |
pmt_20 = numpy.empty((188,), dtype=numpy.uint8) | |
pmt_20[:] = 0xff | |
pmt_20[:len(pmt_20_data)] = pmt_20_data | |
f.write(pmt_20) | |
pmt_20_continuity_counter += 1 | |
pad_continuity_counter = 0 | |
pad = make_padding_packet(continuity_counter=pad_continuity_counter) | |
pad_continuity_counter += 1 | |
hamming_tests_header = { | |
# Test sync byte, transport priority, PID, scrambling flags | |
0x0000: (0x47400010, 0xfffffff0), # PAT | |
#0x0010: (0x47401010, 0xff7fffd0), # PMT | |
0x0020: (0x47402010, 0xfffffff0), # PMT | |
0x03e8: (0x4703e800, 0xffbfffc0), # Video | |
0x1fff: (0x471fff10, 0xfffffff0), # Padding | |
} | |
video_continuity_counter = 0 | |
n = 0 | |
while n < (len(d) - 4): | |
header_int = struct.unpack('>I', d[n:n+4])[0] | |
hamming_distances = {} | |
for test_pid, test in hamming_tests_header.items(): | |
value, mask = test | |
different_bits = (header_int ^ value) & mask | |
different_bits_count = sum(map(int, bin(different_bits)[2:])) | |
hamming_distances[test_pid] = different_bits_count | |
n_mod = n % 188 | |
best_pids = sorted(hamming_distances.items(), key=operator.itemgetter(1)) | |
#print(best_pid) | |
best_pid = best_pids[0][0] | |
different_bits_count = hamming_distances[best_pid] | |
if different_bits_count > 1: | |
header_before = struct.unpack('>I', d[n:n+4])[0] | |
pad[3] = (pad[3] & 0xf0) | (pad_continuity_counter & 0xf) | |
header_after = struct.unpack('>I', pad[0:4])[0] | |
print('%8x %3d %8x -> %8x %2d %s' % (n, n_mod, header_before, header_after, different_bits_count, '*' * different_bits_count)) | |
pad_continuity_counter += 1 | |
f.write(pad) | |
n += 188 | |
continue | |
# print(best_pids) | |
if best_pid == 0x03e8: | |
#value, mask = hamming_tests_header[best_pid] | |
#fixed_header = (header_int & (mask ^ 0xffffffff)) | value | |
#fixed_header = (fixed_header & 0xfffffff0) | (video_continuity_counter & 0xf) | |
payload_unit_start_indicator = (d[n+1] >> 6) & 1 | |
#if payload_unit_start_indicator and different_bits_count > 1: | |
# # Clear the PUSI, as it's likely bunk. | |
# payload_unit_start_indicator = 0 | |
adaptation_field_present = (d[n+3] >> 5) & 1 | |
payload_present = (d[n+3] >> 4) & 1 | |
if payload_present != 1: | |
# There's no reason why the video PID would not carry a payload. | |
payload_present = 1 | |
n_mod = n % 188 | |
header_before = struct.unpack('>I', d[n:n+4])[0] | |
d[n:n+4] = ( | |
0x47, | |
(payload_unit_start_indicator << 6) | 0x3, | |
0xe8, | |
(adaptation_field_present << 5) | (payload_present << 4) | (video_continuity_counter & 0xf) | |
) | |
header_after = struct.unpack('>I', d[n:n+4])[0] | |
print('%8x %3d %8x -> %8x %d %d %d %2d %s' % (n, n_mod, header_before, header_after, payload_unit_start_indicator, adaptation_field_present, payload_present, different_bits_count, '*' * different_bits_count)) | |
video_continuity_counter += 1 | |
f.write(d[n:n+188]) | |
n += 188 | |
elif best_pid == 0x0000: | |
header_before = struct.unpack('>I', d[n:n+4])[0] | |
pat[3] = (pat[3] & 0xf0) | (pat_continuity_counter & 0xf) | |
header_after = struct.unpack('>I', pat[0:4])[0] | |
print('%8x %3d %8x -> %8x %2d %s' % (n, n_mod, header_before, header_after, different_bits_count, '*' * different_bits_count)) | |
pat_continuity_counter += 1 | |
f.write(pat) | |
n += 188 | |
elif best_pid == 0x0020: | |
header_before = struct.unpack('>I', d[n:n+4])[0] | |
pmt_20[3] = (pmt_20[3] & 0xf0) | (pmt_20_continuity_counter & 0xf) | |
header_after = struct.unpack('>I', pmt_20[0:4])[0] | |
print('%8x %3d %8x -> %8x %2d %s' % (n, n_mod, header_before, header_after, different_bits_count, '*' * different_bits_count)) | |
pmt_20_continuity_counter += 1 | |
f.write(pmt_20) | |
n += 188 | |
elif best_pid == 0x1fff: | |
header_before = struct.unpack('>I', d[n:n+4])[0] | |
pad[3] = (pad[3] & 0xf0) | (pad_continuity_counter & 0xf) | |
header_after = struct.unpack('>I', pad[0:4])[0] | |
print('%8x %3d %8x -> %8x %2d %s' % (n, n_mod, header_before, header_after, different_bits_count, '*' * different_bits_count)) | |
pad_continuity_counter += 1 | |
f.write(pad) | |
n += 188 | |
else: | |
print() | |
n += 188 | |
# print(hex(header_int)) | |
# sync_byte = d[n] | |
# no_sync = ' ' if sync_byte == 0x47 else '*' | |
# pid = ((d[n+1] & 0x1f) << 8) | d[n+2] | |
# video_pid_hamming_distance = sum(map(int, bin(pid ^ video_pid)[2:])) | |
# continuity_counter = d[n+3] & 0xf | |
# print('%s %8x %4x %13s %2d %s' % (no_sync, n, pid, bin(pid)[2:], continuity_counter, video_pid_hamming_distance)) | |
# d[n] = 0x47 | |
# # if pid in pids_to_fix: | |
# # pid = 0x3e8 | |
# # d[n+1] = (d[n+1] & 0xe0) | (pid >> 8) | |
# # d[n+2] = pid & 0xff | |
# if pid in desired_pids: | |
# f.write(d[n:n+188]) | |
f.close() | |
#d.tofile('out.ts') | |
sys.exit(0) | |
for n in range(len(d) - 188): | |
possible_pid = ((d[n+1] & 0x1f) << 8) | d[n+2] | |
#if d[n] == 0x47: # or possible_pid in desired_pids: | |
if possible_pid == 0x3e8: | |
#d[n] = 0x47 | |
#sync_byte = d[n] | |
#transport_error_indicator = (d[n+1] >> 7) & 1 | |
#payload_unit_start_indicator = (d[n+1] >> 6) & 1 | |
#transport_priority = (d[n+1] >> 5) & 1 | |
pid = ((d[n+1] & 0x1f) << 8) | d[n+2] | |
#pid = possible_pid | |
#scrambling_control = (d[n+3] >> 6) & 3 | |
#adaptation_field_present = (d[n+3] >> 5) & 1 | |
#payload_present = (d[n+3] >> 4) & 1 | |
continuity_counter = d[n+3] & 0xf | |
if d[n] != 0x47: | |
no_sync = '*' | |
else: | |
no_sync = ' ' | |
if pid == 0x3e8: | |
print('%s %8x %4x %2d' % (no_sync, n, pid, continuity_counter)) | |
# if pid in desired_pids: | |
# if pid in continuity_counts: | |
# expected_continuity_counter = (continuity_counts[pid] + 1) & 0xf | |
# if expected_continuity_counter != continuity_counter: | |
# print('%7d %4x %2d != %2d' % (n, pid, continuity_counter, expected_continuity_counter)) | |
# if pid not in per_pid_output_files: | |
# per_pid_output_files[pid] = open('%04x.ts' % pid, "wb") | |
# per_pid_output_files[pid].write(d[n:n+188]) | |
# #print(n, d[n:n+4]) | |
# f.write(d[n:n+188]) | |
# continuity_counts[pid] = continuity_counter | |
# for pid_f in per_pid_output_files.values(): | |
# pid_f.close() | |
# f.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment