Skip to content

Instantly share code, notes, and snippets.

@rockwotj
Last active March 26, 2025 16:38
Show Gist options
  • Save rockwotj/621d34a865896c394f75539b564cd0f6 to your computer and use it in GitHub Desktop.
Save rockwotj/621d34a865896c394f75539b564cd0f6 to your computer and use it in GitHub Desktop.
#edit
input:
http_server:
path: /post/chat
allowed_verbs:
- POST
pipeline:
processors:
- openai_chat_completion:
api_key: "${USER_CONFIGURED_OPENAI_API_KEY}"
model: "${USER_CONFIGURED_OPENAI_MODEL:gpt-4o}"
system_prompt: |
You are a helpful question answering AI agent.
You answer questions and have available a tool to search a document store
for semantically relevant content to answer questions.
prompt: "${this.question}"
response_format: json
json_schema:
name: ""
schema: ""
tools:
- name: SearchVectorDB
description: 'Retrieve documents from the vector database to help answer questions'
parameters:
required: ["question"]
properties:
question:
type: string
description: "the text to compute embeddings for and search for similar vectors"
processors:
- openai_embeddings:
api_key: "${USER_CONFIGURED_OPENAI_API_KEY}"
model: text-embedding-3-small
text_mapping: "${! this.question }"
dimensions: 768
- sql_raw:
driver: "postgresql"
dsn: "${USER_CONFIGURED_POSTGRES_DSN}"
query: |
SELECT document FROM ${USER_CONFIGURED_TOPIC} ORDER BY embeddings <-> $1 LIMIT 5
args_mapping: "[ this.vector() ]"
- mapping: |
Searching the vector database resulted in the following results:
${! this.map_each(row -> row.document).join("\n\n") }
output:
reject_errored:
drop: {}
input:
kafka_franz:
seed_brokers: ["${REDPANDA_BROKERS}"]
topics: ["${USER_CONFIGURED_TOPIC}"]
consumer_group: "${USER_CONFIGURED_TOPIC}-ai-pipeline"
pipeline:
processors:
- mapping: |
root.document = content().string()
- label: embeddings
branch:
processors:
- openai_embeddings:
api_key: "${USER_CONFIGURED_OPENAI_API_KEY}"
model: text-embedding-3-small
text: "${! this.document }"
dimensions: 768
result_map:
root.embeddings = this
output:
sql_insert:
driver: "postgresql"
dsn: "${USER_CONFIGURED_POSTGRES_DSN}"
table: "${USER_CONFIGURED_TOPIC}"
init_statement: |
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE ${USER_CONFIGURED_TOPIC} (key text PRIMARY KEY, document text, embeddings vector(768));
columns: [key, document, embeddings]
args_mapping: root = [ @kafka_key.not_empty() | uuid_v4(), this.document, this.embeddings.vector() ]
max_in_flight: 8
batching:
count: 64
period: "30s"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment