Created
November 18, 2020 22:57
-
-
Save ximion/8457efada8299e20a2d5cc792fe670a1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# SPDX-License-Identifier: LGPL-3.0-or-later | |
''' | |
MKV to .mat conversion script. | |
Extracted from our bigger pipeline for Miniscope data analysis | |
using the MIN1PIPE pipeline by Lu et al. | |
@author: Matthias Klumpp | |
''' | |
import os | |
import sys | |
import argparse | |
import cv2 as cv | |
import numpy as np | |
import h5py | |
import logging as log | |
from datetime import datetime | |
from contextlib import contextmanager | |
from natsort import natsorted | |
@contextmanager | |
def open_hdf5_with_matlab_header(fname, **kwargs): | |
now = datetime.now() | |
# fake MATLAB HDF5 header string | |
s = 'MATLAB 7.3 MAT-file, Platform: unknown-any ' \ | |
+ '0.1' + ', Created on: ' \ | |
+ now.strftime('%a %b %d %H:%M:%S %Y') \ | |
+ ' HDF5 schema 1.00 .' | |
# create HDF5 file template | |
hf = h5py.File(fname, mode='w', userblock_size=512) | |
hf.close() | |
hf = None | |
# Make the bytearray while padding with spaces up to | |
# 128-12 (the minus 12 is there since the last 12 bytes | |
# are special). | |
b = bytearray(s + (128 - 12 - len(s)) * ' ', encoding='utf-8') | |
# Add 8 nulls (0) and the magic number that MATLAB uses. | |
b.extend(bytearray.fromhex('00000000 00000000 0002494D')) | |
# write MATLAB userblock | |
with open(fname, 'r+b') as f: | |
f.write(b) | |
# reopen the file in append-mode, skipping userblock | |
hf = h5py.File(fname, | |
mode='a', | |
libver=('earliest', 'v110'), | |
**kwargs) | |
yield hf | |
hf.flush() | |
hf.close() | |
def videos_to_mat(vid_fnames, mat_fname): | |
''' Convert list of videos into a single MATLAB-compatible HDF5 file. ''' | |
if not vid_fnames: | |
raise Exception('No videos to analyze have been passed.') | |
# get common video properties | |
vreader = cv.VideoCapture(vid_fnames[0]) | |
if not vreader.isOpened(): | |
raise Exception('Unable to read from video file!') | |
width = int(vreader.get(cv.CAP_PROP_FRAME_WIDTH)) | |
height = int(vreader.get(cv.CAP_PROP_FRAME_HEIGHT)) | |
fps = int(vreader.get(cv.CAP_PROP_FPS)) | |
frames_total = 0 | |
vreader.release() | |
# validate all videos | |
for v_fname in vid_fnames: | |
vreader = cv.VideoCapture(vid_fnames[0]) | |
if not vreader.isOpened(): | |
raise Exception('Unable to read from video file!') | |
if width != int(vreader.get(cv.CAP_PROP_FRAME_WIDTH)): | |
print('Video {} has invalid width (expected {}px)'.format(v_fname, width)) | |
sys.exit(2) | |
if height != int(vreader.get(cv.CAP_PROP_FRAME_HEIGHT)): | |
print('Video {} has invalid height (expected {}px)'.format(v_fname, height)) | |
sys.exit(2) | |
if fps != int(vreader.get(cv.CAP_PROP_FPS)): | |
print('Video {} has invalid framerate (expected {}fps)'.format(v_fname, fps)) | |
sys.exit(2) | |
frames_total += int(vreader.get(cv.CAP_PROP_FRAME_COUNT)) | |
vreader.release() | |
# read all videos | |
frames_all = np.zeros((frames_total, width, height), dtype=np.single) | |
cur_total_frame_n = 0 | |
for v_fname in vid_fnames: | |
vreader = cv.VideoCapture(v_fname) | |
if not vreader.isOpened(): | |
raise Exception('Unable to read from video file!') | |
fmt = int(vreader.get(cv.CAP_PROP_FORMAT)) | |
expected_frames_n = int(vreader.get(cv.CAP_PROP_FRAME_COUNT)) | |
log.info('Reading video: {}'.format(v_fname)) | |
frame_n = 0 | |
while True: | |
ret, mat = vreader.read() | |
if not ret: | |
break | |
cur_total_frame_n += 1 | |
frame_n += 1 | |
if frame_n > expected_frames_n: | |
raise Exception('Read more frames than the expected numer ({})'.format(expected_frames_n)) | |
# we should already have an 8-bit grayscale image, but we convert it just in case | |
gray_mat = cv.cvtColor(mat, cv.COLOR_BGR2GRAY) | |
frames_all[cur_total_frame_n - 1, :, :] = gray_mat.T.astype(np.single) | |
vreader.release() | |
if expected_frames_n != frame_n: | |
raise Exception('Read an unexpected amount of frames (expected {}, got {}'.format(expected_frames_n, frame_n)) | |
log.info('Video file contained {} frames @ {}fps {}x{}px, fmt-{}; now converted to 8bit gray type:single matrices'.format(frame_n, fps, width, height, fmt)) | |
if frames_total != cur_total_frame_n: | |
raise Exception('Read an invalid amount of frames: {} instead of {}'.format(cur_total_frame_n, frames_total)) | |
log.info('Total number of frames to process: {}'.format(frames_total)) | |
with open_hdf5_with_matlab_header(mat_fname) as f: | |
log.info('Saving raw video data to MATLAB-compatible HDF5 file...') | |
# direct assignment is *much* faster than writing to the HDF5 file in chunks | |
f.create_dataset('frame_all', data=frames_all) | |
log.info('Video conversion for mmap done.') | |
return (width, height), fps | |
def main(options): | |
if not options.videos: | |
print('No video files set.') | |
sys.exit(1) | |
if not options.out_fname: | |
print('No destination for filename set.') | |
sys.exit(1) | |
videos_to_mat(natsorted(options.videos), | |
options.out_fname) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Convert MKV video to MATLAB .mat file') | |
parser.add_argument('-o', '--output', action='store', dest='out_fname', | |
help='Output filename.') | |
parser.add_argument('videos', action='store', nargs='+', | |
help='The Miniscope video files to analyze.') | |
optn = parser.parse_args(sys.argv[1:]) | |
main(optn) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment