Last active
September 22, 2025 23:41
-
-
Save elcritch/3ef18bbd1b718b9d6bc26774a870b1b8 to your computer and use it in GitHub Desktop.
Simple Pyauto Rest API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyautogui | |
import subprocess | |
import time | |
import logging | |
from datetime import datetime | |
# Create logger | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.ERROR) | |
# Create formatters | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
# # Create file handler | |
# file_handler = logging.FileHandler('app.log') | |
# file_handler.setLevel(logging.INFO) | |
# file_handler.setFormatter(formatter) | |
# Create console handler | |
# console_handler = logging.StreamHandler() | |
# console_handler.setLevel(logging.DEBUG) | |
# console_handler.setFormatter(formatter) | |
# Add handlers to logger | |
# logger.addHandler(file_handler) | |
# logger.addHandler(console_handler) | |
def launch_trinamix_app(): | |
"""Launch the trinamiX Spectroscopy UWP app""" | |
try: | |
# Method 1: Using subprocess.run (recommended) | |
result = subprocess.run([ | |
"explorer.exe", | |
"shell:AppsFolder\\trinamiXGmbH.trinamiXSpectroscopy_fsz33ynyg1jg0!App" | |
], capture_output=True, text=True) | |
if result.returncode == 0: | |
logging.info("trinamiX app launched successfully") | |
return True | |
else: | |
logging.error(f"Failed to launch app. Return code: {result.returncode}") | |
logging.error(f"Error output: {result.stderr}") | |
return False | |
except Exception as e: | |
logging.error(f"Exception launching app: {str(e)}") | |
return False | |
def setup_automation(): | |
try: | |
# Log script start | |
logging.info("Script started - waiting 60 seconds before beginning messages") | |
print("Script started - waiting 60 seconds before beginning messages") | |
# Disable pyautogui failsafe to prevent accidental termination | |
pyautogui.FAILSAFE = False | |
# Wait 60 seconds after startup before beginning | |
# time.sleep(6) | |
logging.info("Starting message display loop") | |
if launch_trinamix_app(): | |
logging.info("Initial app launch successful") | |
else: | |
logging.error("Initial app launch failed, trying alternative method") | |
except KeyboardInterrupt: | |
logging.info("Script stopped by user") | |
except Exception as e: | |
logging.error(f"Fatal error: {str(e)}") | |
# if __name__ == "__main__": | |
# setup_automation() | |
import asyncio | |
import json | |
import logging | |
from datetime import datetime | |
from typing import List, Optional | |
from contextlib import asynccontextmanager | |
from fastapi import FastAPI, HTTPException | |
from fastapi.responses import StreamingResponse | |
from pydantic import BaseModel | |
import uvicorn | |
import pyautogui | |
from io import BytesIO | |
import os | |
# Import our local automation module | |
import pyauto | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
pyauto.setup_automation() | |
# Data models | |
class Message(BaseModel): | |
content: str | |
type: str = "message" | |
class BroadcastMessage(BaseModel): | |
message: str | |
type: str = "broadcast" | |
class PingMessage(BaseModel): | |
type: str = "ping" | |
class ClickRequest(BaseModel): | |
x: int | |
y: int | |
clicks: int = 1 | |
button: str = "left" # left, right, middle | |
interval: Optional[float] = None | |
class FindImageRequest(BaseModel): | |
image_path: str | |
confidence: Optional[float] = 0.8 # Confidence level for image matching | |
return_center: bool = True # Return center coordinates instead of top-left | |
class EnterTextRequest(BaseModel): | |
text: str | |
interval: Optional[float] = None # Seconds between keystrokes | |
presses: Optional[List[str]] = None # Optional sequence of keys to press after typing | |
# Global state for broadcasting | |
broadcast_queue: asyncio.Queue = asyncio.Queue() | |
polling_clients: List[asyncio.Queue] = [] | |
@asynccontextmanager | |
async def lifespan(app: FastAPI): | |
"""Lifespan context manager for FastAPI""" | |
logger.info("Starting simple REST server...") | |
try: | |
yield | |
finally: | |
logger.info("Shutting down simple REST server...") | |
app = FastAPI( | |
title="Simple Automation Server REST API", | |
description="Simplified HTTP REST API with broadcasting to all polling clients", | |
version="1.0.0", | |
lifespan=lifespan | |
) | |
@app.post("/api/message") | |
async def send_message(message: Message): | |
"""Send a message and get echo response""" | |
logger.info(f"Received message: {message.content}") | |
# Echo the message back | |
response = { | |
"type": "echo", | |
"original_message": message.content, | |
"timestamp": datetime.now().isoformat() | |
} | |
return response | |
@app.post("/api/ping") | |
async def ping(): | |
"""Handle ping request""" | |
logger.info("Received ping") | |
# Send pong response | |
response = { | |
"type": "pong", | |
"timestamp": datetime.now().isoformat() | |
} | |
return response | |
@app.post("/api/broadcast") | |
async def broadcast_message(broadcast_msg: BroadcastMessage): | |
"""Broadcast a message to all polling clients""" | |
logger.info(f"Broadcasting message: {broadcast_msg.message}") | |
# Create broadcast message | |
broadcast_data = { | |
"type": "broadcast", | |
"message": broadcast_msg.message, | |
"timestamp": datetime.now().isoformat() | |
} | |
# Send to all polling clients | |
clients_notified = 0 | |
for client_queue in polling_clients.copy(): # Use copy to avoid modification during iteration | |
try: | |
await client_queue.put(broadcast_data) | |
clients_notified += 1 | |
except Exception as e: | |
logger.error(f"Failed to send broadcast to client: {e}") | |
# Remove broken client queue | |
try: | |
polling_clients.remove(client_queue) | |
except ValueError: | |
pass | |
logger.info(f"Broadcast sent to {clients_notified} polling clients") | |
# Return confirmation | |
return { | |
"type": "broadcast_sent", | |
"message": f"Message broadcasted to {clients_notified} clients", | |
"timestamp": datetime.now().isoformat() | |
} | |
@app.post("/api/enter_text") | |
async def enter_text(req: EnterTextRequest): | |
"""Type text using the keyboard with optional interval and post-typing key presses""" | |
logger.info( | |
( | |
"Entering text of length %d with interval=%s, presses=%s" | |
% (len(req.text), str(req.interval), str(req.presses)) | |
) | |
) | |
try: | |
kwargs = {} | |
if req.interval is not None: | |
kwargs["interval"] = req.interval | |
pyautogui.write(req.text, **kwargs) | |
# Execute any requested key presses after typing | |
if req.presses: | |
for key in req.presses: | |
try: | |
pyautogui.press(key) | |
except Exception as e: | |
logger.error(f"Failed to press key '{key}': {e}") | |
raise | |
return { | |
"type": "enter_text_success", | |
"message": "Text entered successfully", | |
"chars": len(req.text), | |
"presses": req.presses or [], | |
"timestamp": datetime.now().isoformat(), | |
} | |
except Exception as e: | |
logger.error(f"Error entering text: {e}") | |
raise HTTPException(status_code=500, detail=f"Failed to enter text: {str(e)}") | |
@app.post("/api/click") | |
async def click_gui(click_request: ClickRequest): | |
"""Perform a GUI click at specified coordinates""" | |
logger.info(f"Performing GUI click at ({click_request.x}, {click_request.y}) with {click_request.clicks} clicks") | |
try: | |
# Perform the click using pyautogui | |
click_kwargs = { | |
"x": click_request.x, | |
"y": click_request.y, | |
"clicks": click_request.clicks, | |
"button": click_request.button | |
} | |
# Only add interval if it's not None | |
if click_request.interval is not None: | |
click_kwargs["interval"] = click_request.interval | |
pyautogui.click(**click_kwargs) | |
response = { | |
"type": "click_success", | |
"message": f"Successfully clicked at ({click_request.x}, {click_request.y})", | |
"coordinates": {"x": click_request.x, "y": click_request.y}, | |
"clicks": click_request.clicks, | |
"button": click_request.button, | |
"timestamp": datetime.now().isoformat() | |
} | |
logger.info(f"GUI click successful: {response}") | |
return response | |
except Exception as e: | |
logger.error(f"Error performing GUI click: {e}") | |
raise HTTPException( | |
status_code=500, | |
detail=f"Failed to perform GUI click: {str(e)}" | |
) | |
@app.post("/api/find_image") | |
async def find_image(find_request: FindImageRequest): | |
"""Find an image on the screen and return its coordinates""" | |
logger.info(f"Searching for image: {find_request.image_path}") | |
try: | |
image_path = os.path.join(os.path.dirname(__file__), find_request.image_path) | |
# Check if image file exists | |
if not os.path.exists(image_path): | |
logger.error(f"Image file not found: {image_path}") | |
raise HTTPException( | |
status_code=404, | |
detail=f"Image file not found: {image_path}" | |
) | |
# Try to find the image on screen | |
try: | |
if find_request.return_center: | |
# Find center coordinates | |
location = pyautogui.locateCenterOnScreen(image_path, confidence=find_request.confidence) | |
else: | |
# Find top-left coordinates | |
location = pyautogui.locateOnScreen(image_path, confidence=find_request.confidence) | |
except pyautogui.ImageNotFoundException: | |
location = None | |
if location is None: | |
# Image not found | |
response = { | |
"type": "image_not_found", | |
"message": f"Image not found on screen: {image_path}", | |
"image_path": image_path, | |
"confidence": find_request.confidence, | |
"timestamp": datetime.now().isoformat() | |
} | |
logger.info(f"Image not found: {image_path}") | |
return response | |
else: | |
# Image found | |
if find_request.return_center: | |
# location is a Point-like object with x, y coordinates | |
coordinates = {"x": int(location[0]), "y": int(location[1])} | |
else: | |
# location is a Box-like object with left, top, width, height | |
coordinates = { | |
"left": location.left, | |
"top": location.top, | |
"width": location.width, | |
"height": location.height, | |
"center_x": location.left + location.width // 2, | |
"center_y": location.top + location.height // 2 | |
} | |
response = { | |
"type": "image_found", | |
"message": f"Image found on screen: {find_request.image_path}", | |
"image_path": find_request.image_path, | |
"coordinates": coordinates, | |
"confidence": find_request.confidence, | |
"return_center": find_request.return_center, | |
"timestamp": datetime.now().isoformat() | |
} | |
logger.info(f"Image found: {image_path} at {coordinates}") | |
return response | |
except Exception as e: | |
logger.error(f"Error finding image: {e}") | |
raise HTTPException( | |
status_code=500, | |
detail=f"Failed to find image: {str(e)}" | |
) | |
@app.get("/api/screenshot") | |
async def take_screenshot(filename: str = "screenshot.png"): | |
"""Take a screenshot and return it as a downloadable file""" | |
logger.info(f"Taking screenshot with filename: {filename}") | |
try: | |
# Take screenshot using pyautogui | |
screenshot = pyautogui.screenshot() | |
# Save to an in-memory buffer as PNG | |
buffer = BytesIO() | |
screenshot.save(buffer, format="PNG") | |
buffer.seek(0) | |
logger.info(f"Screenshot generated in-memory ({buffer.getbuffer().nbytes} bytes)") | |
# Stream the PNG directly without writing to disk | |
return StreamingResponse( | |
buffer, | |
media_type="image/png", | |
) | |
except Exception as e: | |
logger.error(f"Error taking screenshot: {e}") | |
raise HTTPException( | |
status_code=500, | |
detail=f"Failed to take screenshot: {str(e)}" | |
) | |
@app.get("/api/poll") | |
async def poll_messages(timeout: int = 30): | |
"""Long polling endpoint for receiving broadcast messages""" | |
# Create a queue for this polling client | |
client_queue = asyncio.Queue() | |
polling_clients.append(client_queue) | |
try: | |
# Wait for a message with timeout | |
message = await asyncio.wait_for(client_queue.get(), timeout=timeout) | |
logger.info(f"Sending broadcast to polling client: {message}") | |
return message | |
except asyncio.TimeoutError: | |
# Return timeout response | |
return { | |
"type": "timeout", | |
"message": "No broadcasts received", | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Error in polling: {e}") | |
raise HTTPException(status_code=500, detail="Internal server error") | |
finally: | |
# Remove client queue when done | |
try: | |
polling_clients.remove(client_queue) | |
except ValueError: | |
pass | |
@app.get("/api/status") | |
async def get_server_status(): | |
"""Get server status""" | |
return { | |
"server_status": "running", | |
"polling_clients": len(polling_clients), | |
"timestamp": datetime.now().isoformat() | |
} | |
@app.get("/") | |
async def root(): | |
"""Root endpoint with API information""" | |
return { | |
"message": "Simple Automation Server REST API", | |
"version": "1.0.0", | |
"endpoints": { | |
"send_message": "POST /api/message", | |
"ping": "POST /api/ping", | |
"broadcast": "POST /api/broadcast", | |
"click": "POST /api/click", | |
"enter_text": "POST /api/enter_text", | |
"find_image": "POST /api/find_image", | |
"screenshot": "GET /api/screenshot?filename=screenshot.png", | |
"poll": "GET /api/poll?timeout=30", | |
"status": "GET /api/status" | |
}, | |
"documentation": "/docs", | |
"timestamp": datetime.now().isoformat() | |
} | |
if __name__ == "__main__": | |
try: | |
uvicorn.run( | |
app, | |
host="0.0.0.0", | |
port=8766, | |
log_level="info", | |
access_log=True | |
) | |
except KeyboardInterrupt: | |
logger.info("Server stopped by user") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment