| 
          # frozen_string_literal: true | 
        
        
           | 
          
 | 
        
        
           | 
          require 'dspy' | 
        
        
           | 
          
 | 
        
        
           | 
          module UniversalFlow | 
        
        
           | 
            # Collects request/response pairs so you can print them like the original gist. | 
        
        
           | 
            class CapturedRequests < DSPy::Events::BaseSubscriber | 
        
        
           | 
              Pair = Struct.new(:request, :response, :usage, :duration_ms, keyword_init: true) | 
        
        
           | 
          
 | 
        
        
           | 
              attr_reader :pairs | 
        
        
           | 
          
 | 
        
        
           | 
              def initialize | 
        
        
           | 
                super() | 
        
        
           | 
                @pairs = [] | 
        
        
           | 
                @pending = {} | 
        
        
           | 
                @usage_queue = [] | 
        
        
           | 
                subscribe | 
        
        
           | 
              end | 
        
        
           | 
          
 | 
        
        
           | 
              def subscribe | 
        
        
           | 
                add_subscription('universal_flow.call.started') { |_event, attrs| handle_start(attrs) } | 
        
        
           | 
                add_subscription('universal_flow.call.completed') { |_event, attrs| handle_complete(attrs) } | 
        
        
           | 
                add_subscription('universal_flow.call.failed') { |_event, attrs| handle_failed(attrs) } | 
        
        
           | 
                add_subscription('lm.tokens') { |_event, attrs| handle_usage(attrs) } | 
        
        
           | 
              end | 
        
        
           | 
          
 | 
        
        
           | 
              def print_requests | 
        
        
           | 
                puts '=== CAPTURED REQUESTS ===' | 
        
        
           | 
                pairs.each_with_index do |pair, index| | 
        
        
           | 
                  puts "Pair #{index + 1}:" | 
        
        
           | 
                  puts '  Request:' | 
        
        
           | 
                  puts "    Provider: #{pair.request[:provider]}" | 
        
        
           | 
                  puts "    Model: #{pair.request[:model]}" | 
        
        
           | 
                  puts "    Messages: #{pair.request[:messages]}" | 
        
        
           | 
                  puts "    Timestamp: #{pair.request[:timestamp]}" | 
        
        
           | 
                  puts '  Response:' | 
        
        
           | 
                  puts "    Content: #{pair.response[:content]}" | 
        
        
           | 
                  puts "    Usage: #{pair.usage}" | 
        
        
           | 
                  puts "    Timestamp: #{pair.response[:timestamp]}" | 
        
        
           | 
                  puts "  Duration: #{pair.duration_ms}ms" | 
        
        
           | 
                  puts | 
        
        
           | 
                end | 
        
        
           | 
              end | 
        
        
           | 
          
 | 
        
        
           | 
              private | 
        
        
           | 
          
 | 
        
        
           | 
              def handle_start(attrs) | 
        
        
           | 
                key = execution_key(attrs) | 
        
        
           | 
                @pending[key] = { | 
        
        
           | 
                  started_at: Time.now, | 
        
        
           | 
                  request: { | 
        
        
           | 
                    provider: nil, | 
        
        
           | 
                    model: nil, | 
        
        
           | 
                    messages: attrs[:params], | 
        
        
           | 
                    timestamp: Time.now | 
        
        
           | 
                  } | 
        
        
           | 
                } | 
        
        
           | 
              end | 
        
        
           | 
          
 | 
        
        
           | 
              def handle_complete(attrs) | 
        
        
           | 
                key = execution_key(attrs) | 
        
        
           | 
                entry = @pending.delete(key) || {} | 
        
        
           | 
                started_at = entry[:started_at] || Time.now | 
        
        
           | 
                finished_at = Time.now | 
        
        
           | 
                usage = next_usage | 
        
        
           | 
          
 | 
        
        
           | 
                request = entry[:request] || {} | 
        
        
           | 
                if usage | 
        
        
           | 
                  request[:provider] ||= usage[:provider] | 
        
        
           | 
                  request[:model] ||= usage[:model] | 
        
        
           | 
                end | 
        
        
           | 
          
 | 
        
        
           | 
                pair = Pair.new( | 
        
        
           | 
                  request: request, | 
        
        
           | 
                  response: { | 
        
        
           | 
                    content: attrs[:output], | 
        
        
           | 
                    timestamp: finished_at | 
        
        
           | 
                  }, | 
        
        
           | 
                  usage: usage && usage[:usage], | 
        
        
           | 
                  duration_ms: ((finished_at - started_at) * 1000).round(2) | 
        
        
           | 
                ) | 
        
        
           | 
          
 | 
        
        
           | 
                @pairs << pair | 
        
        
           | 
              end | 
        
        
           | 
          
 | 
        
        
           | 
              def handle_failed(attrs) | 
        
        
           | 
                key = execution_key(attrs) | 
        
        
           | 
                entry = @pending.delete(key) || {} | 
        
        
           | 
                started_at = entry[:started_at] || Time.now | 
        
        
           | 
                finished_at = Time.now | 
        
        
           | 
          
 | 
        
        
           | 
                pair = Pair.new( | 
        
        
           | 
                  request: entry[:request] || {}, | 
        
        
           | 
                  response: { | 
        
        
           | 
                    content: "ERROR: #{attrs[:error]}", | 
        
        
           | 
                    timestamp: finished_at | 
        
        
           | 
                  }, | 
        
        
           | 
                  usage: next_usage&.fetch(:usage, nil), | 
        
        
           | 
                  duration_ms: ((finished_at - started_at) * 1000).round(2) | 
        
        
           | 
                ) | 
        
        
           | 
          
 | 
        
        
           | 
                @pairs << pair | 
        
        
           | 
              end | 
        
        
           | 
          
 | 
        
        
           | 
              def handle_usage(attrs) | 
        
        
           | 
                usage_entry = { | 
        
        
           | 
                  provider: attrs['gen_ai.system'], | 
        
        
           | 
                  model: attrs['gen_ai.request.model'], | 
        
        
           | 
                  usage: { | 
        
        
           | 
                    input_tokens: attrs[:input_tokens], | 
        
        
           | 
                    output_tokens: attrs[:output_tokens], | 
        
        
           | 
                    total_tokens: attrs[:total_tokens] | 
        
        
           | 
                  }, | 
        
        
           | 
                  timestamp: Time.now | 
        
        
           | 
                } | 
        
        
           | 
          
 | 
        
        
           | 
                @usage_queue << usage_entry | 
        
        
           | 
              end | 
        
        
           | 
          
 | 
        
        
           | 
              def next_usage | 
        
        
           | 
                @usage_queue.shift | 
        
        
           | 
              end | 
        
        
           | 
          
 | 
        
        
           | 
              def execution_key(attrs) | 
        
        
           | 
                attrs[:execution_id] | 
        
        
           | 
              end | 
        
        
           | 
            end | 
        
        
           | 
          end |