Skip to content

Instantly share code, notes, and snippets.

@vicentereig
Last active October 28, 2025 14:18
Show Gist options
  • Save vicentereig/08538c26df02981a1e8c3bb03fe9138e to your computer and use it in GitHub Desktop.
Save vicentereig/08538c26df02981a1e8c3bb03fe9138e to your computer and use it in GitHub Desktop.
DSPy.rb event-bus alternative to manual message capture
# frozen_string_literal: true
require 'dspy'
module UniversalFlow
# Collects request/response pairs so you can print them like the original gist.
class CapturedRequests < DSPy::Events::BaseSubscriber
Pair = Struct.new(:request, :response, :usage, :duration_ms, keyword_init: true)
attr_reader :pairs
def initialize
super()
@pairs = []
@pending = {}
@usage_queue = []
subscribe
end
def subscribe
add_subscription('universal_flow.call.started') { |_event, attrs| handle_start(attrs) }
add_subscription('universal_flow.call.completed') { |_event, attrs| handle_complete(attrs) }
add_subscription('universal_flow.call.failed') { |_event, attrs| handle_failed(attrs) }
add_subscription('lm.tokens') { |_event, attrs| handle_usage(attrs) }
end
def print_requests
puts '=== CAPTURED REQUESTS ==='
pairs.each_with_index do |pair, index|
puts "Pair #{index + 1}:"
puts ' Request:'
puts " Provider: #{pair.request[:provider]}"
puts " Model: #{pair.request[:model]}"
puts " Messages: #{pair.request[:messages]}"
puts " Timestamp: #{pair.request[:timestamp]}"
puts ' Response:'
puts " Content: #{pair.response[:content]}"
puts " Usage: #{pair.usage}"
puts " Timestamp: #{pair.response[:timestamp]}"
puts " Duration: #{pair.duration_ms}ms"
puts
end
end
private
def handle_start(attrs)
key = execution_key(attrs)
@pending[key] = {
started_at: Time.now,
request: {
provider: nil,
model: nil,
messages: attrs[:params],
timestamp: Time.now
}
}
end
def handle_complete(attrs)
key = execution_key(attrs)
entry = @pending.delete(key) || {}
started_at = entry[:started_at] || Time.now
finished_at = Time.now
usage = next_usage
request = entry[:request] || {}
if usage
request[:provider] ||= usage[:provider]
request[:model] ||= usage[:model]
end
pair = Pair.new(
request: request,
response: {
content: attrs[:output],
timestamp: finished_at
},
usage: usage && usage[:usage],
duration_ms: ((finished_at - started_at) * 1000).round(2)
)
@pairs << pair
end
def handle_failed(attrs)
key = execution_key(attrs)
entry = @pending.delete(key) || {}
started_at = entry[:started_at] || Time.now
finished_at = Time.now
pair = Pair.new(
request: entry[:request] || {},
response: {
content: "ERROR: #{attrs[:error]}",
timestamp: finished_at
},
usage: next_usage&.fetch(:usage, nil),
duration_ms: ((finished_at - started_at) * 1000).round(2)
)
@pairs << pair
end
def handle_usage(attrs)
usage_entry = {
provider: attrs['gen_ai.system'],
model: attrs['gen_ai.request.model'],
usage: {
input_tokens: attrs[:input_tokens],
output_tokens: attrs[:output_tokens],
total_tokens: attrs[:total_tokens]
},
timestamp: Time.now
}
@usage_queue << usage_entry
end
def next_usage
@usage_queue.shift
end
def execution_key(attrs)
attrs[:execution_id]
end
end
end

DSPy.rb Event Bus Instrumentation Proposal

This snippet shows how to migrate the original message-capturing gist to the official DSPy.rb event system. It removes adapter monkey patches and relies on structured events that already include module ancestry and token usage.

Original gist for reference: https://gist.github.com/thedumbtechguy/b5185433538bb1e9f53c7b0b5b6ac89e

Files

  • execution_monitor.rb – listens to built-in lm.tokens events for token accounting.
  • execution_transcriber.rb – persists prompts/responses by subscribing to custom universal_flow.* events.
  • captured_requests.rb – rebuilds the original print_requests view by combining the event payloads.
  • dspy_wrapper.rb – wraps an agent call, emits those events, and keeps your execution record up to date.

See docs/src/core-concepts/events.md for the API and lib/dspy/lm.rb to confirm where lm.tokens comes from. The new CapturedRequests#print_requests method mirrors the gist’s console output using the same data, just sourced from event subscribers.

# frozen_string_literal: true
require 'dspy'
require_relative 'execution_monitor'
require_relative 'execution_transcriber'
module UniversalFlow
module DSPyWrapper
module_function
def call_with_tracking(agent, execution:, **params)
monitor = UniversalFlow::ExecutionMonitor.new(execution: execution)
transcriber = UniversalFlow::ExecutionTranscriber.new(execution: execution)
DSPy.event('universal_flow.call.started', event_payload(agent, execution, params))
result = agent.call(**params)
DSPy.event('universal_flow.call.completed', event_payload(agent, execution, result: result))
execution.update!(status: 'completed', outputs: result.to_h)
result
rescue => error
execution.update!(status: 'failed', error_message: error.message)
DSPy.event('universal_flow.call.failed', event_payload(agent, execution, error: error.message))
raise
ensure
monitor.unsubscribe
transcriber.unsubscribe
end
def event_payload(agent, execution, params = {})
base = {
agent_class: agent.class.name,
signature: agent.respond_to?(:signature_class) ? agent.signature_class.name : nil,
execution_id: execution.id
}
if params.key?(:result)
base.merge(output: DSPy::TypeSerializer.serialize(params[:result]))
elsif params.key?(:error)
base.merge(error: params[:error])
else
base.merge(params: DSPy::TypeSerializer.serialize(params))
end
end
end
end
# frozen_string_literal: true
require 'dspy'
module UniversalFlow
# Listens to DSPy.rb's built-in lm.tokens events and updates an execution record.
class ExecutionMonitor < DSPy::Events::BaseSubscriber
def initialize(execution:)
super()
@execution = execution
subscribe
end
def subscribe
add_subscription('lm.tokens') do |_event, attrs|
@execution.increment!(:input_tokens, attrs[:input_tokens].to_i)
@execution.increment!(:output_tokens, attrs[:output_tokens].to_i)
end
end
end
end
# frozen_string_literal: true
require 'dspy'
module UniversalFlow
# Persists prompts/responses for an execution by listening to custom events.
class ExecutionTranscriber < DSPy::Events::BaseSubscriber
def initialize(execution:)
super()
@execution = execution
subscribe
end
def subscribe
add_subscription('universal_flow.call.started') do |_event, attrs|
UniversalFlow::AgentExecutionMessage.create!(
agent_execution: @execution,
role: 'system',
content: attrs.dig(:params, 'user_input') || attrs[:params],
metadata: attrs.slice(:agent_class, :signature)
)
end
add_subscription('universal_flow.call.completed') do |_event, attrs|
UniversalFlow::AgentExecutionMessage.create!(
agent_execution: @execution,
role: 'assistant',
content: attrs.dig(:output, 'response') || attrs[:output],
metadata: attrs.slice(:agent_class, :signature)
)
end
add_subscription('universal_flow.call.failed') do |_event, attrs|
UniversalFlow::AgentExecutionMessage.create!(
agent_execution: @execution,
role: 'error',
content: attrs[:error],
metadata: attrs.slice(:agent_class, :signature)
)
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment