|
# frozen_string_literal: true |
|
|
|
require 'dspy' |
|
|
|
module UniversalFlow |
|
# Collects request/response pairs so you can print them like the original gist. |
|
class CapturedRequests < DSPy::Events::BaseSubscriber |
|
Pair = Struct.new(:request, :response, :usage, :duration_ms, keyword_init: true) |
|
|
|
attr_reader :pairs |
|
|
|
def initialize |
|
super() |
|
@pairs = [] |
|
@pending = {} |
|
@usage_queue = [] |
|
subscribe |
|
end |
|
|
|
def subscribe |
|
add_subscription('universal_flow.call.started') { |_event, attrs| handle_start(attrs) } |
|
add_subscription('universal_flow.call.completed') { |_event, attrs| handle_complete(attrs) } |
|
add_subscription('universal_flow.call.failed') { |_event, attrs| handle_failed(attrs) } |
|
add_subscription('lm.tokens') { |_event, attrs| handle_usage(attrs) } |
|
end |
|
|
|
def print_requests |
|
puts '=== CAPTURED REQUESTS ===' |
|
pairs.each_with_index do |pair, index| |
|
puts "Pair #{index + 1}:" |
|
puts ' Request:' |
|
puts " Provider: #{pair.request[:provider]}" |
|
puts " Model: #{pair.request[:model]}" |
|
puts " Messages: #{pair.request[:messages]}" |
|
puts " Timestamp: #{pair.request[:timestamp]}" |
|
puts ' Response:' |
|
puts " Content: #{pair.response[:content]}" |
|
puts " Usage: #{pair.usage}" |
|
puts " Timestamp: #{pair.response[:timestamp]}" |
|
puts " Duration: #{pair.duration_ms}ms" |
|
puts |
|
end |
|
end |
|
|
|
private |
|
|
|
def handle_start(attrs) |
|
key = execution_key(attrs) |
|
@pending[key] = { |
|
started_at: Time.now, |
|
request: { |
|
provider: nil, |
|
model: nil, |
|
messages: attrs[:params], |
|
timestamp: Time.now |
|
} |
|
} |
|
end |
|
|
|
def handle_complete(attrs) |
|
key = execution_key(attrs) |
|
entry = @pending.delete(key) || {} |
|
started_at = entry[:started_at] || Time.now |
|
finished_at = Time.now |
|
usage = next_usage |
|
|
|
request = entry[:request] || {} |
|
if usage |
|
request[:provider] ||= usage[:provider] |
|
request[:model] ||= usage[:model] |
|
end |
|
|
|
pair = Pair.new( |
|
request: request, |
|
response: { |
|
content: attrs[:output], |
|
timestamp: finished_at |
|
}, |
|
usage: usage && usage[:usage], |
|
duration_ms: ((finished_at - started_at) * 1000).round(2) |
|
) |
|
|
|
@pairs << pair |
|
end |
|
|
|
def handle_failed(attrs) |
|
key = execution_key(attrs) |
|
entry = @pending.delete(key) || {} |
|
started_at = entry[:started_at] || Time.now |
|
finished_at = Time.now |
|
|
|
pair = Pair.new( |
|
request: entry[:request] || {}, |
|
response: { |
|
content: "ERROR: #{attrs[:error]}", |
|
timestamp: finished_at |
|
}, |
|
usage: next_usage&.fetch(:usage, nil), |
|
duration_ms: ((finished_at - started_at) * 1000).round(2) |
|
) |
|
|
|
@pairs << pair |
|
end |
|
|
|
def handle_usage(attrs) |
|
usage_entry = { |
|
provider: attrs['gen_ai.system'], |
|
model: attrs['gen_ai.request.model'], |
|
usage: { |
|
input_tokens: attrs[:input_tokens], |
|
output_tokens: attrs[:output_tokens], |
|
total_tokens: attrs[:total_tokens] |
|
}, |
|
timestamp: Time.now |
|
} |
|
|
|
@usage_queue << usage_entry |
|
end |
|
|
|
def next_usage |
|
@usage_queue.shift |
|
end |
|
|
|
def execution_key(attrs) |
|
attrs[:execution_id] |
|
end |
|
end |
|
end |