Last active
October 28, 2025 14:12
-
-
Save thedumbtechguy/b5185433538bb1e9f53c7b0b5b6ac89e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # DSPy call wrapper with comprehensive tracking | |
| def dspy_call_with_tracking(agent, **params) | |
| agent_execution = dspy_create_agent_execution | |
| logger = DSPy::MessageCapturingLogger.new | |
| # Set up callbacks to create messages as they happen | |
| logger.on_request do |request_data| | |
| # Create separate message records for each role in the DSPy prompt | |
| request_data[:messages].each_with_index do |msg, index| | |
| UniversalFlow::AgentExecutionMessage.create!( | |
| agent_execution: agent_execution, | |
| role: msg[:role], | |
| content: msg[:content], | |
| metadata: { | |
| signature_class: agent.signature_class.name, | |
| program_class: agent.class.name, | |
| stored_program_id: agent.stored_program_id | |
| } | |
| ) | |
| end | |
| end | |
| logger.on_response do |response_data| | |
| UniversalFlow::AgentExecutionMessage.create!( | |
| agent_execution: agent_execution, | |
| role: "assistant", | |
| content: response_data[:content], | |
| metadata: { | |
| signature_class: agent.signature_class.name, | |
| program_class: agent.class.name, | |
| stored_program_id: agent.stored_program_id | |
| } | |
| ) | |
| # Update token usage if available | |
| if response_data[:usage] | |
| usage = response_data[:usage] | |
| agent_execution.update!( | |
| input_tokens: (agent_execution.input_tokens || 0) + (usage[:input_tokens] || 0), | |
| output_tokens: (agent_execution.output_tokens || 0) + (usage[:output_tokens] || 0) | |
| ) | |
| end | |
| end | |
| begin | |
| result = logger.with_capture { agent.call(**params) } | |
| # Update execution with success | |
| agent_execution.update!( | |
| status: "completed", | |
| completed_at: Time.current, | |
| metadata: agent_execution.metadata.merge({ | |
| signature_class: agent.signature_class.name, | |
| program_class: agent.class.name, | |
| stored_program_id: agent.stored_program_id, | |
| captured_requests: logger.captured_requests, | |
| outputs: result.to_h.except(*params.keys) | |
| }) | |
| ) | |
| result | |
| rescue => e | |
| agent_execution.update!(status: "failed", completed_at: Time.current, error_message: e.message) | |
| raise e | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| module DSPy | |
| # Permanent monkey patches installed once at startup | |
| module MessageCapturing | |
| class << self | |
| def install! | |
| return if @installed | |
| patch_openai_adapter if defined?(DSPy::LM::Adapters::OpenAIAdapter) | |
| patch_gemini_adapter if defined?(DSPy::LM::GeminiAdapter) | |
| patch_anthropic_adapter if defined?(DSPy::LM::Adapters::AnthropicAdapter) | |
| @installed = true | |
| end | |
| def current_logger | |
| Fiber[:dspy_message_logger] | |
| end | |
| def with_logger(logger) | |
| old_logger = Fiber[:dspy_message_logger] | |
| Fiber[:dspy_message_logger] = logger | |
| yield | |
| ensure | |
| Fiber[:dspy_message_logger] = old_logger | |
| end | |
| private | |
| def patch_openai_adapter | |
| return unless defined?(DSPy::LM::Adapters::OpenAIAdapter) | |
| original_method = DSPy::LM::Adapters::OpenAIAdapter.instance_method(:chat) | |
| DSPy::LM::Adapters::OpenAIAdapter.define_method(:chat) do |messages:, **kwargs, &block| | |
| logger = MessageCapturing.current_logger | |
| logger&.capture_request("openai", @model, messages, kwargs) | |
| result = original_method.bind_call(self, messages: messages, **kwargs, &block) | |
| logger&.capture_response("openai", @model, result) | |
| result | |
| end | |
| end | |
| def patch_gemini_adapter | |
| return unless defined?(DSPy::LM::GeminiAdapter) | |
| original_method = DSPy::LM::GeminiAdapter.instance_method(:chat) | |
| DSPy::LM::GeminiAdapter.define_method(:chat) do |messages:, **kwargs, &block| | |
| logger = MessageCapturing.current_logger | |
| logger&.capture_request("gemini", @model, messages, kwargs) | |
| result = original_method.bind_call(self, messages: messages, **kwargs, &block) | |
| logger&.capture_response("gemini", @model, result) | |
| result | |
| end | |
| end | |
| def patch_anthropic_adapter | |
| return unless defined?(DSPy::LM::Adapters::AnthropicAdapter) | |
| original_method = DSPy::LM::Adapters::AnthropicAdapter.instance_method(:chat) | |
| DSPy::LM::Adapters::AnthropicAdapter.define_method(:chat) do |messages:, **kwargs, &block| | |
| logger = MessageCapturing.current_logger | |
| logger&.capture_request("anthropic", @model, messages, kwargs) | |
| result = original_method.bind_call(self, messages: messages, **kwargs, &block) | |
| logger&.capture_response("anthropic", @model, result) | |
| result | |
| end | |
| end | |
| end | |
| end | |
| class MessageCapturingLogger | |
| attr_reader :captured_requests | |
| def initialize | |
| @captured_requests = [] | |
| @current_request = nil | |
| @request_start_time = nil | |
| @request_callbacks = [] | |
| @response_callbacks = [] | |
| end | |
| def on_request(&block) | |
| @request_callbacks << block | |
| end | |
| def on_response(&block) | |
| @response_callbacks << block | |
| end | |
| def clear! | |
| @captured_requests.clear | |
| @current_request = nil | |
| @request_start_time = nil | |
| end | |
| def capture_request(provider, model, messages, kwargs) | |
| begin | |
| dup_messages = case messages | |
| when Array | |
| messages.map { |m| m.respond_to?(:to_h) ? m.to_h : m } | |
| else | |
| messages.respond_to?(:to_h) ? messages.to_h : messages | |
| end | |
| rescue | |
| dup_messages = messages.inspect | |
| end | |
| @request_start_time = Time.current | |
| @current_request = { | |
| provider: provider, | |
| model: model, | |
| messages: dup_messages, | |
| kwargs: kwargs.dup, | |
| timestamp: @request_start_time | |
| } | |
| # Call request callbacks | |
| @request_callbacks.each { |callback| callback.call(@current_request) } | |
| end | |
| def capture_response(provider, model, result) | |
| response_time = Time.current | |
| response = { | |
| provider: provider, | |
| model: model, | |
| content: result.content, | |
| usage: result.usage&.to_h, | |
| metadata: result.metadata&.to_h, | |
| timestamp: response_time | |
| } | |
| # Call response callbacks | |
| @response_callbacks.each { |callback| callback.call(response) } | |
| # Create correlated pair if we have a current request | |
| if @current_request | |
| duration_ms = @request_start_time ? ((response_time - @request_start_time) * 1000).round(2) : nil | |
| @captured_requests << { | |
| request: @current_request, | |
| response: response, | |
| duration_ms: duration_ms | |
| } | |
| @current_request = nil | |
| @request_start_time = nil | |
| end | |
| end | |
| def with_capture(&block) | |
| MessageCapturing.with_logger(self, &block) | |
| end | |
| def print_summary | |
| puts "=== MESSAGE CAPTURE SUMMARY ===" | |
| puts "Total captured requests: #{@captured_requests.size}" | |
| puts | |
| print_requests | |
| end | |
| def print_requests | |
| puts "=== CAPTURED REQUESTS ===" | |
| @captured_requests.each_with_index do |pair, i| | |
| puts "Pair #{i + 1}:" | |
| puts " Request:" | |
| puts " Provider: #{pair[:request][:provider]}" | |
| puts " Model: #{pair[:request][:model]}" | |
| puts " Messages: #{pair[:request][:messages]}" | |
| puts " Timestamp: #{pair[:request][:timestamp]}" | |
| puts " Response:" | |
| puts " Content: #{pair[:response][:content]}" | |
| puts " Usage: #{pair[:response][:usage]}" | |
| puts " Timestamp: #{pair[:response][:timestamp]}" | |
| puts " Duration: #{pair[:duration_ms]}ms" | |
| puts | |
| end | |
| end | |
| def save_to_file(filename) | |
| data = { | |
| captured_requests: @captured_requests, | |
| captured_at: Time.current | |
| } | |
| File.write(filename, JSON.pretty_generate(data)) | |
| puts "Captured requests saved to #{filename}" | |
| end | |
| end | |
| # Live logging wrapper that shows each call as it happens | |
| class LiveLogger < MessageCapturingLogger | |
| def initialize | |
| super | |
| @call_count = 0 | |
| @start_time = Time.current | |
| end | |
| def show_progress_summary | |
| elapsed = Time.current - @start_time | |
| total_tokens = @captured_requests.map { |p| p[:response][:usage][:total_tokens] if p[:response][:usage] }.compact.sum | |
| puts "\nπ PROGRESS SUMMARY" | |
| puts " Calls made: #{@call_count}" | |
| puts " Time elapsed: #{elapsed.round(1)}s" | |
| puts " Total tokens: #{total_tokens}" | |
| puts " Avg tokens/call: #{total_tokens / @call_count if @call_count > 0}" | |
| puts " Rate: #{(@call_count / elapsed).round(2)} calls/sec" if elapsed > 0 | |
| end | |
| def capture_request(provider, model, messages, kwargs) | |
| @call_count += 1 | |
| call_num = @call_count | |
| # Show what we're about to send | |
| puts "\nπ LLM Call ##{call_num} (#{Time.current.strftime("%H:%M:%S")})" | |
| puts " Model: #{model}" | |
| # Extract and show prompt preview | |
| begin | |
| dup_messages = case messages | |
| when Array | |
| messages.map { |m| m.respond_to?(:to_h) ? m.to_h : m } | |
| else | |
| messages.respond_to?(:to_h) ? messages.to_h : messages | |
| end | |
| # Show first message content | |
| if dup_messages.is_a?(Array) && dup_messages.first.is_a?(Hash) | |
| content = dup_messages.first[:content] || dup_messages.first["content"] | |
| if content | |
| preview = content.last(2000).tr("\n", " ") | |
| puts " π Prompt: #{preview}" | |
| end | |
| end | |
| rescue | |
| puts " π Prompt: [Complex message structure]" | |
| end | |
| # Call parent to store the request | |
| super(provider, model, messages, kwargs.merge(call_number: call_num)) | |
| end | |
| def capture_response(provider, model, result) | |
| # Show response immediately | |
| response_preview = result.content | |
| puts " β Response: #{response_preview}" | |
| if result.usage | |
| puts " π― Tokens: #{result.usage.total_tokens} (in: #{result.usage.input_tokens}, out: #{result.usage.output_tokens})" | |
| end | |
| # Call parent to store the response | |
| super | |
| end | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| intent_recognizer = dspy_load_program(UniversalFlow::RecognizeIntent, DSPy::ChainOfThought) | |
| result = dspy_call_with_tracking( | |
| intent_recognizer, | |
| user_input: user_input, | |
| available_intents: available_intents, | |
| intent_descriptions: intent_descriptions | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For future reference: