Created
July 8, 2025 12:05
-
-
Save Rishav159/3f2ab03daf322d1afb833ad3d4f7e022 to your computer and use it in GitHub Desktop.
Utility functions to use Sentry HTTP API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require 'net/http' | |
| require 'json' | |
| require 'uri' | |
| require 'thread' | |
| require 'queue' | |
| class SentryUtils | |
| DEFAULT_CONFIG = { | |
| auth_token: ENV['SENTRY_API_TOKEN'], | |
| org_slug: ENV['SENTRY_ORG_SLUG'] || 'apolloio', | |
| base_url: 'https://sentry.io/api/0', | |
| max_threads: 5, | |
| batch_size: 100, | |
| retry_attempts: 3, | |
| delay_between_requests: 0.05, | |
| limit: 100 | |
| }.freeze | |
| attr_reader :config, :http_pool, :mutex | |
| def initialize(config = {}) | |
| @config = DEFAULT_CONFIG.merge(config) | |
| @http_pool = {} | |
| @mutex = Mutex.new | |
| validate_config! | |
| end | |
| # === Issue Fetching Methods === | |
| def fetch_all_issues(query: nil, output_file: 'sentry_issues.json') | |
| query_param = query ? URI.encode_www_form_component(query) : URI.encode_www_form_component("assigned:#native-data") | |
| base_url = "#{config[:base_url]}/organizations/#{config[:org_slug]}/issues/?query=#{query_param}&limit=#{config[:limit]}" | |
| issues = [] | |
| next_url = base_url | |
| puts "π Fetching issues from Sentry..." | |
| loop do | |
| page_data, next_link = fetch_issues_page(next_url) | |
| break unless page_data | |
| puts "π₯ Loaded #{page_data.size} issues" | |
| issues.concat(page_data) | |
| break unless next_link | |
| next_url = next_link | |
| end | |
| save_to_file(issues, output_file) | |
| puts "β Saved #{issues.size} issues to #{output_file}" | |
| issues | |
| end | |
| def fetch_issues_page(url) | |
| uri = URI.parse(url) | |
| request = Net::HTTP::Get.new(uri) | |
| request["Authorization"] = "Bearer #{config[:auth_token]}" | |
| response = get_http_client(uri).request(request) | |
| unless response.is_a?(Net::HTTPSuccess) | |
| puts "β Request failed: #{response.code} #{response.message}" | |
| return [nil, nil] | |
| end | |
| page_data = JSON.parse(response.body) | |
| next_link = parse_next_link(response['Link']) | |
| [page_data, next_link] | |
| rescue StandardError => e | |
| puts "β Error fetching page: #{e.message}" | |
| [nil, nil] | |
| end | |
| # === Issue Enrichment Methods === | |
| def enrich_issues_simple(input_file: 'sentry_issues.json', output_file: 'enriched_sentry_issues.json') | |
| issues = load_from_file(input_file) | |
| return unless issues | |
| enriched_issues = [] | |
| total_issues = issues.size | |
| puts "π Processing #{total_issues} issues sequentially..." | |
| issues.each_with_index do |issue, index| | |
| show_progress(index + 1, total_issues) if (index + 1) % 10 == 0 || index == total_issues - 1 | |
| enriched_issue = enrich_single_issue(issue) | |
| enriched_issues << enriched_issue | |
| sleep(config[:delay_between_requests]) if config[:delay_between_requests] > 0 | |
| end | |
| save_to_file(enriched_issues, output_file) | |
| print_enrichment_summary(enriched_issues, output_file) | |
| enriched_issues | |
| end | |
| def enrich_issues_concurrent(input_file: 'sentry_issues.json', output_file: 'enriched_sentry_issues.json', checkpoint_file: 'sentry_enrichment_checkpoint.json') | |
| issues = load_from_file(input_file) | |
| return unless issues | |
| # Check for existing checkpoint | |
| checkpoint = load_checkpoint(checkpoint_file) | |
| if checkpoint | |
| puts "π Resuming from checkpoint..." | |
| enriched_issues = checkpoint['enriched_issues'] | |
| start_index = checkpoint['processed_count'] | |
| remaining_issues = issues[start_index..-1] | |
| else | |
| puts "π Starting fresh processing..." | |
| enriched_issues = [] | |
| start_index = 0 | |
| remaining_issues = issues | |
| end | |
| return enriched_issues if remaining_issues.empty? | |
| # Setup concurrent processing | |
| work_queue = Queue.new | |
| result_queue = Queue.new | |
| # Fill work queue | |
| remaining_issues.each_with_index do |issue, relative_index| | |
| work_queue.push([issue, start_index + relative_index]) | |
| end | |
| # Add sentinel values | |
| config[:max_threads].times { work_queue.push(nil) } | |
| puts "π Starting concurrent processing with #{config[:max_threads]} threads..." | |
| # Start worker threads | |
| threads = [] | |
| config[:max_threads].times do |i| | |
| threads << Thread.new do | |
| worker_thread(work_queue, result_queue, i + 1) | |
| end | |
| end | |
| # Collect results | |
| results = collect_enrichment_results(result_queue, remaining_issues.size, enriched_issues, start_index, issues.size, checkpoint_file) | |
| # Wait for threads to complete | |
| threads.each(&:join) | |
| # Final sorting and merging | |
| sorted_results = results.sort_by { |_, index| index } | |
| final_enriched_issues = enriched_issues + sorted_results.map { |item, _| item } | |
| save_to_file(final_enriched_issues, output_file) | |
| print_enrichment_summary(final_enriched_issues, output_file) | |
| # Clean up checkpoint | |
| File.delete(checkpoint_file) if File.exist?(checkpoint_file) | |
| puts "π§Ή Checkpoint file cleaned up" | |
| final_enriched_issues | |
| end | |
| # === Issue Processing Methods === | |
| def process_issues(input_file: 'enriched_sentry_issues.json', output_file: 'processed_sentry_issues.json') | |
| issues = load_from_file(input_file) | |
| return unless issues | |
| puts "π Processing #{issues.length} issues..." | |
| issues.each do |issue| | |
| area = get_value_from_tags(issue["tags"], "area") | |
| path_owner = get_value_from_tags(issue["tags"], "pathname_owner") | |
| issue["area"] = area | |
| issue["path_owner"] = path_owner | |
| issue["last_assigned"] = get_last_assigned_from_activity(issue["activity"]) | |
| puts "β οΈ Issue #{issue["id"]} has no last assigned" if issue["last_assigned"].nil? | |
| # Clean up unnecessary fields | |
| issue.delete("tags") | |
| issue.delete("seenBy") | |
| issue.delete("stats") | |
| end | |
| save_to_file(issues, output_file) | |
| puts "β Processed issues saved to #{output_file}" | |
| issues | |
| end | |
| def group_issues_by_area(issues) | |
| grouped_issues = {} | |
| issues.each do |issue| | |
| area = issue["area"] | |
| grouped_issues[area] ||= [] | |
| grouped_issues[area] << issue | |
| end | |
| grouped_issues | |
| end | |
| # === Team Assignment Methods === | |
| def get_teams | |
| endpoint = "/organizations/#{config[:org_slug]}/teams/" | |
| teams = make_sentry_request('GET', endpoint) | |
| teams || [] | |
| end | |
| def get_team_id(team_name) | |
| teams = get_teams | |
| team = teams.find { |t| t['slug'] == team_name } | |
| team ? team['id'] : nil | |
| end | |
| def assign_issue(issue_id, assignee_id, assignee_type = 'team') | |
| endpoint = "/issues/#{issue_id}/" | |
| assigned_to = assignee_type == "team" ? "team:#{assignee_id}" : "user:#{assignee_id}" | |
| body = { assignedTo: assigned_to } | |
| result = make_sentry_request('PUT', endpoint, body) | |
| if result | |
| puts " β Assigned issue #{issue_id} to #{assignee_type} #{assignee_id}" | |
| true | |
| else | |
| puts " β Failed to assign issue #{issue_id}" | |
| false | |
| end | |
| end | |
| def unassign_issue(issue_id) | |
| endpoint = "/issues/#{issue_id}/" | |
| body = { | |
| assignedTo: "", | |
| assignedBy: "assignee_selector" | |
| } | |
| result = make_sentry_request('PUT', endpoint, body) | |
| if result | |
| puts " β Unassigned issue #{issue_id}" | |
| true | |
| else | |
| puts " β Failed to unassign issue #{issue_id}" | |
| false | |
| end | |
| end | |
| def assign_issues_to_teams(input_file: 'processed_sentry_issues.json', output_file: 'final_sentry_issues.json') | |
| issues = load_from_file(input_file) | |
| return unless issues | |
| finished_assignments = get_finished_assignments(output_file) | |
| total_stats = { success: 0, failed: 0, unmapped: 0 } | |
| puts "π Processing #{issues.length} issues for team assignment..." | |
| issues.each_with_index do |issue, index| | |
| result = process_single_assignment(issue, finished_assignments) | |
| total_stats[result] += 1 | |
| if index % 20 == 0 | |
| puts "π Processed #{index} issues" | |
| save_to_file(issues, output_file) | |
| end | |
| end | |
| save_to_file(issues, output_file) | |
| print_assignment_summary(total_stats) | |
| issues | |
| end | |
| # === Utility Methods === | |
| def fetch_issue_tags(issue_id, retries = nil) | |
| retries ||= config[:retry_attempts] | |
| uri = URI.parse("#{config[:base_url]}/issues/#{issue_id}/tags/") | |
| request = Net::HTTP::Get.new(uri) | |
| request["Authorization"] = "Bearer #{config[:auth_token]}" | |
| begin | |
| response = get_http_client(uri).request(request) | |
| if response.is_a?(Net::HTTPSuccess) | |
| JSON.parse(response.body) | |
| elsif response.code.to_i == 429 # Rate limited | |
| puts "β³ Rate limited for issue #{issue_id}, retrying in 1 second..." | |
| sleep(1) | |
| retries > 0 ? fetch_issue_tags(issue_id, retries - 1) : [] | |
| else | |
| puts "β οΈ Failed to fetch tags for issue #{issue_id}: #{response.code}" | |
| [] | |
| end | |
| rescue StandardError => e | |
| puts "β οΈ Error fetching tags for issue #{issue_id}: #{e.message}" | |
| retries > 0 ? fetch_issue_tags(issue_id, retries - 1) : [] | |
| end | |
| end | |
| def fetch_issue_activities(issue_id, retries = nil) | |
| retries ||= config[:retry_attempts] | |
| uri = URI.parse("#{config[:base_url]}/issues/#{issue_id}/activities/") | |
| request = Net::HTTP::Get.new(uri) | |
| request["Authorization"] = "Bearer #{config[:auth_token]}" | |
| begin | |
| response = get_http_client(uri).request(request) | |
| if response.is_a?(Net::HTTPSuccess) | |
| JSON.parse(response.body) | |
| elsif response.code.to_i == 429 # Rate limited | |
| puts "β³ Rate limited for issue activities #{issue_id}, retrying in 1 second..." | |
| sleep(1) | |
| retries > 0 ? fetch_issue_activities(issue_id, retries - 1) : [] | |
| else | |
| puts "β οΈ Failed to fetch activities for issue #{issue_id}: #{response.code}" | |
| [] | |
| end | |
| rescue StandardError => e | |
| puts "β οΈ Error fetching activities for issue #{issue_id}: #{e.message}" | |
| retries > 0 ? fetch_issue_activities(issue_id, retries - 1) : [] | |
| end | |
| end | |
| def fetch_issue_details(issue_id) | |
| uri = URI.parse("#{config[:base_url]}/issues/#{issue_id}/") | |
| request = Net::HTTP::Get.new(uri) | |
| request["Authorization"] = "Bearer #{config[:auth_token]}" | |
| response = get_http_client(uri).request(request) | |
| if response.is_a?(Net::HTTPSuccess) | |
| JSON.parse(response.body) | |
| else | |
| puts "β οΈ Failed to fetch details for issue #{issue_id}: #{response.code}" | |
| nil | |
| end | |
| rescue StandardError => e | |
| puts "β οΈ Error fetching details for issue #{issue_id}: #{e.message}" | |
| nil | |
| end | |
| # === File I/O Methods === | |
| def load_from_file(filename) | |
| unless File.exist?(filename) | |
| puts "β Error: #{filename} not found!" | |
| return nil | |
| end | |
| begin | |
| JSON.parse(File.read(filename)) | |
| rescue JSON::ParserError => e | |
| puts "β Error parsing JSON in #{filename}: #{e.message}" | |
| nil | |
| end | |
| end | |
| def save_to_file(data, filename) | |
| File.write(filename, JSON.pretty_generate(data)) | |
| rescue StandardError => e | |
| puts "β Error saving to #{filename}: #{e.message}" | |
| false | |
| end | |
| private | |
| def validate_config! | |
| unless config[:auth_token] | |
| raise "SENTRY_API_TOKEN is required. Set it as an environment variable or pass it in config." | |
| end | |
| unless config[:org_slug] | |
| raise "SENTRY_ORG_SLUG is required. Set it as an environment variable or pass it in config." | |
| end | |
| end | |
| def get_http_client(uri) | |
| mutex.synchronize do | |
| key = "#{uri.hostname}:#{uri.port}" | |
| http_pool[key] ||= begin | |
| http = Net::HTTP.new(uri.hostname, uri.port) | |
| http.use_ssl = true | |
| http.keep_alive_timeout = 30 | |
| http.read_timeout = 30 | |
| http.open_timeout = 10 | |
| http | |
| end | |
| end | |
| end | |
| def parse_next_link(link_header) | |
| return nil unless link_header | |
| link_header.split(',').each do |part| | |
| if part.include?('rel="next"') && part.include?('results="true"') | |
| match = part.match(/<([^>]+)>/) | |
| return match[1] if match | |
| end | |
| end | |
| nil | |
| end | |
| def enrich_single_issue(issue) | |
| issue_id = issue['id'] | |
| enriched_issue = issue.dup | |
| # Fetch tags if needed | |
| if issue['tags'].nil? || issue['tags'].empty? | |
| tags = fetch_issue_tags(issue_id) | |
| enriched_issue['tags'] = tags | |
| enriched_issue['tags_fetched_from_api'] = true | |
| else | |
| enriched_issue['tags_fetched_from_api'] = false | |
| end | |
| # Fetch detailed information if needed | |
| unless enriched_issue['details_fetched_from_api'] | |
| detailed_issue = fetch_issue_details(issue_id) | |
| if detailed_issue | |
| enriched_issue.merge!(detailed_issue) | |
| enriched_issue['details_fetched_from_api'] = true | |
| end | |
| end | |
| enriched_issue['processed_at'] = Time.now.iso8601 | |
| enriched_issue | |
| end | |
| def worker_thread(work_queue, result_queue, thread_id) | |
| while (work_item = work_queue.pop) | |
| issue, index = work_item | |
| issue_id = issue['id'] | |
| enriched_issue = issue.dup | |
| # Fetch tags if needed | |
| if issue['tags'].nil? || issue['tags'].empty? | |
| tags = fetch_issue_tags(issue_id) | |
| enriched_issue['tags'] = tags | |
| enriched_issue['tags_fetched_from_api'] = true | |
| else | |
| enriched_issue['tags_fetched_from_api'] = false | |
| end | |
| enriched_issue['processed_at'] = Time.now.iso8601 | |
| enriched_issue['processed_by_thread'] = thread_id | |
| result_queue.push([enriched_issue, index]) | |
| sleep(config[:delay_between_requests]) if config[:delay_between_requests] > 0 | |
| end | |
| end | |
| def collect_enrichment_results(result_queue, total_to_process, enriched_issues, start_index, total_issues, checkpoint_file) | |
| results = [] | |
| processed_count = 0 | |
| while processed_count < total_to_process | |
| enriched_issue, original_index = result_queue.pop | |
| results << [enriched_issue, original_index] | |
| processed_count += 1 | |
| show_progress(processed_count, total_to_process) if processed_count % 10 == 0 || processed_count == total_to_process | |
| # Save checkpoint periodically | |
| if processed_count % config[:batch_size] == 0 | |
| sorted_results = results.sort_by { |_, index| index } | |
| current_enriched = enriched_issues + sorted_results.map { |item, _| item } | |
| save_checkpoint(current_enriched, start_index + processed_count, total_issues, checkpoint_file) | |
| end | |
| end | |
| results | |
| end | |
| def save_checkpoint(enriched_issues, processed_count, total_count, checkpoint_file) | |
| checkpoint_data = { | |
| processed_count: processed_count, | |
| total_count: total_count, | |
| timestamp: Time.now.iso8601, | |
| enriched_issues: enriched_issues | |
| } | |
| save_to_file(checkpoint_data, checkpoint_file) | |
| puts "πΎ Checkpoint saved: #{processed_count}/#{total_count} issues processed" | |
| end | |
| def load_checkpoint(checkpoint_file) | |
| return nil unless File.exist?(checkpoint_file) | |
| begin | |
| data = load_from_file(checkpoint_file) | |
| puts "π Found checkpoint: #{data['processed_count']}/#{data['total_count']} issues previously processed" | |
| data | |
| rescue StandardError => e | |
| puts "β οΈ Error loading checkpoint: #{e.message}" | |
| nil | |
| end | |
| end | |
| def get_value_from_tags(tags, key) | |
| return "unknown" if tags.nil? || tags.empty? | |
| tag = tags.find { |t| t["key"] == key } | |
| if tag && tag["topValues"] && !tag["topValues"].empty? | |
| first_top_value = tag["topValues"][0] | |
| return first_top_value["value"] if first_top_value && first_top_value["value"] | |
| end | |
| "unknown" | |
| end | |
| def get_last_assigned_from_activity(activity) | |
| return nil if activity.nil? || activity.empty? | |
| # Find assignment by [email protected] to native-data team | |
| rishav_assignment_index = nil | |
| activity.each_with_index do |act, index| | |
| if act["type"] == "assigned" && | |
| act["user"] && | |
| act["user"]["email"] == "[email protected]" && | |
| act["data"] && | |
| act["data"]["assigneeName"] == "native-data" | |
| rishav_assignment_index = index | |
| break | |
| end | |
| end | |
| return nil if rishav_assignment_index.nil? | |
| # Look for previous assignments | |
| ((rishav_assignment_index + 1)...activity.length).each do |index| | |
| act = activity[index] | |
| if act["type"] == "assigned" | |
| return { | |
| "assignee" => act["data"]["assignee"], | |
| "assigneeEmail" => act["data"]["assigneeEmail"], | |
| "assigneeName" => act["data"]["assigneeName"], | |
| "assigneeType" => act["data"]["assigneeType"], | |
| "dateCreated" => act["dateCreated"] | |
| } | |
| end | |
| end | |
| nil | |
| end | |
| def make_sentry_request(method, endpoint, body = nil) | |
| uri = URI("#{config[:base_url]}#{endpoint}") | |
| case method.upcase | |
| when 'GET' | |
| request = Net::HTTP::Get.new(uri) | |
| when 'POST' | |
| request = Net::HTTP::Post.new(uri) | |
| when 'PUT' | |
| request = Net::HTTP::Put.new(uri) | |
| when 'PATCH' | |
| request = Net::HTTP::Patch.new(uri) | |
| else | |
| raise "Unsupported HTTP method: #{method}" | |
| end | |
| request['Authorization'] = "Bearer #{config[:auth_token]}" | |
| request['Content-Type'] = 'application/json' | |
| request.body = body.to_json if body | |
| response = get_http_client(uri).request(request) | |
| unless response.code.to_i.between?(200, 299) | |
| puts "β οΈ API request failed: #{response.code} - #{response.body}" | |
| return nil | |
| end | |
| JSON.parse(response.body) rescue response.body | |
| rescue StandardError => e | |
| puts "β οΈ Error making request: #{e.message}" | |
| nil | |
| end | |
| def get_finished_assignments(filename) | |
| return {} unless File.exist?(filename) | |
| issues = load_from_file(filename) | |
| return {} unless issues | |
| finished_issues = {} | |
| issues.each do |issue| | |
| finished_issues[issue["id"]] = true if issue["assignment_finished_on_sentry"] | |
| end | |
| finished_issues | |
| end | |
| def process_single_assignment(issue, finished_assignments) | |
| assignee_id = issue.dig("last_assigned", "assignee") | |
| assignee_type = issue.dig("last_assigned", "assigneeType") | |
| assignee_name = issue.dig("last_assigned", "assigneeName") | |
| if assignee_id | |
| if finished_assignments[issue["id"]] | |
| issue["assignment_finished_on_sentry"] = true | |
| puts " β Issue #{issue["id"]} already assigned to #{assignee_id}" | |
| return :success | |
| end | |
| if assignee_name == "data-catalog" | |
| puts " β Issue #{issue["id"]} is supposed to be with native-data team" | |
| issue["assignment_finished_on_sentry"] = true | |
| return :success | |
| end | |
| if assign_issue(issue["id"], assignee_id, assignee_type) | |
| issue["assignment_finished_on_sentry"] = true | |
| return :success | |
| elsif unassign_issue(issue["id"]) | |
| issue["assignment_finished_on_sentry"] = true | |
| return :success | |
| else | |
| return :failed | |
| end | |
| else | |
| if unassign_issue(issue["id"]) | |
| issue["assignment_finished_on_sentry"] = true | |
| return :success | |
| else | |
| return :failed | |
| end | |
| end | |
| end | |
| def show_progress(current, total) | |
| percentage = ((current * 100.0) / total).round(1) | |
| puts "π Progress: #{current}/#{total} (#{percentage}%)" | |
| end | |
| def print_enrichment_summary(enriched_issues, output_file) | |
| issues_with_tags = enriched_issues.count { |issue| issue['tags'] && !issue['tags'].empty? } | |
| api_fetched_tags = enriched_issues.count { |issue| issue['tags_fetched_from_api'] == true } | |
| puts "\nπ Enrichment Summary:" | |
| puts "β Successfully saved #{enriched_issues.size} enriched issues to #{output_file}" | |
| puts "π Total issues: #{enriched_issues.size}" | |
| puts "π·οΈ Issues with tags: #{issues_with_tags}" | |
| puts "π Tags fetched from API: #{api_fetched_tags}" | |
| puts "π¦ Tags already present: #{issues_with_tags - api_fetched_tags}" | |
| if enriched_issues.any? | |
| total_tags = enriched_issues.sum { |issue| issue['tags']&.size || 0 } | |
| puts "π’ Total tags: #{total_tags}" | |
| puts "π Average tags per issue: #{(total_tags.to_f / enriched_issues.size).round(2)}" | |
| end | |
| end | |
| def print_assignment_summary(stats) | |
| puts "\n" + "="*50 | |
| puts "π― ASSIGNMENT SUMMARY" | |
| puts "="*50 | |
| puts "β Successfully assigned: #{stats[:success]} issues" | |
| puts "β Failed to assign: #{stats[:failed]} issues" | |
| puts "β Unmapped areas: #{stats[:unmapped]} issues" | |
| puts "π Total processed: #{stats.values.sum} issues" | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment