Created
November 11, 2020 13:50
-
-
Save ZackAkil/559a7ec1498e04801b0752126e1da13c to your computer and use it in GitHub Desktop.
Google Cloud Function to Generate a tracking gif of a body part using Video Intelligence API and Storage Triggers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import numpy as np | |
import imageio | |
import cv2 | |
from google.cloud import storage | |
from google.cloud import videointelligence_v1p3beta1 as videointelligence | |
storage_client = storage.Client() | |
client = videointelligence.VideoIntelligenceServiceClient() | |
OUTPUT_BUCKET_NAME = 'YOUR BUCKET NAME' # NO gs://, e.g 'my-output-bucket' | |
LANDMARK_TO_TRACK = 'right_wrist' | |
# configure person detection | |
config = videointelligence.types.PersonDetectionConfig( | |
include_bounding_boxes=True, # include bounding box round whole body | |
include_attributes=False, # include cloths info | |
include_pose_landmarks=True, # include body joints info | |
) | |
video_context = videointelligence.types.VideoContext(person_detection_config=config) | |
def hello_gcs(event, context): | |
print(event) | |
video_bucket = storage_client.get_bucket(event['bucket']) | |
gs_video_blob = video_bucket.blob(event['name']) | |
# download from google storage | |
with open('/tmp/'+ event['name'], 'wb') as file_obj: | |
local_file_name = file_obj.name | |
gs_video_blob.download_to_file(file_obj) | |
gcs_uri = 'gs://' + event['bucket'] + '/' + event['name'] | |
operation = client.annotate_video( | |
input_uri=gcs_uri, | |
features=[videointelligence.enums.Feature.PERSON_DETECTION], | |
video_context=video_context, | |
) | |
print("\nProcessing video for person detection annotations.") | |
result = operation.result(timeout=300) | |
print("Finnished processing") | |
# get the first tracked person from the annotations | |
person_track = result.annotation_results[0].person_detection_annotations[0].tracks[0].timestamped_objects | |
# define how many pixels around the wrist to crop out | |
square_padding = 100 | |
# store images of hands for playback | |
hand_images = [] | |
video, fps, frame_count, duration = load_video(local_file_name) | |
print('video loaded, fps:', fps,' frames:', frame_count,' duration:', duration) | |
for body_frame in person_track[::]: | |
frame_time_millis = body_frame.time_offset.ToMilliseconds() | |
# find image from video that matches timestamp | |
image = seek_video_to_time(video, fps, frame_time_millis) | |
height, width, channels = image.shape | |
# get just the right wrist landmark | |
wrist = get_landmark(body_frame.landmarks, LANDMARK_TO_TRACK) | |
# if it's high enough confidence | |
if wrist and wrist.confidence > .3: | |
print(frame_time_millis, 'ms') | |
wrist_landmark_x = wrist.point.x * width | |
wrist_landmark_y = wrist.point.y * height | |
crop_x = wrist_landmark_x - square_padding | |
crop_y = wrist_landmark_y - square_padding | |
crop_width = square_padding*2 | |
crop_height = square_padding*2 | |
# draw just hand image | |
crop_box = (crop_x, crop_y, crop_width, crop_height) | |
hand_image = get_image_crop(image, crop_box) | |
hand_images.append(hand_image) | |
file_name = event['name'].split('.')[0] | |
gif_file_name = '/tmp/'+file_name+'.gif' | |
imageio.mimsave(gif_file_name, hand_images) | |
# upload to google storage | |
output_video_bucket = storage_client.get_bucket(OUTPUT_BUCKET_NAME) | |
new_blob_name = event['name'].split('.')[0]+'_'+LANDMARK_TO_TRACK+'.gif' | |
new_blob = output_video_bucket.blob(new_blob_name) | |
new_blob.upload_from_filename(gif_file_name) | |
# function for getting a crop portation of an image as its own image | |
def get_image_crop(image, box): | |
x, y, width, height = [int(dim) for dim in box] | |
# pad image so that when we crop we get constant size images if the crop | |
# is happening near the edge of the image | |
padded_image = np.pad(image, ((height, height), (width, width), (0,0)), | |
constant_values = 0) | |
# offset the coordinates to account for added padding | |
x += width | |
y += height | |
image_array = padded_image[y: y + height, x: x + width] | |
return image_array | |
# function to load video and get video info | |
def load_video(file_path): | |
video = cv2.VideoCapture(file_path) | |
fps = video.get(cv2.CAP_PROP_FPS) | |
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
duration = frame_count/fps | |
return (video, fps, frame_count, duration) | |
# function for getting a single body part from the landmarks data | |
def get_landmark(landmarks, body_part_name): | |
for landmark in landmarks: | |
if landmark.name == body_part_name: | |
return landmark | |
return None | |
# function to seek video to specifc time | |
def seek_video_to_time(video, fps, frame_time): | |
current_index = video.get(cv2.CAP_PROP_POS_FRAMES) | |
target_frame_index = int((frame_time/1000.0) * fps) | |
image = None | |
if target_frame_index == 0: | |
success, image = video.read() | |
while current_index < target_frame_index: | |
success, image = video.read() | |
current_index += 1 | |
return image |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
google-cloud-storage | |
google-cloud-videointelligence | |
opencv-python | |
imageio |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment