Last active
February 20, 2025 13:34
-
-
Save ishan123456789/b37c508c41ec17ea7e07092b91adaa75 to your computer and use it in GitHub Desktop.
Match SNS, SQS subscription existence
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# To run this script: | |
# ruby 'match_sns_sqs_subscription.rb' --profile log-stg --region eu-west-1 --countries vn --env staging | |
# ruby 'match_sns_sqs_subscription.rb' --profile log-prd --region eu-west-1 --countries gv-ba,gv-pt | |
require 'aws-sdk-sns' | |
require 'aws-sdk-sqs' | |
require 'json' | |
require 'optparse' | |
require 'logger' | |
# Define queue templates | |
QUEUE_TEMPLATES = { | |
geo_object_updated: "{{environment}}-{{region_short}}_geo_object_updated_{{country}}_dashboard_queue", | |
courier_location_changed: "{{environment}}-{{region_short}}_courier_location_changed_mobile_{{country}}_dashboard_queue", | |
delivery_dispatched: "{{environment}}-{{region_short}}_delivery_dispatched_{{country}}_dashboard_queue", | |
delivery_undispatched: "{{environment}}-{{region_short}}_delivery_undispatched_{{country}}_dashboard_queue", | |
early_arrival_deliveries: "{{environment}}-{{region_short}}_early_arrival_deliveries_{{country}}_dashboard_queue", | |
geofence_health_check_failure: "{{environment}}_{{region_short}}_geofence_health_check_failure_{{country}}_dashboard", | |
route_updated: "{{environment}}-{{region_short}}_route_updated_{{country}}_dashboard_queue", | |
courier_balance_limit: "{{environment}}-{{region_short}}_courier_balance_limit_{{country}}_dashboard_queue", | |
vehicle_type_updated: "{{environment}}-{{region_short}}_vehicle_type_updated_{{country}}_dashboard_queue", | |
bag_type_updated: "{{environment}}-{{region_short}}_bag_type_updated_{{country}}_dashboard_queue", | |
issue_updated: "{{environment}}-{{region_short}}_issue_updated_{{country}}_dashboard_queue", | |
issue_created: "{{environment}}-{{region_short}}_issue_created_{{country}}_dashboard_queue", | |
shift_updated: "{{environment}}-{{region_short}}_shift_updated_{{country}}_dashboard_queue", | |
issue_creation_param_updated: "{{environment}}-{{region_short}}_issue_creation_param_updated_{{country}}_dashboard_queue", | |
issue_rule_template_updated: "{{environment}}-{{region_short}}_issue_rule_template_updated_{{country}}_dashboard_queue", | |
courier_all_status_changed: "{{environment}}-{{region_short}}_courier_all_status_changed_{{country}}_dashboard_queue", | |
courier_current_shift_updated: "{{environment}}-{{region_short}}_courier_current_shift_updated_{{country}}_dashboard_queue", | |
courier_state_transition_updated: "{{environment}}-{{region_short}}_courier_state_transition_updated_{{country}}_dashboard_queue", | |
send_to_vendor: "send-to-vendor-response_{{country}}", | |
courier_updated: "{{environment}}-{{region_short}}_courier_updated_{{country}}_dashboard_queue", | |
delay_updated: "{{environment}}-{{region_short}}_delay_updated_{{country}}_dashboard_queue", | |
courier_delivery_distance_updated: "{{environment}}-{{region_short}}_courier_delivery_distance_updated_{{country}}_dashboard_queue", | |
send_to_vendor_recommendation: "{{environment}}-{{region_short}}_send_to_vendor_recommendation_{{country}}_dashboard_queue" | |
} | |
# Parse command-line arguments | |
options = {} | |
OptionParser.new do |opts| | |
opts.banner = "Usage: ruby validate_sns.rb [options]" | |
opts.on("-r", "--env env", "AWS Region (e.g., production|staging)") do |env| | |
options[:env] = env | |
end | |
opts.on("-r", "--region REGION", "AWS Region (e.g., eu-west-1)") do |region| | |
options[:region] = region | |
end | |
opts.on("-p", "--profile PROFILE", "AWS Profile (e.g., default)") do |profile| | |
options[:profile] = profile | |
end | |
opts.on("-c", "--countries COUNTRIES", "Comma-separated list of country codes (e.g., gv-ba,gv-pt)") do |countries| | |
options[:countries] = countries.split(',') | |
end | |
end.parse! | |
# Ensure required options are provided | |
unless options[:region] && options[:profile] && options[:countries] | |
puts "Usage: ruby validate_sns.rb --region REGION --profile PROFILE --countries COUNTRIES" | |
exit 1 | |
end | |
# Initialize logger | |
logger = Logger.new(STDOUT) | |
logger.level = Logger::INFO | |
# Populate the environment and region prefix dynamically | |
environment = options[:env] | |
region_short = options[:region].split('-').first # Use the first part of the region (e.g., 'eu' from 'eu-west-1') | |
# Generate all queue names | |
def generate_queue_names(environment, region_short, countries, templates) | |
queue_names = [] | |
countries.each do |country| | |
templates.each_value do |template| | |
queue_names << template | |
.gsub('{{environment}}', environment) | |
.gsub('{{region_short}}', region_short) | |
.gsub('{{country}}', country) | |
end | |
end | |
queue_names | |
end | |
# Fetch SNS subscriptions for a topic | |
def fetch_subscriptions_for_topic(sns_client, topic_arn, logger) | |
subscriptions = [] | |
next_token = nil | |
loop do | |
response = sns_client.list_subscriptions_by_topic( | |
topic_arn: topic_arn, | |
next_token: next_token | |
) | |
subscriptions.concat(response.subscriptions) | |
next_token = response.next_token | |
break unless next_token | |
end | |
subscriptions | |
end | |
# Validate SQS subscriptions match SNS topics | |
def validate_sqs_sns_relationship(sns_client, sqs_client, queue_name, queue_arn, logger) | |
begin | |
# Fetch queue attributes | |
attributes = sqs_client.get_queue_attributes( | |
queue_url: queue_name, | |
attribute_names: ['Policy'] | |
).attributes | |
if attributes['Policy'] | |
policy = JSON.parse(attributes['Policy']) | |
# Extract SNS topics from the policy using `aws:SourceArn` | |
referenced_topics = policy['Statement'] | |
.flat_map { |statement| statement.dig('Condition', 'ArnEquals', 'aws:SourceArn') } | |
.compact | |
referenced_topics.each do |topic_arn| | |
logger.info("SNS topic found in policy: #{topic_arn}") | |
logger.info("Validating SNS topic: #{topic_arn} for SQS queue: #{queue_name}") | |
# Fetch subscriptions for the topic | |
subscriptions = fetch_subscriptions_for_topic(sns_client, topic_arn, logger) | |
# Check if the queue ARN is subscribed to the topic | |
subscribed_endpoints = subscriptions.select { |sub| sub.protocol == 'sqs' }.map(&:endpoint) | |
if subscribed_endpoints.include?(queue_arn) | |
logger.info("Validation OK: Queue #{queue_name} is properly subscribed to #{topic_arn}.") | |
else | |
logger.warn("---------ALERT-------------") | |
logger.error("Validation failed: Queue #{queue_name} references #{topic_arn} but is not subscribed.") | |
logger.warn("---------ALERT-------------") | |
end | |
end | |
else | |
logger.warn("No policy found for queue: #{queue_name}. Skipping validation.") | |
end | |
rescue StandardError => e | |
logger.error("Error validating SQS and SNS relationship for #{queue_name}: #{e.message}") | |
end | |
end | |
# Main validation logic | |
def validate_queues(region, profile, queue_names, logger) | |
sns_client = Aws::SNS::Client.new(region: region, profile: profile) | |
sqs_client = Aws::SQS::Client.new(region: region, profile: profile) | |
queue_names.each do |queue_name| | |
begin | |
logger.info("Validating queue: #{queue_name}") | |
# Fetch queue URL | |
response = sqs_client.list_queues(queue_name_prefix: queue_name) | |
queue_urls = response.queue_urls | |
if queue_urls.empty? | |
logger.warn("---------ALERT-------------") | |
logger.warn("Queue not found: #{queue_name}") | |
logger.warn("---------ALERT-------------") | |
next | |
end | |
queue_url = queue_urls.first | |
logger.info("Queue found: #{queue_url}") | |
# Fetch queue ARN | |
queue_arn = sqs_client.get_queue_attributes( | |
queue_url: queue_url, | |
attribute_names: ['QueueArn'] | |
).attributes['QueueArn'] | |
# Validate SQS subscriptions with SNS topics | |
validate_sqs_sns_relationship(sns_client, sqs_client, queue_url, queue_arn, logger) | |
rescue StandardError => e | |
logger.error("Error processing queue #{queue_name}: #{e.message}") | |
end | |
end | |
end | |
# Execute the validation | |
queue_names = generate_queue_names(environment, region_short, options[:countries], QUEUE_TEMPLATES) | |
validate_queues(options[:region], options[:profile], queue_names, logger) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment