Last active
April 26, 2022 22:31
-
-
Save jb3/a4bc32d237975cd78cedc7e2ab594ae5 to your computer and use it in GitHub Desktop.
Crash file detector for Discord
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import sys | |
import pprint | |
loop = asyncio.get_event_loop() | |
# List of files to check | |
FILES = [ | |
"/Users/joseph/Movies/coursework_update.mp4", | |
"/Users/joseph/Movies/coursework_video.mp4", | |
"/Users/joseph/Movies/2020-06-09 18-34-35.mkv", | |
"/Users/joseph/video.mp4" | |
] | |
async def task(path): | |
proc = await asyncio.create_subprocess_exec( | |
# Call ffprobe, a video probing tool | |
"/usr/local/bin/ffprobe", | |
# Show errors | |
"-v", "error", | |
# Return each frame as a CSV entry including the time, width, height and pixel format | |
"-show_entries", "frame=pkt_pts_time,width,height,pix_fmt", | |
# Only process video | |
"-select_streams", "v", | |
# Set the output format to CSV | |
"-of", "csv=p=0", | |
# Provide the path to the video | |
path, | |
stdout=asyncio.subprocess.PIPE, | |
) | |
# Store the last frames width, height and pixel format | |
w, h, fmt = None, None, None | |
scanned_frames = 0 | |
safe = True | |
# Iterate over every frame in the video | |
async for line in proc.stdout: | |
scanned_frames += 1 | |
# Parse the line as CSV and unpack the values | |
line = line.decode().strip() | |
_pkt_time, frame_w, frame_h, frame_fmt = line.split(",") | |
# If we haven't seen a frame before, init some variables | |
if w is None: | |
w = frame_w | |
if h is None: | |
h = frame_h | |
if fmt is None: | |
fmt = frame_fmt | |
# If the height, width or pixel format have changed from the | |
# last frame to the current one then we know the video is unsafe | |
# and is intended to crash the Discord client. | |
if w != frame_w or h != frame_h or fmt != frame_fmt: | |
safe = False | |
print(f"ERR! Frame is different size/format in {path}") | |
# Wait for the process to exit | |
await proc.wait() | |
# Return some video metadata and whether it is safe or not | |
return { | |
"safe": safe, | |
"filename": path, | |
"scanned": scanned_frames, | |
"dimensions": [w, h], | |
"format": fmt | |
} | |
async def main(): | |
tasks = [] | |
for file in FILES: | |
# Create a new video search task for every file listed above | |
tasks.append(loop.create_task(task(file))) | |
# Run all searches in parallel and await the return of all | |
results = await asyncio.gather(*tasks) | |
# Print out each result in a pretty format | |
for result in results: | |
pprint.pprint(result) | |
# Start the program | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment