Created
April 30, 2025 15:28
-
-
Save StonkGodCapital/fe0f980c7b210221d5181a87c832501e to your computer and use it in GitHub Desktop.
GPT-Image-1 Conversational OpenWebUI Function
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
title: GPT-Image-1 Conversational Image Generation | |
description: Pipe to enable conversational image generation and editing with gpt-image-1 | |
author: This version, StonkGodCapital, previous versions credit to original author MorningBean.ai, contributions by MichaelMKenny, Coofdy | |
author_url: https://github.com/StonkGodCapital | |
funding_url: FREE | |
version: 0.5.0 | |
license: MIT | |
requirements: typing, pydantic, openai | |
environment_variables: | |
disclaimer: This pipe is provided as is without any guarantees. | |
Please ensure that it meets your requirements. | |
0.5.0 Add conversational support, changed image generating icon to something with an outline that is recognizable | |
0.4.0 Added support for multiple images in a single message | |
0.3.2 Logic fix to only invoke editing when latets user message (prompt) contains an image. | |
0.3.0 BugFix move to Non-Blocking | |
""" | |
import json | |
import random | |
import base64 | |
import asyncio | |
import re | |
import tempfile | |
import os | |
import logging | |
from typing import List, AsyncGenerator, Callable, Awaitable | |
from pydantic import BaseModel, Field | |
from openai import OpenAI | |
class Pipe: | |
class Valves(BaseModel): | |
OPENAI_API_KEYS: str = Field( | |
default="", description="OpenAI API Keys, comma-separated" | |
) | |
IMAGE_NUM: int = Field(default=2, description="Number of images (1-10)") | |
IMAGE_SIZE: str = Field( | |
default="1024x1024", | |
description="Image size: 1024x1024, 1536x1024, 1024x1536, auto", | |
) | |
IMAGE_QUALITY: str = Field( | |
default="auto", description="Image quality: high, medium, low, auto" | |
) | |
MODERATION: str = Field( | |
default="auto", description="Moderation strictness: auto (default) or low" | |
) | |
# Proxy-related fields have been removed | |
def __init__(self): | |
self.type = "manifold" | |
self.name = "ChatGPT: " | |
self.valves = self.Valves() | |
self.emitter: Callable[[dict], Awaitable[None]] | None = None | |
def _get_proxy_url(self) -> str | None: | |
# Proxy logic has been removed | |
return None | |
async def emit_status(self, message: str = "", done: bool = False): | |
if self.emitter: | |
await self.emitter( | |
{"type": "status", "data": {"description": message, "done": done}} | |
) | |
async def pipes(self) -> List[dict]: | |
return [{"id": "gpt-image-1", "name": "GPT Image 1"}] | |
# Edited to support conversational editing. Adds all images and responses to the generation request | |
# Not advised to use for too many edits, make sure to start new threads or edit old messages to keep excessive images from the request | |
def convert_message_to_prompt(self, messages: List[dict]) -> tuple[str, List[dict]]: | |
all_text_lines = [] | |
all_images = [] | |
# This regex will find markdown images using base64 data | |
pattern = r"!\[[^\]]*\]\(data:([^;]+);base64,([^)]+)\)" | |
for msg in messages: | |
# Process all messages so only need role for conversation history | |
role = msg.get("role") | |
content = msg.get("content") | |
text_parts = [] | |
if isinstance(content, list): | |
# Process mixed content lists | |
for part in content: | |
if part.get("type") == "text": | |
text = part.get("text", "") | |
if text: | |
text_parts.append(text) | |
elif part.get("type") == "image_url": | |
url = part.get("image_url", {}).get("url", "") | |
if url.startswith("data:"): | |
header, data = url.split(";base64,", 1) | |
mime = header.split("data:")[-1] | |
all_images.append({"mimeType": mime, "data": data}) | |
full_text = " ".join(text_parts).strip() | |
elif isinstance(content, str): | |
# Extract any base64 images embedded in the string | |
matches = re.findall(pattern, content) | |
for m, d in matches: | |
all_images.append({"mimeType": m, "data": d}) | |
# Remove image markdown from text | |
full_text = re.sub(pattern, "", content).strip() | |
if full_text: | |
# Capitalize the role for display purposes | |
all_text_lines.append(f"{role.capitalize()}: {full_text}") | |
prompt = ( | |
"\n".join(all_text_lines) | |
if all_text_lines | |
else "Please edit the provided image(s)" | |
) | |
return prompt, all_images | |
async def _run_blocking(self, fn: Callable, *args, **kwargs): | |
loop = asyncio.get_running_loop() | |
return await loop.run_in_executor(None, lambda: fn(*args, **kwargs)) | |
async def generate_image( | |
self, | |
prompt: str, | |
model: str, | |
n: int, | |
size: str, | |
quality: str, | |
) -> AsyncGenerator[str, None]: | |
await self.emit_status("π§βπ¨ Generating images...") | |
key = random.choice(self.valves.OPENAI_API_KEYS.split(",")).strip() | |
if not key: | |
yield "Error: OPENAI_API_KEYS not set" | |
return | |
client = OpenAI(api_key=key) | |
def _call_gen(): | |
return client.images.generate( | |
model=model, | |
prompt=prompt, | |
n=n, | |
size=size, | |
quality=quality, | |
moderation=self.valves.MODERATION, | |
) | |
try: | |
resp = await self._run_blocking(_call_gen) | |
for i, img in enumerate(resp.data, 1): | |
yield f"" | |
await self.emit_status("π Image generation successful", done=True) | |
except Exception as e: | |
yield f"Error during image generation: {e}" | |
await self.emit_status("β Image generation failed", done=True) | |
async def edit_image( | |
self, | |
base64_images: List[dict], | |
prompt: str, | |
model: str, | |
n: int, | |
size: str, | |
quality: str, | |
) -> AsyncGenerator[str, None]: | |
await self.emit_status("βοΈ Editing images...") | |
key = random.choice(self.valves.OPENAI_API_KEYS.split(",")).strip() | |
if not key: | |
yield "Error: OPENAI_API_KEYS not set" | |
return | |
client = OpenAI(api_key=key) | |
images_array = [] | |
for i, img_dict in enumerate(base64_images, start=1): | |
try: | |
data = base64.b64decode(img_dict["data"]) | |
if len(data) > 25 * 1024 * 1024: | |
raise ValueError("Image exceeds 25MB limit") | |
suffix = { | |
"image/png": ".png", | |
"image/jpeg": ".jpg", | |
"image/webp": ".webp", | |
}.get(img_dict["mimeType"]) | |
if not suffix: | |
raise ValueError(f"Unsupported format: {img_dict['mimeType']}") | |
image = (f"file{i}", data, img_dict["mimeType"]) | |
images_array.append(image) | |
except Exception as e: | |
raise ValueError(f"Error decoding image: {e}") | |
def _call_edit(images): | |
return client.images.edit( | |
model=model, | |
image=images, | |
prompt=prompt, | |
n=n, | |
size=size, | |
extra_body={ | |
"quality": quality, | |
"moderation": self.valves.MODERATION, | |
}, | |
) | |
try: | |
resp = await self._run_blocking(_call_edit, images_array) | |
for i, img in enumerate(resp.data, 1): | |
yield f"" | |
await self.emit_status("π Image edit successful", done=True) | |
except Exception as e: | |
yield f"Error during image edit: {e}" | |
await self.emit_status("β Image edit failed", done=True) | |
async def pipe( | |
self, | |
body: dict, | |
__event_emitter__: Callable[[dict], Awaitable[None]] = None, | |
) -> AsyncGenerator[str, None]: | |
self.emitter = __event_emitter__ | |
msgs = body.get("messages", []) | |
model_id, n = "gpt-image-1", min(max(1, self.valves.IMAGE_NUM), 10) | |
size, quality = self.valves.IMAGE_SIZE, self.valves.IMAGE_QUALITY | |
prompt, imgs = self.convert_message_to_prompt(msgs) | |
if imgs: | |
async for out in self.edit_image( | |
base64_images=imgs, | |
prompt=prompt, | |
model=model_id, | |
n=n, | |
size=size, | |
quality=quality, | |
): | |
yield out | |
else: | |
async for out in self.generate_image( | |
prompt=prompt, | |
model=model_id, | |
n=n, | |
size=size, | |
quality=quality, | |
): | |
yield out |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment