Skip to content

Instantly share code, notes, and snippets.

@thedumbtechguy
Last active October 28, 2025 14:12
Show Gist options
  • Save thedumbtechguy/b5185433538bb1e9f53c7b0b5b6ac89e to your computer and use it in GitHub Desktop.
Save thedumbtechguy/b5185433538bb1e9f53c7b0b5b6ac89e to your computer and use it in GitHub Desktop.
# DSPy call wrapper with comprehensive tracking
def dspy_call_with_tracking(agent, **params)
agent_execution = dspy_create_agent_execution
logger = DSPy::MessageCapturingLogger.new
# Set up callbacks to create messages as they happen
logger.on_request do |request_data|
# Create separate message records for each role in the DSPy prompt
request_data[:messages].each_with_index do |msg, index|
UniversalFlow::AgentExecutionMessage.create!(
agent_execution: agent_execution,
role: msg[:role],
content: msg[:content],
metadata: {
signature_class: agent.signature_class.name,
program_class: agent.class.name,
stored_program_id: agent.stored_program_id
}
)
end
end
logger.on_response do |response_data|
UniversalFlow::AgentExecutionMessage.create!(
agent_execution: agent_execution,
role: "assistant",
content: response_data[:content],
metadata: {
signature_class: agent.signature_class.name,
program_class: agent.class.name,
stored_program_id: agent.stored_program_id
}
)
# Update token usage if available
if response_data[:usage]
usage = response_data[:usage]
agent_execution.update!(
input_tokens: (agent_execution.input_tokens || 0) + (usage[:input_tokens] || 0),
output_tokens: (agent_execution.output_tokens || 0) + (usage[:output_tokens] || 0)
)
end
end
begin
result = logger.with_capture { agent.call(**params) }
# Update execution with success
agent_execution.update!(
status: "completed",
completed_at: Time.current,
metadata: agent_execution.metadata.merge({
signature_class: agent.signature_class.name,
program_class: agent.class.name,
stored_program_id: agent.stored_program_id,
captured_requests: logger.captured_requests,
outputs: result.to_h.except(*params.keys)
})
)
result
rescue => e
agent_execution.update!(status: "failed", completed_at: Time.current, error_message: e.message)
raise e
end
end
module DSPy
# Permanent monkey patches installed once at startup
module MessageCapturing
class << self
def install!
return if @installed
patch_openai_adapter if defined?(DSPy::LM::Adapters::OpenAIAdapter)
patch_gemini_adapter if defined?(DSPy::LM::GeminiAdapter)
patch_anthropic_adapter if defined?(DSPy::LM::Adapters::AnthropicAdapter)
@installed = true
end
def current_logger
Fiber[:dspy_message_logger]
end
def with_logger(logger)
old_logger = Fiber[:dspy_message_logger]
Fiber[:dspy_message_logger] = logger
yield
ensure
Fiber[:dspy_message_logger] = old_logger
end
private
def patch_openai_adapter
return unless defined?(DSPy::LM::Adapters::OpenAIAdapter)
original_method = DSPy::LM::Adapters::OpenAIAdapter.instance_method(:chat)
DSPy::LM::Adapters::OpenAIAdapter.define_method(:chat) do |messages:, **kwargs, &block|
logger = MessageCapturing.current_logger
logger&.capture_request("openai", @model, messages, kwargs)
result = original_method.bind_call(self, messages: messages, **kwargs, &block)
logger&.capture_response("openai", @model, result)
result
end
end
def patch_gemini_adapter
return unless defined?(DSPy::LM::GeminiAdapter)
original_method = DSPy::LM::GeminiAdapter.instance_method(:chat)
DSPy::LM::GeminiAdapter.define_method(:chat) do |messages:, **kwargs, &block|
logger = MessageCapturing.current_logger
logger&.capture_request("gemini", @model, messages, kwargs)
result = original_method.bind_call(self, messages: messages, **kwargs, &block)
logger&.capture_response("gemini", @model, result)
result
end
end
def patch_anthropic_adapter
return unless defined?(DSPy::LM::Adapters::AnthropicAdapter)
original_method = DSPy::LM::Adapters::AnthropicAdapter.instance_method(:chat)
DSPy::LM::Adapters::AnthropicAdapter.define_method(:chat) do |messages:, **kwargs, &block|
logger = MessageCapturing.current_logger
logger&.capture_request("anthropic", @model, messages, kwargs)
result = original_method.bind_call(self, messages: messages, **kwargs, &block)
logger&.capture_response("anthropic", @model, result)
result
end
end
end
end
class MessageCapturingLogger
attr_reader :captured_requests
def initialize
@captured_requests = []
@current_request = nil
@request_start_time = nil
@request_callbacks = []
@response_callbacks = []
end
def on_request(&block)
@request_callbacks << block
end
def on_response(&block)
@response_callbacks << block
end
def clear!
@captured_requests.clear
@current_request = nil
@request_start_time = nil
end
def capture_request(provider, model, messages, kwargs)
begin
dup_messages = case messages
when Array
messages.map { |m| m.respond_to?(:to_h) ? m.to_h : m }
else
messages.respond_to?(:to_h) ? messages.to_h : messages
end
rescue
dup_messages = messages.inspect
end
@request_start_time = Time.current
@current_request = {
provider: provider,
model: model,
messages: dup_messages,
kwargs: kwargs.dup,
timestamp: @request_start_time
}
# Call request callbacks
@request_callbacks.each { |callback| callback.call(@current_request) }
end
def capture_response(provider, model, result)
response_time = Time.current
response = {
provider: provider,
model: model,
content: result.content,
usage: result.usage&.to_h,
metadata: result.metadata&.to_h,
timestamp: response_time
}
# Call response callbacks
@response_callbacks.each { |callback| callback.call(response) }
# Create correlated pair if we have a current request
if @current_request
duration_ms = @request_start_time ? ((response_time - @request_start_time) * 1000).round(2) : nil
@captured_requests << {
request: @current_request,
response: response,
duration_ms: duration_ms
}
@current_request = nil
@request_start_time = nil
end
end
def with_capture(&block)
MessageCapturing.with_logger(self, &block)
end
def print_summary
puts "=== MESSAGE CAPTURE SUMMARY ==="
puts "Total captured requests: #{@captured_requests.size}"
puts
print_requests
end
def print_requests
puts "=== CAPTURED REQUESTS ==="
@captured_requests.each_with_index do |pair, i|
puts "Pair #{i + 1}:"
puts " Request:"
puts " Provider: #{pair[:request][:provider]}"
puts " Model: #{pair[:request][:model]}"
puts " Messages: #{pair[:request][:messages]}"
puts " Timestamp: #{pair[:request][:timestamp]}"
puts " Response:"
puts " Content: #{pair[:response][:content]}"
puts " Usage: #{pair[:response][:usage]}"
puts " Timestamp: #{pair[:response][:timestamp]}"
puts " Duration: #{pair[:duration_ms]}ms"
puts
end
end
def save_to_file(filename)
data = {
captured_requests: @captured_requests,
captured_at: Time.current
}
File.write(filename, JSON.pretty_generate(data))
puts "Captured requests saved to #{filename}"
end
end
# Live logging wrapper that shows each call as it happens
class LiveLogger < MessageCapturingLogger
def initialize
super
@call_count = 0
@start_time = Time.current
end
def show_progress_summary
elapsed = Time.current - @start_time
total_tokens = @captured_requests.map { |p| p[:response][:usage][:total_tokens] if p[:response][:usage] }.compact.sum
puts "\nπŸ“Š PROGRESS SUMMARY"
puts " Calls made: #{@call_count}"
puts " Time elapsed: #{elapsed.round(1)}s"
puts " Total tokens: #{total_tokens}"
puts " Avg tokens/call: #{total_tokens / @call_count if @call_count > 0}"
puts " Rate: #{(@call_count / elapsed).round(2)} calls/sec" if elapsed > 0
end
def capture_request(provider, model, messages, kwargs)
@call_count += 1
call_num = @call_count
# Show what we're about to send
puts "\nπŸ”„ LLM Call ##{call_num} (#{Time.current.strftime("%H:%M:%S")})"
puts " Model: #{model}"
# Extract and show prompt preview
begin
dup_messages = case messages
when Array
messages.map { |m| m.respond_to?(:to_h) ? m.to_h : m }
else
messages.respond_to?(:to_h) ? messages.to_h : messages
end
# Show first message content
if dup_messages.is_a?(Array) && dup_messages.first.is_a?(Hash)
content = dup_messages.first[:content] || dup_messages.first["content"]
if content
preview = content.last(2000).tr("\n", " ")
puts " πŸ“ Prompt: #{preview}"
end
end
rescue
puts " πŸ“ Prompt: [Complex message structure]"
end
# Call parent to store the request
super(provider, model, messages, kwargs.merge(call_number: call_num))
end
def capture_response(provider, model, result)
# Show response immediately
response_preview = result.content
puts " βœ… Response: #{response_preview}"
if result.usage
puts " 🎯 Tokens: #{result.usage.total_tokens} (in: #{result.usage.input_tokens}, out: #{result.usage.output_tokens})"
end
# Call parent to store the response
super
end
end
end
intent_recognizer = dspy_load_program(UniversalFlow::RecognizeIntent, DSPy::ChainOfThought)
result = dspy_call_with_tracking(
intent_recognizer,
user_input: user_input,
available_intents: available_intents,
intent_descriptions: intent_descriptions
)
@vicentereig
Copy link

For future reference:

I think you can use the new event bus, without adapters or monkey patches, just subscribers. Is there reason that doesn't match your use case? lm.tokens already gives you token counts with relevant context. Plus you can/should emit your own events too!
https://gist.github.com/vicentereig/08538c26df02981a1e8c3bb03fe9138e

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment