Skip to content

Instantly share code, notes, and snippets.

@graylan0
Last active January 22, 2024 16:31
Show Gist options
  • Save graylan0/b33ab12eb67e48696cc279903f4a92d7 to your computer and use it in GitHub Desktop.
Save graylan0/b33ab12eb67e48696cc279903f4a92d7 to your computer and use it in GitHub Desktop.
import pennylane as qml
import numpy as np
import requests
import random
import sys
import io
import base64
from PIL import Image
import httpx
import concurrent.futures
import logging
import os
import json
from kivymd.app import MDApp
from kivymd.uix.screen import MDScreen
from kivymd.uix.button import MDRaisedButton
from kivymd.uix.boxlayout import MDBoxLayout
from kivymd.uix.textfield import MDTextField
from kivymd.uix.image import AsyncImage
from kivymd.uix.dialog import MDDialog
from kivy.uix.colorpicker import ColorPicker
with open('config.json', 'r') as f:
config = json.load(f)
openai_api_key = config['openai_api_key']
openai_api_key = OPENAI_API_KEY
logging.basicConfig(level=logging.INFO)
num_qubits = 6
dev = qml.device('default.qubit', wires=num_qubits)
@qml.qnode(dev)
def quantum_circuit(color_code, amplitude):
r, g, b = [int(color_code[i:i+2], 16) for i in (1, 3, 5)]
r, g, b = r / 255.0, g / 255.0, b / 255.0
qml.RY(r * np.pi, wires=0)
qml.RY(g * np.pi, wires=1)
qml.RY(b * np.pi, wires=2)
qml.RY(amplitude * np.pi, wires=3)
qml.CNOT(wires=[0, 1])
qml.CNOT(wires=[1, 2])
qml.CNOT(wires=[2, 3])
return qml.state()
def mixed_state_to_color_code(mixed_state):
norm_probs = mixed_state / np.sum(mixed_state)
r = int(sum(norm_probs[:2]) * 255)
g = int(sum(norm_probs[2:4]) * 255)
b = int(sum(norm_probs[4:]) * 255)
return f'#{r:02x}{g:02x}{b:02x}'
class QuantumImageApp(MDApp):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.max_runs = 5
self.dynamic_prompt = None
self.root = MDScreen()
self.image_display = AsyncImage(source="")
self.color_picker = ColorPicker()
self.create_gui()
def create_gui(self):
layout = MDBoxLayout(orientation="vertical")
run_button = MDRaisedButton(text="Run", on_press=self.run_telephone_stack)
reset_button = MDRaisedButton(text="Reset", on_press=self.reset_telephone_stack)
randomize_button = MDRaisedButton(text="Randomize", on_press=self.randomize_telephone_stack)
text_box = MDTextField(hint_text="Enter text for sentiment analysis")
color_button = MDRaisedButton(text="Color Sense", on_press=self.get_quantum_state_from_color)
layout.add_widget(run_button)
layout.add_widget(reset_button)
layout.add_widget(randomize_button)
layout.add_widget(text_box)
layout.add_widget(color_button)
layout.add_widget(self.image_display)
layout.add_widget(self.color_picker)
self.root.add_widget(layout)
def run_telephone_stack(self, *args):
self.reset_telephone_stack() # Reset before starting a new run
params = np.random.random(num_qubits) * np.pi
result = self.integrated_quantum_image_analysis(params, 1, self.dynamic_prompt)
self.display_result(result)
def reset_telephone_stack(self, *args):
self.dynamic_prompt = None
self.image_display.source = ""
def randomize_telephone_stack(self, *args):
self.reset_telephone_stack()
self.max_runs = random.randint(1, 10)
self.run_telephone_stack()
def display_result(self, result):
analysis_result = result.get("analysis_result")
dynamic_result = result.get("dynamic_result")
timestamp = result.get("timestamp")
if analysis_result is not None:
image_filename = analysis_result.get("image_filename")
if image_filename is not None:
self.image_display.source = image_filename
logging.info(f"Saved image with quantum state at timestamp {timestamp} in folder {os.path.dirname(image_filename)}.")
if dynamic_result is not None:
logging.info(f"Analysis result: {analysis_result}, Dynamic result: {dynamic_result}")
def get_quantum_state_from_color(self, *args):
color = self.color_picker.color
quantum_state = self.get_quantum_state_from_color_code(color)
logging.info(f"Quantum state from color {color}: {quantum_state}")
self.display_quantum_state(quantum_state)
def get_quantum_state_from_color_code(self, color_code):
r, g, b = [int(color_code[i:i+2], 16) for i in (1, 3, 5)]
amplitude = r / 255.0
quantum_state = quantum_circuit(color_code, amplitude)
return quantum_state
def display_quantum_state(self, quantum_state):
color_code = mixed_state_to_color_code(quantum_state)
logging.info(f"Color code from quantum state: {color_code}")
self.color_picker.color = color_code
def generate_image_from_quantum_data(quantum_data, timestamp):
color_code = mixed_state_to_color_code(quantum_data)
prompt = f"Generate an image with predominant color {color_code} at timestamp {timestamp}"
url = 'http://127.0.0.1:7860/sdapi/v1/txt2img'
payload = {
"prompt": prompt,
"steps": 121,
"seed": random.randrange(sys.maxsize),
"enable_hr": "false",
"denoising_strength": "0.7",
"cfg_scale": "7",
"width": 366,
"height": 856,
"restore_faces": "true",
}
try:
response = requests.post(url, json=payload)
response.raise_for_status()
except requests.exceptions.RequestException as err:
logging.error(f"Error generating image: {err}")
return None
try:
r = response.json()
image_data = r['images'][0]
image = Image.open(io.BytesIO(base64.b64decode(image_data.split(",", 1)[1])))
return image
except Exception as e:
logging.error(f"Error processing image data: {e}")
return None
def encode_image_to_base64(image):
try:
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
return base64.b64encode(buffered.getvalue()).decode('utf-8')
except Exception as e:
logging.error(f"Error encoding image to base64: {e}")
return None
async def analyze_image_with_gpt4_vision(base64_image, timestamp):
async with httpx.AsyncClient() as client:
try:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": "Bearer $OPENAI_API_KEY"},
json={
"model": "gpt-4-vision-preview",
"messages": [
{
"role": "user",
"content": [
{"type": "image", "data": base64_image}
]
}
]
}
)
response.raise_for_status()
except httpx.HTTPStatusError as err:
logging.error(f"Error from GPT-4 Vision API: {err}")
return None
return {"result": response.json(), "timestamp": timestamp}
async def generate_dynamic_prompt(previous_result, timestamp):
async with httpx.AsyncClient() as client:
try:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": "Bearer $OPENAI_API_KEY"},
json={
"prompt": f"Based on the analysis result at timestamp {previous_result['timestamp']}, generate a dynamic prompt for timestamp {timestamp}",
"max_tokens": 50
}
)
response.raise_for_status()
except httpx.HTTPStatusError as err:
logging.error(f"Error from GPT API: {err}")
return None
return response.json()
async def integrated_quantum_image_analysis(params, timestamp, dynamic_prompt=None):
quantum_state = quantum_circuit(params)
logging.info(f"Quantum state: {quantum_state}")
image = generate_image_from_quantum_data(quantum_state, timestamp)
if image is not None:
logging.info(f"Generated image from quantum state at timestamp {timestamp}.")
base64_image = encode_image_to_base64(image)
if base64_image is not None:
logging.info(f"Encoded image to base64.")
analysis_result = await analyze_image_with_gpt4_vision(base64_image, timestamp)
if analysis_result is not None:
logging.info(f"Analyzed image with GPT-4 Vision at timestamp {timestamp}.")
if dynamic_prompt is not None:
dynamic_result = await generate_dynamic_prompt(analysis_result, timestamp)
if dynamic_result is not None:
logging.info(f"Generated dynamic prompt for timestamp {timestamp}.")
return {"analysis_result": analysis_result, "dynamic_result": dynamic_result}
return {"analysis_result": analysis_result, "timestamp": timestamp}
return None
def run_telephone_stack(max_runs):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
dynamic_prompt = None
for i in range(max_runs):
timestamp = i + 1
params = np.random.random(num_qubits) * np.pi
future = executor.submit(integrated_quantum_image_analysis, params, timestamp, dynamic_prompt)
futures.append(future)
if i >= 1:
dynamic_prompt_future = executor.submit(generate_dynamic_prompt, futures[i - 1].result(), timestamp)
dynamic_prompt = dynamic_prompt_future.result()["choices"][0]["text"].strip()
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is not None:
print(result)
if __name__ == "__main__":
QuantumImageApp().run()
max_runs = 5
run_telephone_stack(max_runs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment