Skip to content

Instantly share code, notes, and snippets.

@hohonuuli
Last active April 20, 2020 21:45
Show Gist options
  • Save hohonuuli/1ed2976977f675674e177e9c2f7c72b8 to your computer and use it in GitHub Desktop.
Save hohonuuli/1ed2976977f675674e177e9c2f7c72b8 to your computer and use it in GitHub Desktop.
Script to automate extraction of annotated video frames from VARS/M3
from datetime import datetime, timedelta
from typing import List, Dict
import iso8601
import requests
import subprocess
import sys
__author__ = "Brian Schlining"
__copyright__ = "Copyright 2018, Monterey Bay Aquarium Research Institute"
JsonArray = List[Dict]
class VampireSquid(object):
""" Encapsulate REST calls to the video-asset manager
Some terms:
A `video sequence` is essentially a camera/rov/auv deployment
A `video` is a segment of video in video seqeunce. Often a video
may have several representations: e.g. master, mezzanine and proxy
versions. A video will contain one or more video references.
A `video reference` is information about a single video file. Usually,
it has a URI that is a URL to a video file on a web server.
A `media` is a simple, yet complete, metadata view of a video reference. Media
are often the easiest unit to work with.
Typical usage:
1. Find the name of the video sequence of interest. (list_video_sequence_names)
2. Look up the video sequence by name (find_video_sequence_by_name)
- This returns all media that compose a single video sequence.
- This might be all you need.
- If you want a simpiler view, grab the video_reference.uuid and use
`find_media_by_uuid` to get a media view of a video reference.
"""
def __init__(self, base_url: str):
self.base_url = base_url
def list_video_sequence_names(self) -> List[str]:
"""List all available video sequence names"""
url = "{}/videosequences/names".format(self.base_url)
return requests.get(url).json()
def find_media_by_video_sequence_name(self, video_sequence_name: str) -> JsonArray:
"""Find all media belonging to a video sequence"""
url = "{}/media/videosequence/{}".format(
self.base_url, video_sequence_name)
return requests.get(url).json()
def find_media_by_video_reference_uuid(self, video_reference_uuid: str) -> Dict:
"""Find a single media by its video_reference_uuid"""
url = "{}/media/videoreference/{}".format(
self.base_url, video_reference_uuid)
return requests.get(url).json()
def find_concurrent_media(self, video_reference_uuid: str) -> JsonArray:
"""Find all media in the same video sequence that overlap in time
with the one whos primary key you provide"""
url = "{}/media/concurrent/{}".format(
self.base_url, video_reference_uuid)
return requests.get(url).json()
class Annosaurus(object):
"""Encapsulate REST calls to the annotation service
"""
def __init__(self, base_url: str):
self.base_url = base_url
def find_annotations(self, video_reference_uuid) -> JsonArray:
"""Find all annotations for a specific media/video"""
url = "{}/annotations/videoreference/{}".format(
self.base_url, video_reference_uuid)
return requests.get(url).json()
def format_millis(millis: int) -> str:
"""Format elapse_time_millis as something ffmpeg likes"""
seconds = (millis / 1000) % 60
minutes = int((millis / (1000 * 60)) % 60)
hours = int((millis / (1000 * 60 * 60)) % 24)
return "{:02d}:{:02d}:{:02.3f}".format(hours, minutes, seconds)
def find_concurrent_annotations(video_reference_uuid: str, vampire_squid: VampireSquid, annosaurus: Annosaurus) -> JsonArray:
"""Find annotations from other media in the same video sequence that occur
within the time bounds of your chosen media"""
# --- 1. Get the time bounds of your video
media = vampire_squid.find_media_by_video_reference_uuid(
video_reference_uuid)
start = iso8601.parse_date(media['start_timestamp'])
end = start + timedelta(milliseconds=media['duration_millis'])
concurrent_media = vampire_squid.find_concurrent_media(
video_reference_uuid)
annotations = annosaurus.find_annotations(video_reference_uuid)
for c in concurrent_media:
new_annos = annosaurus.find_annotations(c['video_reference_uuid'])
for a in new_annos:
if 'recorded_timestamp' in a:
t = iso8601.parse_date(a['recorded_timestamp'])
if t >= start and t <= end:
# Calculate elapsed time relative to our chosen media
dt = t - start
millis = dt.total_seconds() * 1000
a['elapsed_time_millis'] = int(millis)
# print("ET={}\tT={}".format(
# a['elapsed_time_millis'], a['recorded_timestamp']))
annotations.append(a)
return annotations
def remove_duplicate_elapsed_times(annotations) -> JsonArray:
"""There's often more than one observation in the same frame. Skip duplicate times"""
checked_elapsed_times = set()
checked_annotations = list()
for a in annotations:
t = a['elapsed_time_millis']
if t not in checked_elapsed_times:
checked_elapsed_times.add(t)
checked_annotations.append(a)
return checked_annotations
def main(video_reference_uuid: str, vampire_squid: VampireSquid, annosaurus: Annosaurus):
media = vampire_squid.find_media_by_video_reference_uuid(
video_reference_uuid)
if media is None:
print("No media was found with a video_reference_uuid = {}".format(
video_reference_uuid))
else:
end = iso8601.parse_date(
media['start_timestamp']) + timedelta(milliseconds=media['duration_millis'])
print("""
Using:
video_reference_uuid: {}
uri: {}
start: {}
end: {}
duration (millis): {}
""".format(video_reference_uuid, media['uri'], media['start_timestamp'], end, media['duration_millis']))
annotations = find_concurrent_annotations(
video_reference_uuid, vampire_squid, annosaurus)
annotations = remove_duplicate_elapsed_times(annotations)
url = media_of_interest["uri"]
for a in annotations:
dt = a['elapsed_time_millis']
concept = a['concept'].replace(" ", "-")
jpg = "{}_{}_{}.jpg".format(dt, concept, a['observation_uuid'])
t = format_millis(dt)
print("Extracting {}".format(jpg))
cmd = "ffmpeg -ss {} -i {} -vframes 1 {}".format(t, url, jpg)
# print(cmd)
subprocess.call(cmd, shell=True)
if __name__ == "__main__":
# This is a working mockup. Not meant for production, but gives an idea
# of how to get started. If you need assistant to make this into a
# product, talk to Brian Schlining ([email protected]) for help.
# --- 1. Initialize stuff we need
# Initialize REST APIs with correct endpoints
vampire_squid = VampireSquid("http://m3.shore.mbari.org/vam/v1")
annosaurus = Annosaurus("http://m3.shore.mbari.org/anno/v1")
# You could use vampire_squid.list_video_sequence_names() or browse to
# http://m3.shore.mbari.org/vam/v1/videosequences/names to find acceptable names
# I'm just hard-coding one here.
video_sequence_name = "Doc Ricketts 0953"
# --- 2. Find the primary key (video_reference_uuid) for the video that we want
# to extract image from. There's many ways to skin this cat.
# Find all media in a dive
media = vampire_squid.find_media_by_video_sequence_name(
video_sequence_name)
# Here we'll filter so we're only working with MP4 videos as they work
# waaaaay better than ProRes for this application
mp4s = [m for m in media if m['container'] == "video/mp4"]
# We need a media's `video_reference_uuid` to look up annotations for
# a specific video.
media_of_interest = mp4s[0]
video_reference_uuid = media_of_interest['video_reference_uuid']
# --- 3. Run extraction. As written they are extracted to your current directory
main(video_reference_uuid, vampire_squid, annosaurus)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment