Created
October 4, 2024 15:02
-
-
Save dat-boris/713fa21edfdef0a677621a002d486203 to your computer and use it in GitHub Desktop.
Example code for creating keyframe from screen capture
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""A simple script to capture screen, that then post to the server as websocket | |
for setting up the messages. | |
We log these folders to allow for the case for following through the data. The | |
folder structure works with: | |
- logs/video/<yyyymmdd-hhmmss>.avi | |
- logs/keyframes/<yyyymmdd-hhmmss>_<frame>.png | |
""" | |
import argparse | |
import logging | |
import os | |
import shutil | |
import time | |
from typing import Iterable | |
import mss | |
import cv2 | |
import requests | |
from arapp.utils import convert_img_to_base64_url | |
from artools.custom_types import Image | |
from artools.keyframes import extract_key_frame | |
VIDEO_FOLDER = "./logs/video/" | |
KEYFRAME_FOLDER = "./logs/keyframes/" | |
def capture_screen(monitor=1, display=True) -> Image: | |
"""Capture screen and return the image.""" | |
with mss.mss() as sct: | |
# Get the screen | |
screen = sct.shot(mon=monitor) | |
# Load the image | |
img = cv2.imread(screen) | |
if display: | |
cv2.imshow("image", img) | |
cv2.waitKey(1) | |
return img | |
def send_to_server( | |
images: Iterable[Image], | |
host: str, | |
): | |
post_url = f"http://{host}/video_frame" | |
for img in images: | |
try: | |
# NOTE: we intentionally reconnect every time since we have seen | |
# that the connection can be closed by the server | |
print(f"Sending image to server: {post_url}") | |
base64_url = convert_img_to_base64_url(img) | |
data = { | |
"event": "video_message", | |
"image_base64": base64_url, | |
} | |
# post to the server | |
response = requests.post(post_url, json=data) | |
if response.status_code != 200: | |
raise Exception( | |
f"Failed to send data to server: {response.status_code}, {response.text}" | |
) | |
print("Done!") | |
except Exception as ex: | |
print(f"Error and Retrying: {ex}") | |
time.sleep(3) | |
def capture_loop( | |
interval_in_secs=30, | |
keyframe_count=3, | |
monitor=1, | |
verbose=False, | |
debug=False, | |
fps=4, | |
) -> Iterable[Image]: | |
"""A capturing loop of the screen.""" | |
img = capture_screen(monitor=monitor, display=verbose) | |
width, height, _ = img.shape | |
# Enusre that OUTPUT_VIDEO folde exists | |
os.makedirs(os.path.dirname(VIDEO_FOLDER), exist_ok=True) | |
while True: | |
try: | |
img = capture_screen(monitor=monitor, display=verbose) | |
has_keyframe = False | |
for frame in extract_key_frame( | |
video_filename, | |
KEYFRAME_FOLDER, | |
img_prefix=session_id, | |
keyframe_count=keyframe_count, | |
): | |
yield frame | |
has_keyframe = True | |
if not has_keyframe: | |
logging.error("No keyframe found") | |
continue | |
print("Done processing keyframes") | |
if debug: | |
break | |
except Exception as ex: | |
logging.error(f"Error and Retrying: {ex}") | |
time.sleep(3) | |
if __name__ == "__main__": | |
# allow --monitor to select which monitor | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--monitor", type=int, default=1) | |
parser.add_argument("--verbose", "-v", action="store_true") | |
parser.add_argument( | |
"--interval", | |
type=int, | |
default=30, | |
help="Interval in seconds between screenshots", | |
) | |
parser.add_argument("host", type=str) | |
args = parser.parse_args() | |
images = capture_loop( | |
interval_in_secs=args.interval, | |
monitor=args.monitor, | |
verbose=args.verbose, | |
) | |
send_to_server(images, args.host) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment