Skip to content

Instantly share code, notes, and snippets.

@afilgueira
Last active April 30, 2023 20:54
Show Gist options
  • Save afilgueira/e1cf048770f0d8951a57926d0d5c1bed to your computer and use it in GitHub Desktop.
Save afilgueira/e1cf048770f0d8951a57926d0d5c1bed to your computer and use it in GitHub Desktop.
Example of an aws-sdk-cloudwatchlogs client wrapper for EB

This is a simple client wrapper for the aws-sdk-cloudwatchlogs gem. The idea here is to be able to perform certain queries to know which ips were used by a user on the last X days. This is very useful for fraud detection reasons (so if a user made requests from several ips in a short period of time or some of them are fraudulent, we can do something about it).

The cloudwatch logs flow is simple:

  • You run the query, and receive a query_id
  • You wait a bit
  • After X time you can get the partial results using the query_id
  • After Y time the query is over and you get the complete results

This example service populates a field in the user profile, in an admin panel. When the admin needs it, they click on a button to trigger a new query. Queries and their results are cached in redis for X time, so we let the admin know when the check was performed since they probably won't need to run it again for a few days. The info is presented in a "CSV friendly" format so it can be displayed in a table or even exported to be checked against 3rd party services.

Besides my example service uses only the cached version of the results, this client can be used to always get the results in real time. For simplicity reasons I used for this example $cloudwatch_log and $redis_client as global variables, and they allow me to connect to the cloudwatch gem client and to redis.

Also I included a spec

module CloudwatchLogs
class Client
EXPIRY_IN_SECONDS = Settings.redis_cloudwatch_logs.expiry_in_seconds
KEY_PREFIX = Settings.redis_cloudwatch_logs.key_prefix
class << self
def start_query(options)
$cloudwatch_logs.start_query(options).query_id
end
def get_query_results(query_id)
response = $cloudwatch_logs.get_query_results(query_id: query_id)
{ status: response.status, results: parsed_query_results(response), timestamp: Time.now.to_i }.with_indifferent_access
end
def get_cached_query_results(query_id)
key = "#{KEY_PREFIX}/#{query_id}"
cached_result = $redis_client.get(key)
return JSON.parse(cached_result) if cached_result.present?
result = get_query_results(query_id)
if result[:status] == 'Complete'
# Let's avoid storing partial results
$redis_client.set(key, result.to_json, ex: EXPIRY_IN_SECONDS)
end
result
end
def generate_options(log_group_name, start_time, end_time, query_string, limit = 1000)
{
log_group_name: log_group_name,
start_time: start_time,
end_time: end_time,
query_string: query_string,
limit: limit
}
end
# The idea of using this format is being able to display a table or write a CSV easily
# So it will go from:
# [
# [ { field: 'header1', value: 'value 11'}, { field: 'header2', value: 'value 21'} ],
# [ { field: 'header1', value: 'value 12'}, { field: 'header2', value: 'value 22'} ]
# ]
#
# To:
# [
# [ header 1, header 2 ]
# [ value 11, value 21 ]
# [ value 12, value 22 ]
# ]
def parsed_query_results(response)
headers = response.results.first&.map(&:field) # Headers
fields = response.results.map { |array| array.map(&:value) } # Fields
result = []
result += [headers] if headers.present?
result += fields
result
end
end
end
end
require "rails_helper"
RSpec.describe CloudwatchLogs::Client, type: :client do
let(:query_id) { 'query_id' }
let(:query_status) { 'Complete' }
let(:response) { double(:response, status: query_status, results: results) }
let(:results) do
[
[ logs_struct_for('ip', '192.168.1.1'), logs_struct_for('user_id', '1'), logs_struct_for('count', '10') ],
[ logs_struct_for('ip', '192.168.2.1'), logs_struct_for('user_id', '2'), logs_struct_for('count', '20') ],
[ logs_struct_for('ip', '192.168.2.1'), logs_struct_for('user_id', '1'), logs_struct_for('count', '30') ],
]
end
let(:parsed_results) do
[
['ip', 'user_id', 'count'],
['192.168.1.1', '1', '10'],
['192.168.2.1', '2', '20'],
['192.168.2.1', '1', '30'],
]
end
let(:query_results) { { status: query_status, results: parsed_results, timestamp: Time.now.to_i }.with_indifferent_access }
around do |example|
Timecop.freeze(Time.current, &example)
end
# Warning: use only `scan_each` on controled environments. Never in prod
before(:each) { $redis_client.scan_each(match: "#{described_class::KEY_PREFIX}*") { |key| redis_object.del(key) } }
describe '#start_query' do
let(:options) { { options: '' } }
let(:query_response) { double(:response, query_id: query_id) }
it 'sends the options to the original client' do
expect($cloudwatch_logs).to receive(:start_query).with(options).and_return(query_response)
expect(described_class.start_query(options)).to eq(query_id)
end
end
describe '#get_query_results' do
it 'queries the original client and parses the results' do
expect($cloudwatch_logs).to receive(:get_query_results).with(query_id: query_id).and_return(response)
expect(described_class.get_query_results(query_id)).to eq(query_results)
end
end
describe '#get_cached_query_results' do
let(:cache_key) { "#{described_class::KEY_PREFIX}/#{query_id}" }
context 'when it is not cached yet' do
it 'caches the results if the status is "Complete"' do
expect($redis_client.get(cache_key)).to eq(nil)
expect($cloudwatch_logs).to receive(:get_query_results).with(query_id: query_id).and_return(response)
expect(described_class.get_cached_query_results(query_id)).to eq(query_results)
expect($redis_client.get(cache_key)).to eq(query_results.to_json)
end
context 'when the status is "Running"' do
let(:query_status) { 'Running' }
it 'does not cache the results' do
expect($redis_client.get(cache_key)).to eq(nil)
expect($cloudwatch_logs).to receive(:get_query_results).with(query_id: query_id).and_return(response)
expect(described_class.get_cached_query_results(query_id)).to eq(query_results)
expect($redis_client.get(cache_key)).to eq(nil)
end
end
end
context 'when it is already cached' do
it 'returns the cached result' do
$redis_general_purpose.set(cache_key, query_results.to_json)
expect($cloudwatch_logs).not_to receive(:get_query_results)
expect(described_class.get_cached_query_results(query_id)).to eq(query_results)
expect($redis_client.get(cache_key)).to eq(query_results.to_json)
end
end
end
def logs_struct_for(field, value)
double('ResultField', field: field, value: value)
end
end
module Users
class ExampleService
attr_reader :user
EXPIRY_IN_SECONDS = Settings.redis_cloudwatch_logs.expiry_in_seconds
KEY_PREFIX = Settings.redis_cloudwatch_logs.key_prefix
def initialize(user)
@user = user
end
def ips_info
if cached_query_id.present?
query_id = JSON.parse(cached_query_id)['query_id']
client.get_cached_query_results(query_id)
end
end
def refresh
query_id = client.start_query(options)
$redis_client.set(key, { 'query_id' => query_id }.to_json, ex: EXPIRY_IN_SECONDS)
end
def cached_query_id
@cached_query_id ||= $redis_client.get(key)
end
def client
CloudwatchLogs::Client
end
private
def key
"#{KEY_PREFIX}/#{user.id}"
end
def options
env = Rails.env
log_group = "/our/logs-#{env}.log"
query = <<~HEREDOC
stats count(*) as requests by params.ip as ip
| sort requests desc
| limit 1000
| filter user.id = #{user.id}
HEREDOC
client.generate_options(log_group, 1.week.ago.to_i, 1.second.ago.to_i, query)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment