Last active
November 20, 2024 18:19
-
-
Save victorypoint/69b47099f177ad7a9cae819908b24ea7 to your computer and use it in GitHub Desktop.
Retrieve iFit2 Workout Metrics From Screen via OCR
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# iFit2-Metrics-OCR - 'Retrieve iFit2 Workout Metrics via OCR | |
# Author: Al Udell | |
# Revised: November 20, 2024 | |
# --- functions --- | |
# Validate RPM values function | |
def validate_rpm(value): | |
try: | |
if '/' in value: | |
value = value.split('/')[0] | |
return value | |
except (ValueError, TypeError): | |
return value | |
# --- main --- | |
import subprocess | |
from PIL import Image | |
import requests | |
import io | |
import os | |
import socket | |
from datetime import datetime | |
from io import BytesIO | |
import gzip | |
# File paths | |
ocrlogFile = 'ocr-logfile.txt' | |
# Setup UDP socket for broadcast | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
broadcast_address = ('255.255.255.255', 8002) | |
# Define equipment and OCR slot metrics | |
equipment = "bike" | |
slot_labels = { | |
"bike": ["kilometers", "elapsed", "rpm", "watts", "calories", "incline", "resistance"], | |
"treadmill": ["kilometers", "elapsed", "pace/km", "calories", "vertical gain", "incline", "speed"], | |
}[equipment] | |
# Define crop coordinates | |
crop_coordinates = [ | |
(420, 30, 590, 68), # Slot 1 | |
(675, 30, 845, 68), # Slot 2 | |
(925, 30, 1095, 68), # Slot 3 | |
(1180, 30, 1350, 68), # Slot 4 | |
(1435, 30, 1605, 68), # Slot 5 | |
(560, 1015, 645, 1048), # Slot 6 | |
(1270, 1015, 1355, 1048), # Slot 7 | |
] | |
new_height = 100 # Target height for resized images | |
try: | |
# Loop to capture, process, and OCR the screenshot repeatedly | |
while True: | |
# --- Screenshot Section --- | |
# Run adb command to capture and compress screenshot | |
# Use subprocess.Popen and exec-out for streaming output, suppress errors for speed, highest gzip compression level -9 | |
process = subprocess.Popen(["adb", "exec-out", "screencap -p | gzip -9"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) | |
# Read the output directly as it streams | |
compressed_data = process.stdout.read() | |
process.wait() | |
# Decompress the gzip data | |
decompressed_data = gzip.decompress(compressed_data) | |
# Load the screenshot into memory | |
image_data = BytesIO(decompressed_data) | |
image = Image.open(image_data) | |
# --- Image Processing Section --- | |
slot_values = [] | |
for i, crop in enumerate(crop_coordinates): | |
# Crop and resize the image | |
slot_image = image.crop(crop) | |
height_percent = new_height / float(slot_image.height) | |
new_width = int((float(slot_image.width) * height_percent)) | |
resized_image = slot_image.resize((new_width, new_height), Image.LANCZOS) | |
# Perform OCR directly on the resized image in memory | |
with io.BytesIO() as buffer: | |
resized_image.save(buffer, format="PNG") | |
buffer.seek(0) | |
image_data = buffer.read() | |
ocr_response = requests.post( | |
"http://localhost:32168/v1/vision/ocr", files={"image": image_data} | |
).json() | |
# Extract and validate OCR result | |
labels = [prediction['label'] for prediction in ocr_response.get('predictions', [])] | |
value = ''.join(labels) | |
# Validate RPM specifically for slot 3 | |
if i == 2: # Slot 3 is for RPM | |
value = validate_rpm(value) | |
if value is None: | |
value = "Invalid RPM" | |
slot_values.append(value) | |
# --- Output Section --- | |
message = " | ".join(f"{label}: {value}" for label, value in zip(slot_labels, slot_values)) | |
# Print results | |
os.system('cls' if os.name == 'nt' else 'clear') | |
print(f"\r{message}\n", end="") | |
# Write OCR text to log file | |
dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
with open(ocrlogFile, "a") as file: | |
file.write(f"{dt}, {message}\n") | |
# Encode and send UDP messages | |
if equipment == "bike": | |
udp_messages = [ | |
f"Changed RPM {slot_values[2]}", | |
f"Changed Watts {slot_values[3]}", | |
f"Changed Grade {slot_values[5]}", | |
f"Changed Resistance {slot_values[6]}", | |
] | |
elif equipment == "treadmill": | |
udp_messages = [ | |
f"Changed Grade {slot_values[5]}", | |
f"Changed KPH {slot_values[6]}", | |
] | |
for udp_message in udp_messages: | |
sock.sendto(udp_message.encode('utf-8'), broadcast_address) | |
# Wait before the next iteration | |
# time.sleep(5) | |
except KeyboardInterrupt: | |
print("\nLoop interrupted.") | |
finally: | |
sock.close() | |
subprocess.run(["adb", "disconnect"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment