Skip to content

Instantly share code, notes, and snippets.

@ishan123456789
Last active February 20, 2025 13:34
Show Gist options
  • Save ishan123456789/b37c508c41ec17ea7e07092b91adaa75 to your computer and use it in GitHub Desktop.
Save ishan123456789/b37c508c41ec17ea7e07092b91adaa75 to your computer and use it in GitHub Desktop.
Match SNS, SQS subscription existence
# To run this script:
# ruby 'match_sns_sqs_subscription.rb' --profile log-stg --region eu-west-1 --countries vn --env staging
# ruby 'match_sns_sqs_subscription.rb' --profile log-prd --region eu-west-1 --countries gv-ba,gv-pt
require 'aws-sdk-sns'
require 'aws-sdk-sqs'
require 'json'
require 'optparse'
require 'logger'
# Define queue templates
QUEUE_TEMPLATES = {
geo_object_updated: "{{environment}}-{{region_short}}_geo_object_updated_{{country}}_dashboard_queue",
courier_location_changed: "{{environment}}-{{region_short}}_courier_location_changed_mobile_{{country}}_dashboard_queue",
delivery_dispatched: "{{environment}}-{{region_short}}_delivery_dispatched_{{country}}_dashboard_queue",
delivery_undispatched: "{{environment}}-{{region_short}}_delivery_undispatched_{{country}}_dashboard_queue",
early_arrival_deliveries: "{{environment}}-{{region_short}}_early_arrival_deliveries_{{country}}_dashboard_queue",
geofence_health_check_failure: "{{environment}}_{{region_short}}_geofence_health_check_failure_{{country}}_dashboard",
route_updated: "{{environment}}-{{region_short}}_route_updated_{{country}}_dashboard_queue",
courier_balance_limit: "{{environment}}-{{region_short}}_courier_balance_limit_{{country}}_dashboard_queue",
vehicle_type_updated: "{{environment}}-{{region_short}}_vehicle_type_updated_{{country}}_dashboard_queue",
bag_type_updated: "{{environment}}-{{region_short}}_bag_type_updated_{{country}}_dashboard_queue",
issue_updated: "{{environment}}-{{region_short}}_issue_updated_{{country}}_dashboard_queue",
issue_created: "{{environment}}-{{region_short}}_issue_created_{{country}}_dashboard_queue",
shift_updated: "{{environment}}-{{region_short}}_shift_updated_{{country}}_dashboard_queue",
issue_creation_param_updated: "{{environment}}-{{region_short}}_issue_creation_param_updated_{{country}}_dashboard_queue",
issue_rule_template_updated: "{{environment}}-{{region_short}}_issue_rule_template_updated_{{country}}_dashboard_queue",
courier_all_status_changed: "{{environment}}-{{region_short}}_courier_all_status_changed_{{country}}_dashboard_queue",
courier_current_shift_updated: "{{environment}}-{{region_short}}_courier_current_shift_updated_{{country}}_dashboard_queue",
courier_state_transition_updated: "{{environment}}-{{region_short}}_courier_state_transition_updated_{{country}}_dashboard_queue",
send_to_vendor: "send-to-vendor-response_{{country}}",
courier_updated: "{{environment}}-{{region_short}}_courier_updated_{{country}}_dashboard_queue",
delay_updated: "{{environment}}-{{region_short}}_delay_updated_{{country}}_dashboard_queue",
courier_delivery_distance_updated: "{{environment}}-{{region_short}}_courier_delivery_distance_updated_{{country}}_dashboard_queue",
send_to_vendor_recommendation: "{{environment}}-{{region_short}}_send_to_vendor_recommendation_{{country}}_dashboard_queue"
}
# Parse command-line arguments
options = {}
OptionParser.new do |opts|
opts.banner = "Usage: ruby validate_sns.rb [options]"
opts.on("-r", "--env env", "AWS Region (e.g., production|staging)") do |env|
options[:env] = env
end
opts.on("-r", "--region REGION", "AWS Region (e.g., eu-west-1)") do |region|
options[:region] = region
end
opts.on("-p", "--profile PROFILE", "AWS Profile (e.g., default)") do |profile|
options[:profile] = profile
end
opts.on("-c", "--countries COUNTRIES", "Comma-separated list of country codes (e.g., gv-ba,gv-pt)") do |countries|
options[:countries] = countries.split(',')
end
end.parse!
# Ensure required options are provided
unless options[:region] && options[:profile] && options[:countries]
puts "Usage: ruby validate_sns.rb --region REGION --profile PROFILE --countries COUNTRIES"
exit 1
end
# Initialize logger
logger = Logger.new(STDOUT)
logger.level = Logger::INFO
# Populate the environment and region prefix dynamically
environment = options[:env]
region_short = options[:region].split('-').first # Use the first part of the region (e.g., 'eu' from 'eu-west-1')
# Generate all queue names
def generate_queue_names(environment, region_short, countries, templates)
queue_names = []
countries.each do |country|
templates.each_value do |template|
queue_names << template
.gsub('{{environment}}', environment)
.gsub('{{region_short}}', region_short)
.gsub('{{country}}', country)
end
end
queue_names
end
# Fetch SNS subscriptions for a topic
def fetch_subscriptions_for_topic(sns_client, topic_arn, logger)
subscriptions = []
next_token = nil
loop do
response = sns_client.list_subscriptions_by_topic(
topic_arn: topic_arn,
next_token: next_token
)
subscriptions.concat(response.subscriptions)
next_token = response.next_token
break unless next_token
end
subscriptions
end
# Validate SQS subscriptions match SNS topics
def validate_sqs_sns_relationship(sns_client, sqs_client, queue_name, queue_arn, logger)
begin
# Fetch queue attributes
attributes = sqs_client.get_queue_attributes(
queue_url: queue_name,
attribute_names: ['Policy']
).attributes
if attributes['Policy']
policy = JSON.parse(attributes['Policy'])
# Extract SNS topics from the policy using `aws:SourceArn`
referenced_topics = policy['Statement']
.flat_map { |statement| statement.dig('Condition', 'ArnEquals', 'aws:SourceArn') }
.compact
referenced_topics.each do |topic_arn|
logger.info("SNS topic found in policy: #{topic_arn}")
logger.info("Validating SNS topic: #{topic_arn} for SQS queue: #{queue_name}")
# Fetch subscriptions for the topic
subscriptions = fetch_subscriptions_for_topic(sns_client, topic_arn, logger)
# Check if the queue ARN is subscribed to the topic
subscribed_endpoints = subscriptions.select { |sub| sub.protocol == 'sqs' }.map(&:endpoint)
if subscribed_endpoints.include?(queue_arn)
logger.info("Validation OK: Queue #{queue_name} is properly subscribed to #{topic_arn}.")
else
logger.warn("---------ALERT-------------")
logger.error("Validation failed: Queue #{queue_name} references #{topic_arn} but is not subscribed.")
logger.warn("---------ALERT-------------")
end
end
else
logger.warn("No policy found for queue: #{queue_name}. Skipping validation.")
end
rescue StandardError => e
logger.error("Error validating SQS and SNS relationship for #{queue_name}: #{e.message}")
end
end
# Main validation logic
def validate_queues(region, profile, queue_names, logger)
sns_client = Aws::SNS::Client.new(region: region, profile: profile)
sqs_client = Aws::SQS::Client.new(region: region, profile: profile)
queue_names.each do |queue_name|
begin
logger.info("Validating queue: #{queue_name}")
# Fetch queue URL
response = sqs_client.list_queues(queue_name_prefix: queue_name)
queue_urls = response.queue_urls
if queue_urls.empty?
logger.warn("---------ALERT-------------")
logger.warn("Queue not found: #{queue_name}")
logger.warn("---------ALERT-------------")
next
end
queue_url = queue_urls.first
logger.info("Queue found: #{queue_url}")
# Fetch queue ARN
queue_arn = sqs_client.get_queue_attributes(
queue_url: queue_url,
attribute_names: ['QueueArn']
).attributes['QueueArn']
# Validate SQS subscriptions with SNS topics
validate_sqs_sns_relationship(sns_client, sqs_client, queue_url, queue_arn, logger)
rescue StandardError => e
logger.error("Error processing queue #{queue_name}: #{e.message}")
end
end
end
# Execute the validation
queue_names = generate_queue_names(environment, region_short, options[:countries], QUEUE_TEMPLATES)
validate_queues(options[:region], options[:profile], queue_names, logger)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment