Created
December 8, 2023 05:43
-
-
Save graylan0/ea4df29f79d1e5377cc0bd009c58f93f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
import numpy as np | |
import asyncio | |
import threading | |
import aiosqlite | |
from io import BytesIO | |
import logging | |
import curses | |
from llama_cpp import Llama | |
from llama_cpp.llama_chat_format import Llava15ChatHandler | |
import base64 | |
import json | |
import datetime | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
class CodeContext: | |
def __init__(self): | |
self.indent_level = 0 | |
self.in_function = False | |
def update_context(self, line): | |
stripped_line = line.strip() | |
if stripped_line.startswith('def ') and stripped_line.endswith(':'): | |
self.in_function = True | |
self.indent_level = len(line) - len(stripped_line) | |
elif self.in_function and len(line) - len(stripped_line) <= self.indent_level: | |
self.in_function = False | |
class MultiverseCodeTester: | |
def __init__(self, prompts_file, config_file, llama_model_path, llama_clip_model_path): | |
self.coords = {"X": 34, "Y": 76, "Z": 12, "T": 5633} | |
self.stop_requested = False | |
self.db_path = "test_results.db" | |
self.api_key = self.load_api_key(config_file) | |
self.shot_log = [] # Log for paintball shots | |
with open(prompts_file, 'r') as file: | |
self.prompts = json.load(file) | |
chat_handler = Llava15ChatHandler(clip_model_path=llama_clip_model_path) | |
self.llm = Llama(model_path=llama_model_path, chat_handler=chat_handler, n_ctx=2048) | |
def load_api_key(self, config_file): | |
with open(config_file, 'r') as file: | |
config = json.load(file) | |
return config.get('api_key') | |
async def initialize_db(self): | |
async with aiosqlite.connect(self.db_path) as db: | |
await db.execute('''CREATE TABLE IF NOT EXISTS test_results ( | |
filename TEXT, | |
test_case TEXT, | |
result TEXT, | |
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP | |
)''') | |
await db.commit() | |
def capture_image(self): | |
cap = cv2.VideoCapture(0) | |
ret, frame = cap.read() | |
if ret: | |
cv2.imwrite("captured_frame.jpg", frame) | |
return frame | |
cap.release() | |
return None | |
def process_image_for_target(self, image): | |
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) | |
lower_red = np.array([30, 150, 50]) | |
upper_red = np.array([255, 255, 180]) | |
mask = cv2.inRange(hsv, lower_red, upper_red) | |
res = cv2.bitwise_and(image, image, mask=mask) | |
return res | |
def simulate_paintball_shot(self, target_coords): | |
logging.info(f"Paintball shot simulated at coordinates: {target_coords}") | |
self.shot_log.append({ | |
'timestamp': datetime.datetime.now().isoformat(), | |
'coordinates': target_coords | |
}) | |
def extract_target_coordinates(self, image): | |
contours, _ = cv2.findContours(image, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) | |
for contour in contours: | |
M = cv2.moments(contour) | |
if M["m00"] != 0: | |
cX = int(M["m10"] / M["m00"]) | |
cY = int(M["m01"] / M["m00"]) | |
return (cX, cY) # Coordinates of the center of the contour | |
return None # No target found | |
async def run_tests(self, stdscr, file_path): | |
captured_image = self.capture_image() | |
if captured_image is not None: | |
processed_image = self.process_image_for_target(captured_image) | |
target_coords = self.extract_target_coordinates(processed_image) | |
if target_coords: | |
self.simulate_paintball_shot(target_coords) | |
target_summary = await self.generate_target_summary() | |
await self.export_to_markdown("target_report.md", target_summary) | |
stdscr.addstr(12, 0, "Tests completed and report generated.") | |
stdscr.refresh() | |
async def generate_target_summary(self): | |
summary = "\n".join([f"Shot at {log['timestamp']}: {log['coordinates']}" for log in self.shot_log]) | |
return summary | |
async def export_to_markdown(self, filename, target_summary): | |
with open(filename, 'w') as md_file: | |
md_file.write("# Multiverse Code Testing Report\n\n") | |
md_file.write("## Target Summary\n") | |
md_file.write(f"{target_summary}\n") | |
def extract_target_coordinates(self, image): | |
"""Extract target coordinates from the processed image.""" | |
# Placeholder logic for extracting target coordinates | |
# This should be replaced with actual image processing logic | |
# Example: Find the center of the detected area | |
contours, _ = cv2.findContours(image, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) | |
for cnt in contours: | |
M = cv2.moments(cnt) | |
if M['m00'] != 0: | |
cx = int(M['m10'] / M['m00']) | |
cy = int(M['m01'] / M['m00']) | |
return (cx, cy) # Coordinates of the center | |
return None | |
async def generate_target_summary(self): | |
"""Generate a summary of the targets hit.""" | |
summary = "\n".join([f"Shot at {log['timestamp']}: {log['coordinates']}" for log in self.shot_log]) | |
return summary | |
async def export_to_markdown(self, filename, target_summary): | |
with open(filename, 'w') as md_file: | |
md_file.write("# Multiverse Code Testing Report\n\n") | |
md_file.write("## Target Summary\n") | |
md_file.write(f"{target_summary}\n") | |
async def execute_tests(self, test_cases, filename): | |
"""Execute the simulated tests and log results.""" | |
async with aiosqlite.connect(self.db_path) as db: | |
for test in test_cases: | |
result = "pass" # Placeholder for actual test result logic | |
await db.execute("INSERT INTO test_results (filename, test_case, result) VALUES (?, ?, ?)", (filename, test, result)) | |
await db.commit() | |
def run_tui(tester): | |
def start_tui(stdscr): | |
curses.curs_set(0) | |
stdscr.clear() | |
stdscr.addstr("Multiverse Code Tester\n", curses.A_BOLD) | |
stdscr.addstr("Press 'q' to exit, 'r' to run code tests, 's' to stop\n\n") | |
stdscr.refresh() | |
is_running = False | |
while True: | |
stdscr.addstr(4, 0, f"Current Coordinates: {tester.coords} ") | |
stdscr.refresh() | |
key = stdscr.getch() | |
if key == ord('q'): | |
break | |
elif key == ord('r') and not is_running: | |
stdscr.addstr(6, 0, "Enter file path: ") | |
curses.echo() | |
file_path = stdscr.getstr(7, 0, 60).decode('utf-8') | |
curses.noecho() | |
stdscr.addstr(9, 0, f"Running code tests on {file_path}... ") | |
stdscr.refresh() | |
is_running = True | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(tester.run_tests(stdscr, file_path)) | |
loop.close() | |
is_running = False | |
elif key == ord('s') and is_running: | |
tester.stop_requested = True | |
is_running = False | |
stdscr.addstr(10, 0, "Stopping tests... ") | |
stdscr.refresh() | |
curses.wrapper(start_tui) | |
async def main(): | |
tester = MultiverseCodeTester("prompts.json", "config.json", "path/to/llama-model.gguf", "path/to/llava/mmproj.bin") | |
await tester.initialize_db() | |
tui_thread = threading.Thread(target=run_tui, args=(tester,), daemon=True) | |
tui_thread.start() | |
tui_thread.join() | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment