Last active
January 22, 2024 16:31
-
-
Save graylan0/b33ab12eb67e48696cc279903f4a92d7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pennylane as qml | |
import numpy as np | |
import requests | |
import random | |
import sys | |
import io | |
import base64 | |
from PIL import Image | |
import httpx | |
import concurrent.futures | |
import logging | |
import os | |
import json | |
from kivymd.app import MDApp | |
from kivymd.uix.screen import MDScreen | |
from kivymd.uix.button import MDRaisedButton | |
from kivymd.uix.boxlayout import MDBoxLayout | |
from kivymd.uix.textfield import MDTextField | |
from kivymd.uix.image import AsyncImage | |
from kivymd.uix.dialog import MDDialog | |
from kivy.uix.colorpicker import ColorPicker | |
with open('config.json', 'r') as f: | |
config = json.load(f) | |
openai_api_key = config['openai_api_key'] | |
openai_api_key = OPENAI_API_KEY | |
logging.basicConfig(level=logging.INFO) | |
num_qubits = 6 | |
dev = qml.device('default.qubit', wires=num_qubits) | |
@qml.qnode(dev) | |
def quantum_circuit(color_code, amplitude): | |
r, g, b = [int(color_code[i:i+2], 16) for i in (1, 3, 5)] | |
r, g, b = r / 255.0, g / 255.0, b / 255.0 | |
qml.RY(r * np.pi, wires=0) | |
qml.RY(g * np.pi, wires=1) | |
qml.RY(b * np.pi, wires=2) | |
qml.RY(amplitude * np.pi, wires=3) | |
qml.CNOT(wires=[0, 1]) | |
qml.CNOT(wires=[1, 2]) | |
qml.CNOT(wires=[2, 3]) | |
return qml.state() | |
def mixed_state_to_color_code(mixed_state): | |
norm_probs = mixed_state / np.sum(mixed_state) | |
r = int(sum(norm_probs[:2]) * 255) | |
g = int(sum(norm_probs[2:4]) * 255) | |
b = int(sum(norm_probs[4:]) * 255) | |
return f'#{r:02x}{g:02x}{b:02x}' | |
class QuantumImageApp(MDApp): | |
def __init__(self, **kwargs): | |
super().__init__(**kwargs) | |
self.max_runs = 5 | |
self.dynamic_prompt = None | |
self.root = MDScreen() | |
self.image_display = AsyncImage(source="") | |
self.color_picker = ColorPicker() | |
self.create_gui() | |
def create_gui(self): | |
layout = MDBoxLayout(orientation="vertical") | |
run_button = MDRaisedButton(text="Run", on_press=self.run_telephone_stack) | |
reset_button = MDRaisedButton(text="Reset", on_press=self.reset_telephone_stack) | |
randomize_button = MDRaisedButton(text="Randomize", on_press=self.randomize_telephone_stack) | |
text_box = MDTextField(hint_text="Enter text for sentiment analysis") | |
color_button = MDRaisedButton(text="Color Sense", on_press=self.get_quantum_state_from_color) | |
layout.add_widget(run_button) | |
layout.add_widget(reset_button) | |
layout.add_widget(randomize_button) | |
layout.add_widget(text_box) | |
layout.add_widget(color_button) | |
layout.add_widget(self.image_display) | |
layout.add_widget(self.color_picker) | |
self.root.add_widget(layout) | |
def run_telephone_stack(self, *args): | |
self.reset_telephone_stack() # Reset before starting a new run | |
params = np.random.random(num_qubits) * np.pi | |
result = self.integrated_quantum_image_analysis(params, 1, self.dynamic_prompt) | |
self.display_result(result) | |
def reset_telephone_stack(self, *args): | |
self.dynamic_prompt = None | |
self.image_display.source = "" | |
def randomize_telephone_stack(self, *args): | |
self.reset_telephone_stack() | |
self.max_runs = random.randint(1, 10) | |
self.run_telephone_stack() | |
def display_result(self, result): | |
analysis_result = result.get("analysis_result") | |
dynamic_result = result.get("dynamic_result") | |
timestamp = result.get("timestamp") | |
if analysis_result is not None: | |
image_filename = analysis_result.get("image_filename") | |
if image_filename is not None: | |
self.image_display.source = image_filename | |
logging.info(f"Saved image with quantum state at timestamp {timestamp} in folder {os.path.dirname(image_filename)}.") | |
if dynamic_result is not None: | |
logging.info(f"Analysis result: {analysis_result}, Dynamic result: {dynamic_result}") | |
def get_quantum_state_from_color(self, *args): | |
color = self.color_picker.color | |
quantum_state = self.get_quantum_state_from_color_code(color) | |
logging.info(f"Quantum state from color {color}: {quantum_state}") | |
self.display_quantum_state(quantum_state) | |
def get_quantum_state_from_color_code(self, color_code): | |
r, g, b = [int(color_code[i:i+2], 16) for i in (1, 3, 5)] | |
amplitude = r / 255.0 | |
quantum_state = quantum_circuit(color_code, amplitude) | |
return quantum_state | |
def display_quantum_state(self, quantum_state): | |
color_code = mixed_state_to_color_code(quantum_state) | |
logging.info(f"Color code from quantum state: {color_code}") | |
self.color_picker.color = color_code | |
def generate_image_from_quantum_data(quantum_data, timestamp): | |
color_code = mixed_state_to_color_code(quantum_data) | |
prompt = f"Generate an image with predominant color {color_code} at timestamp {timestamp}" | |
url = 'http://127.0.0.1:7860/sdapi/v1/txt2img' | |
payload = { | |
"prompt": prompt, | |
"steps": 121, | |
"seed": random.randrange(sys.maxsize), | |
"enable_hr": "false", | |
"denoising_strength": "0.7", | |
"cfg_scale": "7", | |
"width": 366, | |
"height": 856, | |
"restore_faces": "true", | |
} | |
try: | |
response = requests.post(url, json=payload) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as err: | |
logging.error(f"Error generating image: {err}") | |
return None | |
try: | |
r = response.json() | |
image_data = r['images'][0] | |
image = Image.open(io.BytesIO(base64.b64decode(image_data.split(",", 1)[1]))) | |
return image | |
except Exception as e: | |
logging.error(f"Error processing image data: {e}") | |
return None | |
def encode_image_to_base64(image): | |
try: | |
buffered = io.BytesIO() | |
image.save(buffered, format="JPEG") | |
return base64.b64encode(buffered.getvalue()).decode('utf-8') | |
except Exception as e: | |
logging.error(f"Error encoding image to base64: {e}") | |
return None | |
async def analyze_image_with_gpt4_vision(base64_image, timestamp): | |
async with httpx.AsyncClient() as client: | |
try: | |
response = await client.post( | |
"https://api.openai.com/v1/chat/completions", | |
headers={"Authorization": "Bearer $OPENAI_API_KEY"}, | |
json={ | |
"model": "gpt-4-vision-preview", | |
"messages": [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "image", "data": base64_image} | |
] | |
} | |
] | |
} | |
) | |
response.raise_for_status() | |
except httpx.HTTPStatusError as err: | |
logging.error(f"Error from GPT-4 Vision API: {err}") | |
return None | |
return {"result": response.json(), "timestamp": timestamp} | |
async def generate_dynamic_prompt(previous_result, timestamp): | |
async with httpx.AsyncClient() as client: | |
try: | |
response = await client.post( | |
"https://api.openai.com/v1/chat/completions", | |
headers={"Authorization": "Bearer $OPENAI_API_KEY"}, | |
json={ | |
"prompt": f"Based on the analysis result at timestamp {previous_result['timestamp']}, generate a dynamic prompt for timestamp {timestamp}", | |
"max_tokens": 50 | |
} | |
) | |
response.raise_for_status() | |
except httpx.HTTPStatusError as err: | |
logging.error(f"Error from GPT API: {err}") | |
return None | |
return response.json() | |
async def integrated_quantum_image_analysis(params, timestamp, dynamic_prompt=None): | |
quantum_state = quantum_circuit(params) | |
logging.info(f"Quantum state: {quantum_state}") | |
image = generate_image_from_quantum_data(quantum_state, timestamp) | |
if image is not None: | |
logging.info(f"Generated image from quantum state at timestamp {timestamp}.") | |
base64_image = encode_image_to_base64(image) | |
if base64_image is not None: | |
logging.info(f"Encoded image to base64.") | |
analysis_result = await analyze_image_with_gpt4_vision(base64_image, timestamp) | |
if analysis_result is not None: | |
logging.info(f"Analyzed image with GPT-4 Vision at timestamp {timestamp}.") | |
if dynamic_prompt is not None: | |
dynamic_result = await generate_dynamic_prompt(analysis_result, timestamp) | |
if dynamic_result is not None: | |
logging.info(f"Generated dynamic prompt for timestamp {timestamp}.") | |
return {"analysis_result": analysis_result, "dynamic_result": dynamic_result} | |
return {"analysis_result": analysis_result, "timestamp": timestamp} | |
return None | |
def run_telephone_stack(max_runs): | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
futures = [] | |
dynamic_prompt = None | |
for i in range(max_runs): | |
timestamp = i + 1 | |
params = np.random.random(num_qubits) * np.pi | |
future = executor.submit(integrated_quantum_image_analysis, params, timestamp, dynamic_prompt) | |
futures.append(future) | |
if i >= 1: | |
dynamic_prompt_future = executor.submit(generate_dynamic_prompt, futures[i - 1].result(), timestamp) | |
dynamic_prompt = dynamic_prompt_future.result()["choices"][0]["text"].strip() | |
for future in concurrent.futures.as_completed(futures): | |
result = future.result() | |
if result is not None: | |
print(result) | |
if __name__ == "__main__": | |
QuantumImageApp().run() | |
max_runs = 5 | |
run_telephone_stack(max_runs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment