Skip to content

Instantly share code, notes, and snippets.

@Darkflib
Created April 15, 2025 01:12
Show Gist options
  • Save Darkflib/4d65fa9578aac31db1f3f218ef4be2eb to your computer and use it in GitHub Desktop.
Save Darkflib/4d65fa9578aac31db1f3f218ef4be2eb to your computer and use it in GitHub Desktop.

Conversation with gpt-4.1

Extracted on 2025-04-15 02:09:53

System

Use Python code where applicable. Follow best practice - exceptions, inline documentation etc. If there are multiple options, please outline them before commiting to them.

User

I need a python cloud run worker to accept pubsub push requests. It will use the data inside the request to run a google genai (gemini) api call to generate a response based on a template where the topic, audience, and tone from the request will be filled. Response from gemini should be structured data: an array of objects with title, summary, tags, and suggested hero image. The request should then pass the output back along with additional fields (global, not per array element).

The goal is to take a topic (for example 'Neurodivergence - ADHD/ASD') and produce a list of potential articles (title, summary, tags etc) for the topic.

Assistant

Let's break down your requirements and provide a best-practice, scalable design for a Cloud Run worker in Python that does the following:

Requirements Recap

  1. Accept Google Pub/Sub push requests (HTTP POST, JWT-verified).
  2. Extract topic, audience, tone from the request.
  3. Call Google GenAI (Gemini) API with a template prompt using those fields.
  4. Parse GenAI response to structured output, an array of {title, summary, tags, suggested hero image}.
  5. Respond with this array plus additional global fields.
  6. Robust and secure code (handle errors, logging, etc).

Solution Architecture

  • Cloud Run: Flask-based Python app, accepts HTTP POST from Pub/Sub (docs).
    • Consider using FastAPI if you want async support, but Flask is widely used for this pattern.
  • Gemini API call: Use Google genai client or REST.
  • Templated Prompt: Use Python's str.format() for prompt filled with topic, audience, tone.

1. Pub/Sub Push Request Verification

Google's Pub/Sub push will send:

  • Content-Type: application/json
  • Data in JSON: { "message": { "data": "base64-encoded, user-supplied json" }, ... }
  • JWT Bearer header for authentication.

You'll want to:

  • Verify JWT if needed. (for GCP services, Run can use IAM, but JWT validation is a best practice).
  • Decode base64 in message.data to get your real input (topic, etc.)

2. Gemini (Google GenAI) Integration

Gemini API (Google GenAI) Python Client

  • Make an API call using the prompt, receive the result.

3. Response Parsing

  • The LLM call returns text, we'll parse structured data via:
    • Prompt engineering: "Output as JSON list of ..."
    • Or parse regular text to list of dicts if JSON parsing fails.

4. Return Output

  • Respond with the generated articles plus any extra fields (as required).

Implementation Plan

Let’s outline the code.

Options for web server: Flask is standard, but FastAPI is nice for validation & async. I'll start with Flask (most common for Google Cloud Run Python skeletons) unless you prefer otherwise.


Flask App Skeleton

from flask import Flask, request, jsonify
import base64
import json
import logging
from google.oauth2 import id_token
from google.auth.transport import requests as google_requests
from genai.client import Client  # assuming we use google/genai Python client
import os

app = Flask(__name__)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Gemini API Setup (ensure GOOGLE_API_KEY env var is set in Cloud Run)
GENAI_API_KEY = os.environ.get("GOOGLE_API_KEY")
genai_client = Client(api_key=GENAI_API_KEY)

# Your template for prompting Gemini
PROMPT_TEMPLATE = """
You are an expert content strategist. Given the topic: "{topic}" and writing for the audience: "{audience}" in a "{tone}" tone:

Generate a list of 5 potential article ideas. 
For each, output a JSON object with these fields:
- title: Concise, engaging article headline
- summary: 1-2 sentence summary of what the article covers
- tags: a list of 3-5 relevant tags
- suggested_hero_image: A suggestion for the type of main illustrative image

Output an array (JSON) of these objects. Example: [{{"title": "...", ...}}]
"""

# If you want JWT verification, set this to your GCP project number
AUDIENCE = os.environ.get("PUBSUB_AUDIENCE", "")  # e.g., https://worker-YOUR_REGION.run.app

def verify_pubsub_jwt(jwt_token):
    try:
        id_info = id_token.verify_oauth2_token(
            jwt_token, google_requests.Request(), AUDIENCE
        )
        return True
    except Exception as e:
        logger.warning(f"JWT validation failed: {str(e)}")
        return False

def parse_pubsub_request(req):
    try:
        envelope = req.get_json()
        # Pub/Sub message envelope
        msg = envelope['message']
        # Data is base64-encoded
        payload = base64.b64decode(msg['data'])
        data = json.loads(payload.decode('utf-8'))
        return data
    except Exception as e:
        logger.exception("Failed to parse Pub/Sub request")
        raise

def build_prompt(data: dict):
    # Ensure required fields exist
    for field in ["topic", "audience", "tone"]:
        if field not in data:
            raise ValueError(f"Missing required field: {field}")
    return PROMPT_TEMPLATE.format(
        topic=data["topic"], audience=data["audience"], tone=data["tone"]
    )

def call_gemini(prompt: str):
    try:
        # Modify as per actual google genai client usage!
        result = genai_client.generate_content(prompt, model="gemini-pro")
        # result.candidates, result.text, etc. depending on actual response
        return result.text  # or however the text is returned
    except Exception as e:
        logger.exception("Gemini API error")
        raise

def parse_gemini_response(response_str: str):
    # Try to parse JSON directly
    try:
        articles = json.loads(response_str)
        # Validate structure
        for art in articles:
            if not all(k in art for k in ["title", "summary", "tags", "suggested_hero_image"]):
                raise ValueError("Malformed article entry")
        return articles
    except Exception:
        # Fallback: return as raw string or trigger admin alert
        logger.error("Failed to parse Gemini response as JSON array.")
        raise

@app.route("/", methods=["POST"])
def main_entrypoint():
    # If you want to verify the JWT header:
    jwt_token = request.headers.get("Authorization", "").replace("Bearer ", "")
    if AUDIENCE and not verify_pubsub_jwt(jwt_token):
        return "Unauthorized", 401

    try:
        data = parse_pubsub_request(request)
        prompt = build_prompt(data)
        gemini_response = call_gemini(prompt)
        articles = parse_gemini_response(gemini_response)

        # Merge in extra global fields specified in the original request
        out = {
            "articles": articles,
            **{k: v for k, v in data.items() if k not in ("topic", "audience", "tone")}
        }
        return jsonify(out), 200

    except Exception as e:
        logger.exception("Worker error")
        return jsonify({"error": str(e)}), 500

if __name__ == "__main__":
    # For local debugging only
    app.run(debug=True, port=8080, host='0.0.0.0')

Customization/Options

  • Framework:
    • Flask vs FastAPI: FastAPI gives stricter input validation, async support, and great docs, but Flask is 'Google default'. Happy to provide FastAPI if you prefer!
  • Gemini API Client:
    • Adjust for your actual GenAI client version (I drafted as per docs, but you may need to pip install and tweak for real API structure).
  • Pub/Sub JWT verification:
    • Cloud Run can be protected by IAM, but JWT check is extra hardening.
  • Prompting:
    • Tweak the prompt for output shape/length as needed for best results.

Next Steps

  • Gemini API credentials: You’ll need to set up auth and pass the API key/credentials to the worker (as environment variables in Cloud Run).
  • Testing: Deploy to Cloud Run, configure Pub/Sub push subscription to hit your endpoint.

Let me know if you want this in FastAPI, with unit tests, or with Dockerfile/Cloud Build steps!
Also: specify the additional global fields if you have a schema in mind.


Quick Requirements Followup:

  • Do you want the LLM prompt output as JSON array only? (recommended)
  • What extra 'global' fields should be included in output?
  • Is Flask okay, or do you want FastAPI?

Let me know your preferences and I'll refine, or clarify any part!

User

Pubsub message shoud probably also have a uuid for the request (we can dedup based on this) I beleive flask should be fine initiially - it can support concurrency, but fastAPI might be a better choice later with the async support. LLM response needs to be validated using structured output - JSON Schema (so yes, just the array) JWT verification is preferred Prompt is fine for testing. API credentials for gemini are passed via environment as expected. Global fields should contain topic, tone, the request uuid, and a timestamp. Perhaps also the environment (dev/staging/prod) if set too.

Assistant

Excellent feedback! Here’s how we’ll address each requirement:


Design/Implementation Updates

1. Request UUID

  • We'll expect uuid in the incoming Pub/Sub message payload.
  • If missing, reject the request with an error (deduplication requires it).

2. Flask

  • We'll continue with Flask for the MVP.

3. LLM Response Validation

  • We'll use jsonschema to validate the Gemini response against a strict JSON Schema.
  • Only JSON array is accepted from the LLM; if not valid, log and raise.

4. JWT Verification

  • Implemented as above (with GCP audience/issuer hardening).

5. Environment Variables

  • GOOGLE_API_KEY
  • ENVIRONMENT (optional: dev/staging/prod).

6. Response

Include: topic, tone, uuid, timestamp, environment alongside articles.

7. Timestamp

  • Will use UTC ISO8601 format (datetime.utcnow().isoformat() + "Z").

Updated Code (with inline explanations)

from flask import Flask, request, jsonify
import base64
import json
import logging
import os
from datetime import datetime
from google.oauth2 import id_token
from google.auth.transport import requests as google_requests
import jsonschema

# Placeholder: You'll need to adjust to your Gemini client library or REST client.
from genai.client import Client  # Hypothetical, adapt to your package.

app = Flask(__name__)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Config
GENAI_API_KEY = os.environ.get("GOOGLE_API_KEY")
ENVIRONMENT = os.environ.get("ENVIRONMENT", "")  # dev/staging/prod
AUDIENCE = os.environ.get("PUBSUB_AUDIENCE", "")
genai_client = Client(api_key=GENAI_API_KEY)

# Strict JSON Schema for each article
ARTICLE_SCHEMA = {
    "type": "object",
    "properties": {
        "title": {"type": "string"},
        "summary": {"type": "string"},
        "tags": {
            "type": "array",
            "items": {"type": "string"},
            "minItems": 1,
            "maxItems": 5
        },
        "suggested_hero_image": {"type": "string"}
    },
    "required": ["title", "summary", "tags", "suggested_hero_image"]
}

ARTICLES_ARRAY_SCHEMA = {
    "type": "array",
    "items": ARTICLE_SCHEMA,
    "minItems": 1,
    "maxItems": 10
}

PROMPT_TEMPLATE = """
You are an expert content strategist. Given the topic: "{topic}" and writing for the audience: "{audience}" in a "{tone}" tone:

Generate a list of 5 potential article ideas.
For each, output a JSON object with these fields:
- title: Concise, engaging article headline
- summary: 1-2 sentence summary of what the article covers
- tags: a list of 3-5 relevant tags
- suggested_hero_image: A suggestion for the type of main illustrative image

Output an array (JSON) of these objects. Example: [{{"title": "...", ...}}]
"""

def verify_pubsub_jwt(jwt_token):
    """Verify the JWT from Pub/Sub."""
    try:
        id_info = id_token.verify_oauth2_token(
            jwt_token,
            google_requests.Request(),
            AUDIENCE
        )
        # check issuer if desired: id_info['iss'] == "https://accounts.google.com"
        return True
    except Exception as e:
        logger.warning(f"JWT validation failed: {str(e)}")
        return False

def parse_pubsub_request(req):
    """Decode Pub/Sub push, ensure required fields."""
    try:
        envelope = req.get_json(force=True)
        msg = envelope['message']
        payload = base64.b64decode(msg['data'])
        data = json.loads(payload.decode('utf-8'))
    except Exception as e:
        logger.exception("Failed to parse/decode Pub/Sub message.")
        raise ValueError("Bad message body or data.")

    # Validate essential fields
    required_fields = ["topic", "audience", "tone", "uuid"]
    for f in required_fields:
        if f not in data or not data[f]:
            raise ValueError(f"Missing required field in message: {f}")
    return data

def build_prompt(data):
    return PROMPT_TEMPLATE.format(
        topic=data["topic"], audience=data["audience"], tone=data["tone"]
    )

def call_gemini(prompt: str):
    try:
        # Adjust to your actual Gemini client REST/Python usage
        result = genai_client.generate_content(prompt, model="gemini-pro")
        # Hypothetical: result.text contains LLM output
        return result.text
    except Exception as e:
        logger.exception("Failed to call Gemini API.")
        raise RuntimeError("Gemini API call failed.")

def parse_gemini_and_validate(llm_response: str):
    try:
        data = json.loads(llm_response)
    except Exception as e:
        logger.error("Gemini did not return JSON-parsable output")
        raise ValueError("Gemini response not JSON.")

    try:
        # Validate against the array structure
        jsonschema.validate(data, ARTICLES_ARRAY_SCHEMA)
    except jsonschema.ValidationError as e:
        logger.error(f"Gemini output failed schema validation: {str(e)}")
        raise ValueError("Gemini response failed JSON schema validation.")

    return data

@app.route("/", methods=["POST"])
def main_entrypoint():
    # JWT check (unless AUDIENCE deliberately unset for local testing)
    jwt_token = request.headers.get("Authorization", "").replace("Bearer ", "")
    if AUDIENCE and not verify_pubsub_jwt(jwt_token):
        return "Unauthorized", 401

    try:
        data = parse_pubsub_request(request)
        prompt = build_prompt(data)
        llm_response = call_gemini(prompt)
        articles = parse_gemini_and_validate(llm_response)

        # Build the response (global)
        response = {
            "uuid": data["uuid"],
            "topic": data["topic"],
            "tone": data["tone"],
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "articles": articles
        }
        if ENVIRONMENT:
            response["environment"] = ENVIRONMENT

        return jsonify(response), 200

    except Exception as e:
        logger.exception("Worker error")
        return jsonify({"error": str(e)}), 400

if __name__ == "__main__":
    app.run(debug=True, port=8080, host='0.0.0.0')

Requirements Checklist/Notes

  • UUID: Required, passed through all logic.
  • LLM output: Must be JSON list matching schema, validated.
  • Fields: topic, tone, uuid, timestamp, environment, and structured articles.
  • Security: JWT verification (with project audience).
  • Error handling: Returns errors with useful messages.
  • Logging: For troubleshooting (logs to stdout/Cloud Logging by default).
  • Concurrency: Flask is stateless and will run concurrent requests in Cloud Run.

Next Steps

  • Test Locally: With test JWT or with AUDIENCE unset.
  • Set Env Vars: GOOGLE_API_KEY, ENVIRONMENT, PUBSUB_AUDIENCE.
  • Deploy to Cloud Run: Attach to Pub/Sub "Push" subscription.
  • Update Gemini Client: Adjust genai_client call to match your import/library usage.

If you later want this in FastAPI, want Dockerfile+deployment samples, or want JSONSchema moved to a central location/tested, let me know!


Ready to copy and use. Confirm your Gemini client (install/import), and let me know if you run into edge cases or want further refinement.

User

That timestamp is fine. Automatically parsable and human readable.

This is a script from the google code generator with the above schema. Can you complete the gemini functions?

import base64
import os
from google import genai
from google.genai import types


def generate():
    client = genai.Client(
        api_key=os.environ.get("GEMINI_API_KEY"),
    )

    model = "gemini-2.0-flash"
    contents = [
        types.Content(
            role="user",
            parts=[
                types.Part.from_text(text="""INSERT_INPUT_HERE"""),
            ],
        ),
    ]
    generate_content_config = types.GenerateContentConfig(
        response_mime_type="application/json",
        response_schema=genai.types.Schema(
                        type = genai.types.Type.ARRAY,
                        items = genai.types.Schema(
                            type = genai.types.Type.OBJECT,
                            required = ["title", "summary", "tags", "suggested_hero_image"],
                            properties = {
                                "title": genai.types.Schema(
                                    type = genai.types.Type.STRING,
                                ),
                                "summary": genai.types.Schema(
                                    type = genai.types.Type.STRING,
                                ),
                                "tags": genai.types.Schema(
                                    type = genai.types.Type.ARRAY,
                                    items = genai.types.Schema(
                                        type = genai.types.Type.STRING,
                                    ),
                                ),
                                "suggested_hero_image": genai.types.Schema(
                                    type = genai.types.Type.STRING,
                                ),
                            },
                        ),
                    ),
    )

    for chunk in client.models.generate_content_stream(
        model=model,
        contents=contents,
        config=generate_content_config,
    ):
        print(chunk.text, end="")

if __name__ == "__main__":
    generate()

Assistant

Absolutely! Since you’re using the google.genai client, the functions are a bit different than generic HTTP or legacy Vertex code.

You have a great base — your configuration already instructs Gemini to use a strict output schema, so parsing/validation is less critical (schema is enforced server-side).
I'll provide:

  • a reusable call_gemini function, which returns data (as list[dict])
  • tweak it so you can pass the prompt string as a parameter
  • handle error cases and return parsed list

Core Gemini API Function

import os
from google import genai
from google.genai import types

def call_gemini(prompt: str, model: str = "gemini-2.0-flash") -> list:
    """
    Calls Gemini API with a prompt and fetches a structured list as per schema.
    Returns: list of dicts (articles)
    Raises: RuntimeError on error
    """

    client = genai.Client(
        api_key=os.environ.get("GEMINI_API_KEY"),
    )

    # Gated content structure
    contents = [
        types.Content(
            role="user",
            parts=[
                types.Part.from_text(text=prompt),
            ],
        ),
    ]
    
    # Define structured schema constraint (enforced by Gemini server)
    generate_content_config = types.GenerateContentConfig(
        response_mime_type="application/json",
        response_schema=genai.types.Schema(
            type=genai.types.Type.ARRAY,
            items=genai.types.Schema(
                type=genai.types.Type.OBJECT,
                required=["title", "summary", "tags", "suggested_hero_image"],
                properties={
                    "title": genai.types.Schema(
                        type=genai.types.Type.STRING,
                    ),
                    "summary": genai.types.Schema(
                        type=genai.types.Type.STRING,
                    ),
                    "tags": genai.types.Schema(
                        type=genai.types.Type.ARRAY,
                        items=genai.types.Schema(
                            type=genai.types.Type.STRING,
                        ),
                    ),
                    "suggested_hero_image": genai.types.Schema(
                        type=genai.types.Type.STRING,
                    ),
                },
            ),
        ),
    )

    try:
        # This generator returns streaming objects, assemble full result:
        resp_chunks = client.models.generate_content_stream(
            model=model,
            contents=contents,
            config=generate_content_config,
        )

        full_text = ""
        for chunk in resp_chunks:
            # chunk.text should be a fragment of the JSON output
            if getattr(chunk, "text", None):
                full_text += chunk.text

        # Parse fully assembled JSON
        import json
        articles = json.loads(full_text)

        # Optionally assert structure: it *should* be valid per schema
        if not (isinstance(articles, list) and all(
            isinstance(article, dict) for article in articles
        )):
            raise ValueError("Response is not a list of dicts")

        return articles

    except Exception as e:
        raise RuntimeError(f"Gemini API error: {str(e)}")

Integration

  • Use call_gemini(prompt) in your Flask route.
  • It will raise if Gemini fails or the output isn't valid JSON.
  • Schema is already enforced, so you can relax double validation if you want (but it's fine to keep as a defense-in-depth layer).

Example usage

# In your Flask endpoint or business logic:
try:
    articles = call_gemini(prompt)
except Exception as e:
    # log and raise/return error
    ...

Notes

  • This uses generate_content_stream to handle streaming responses, which are reassembled into a usable JSON string.
  • You can switch to generate_content (non-streaming) if you want, see docs.
  • For Gemini 1.5/2.0 models with response_schema, this is the robust method.

Paste this function into your app, and call with call_gemini(prompt) as written!
Let me know if you want the non-streaming version, custom error classes, or further logging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment