Created
December 8, 2023 04:30
-
-
Save graylan0/792b293dd5be91c395d04b5ff5cee864 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import aiohttp | |
import asyncio | |
import json | |
import pyautogui | |
import ast | |
import aiosqlite | |
from io import BytesIO | |
from PIL import Image | |
import datetime | |
import logging | |
import curses | |
import threading | |
import time | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
class MultiverseCodeTester: | |
def __init__(self, prompts_file, config_file): | |
self.coordinates = {"X": 34, "Y": 76, "Z": 12, "T": 5633} | |
self.stop_requested = False | |
self.db_path = "test_results.db" | |
self.api_key = self.load_api_key(config_file) | |
with open(prompts_file, 'r') as file: | |
self.prompts = json.load(file) | |
def load_api_key(self, config_file): | |
with open(config_file, 'r') as file: | |
config = json.load(file) | |
return config.get('api_key') | |
async def initialize_db(self): | |
async with aiosqlite.connect(self.db_path) as db: | |
await db.execute('''CREATE TABLE IF NOT EXISTS test_results ( | |
filename TEXT, | |
test_case TEXT, | |
result TEXT, | |
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP | |
)''') | |
await db.commit() | |
def parse_code(self, file_path): | |
try: | |
with open(file_path, "r") as file: | |
code = file.read() | |
parsed_code = ast.dump(ast.parse(code), annotate_fields=True, include_attributes=True) | |
return parsed_code | |
except Exception as e: | |
logging.error(f"Error parsing code from {file_path}: {e}") | |
return None | |
async def send_request_to_gpt4_vision(self, text, encoded_image=None): | |
async with aiohttp.ClientSession() as session: | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {self.api_key}" | |
} | |
messages = [{"role": "user", "content": text}] | |
if encoded_image: | |
messages.append({"role": "system", "content": f"data:image/png;base64,{encoded_image}"}) | |
payload = { | |
"model": "gpt-4-vision-preview", | |
"messages": messages, | |
"max_tokens": 300 | |
} | |
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) as response: | |
return await response.json() | |
async def send_request_to_gpt(self, prompt): | |
async with aiohttp.ClientSession() as session: | |
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"} | |
payload = { | |
"model": "gpt-3.5-turbo", | |
"messages": [{"role": "user", "content": prompt}], | |
"max_tokens": 1500 | |
} | |
async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) as response: | |
return await response.json() | |
def capture_screenshot(self): | |
try: | |
# Capture the screenshot directly into a BytesIO buffer | |
screenshot_buffer = BytesIO() | |
screenshot = pyautogui.screenshot() | |
screenshot.save(screenshot_buffer, format="PNG") | |
screenshot_buffer.seek(0) # Reset buffer position to the beginning | |
return screenshot_buffer | |
except Exception as e: | |
logging.error(f"Error capturing screenshot: {e}") | |
return None | |
def encode_image(self, image_buffer): | |
if image_buffer is None: | |
logging.error("No image buffer to encode") | |
return None | |
try: | |
# The image is already in the buffer, so just encode it to base64 | |
return base64.b64encode(image_buffer.getvalue()).decode('utf-8') | |
except Exception as e: | |
logging.error(f"Error encoding image: {e}") | |
return None | |
async def analyze_code_with_gpt(self, code): | |
return await self.send_request_to_gpt(self.prompts.get("code_analysis_prompt", "Analyze this code:") + "\n\n" + code) | |
async def generate_test_cases_with_gpt(self, code): | |
return await self.send_request_to_gpt(self.prompts.get("test_case_generation_prompt", "Generate test cases for this code:") + "\n\n" + code) | |
async def execute_tests(self, test_cases, filename): | |
async with aiosqlite.connect(self.db_path) as db: | |
for test in test_cases: | |
await db.execute("INSERT INTO test_results (filename, test_case, result) VALUES (?, ?, ?)", (filename, test, "pass")) | |
await db.commit() | |
async def generate_code_summary(self, filename): | |
with open(filename, "r") as file: | |
response = await self.send_request_to_gpt(self.prompts["code_summary_prompt"].format(code=file.read())) | |
return response.get("choices", [{}])[0].get("text", "").strip() if response else "Failed to generate summary" | |
async def generate_testing_summary(self): | |
async with aiosqlite.connect(self.db_path) as db: | |
cursor = await db.execute("SELECT filename, COUNT(*) as test_count, SUM(CASE WHEN result = 'pass' THEN 1 ELSE 0 END) as pass_count FROM test_results GROUP BY filename") | |
rows = await cursor.fetchall() | |
return "\n".join([f"{row[0]}: {row[2]}/{row[1]} tests passed" for row in rows]) | |
async def export_to_markdown(self, filename, test_result, code_analysis, test_cases, code_summary, testing_summary): | |
with open(filename, 'w') as md_file: | |
md_file.write("# Multiverse Code Testing Report\n\n") | |
md_file.write("## GUI Analysis\n") | |
md_file.write(f"```\n{json.dumps(test_result, indent=4)}\n```\n\n") | |
md_file.write("## Code Analysis\n") | |
md_file.write(f"```\n{code_analysis}\n```\n\n") | |
md_file.write("## Test Cases\n") | |
md_file.write(f"```\n{test_cases}\n```\n\n") | |
md_file.write("## Code Summary\n") | |
md_file.write(f"{code_summary}\n\n") | |
md_file.write("## Testing Summary\n") | |
md_file.write(f"{testing_summary}\n") | |
async def run_tests(self, stdscr, file_path): | |
screenshot_buffer = self.capture_screenshot() | |
if screenshot_buffer is None: | |
logging.error("Failed to capture screenshot") | |
return | |
encoded_image = self.encode_image(screenshot_buffer) | |
if encoded_image is None: | |
logging.error("Failed to encode image") | |
return | |
test_result = await self.send_request_to_gpt4_vision(self.prompts["gui_analysis_prompt"], encoded_image) | |
code_analysis = self.parse_code(file_path) | |
analyzed_code = await self.analyze_code_with_gpt(code_analysis) | |
test_cases = await self.generate_test_cases_with_gpt(code_analysis) | |
await self.execute_tests(test_cases, file_path) | |
code_summary = await self.generate_code_summary(file_path) | |
testing_summary = await self.generate_testing_summary() | |
await self.export_to_markdown("test_report.md", test_result, analyzed_code, test_cases, code_summary, testing_summary) | |
stdscr.addstr(12, 0, "Tests completed and report generated.") | |
stdscr.refresh() | |
def run_tui(tester): | |
def start_tui(stdscr): | |
curses.curs_set(0) | |
stdscr.clear() | |
stdscr.addstr("Multiverse Code Tester\n", curses.A_BOLD) | |
stdscr.addstr("Press 'q' to exit, 'r' to run code tests, 's' to stop\n\n") | |
stdscr.refresh() | |
is_running = False | |
while True: | |
stdscr.addstr(4, 0, f"Current Coordinates: {tester.coordinates} ") | |
stdscr.refresh() | |
key = stdscr.getch() | |
if key == ord('q'): | |
break | |
elif key == ord('r') and not is_running: | |
stdscr.addstr(6, 0, "Enter file path: ") | |
curses.echo() | |
file_path = stdscr.getstr(7, 0, 60).decode('utf-8') | |
curses.noecho() | |
stdscr.addstr(9, 0, f"Running code tests on {file_path}... ") | |
stdscr.refresh() | |
is_running = True | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(tester.run_tests(stdscr, file_path)) | |
loop.close() | |
is_running = False | |
elif key == ord('s') and is_running: | |
tester.stop_requested = True | |
is_running = False | |
stdscr.addstr(10, 0, "Stopping tests... ") | |
stdscr.refresh() | |
curses.wrapper(start_tui) | |
async def main(): | |
tester = MultiverseCodeTester("prompts.json", "config.json") | |
await tester.initialize_db() | |
tui_thread = threading.Thread(target=run_tui, args=(tester,), daemon=True) | |
tui_thread.start() | |
tui_thread.join() | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment