Created
November 1, 2024 01:07
-
-
Save N8python/b31c84d04caca53e0de3361161646883 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
from pynput import keyboard | |
from datetime import datetime | |
import subprocess | |
import threading | |
import tkinter as tk | |
import queue | |
# ML imports | |
import mlx.core as mx | |
from mlx_vlm import load, generate | |
from mlx_vlm.prompt_utils import apply_chat_template | |
from mlx_vlm.utils import load_config, stream_generate | |
from mlx_lm import load as lm_load, generate as lm_generate, stream_generate as lm_stream_generate | |
# Create screenshots directory if it doesn't exist | |
SCREENSHOT_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'screenshots') | |
os.makedirs(SCREENSHOT_DIR, exist_ok=True) | |
class StatusOverlay: | |
def __init__(self): | |
self.root = tk.Tk() | |
self.root.title("Status") | |
# Platform-specific window flags to ensure it stays on top of everything | |
if os.system == 'Darwin': # macOS | |
self.root.attributes('-alpha', 0.8, '-topmost', True, '-fullscreen', False) | |
self.root.attributes('-float', True) # Makes it float above all windows | |
try: | |
# Try to use PyObjC for additional window level control if available | |
from Foundation import NSAppleScript | |
script = ''' | |
tell application "System Events" | |
set frontmost of every process to false | |
end tell | |
''' | |
NSAppleScript.alloc().initWithSource_(script).executeAndReturnError_(None) | |
except ImportError: | |
pass | |
else: # Linux/Windows | |
self.root.attributes('-alpha', 0.8, '-topmost', True, '-type', 'dock') | |
self.root.lift() | |
self.root.attributes('-topmost', True) # Double-ensure topmost | |
self.root.overrideredirect(True) # Remove window decorations | |
# Initialize processing timer | |
self.start_time = None | |
self.processing = False | |
# Create UI elements... | |
# Ensure the window stays on top periodically | |
def keep_on_top(): | |
self.root.lift() | |
self.root.attributes('-topmost', True) | |
self.root.after(100, keep_on_top) # Check every 100ms | |
keep_on_top() | |
# Calculate position (top right corner with padding) | |
screen_width = self.root.winfo_screenwidth() | |
# Create main frame with rounded corners effect | |
self.main_frame = tk.Frame( | |
self.root, | |
bg='#1a1a1a', # Dark background | |
highlightbackground='#2a2a2a', # Border color | |
highlightthickness=2, | |
) | |
self.main_frame.pack(padx=2, pady=2, fill='both', expand=True) | |
# Status label with icon | |
self.status_frame = tk.Frame(self.main_frame, bg='#1a1a1a') | |
self.status_frame.pack(fill='x', padx=10, pady=(10,5)) | |
self.status_icon = tk.Label( | |
self.status_frame, | |
text="●", # Dot icon | |
font=("Arial", 14), | |
bg='#1a1a1a', | |
fg='#4CAF50' # Green dot by default | |
) | |
self.status_icon.pack(side='left', padx=(0,5)) | |
self.label = tk.Label( | |
self.status_frame, | |
text="Idle", | |
font=("SF Pro Display", 12, "bold"), # Modern font | |
bg='#1a1a1a', | |
fg='#ffffff', | |
) | |
self.label.pack(side='left') | |
# Timer label (right-aligned) | |
self.timer_label = tk.Label( | |
self.status_frame, | |
text="", | |
font=("SF Pro Display", 10), | |
bg='#1a1a1a', | |
fg='#888888', | |
) | |
self.timer_label.pack(side='right') | |
# Answer display (hidden by default) | |
self.answer_frame = tk.Frame(self.main_frame, bg='#1a1a1a') | |
self.answer_frame.pack(fill='x', padx=10, pady=(0,10), expand=True) | |
self.answer_label = tk.Label( | |
self.answer_frame, | |
text="", | |
font=("SF Pro Display", 11), | |
bg='#1a1a1a', | |
fg='#cccccc', | |
wraplength=200, | |
justify='left' | |
) | |
self.answer_label.place(x=0, y=0, anchor='nw') # Fixed position at top-left | |
# Position window in top right with 20px padding | |
self.root.geometry(f"250x0+{screen_width - 250}+20") | |
# Add subtle fade animation for status changes | |
self.fade_alpha = 0.0 | |
self.root.update() | |
self.root.geometry(f"250x{int(self.root.winfo_reqheight() + 20)}") | |
# Queue for thread-safe updates | |
self.update_queue = queue.Queue() | |
self.check_queue() | |
def check_queue(self): | |
"""Check for status updates in the queue""" | |
try: | |
while True: | |
update = self.update_queue.get_nowait() | |
if isinstance(update, tuple): # It's a status+answer update | |
status, answer = update | |
self.update_display(status, answer) | |
else: # It's just a status update | |
self.update_display(update) | |
except queue.Empty: | |
pass | |
self.root.after(100, self.check_queue) | |
def update_display(self, status, answer=None): | |
"""Update the display with new status and optional answer""" | |
# Update status text and colors | |
self.label.config(text=status) | |
if status == "Idle": | |
self.status_icon.config(fg='#4CAF50') # Green | |
self.answer_label.config(text="") | |
self.answer_frame.pack_forget() # Hide answer frame | |
if self.processing: | |
# Calculate and display processing time | |
end_time = time.time() | |
processing_time = end_time - self.start_time | |
self.timer_label.config(text=f"{processing_time:.1f}s") | |
self.processing = False | |
elif "Error" in status: | |
self.status_icon.config(fg='#f44336') # Red | |
self.answer_frame.pack_forget() # Hide answer frame | |
self.processing = False | |
elif status == "Taking Screenshot": | |
self.status_icon.config(fg='#FFA726') # Orange | |
self.start_time = time.time() | |
self.processing = True | |
self.timer_label.config(text="") | |
elif status == "Speaking": | |
self.status_icon.config(fg='#2196F3') # Blue | |
if answer: | |
self.answer_label.config(text=answer) | |
self.answer_frame.pack(fill='x', padx=10, pady=(0,10), expand=True) # Show answer | |
else: | |
self.status_icon.config(fg='#FFA726') # Orange for processing | |
# Add fade effect | |
#self.fade_in() | |
# Update the update_answer method: | |
def update_answer(self, answer): | |
"""Update the answer display""" | |
self.answer_label.config(text=answer) | |
# Don't pack the frame again if it's already packed | |
if not self.answer_frame.winfo_ismapped(): | |
self.answer_frame.pack(fill='x', padx=10, pady=(0,10), expand=True) | |
self.root.update_idletasks() | |
# Calculate new height based on text | |
text_height = self.answer_label.winfo_reqheight() | |
self.answer_frame.configure(height=text_height + 20) # Add padding | |
screen_height_limit = self.root.winfo_screenheight() - 100 | |
self.root.geometry(f"250x{min(int(self.root.winfo_reqheight()), screen_height_limit)}") | |
def fade_in(self): | |
"""Create a subtle fade-in effect""" | |
self.fade_alpha = 0.7 | |
self.root.attributes('-alpha', self.fade_alpha) | |
self.fade_step() | |
def fade_step(self): | |
"""Gradually increase opacity""" | |
if self.fade_alpha < 0.9: | |
self.fade_alpha += 0.1 | |
self.root.attributes('-alpha', self.fade_alpha) | |
self.root.after(20, self.fade_step) | |
def update_status(self, status, answer=None): | |
"""Thread-safe method to update status and answer""" | |
self.update_queue.put((status, answer) if answer else status) | |
def start(self): | |
"""Start the overlay""" | |
self.root.mainloop() | |
class ScreenshotAnalyzer: | |
def __init__(self): | |
self.status_overlay = StatusOverlay() | |
self.processing_thread = None | |
print("Loading Qwen2-VL model...") | |
self.status_overlay.update_status("Loading Models") | |
model_path = "mlx-community/pixtral-12b-4bit" | |
self.model, self.processor = load(model_path) | |
#self.processor = AutoProcessor.from_pretrained( | |
#"Qwen/Qwen2-VL-7B-Instruct", | |
#min_pixels=256*28*28, | |
# max_pixels=512*28*28 | |
#) | |
#self.processor.detokenizer = self.p.detokenizer | |
self.config = load_config(model_path) | |
self.answer_model, self.answer_tokenizer = lm_load("mlx-community/Qwen2.5-Math-7B-Instruct-4bit") | |
self.summarizer_model, self.summarizer_tokenizer = lm_load("mlx-community/Llama-3.2-3B-Instruct-4bit") | |
print("Model loaded successfully!") | |
self.status_overlay.update_status("Idle") | |
def analyze_image(self, image_path): | |
"""Analyze the image using Qwen2-VL model""" | |
self.status_overlay.update_status("Analyzing Image") | |
prompt = "Extract all text, problem statements, and math notation from the image, and format the result in LaTeX. If there is a problem statement, you *must* extract it. Do NOT attempt to solve the problem. If the image doesn't contain anything math-related, output the latex \skip." | |
formatted_prompt = apply_chat_template( | |
self.processor, | |
self.config, | |
prompt, | |
num_images=1 | |
) | |
output = "" | |
for token in stream_generate( | |
self.model, | |
self.processor, | |
[image_path], | |
formatted_prompt, | |
verbose=False, | |
max_tokens=4096, | |
temp=0.0 | |
): | |
output += token | |
self.status_overlay.update_answer(output) | |
#self.status_overlay.update_answer(output) | |
return output | |
def solve_problem(self, problem_text): | |
self.status_overlay.update_status("Solving Problem") | |
prompt = f"Solve the following problem - ensure you simplify your answer as much as possible. Think step by step to get to your conclusion. The problem is below: \n{problem_text}" | |
messages = [{"role": "user", "content": prompt}] | |
prompt = self.answer_tokenizer.apply_chat_template( | |
messages, tokenize=False, add_generation_prompt=True | |
) | |
output = "" | |
for token in lm_stream_generate( | |
self.answer_model, | |
self.answer_tokenizer, | |
prompt, | |
max_tokens=4096, | |
temp=0.0, | |
min_p=0.05, | |
repetition_penalty=1.1 | |
): | |
output += token | |
self.status_overlay.update_answer(output) | |
print(output) | |
self.status_overlay.update_status("Summarizing") | |
summarize_prompt = f"Below is a solved math problem - write out the answer presented without any fancy formatting: \n{output}" | |
formatted_summarize_prompt = self.summarizer_tokenizer.apply_chat_template( | |
[ | |
{"role": "system", "content": "You are an AI that summarizes solutions. When doing so, you write negative numbers as 'negative 2' instead of '-2'." }, | |
{"role": "user", "content": summarize_prompt}, | |
], tokenize=False, add_generation_prompt=True | |
) | |
output = "" | |
for token in lm_stream_generate( | |
self.summarizer_model, | |
self.summarizer_tokenizer, | |
formatted_summarize_prompt, | |
max_tokens=4096, | |
temp=0.0, | |
min_p=0.05 | |
): | |
output += token | |
self.status_overlay.update_answer(output) | |
return output | |
def process_screenshot(self, filepath): | |
"""Process the screenshot in a separate thread""" | |
try: | |
analysis = self.analyze_image(filepath) | |
if "\skip" in analysis: | |
self.status_overlay.update_status("No Math Found") | |
time.sleep(2) | |
else: | |
print("\n=== Problem ===") | |
print(analysis) | |
solution = self.solve_problem(analysis) | |
print("\n=== AI Solution ===") | |
print(solution) | |
self.status_overlay.update_status("Speaking", solution) | |
# Create a pipeline for direct audio streaming | |
process1 = subprocess.Popen( | |
['echo', solution], | |
stdout=subprocess.PIPE | |
) | |
process2 = subprocess.Popen( | |
['piper', '-m', 'en_US-lessac-high', '--output-raw'], | |
stdin=process1.stdout, | |
stdout=subprocess.PIPE | |
) | |
process3 = subprocess.Popen( | |
['play', '-r', '22050', '-b', '16', '-e', 'signed', '-t', 'raw', '-'], | |
stdin=process2.stdout | |
) | |
# Close the unused pipes | |
process1.stdout.close() | |
process2.stdout.close() | |
# Wait for the audio to finish | |
process3.wait() | |
except Exception as e: | |
print(f"Error analyzing screenshot: {str(e)}") | |
self.status_overlay.update_status(f"Error: {str(e)[:20]}...") | |
time.sleep(3) | |
finally: | |
self.status_overlay.update_status("Idle") | |
def on_activate(self): | |
"""Callback function when the hotkey is pressed""" | |
if self.processing_thread and self.processing_thread.is_alive(): | |
print("Still processing previous screenshot...") | |
return | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
filename = f'screenshot_{timestamp}.png' | |
filepath = os.path.join(SCREENSHOT_DIR, filename) | |
self.status_overlay.update_status("Taking Screenshot") | |
subprocess.run(['screencapture', '-i', '-x', filepath]) | |
if os.path.exists(filepath): | |
print(f'\nScreenshot saved to: {filepath}') | |
self.processing_thread = threading.Thread( | |
target=self.process_screenshot, | |
args=(filepath,), | |
daemon=True | |
) | |
self.processing_thread.start() | |
def main(): | |
analyzer = ScreenshotAnalyzer() | |
# Create the hotkey listener | |
hotkey = keyboard.HotKey( | |
keyboard.HotKey.parse('<ctrl>+m'), | |
analyzer.on_activate | |
) | |
def for_canonical(f): | |
return lambda k: f(listener.canonical(k)) | |
# Start listening for the hotkey in a separate thread | |
listener = keyboard.Listener( | |
on_press=for_canonical(hotkey.press), | |
on_release=for_canonical(hotkey.release) | |
) | |
listener.start() | |
print('Screenshot analysis utility started!') | |
print(f'Press Ctrl+M to take a screenshot. Files will be saved to: {SCREENSHOT_DIR}') | |
print('Each screenshot will be automatically analyzed by AI') | |
print('Press Ctrl+C to exit.') | |
# Start the overlay (this will block in the main thread) | |
analyzer.status_overlay.start() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment