Last active
April 20, 2020 21:45
-
-
Save hohonuuli/1ed2976977f675674e177e9c2f7c72b8 to your computer and use it in GitHub Desktop.
Script to automate extraction of annotated video frames from VARS/M3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, timedelta | |
from typing import List, Dict | |
import iso8601 | |
import requests | |
import subprocess | |
import sys | |
__author__ = "Brian Schlining" | |
__copyright__ = "Copyright 2018, Monterey Bay Aquarium Research Institute" | |
JsonArray = List[Dict] | |
class VampireSquid(object): | |
""" Encapsulate REST calls to the video-asset manager | |
Some terms: | |
A `video sequence` is essentially a camera/rov/auv deployment | |
A `video` is a segment of video in video seqeunce. Often a video | |
may have several representations: e.g. master, mezzanine and proxy | |
versions. A video will contain one or more video references. | |
A `video reference` is information about a single video file. Usually, | |
it has a URI that is a URL to a video file on a web server. | |
A `media` is a simple, yet complete, metadata view of a video reference. Media | |
are often the easiest unit to work with. | |
Typical usage: | |
1. Find the name of the video sequence of interest. (list_video_sequence_names) | |
2. Look up the video sequence by name (find_video_sequence_by_name) | |
- This returns all media that compose a single video sequence. | |
- This might be all you need. | |
- If you want a simpiler view, grab the video_reference.uuid and use | |
`find_media_by_uuid` to get a media view of a video reference. | |
""" | |
def __init__(self, base_url: str): | |
self.base_url = base_url | |
def list_video_sequence_names(self) -> List[str]: | |
"""List all available video sequence names""" | |
url = "{}/videosequences/names".format(self.base_url) | |
return requests.get(url).json() | |
def find_media_by_video_sequence_name(self, video_sequence_name: str) -> JsonArray: | |
"""Find all media belonging to a video sequence""" | |
url = "{}/media/videosequence/{}".format( | |
self.base_url, video_sequence_name) | |
return requests.get(url).json() | |
def find_media_by_video_reference_uuid(self, video_reference_uuid: str) -> Dict: | |
"""Find a single media by its video_reference_uuid""" | |
url = "{}/media/videoreference/{}".format( | |
self.base_url, video_reference_uuid) | |
return requests.get(url).json() | |
def find_concurrent_media(self, video_reference_uuid: str) -> JsonArray: | |
"""Find all media in the same video sequence that overlap in time | |
with the one whos primary key you provide""" | |
url = "{}/media/concurrent/{}".format( | |
self.base_url, video_reference_uuid) | |
return requests.get(url).json() | |
class Annosaurus(object): | |
"""Encapsulate REST calls to the annotation service | |
""" | |
def __init__(self, base_url: str): | |
self.base_url = base_url | |
def find_annotations(self, video_reference_uuid) -> JsonArray: | |
"""Find all annotations for a specific media/video""" | |
url = "{}/annotations/videoreference/{}".format( | |
self.base_url, video_reference_uuid) | |
return requests.get(url).json() | |
def format_millis(millis: int) -> str: | |
"""Format elapse_time_millis as something ffmpeg likes""" | |
seconds = (millis / 1000) % 60 | |
minutes = int((millis / (1000 * 60)) % 60) | |
hours = int((millis / (1000 * 60 * 60)) % 24) | |
return "{:02d}:{:02d}:{:02.3f}".format(hours, minutes, seconds) | |
def find_concurrent_annotations(video_reference_uuid: str, vampire_squid: VampireSquid, annosaurus: Annosaurus) -> JsonArray: | |
"""Find annotations from other media in the same video sequence that occur | |
within the time bounds of your chosen media""" | |
# --- 1. Get the time bounds of your video | |
media = vampire_squid.find_media_by_video_reference_uuid( | |
video_reference_uuid) | |
start = iso8601.parse_date(media['start_timestamp']) | |
end = start + timedelta(milliseconds=media['duration_millis']) | |
concurrent_media = vampire_squid.find_concurrent_media( | |
video_reference_uuid) | |
annotations = annosaurus.find_annotations(video_reference_uuid) | |
for c in concurrent_media: | |
new_annos = annosaurus.find_annotations(c['video_reference_uuid']) | |
for a in new_annos: | |
if 'recorded_timestamp' in a: | |
t = iso8601.parse_date(a['recorded_timestamp']) | |
if t >= start and t <= end: | |
# Calculate elapsed time relative to our chosen media | |
dt = t - start | |
millis = dt.total_seconds() * 1000 | |
a['elapsed_time_millis'] = int(millis) | |
# print("ET={}\tT={}".format( | |
# a['elapsed_time_millis'], a['recorded_timestamp'])) | |
annotations.append(a) | |
return annotations | |
def remove_duplicate_elapsed_times(annotations) -> JsonArray: | |
"""There's often more than one observation in the same frame. Skip duplicate times""" | |
checked_elapsed_times = set() | |
checked_annotations = list() | |
for a in annotations: | |
t = a['elapsed_time_millis'] | |
if t not in checked_elapsed_times: | |
checked_elapsed_times.add(t) | |
checked_annotations.append(a) | |
return checked_annotations | |
def main(video_reference_uuid: str, vampire_squid: VampireSquid, annosaurus: Annosaurus): | |
media = vampire_squid.find_media_by_video_reference_uuid( | |
video_reference_uuid) | |
if media is None: | |
print("No media was found with a video_reference_uuid = {}".format( | |
video_reference_uuid)) | |
else: | |
end = iso8601.parse_date( | |
media['start_timestamp']) + timedelta(milliseconds=media['duration_millis']) | |
print(""" | |
Using: | |
video_reference_uuid: {} | |
uri: {} | |
start: {} | |
end: {} | |
duration (millis): {} | |
""".format(video_reference_uuid, media['uri'], media['start_timestamp'], end, media['duration_millis'])) | |
annotations = find_concurrent_annotations( | |
video_reference_uuid, vampire_squid, annosaurus) | |
annotations = remove_duplicate_elapsed_times(annotations) | |
url = media_of_interest["uri"] | |
for a in annotations: | |
dt = a['elapsed_time_millis'] | |
concept = a['concept'].replace(" ", "-") | |
jpg = "{}_{}_{}.jpg".format(dt, concept, a['observation_uuid']) | |
t = format_millis(dt) | |
print("Extracting {}".format(jpg)) | |
cmd = "ffmpeg -ss {} -i {} -vframes 1 {}".format(t, url, jpg) | |
# print(cmd) | |
subprocess.call(cmd, shell=True) | |
if __name__ == "__main__": | |
# This is a working mockup. Not meant for production, but gives an idea | |
# of how to get started. If you need assistant to make this into a | |
# product, talk to Brian Schlining ([email protected]) for help. | |
# --- 1. Initialize stuff we need | |
# Initialize REST APIs with correct endpoints | |
vampire_squid = VampireSquid("http://m3.shore.mbari.org/vam/v1") | |
annosaurus = Annosaurus("http://m3.shore.mbari.org/anno/v1") | |
# You could use vampire_squid.list_video_sequence_names() or browse to | |
# http://m3.shore.mbari.org/vam/v1/videosequences/names to find acceptable names | |
# I'm just hard-coding one here. | |
video_sequence_name = "Doc Ricketts 0953" | |
# --- 2. Find the primary key (video_reference_uuid) for the video that we want | |
# to extract image from. There's many ways to skin this cat. | |
# Find all media in a dive | |
media = vampire_squid.find_media_by_video_sequence_name( | |
video_sequence_name) | |
# Here we'll filter so we're only working with MP4 videos as they work | |
# waaaaay better than ProRes for this application | |
mp4s = [m for m in media if m['container'] == "video/mp4"] | |
# We need a media's `video_reference_uuid` to look up annotations for | |
# a specific video. | |
media_of_interest = mp4s[0] | |
video_reference_uuid = media_of_interest['video_reference_uuid'] | |
# --- 3. Run extraction. As written they are extracted to your current directory | |
main(video_reference_uuid, vampire_squid, annosaurus) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment