Skip to content

Instantly share code, notes, and snippets.

@8parth
Last active August 9, 2017 06:52
Show Gist options
  • Select an option

  • Save 8parth/fa543111092e54ff1a597ca8ae88c91b to your computer and use it in GitHub Desktop.

Select an option

Save 8parth/fa543111092e54ff1a597ca8ae88c91b to your computer and use it in GitHub Desktop.
pagination with dynamodb
# frozen_string_literal: true
class AWSConfigs
include Singleton
attr_reader :ddb_client, :sqs_client, :iot_client, :configs, :ddb_configs
def initialize
@configs ||= {
access_key_id: Rails.application.secrets.aws_access_key_id,
secret_access_key: Rails.application.secrets.aws_secret_access_key,
region: Rails.application.secrets.aws_region
}
@ddb_configs ||=
begin
ddb_configs = configs.dup
case Rails.env
when 'test', 'development'
ddb_configs[:endpoint] = 'http://localhost:8000'
ddb_configs[:logger] = Rails.logger
when 'staging'
ddb_configs[:logger] = Rails.logger
else
ddb_configs
end
ddb_configs
end
@ddb_client ||= Aws::DynamoDB::Client.new(ddb_configs)
@sqs_client ||= Aws::SQS::Client.new(configs)
@iot_client ||= Aws::IoT::Client.new(configs)
end
end
# frozen_string_literal: true
class DynamoDBAdapter
OPERATION_TYPES = %w[query scan].freeze
class << self
def update_provisioned_throughput(table_name, primry_index_options: {}, secondary_index_options: {})
update_params = {
table_name: table_name
}
update_params[:provisioned_throughput] = primry_index_options[:provisioned_throughput] if primry_index_options.present?
if secondary_index_options.present?
update_params[:global_secondary_index_updates] = [
{
update: secondary_index_options
}
]
end
begin
resp = AWSConfigs.instance.ddb_client.update_table(update_params)
return resp.successful?
rescue Aws::DynamoDB::Errors::ServiceError => e
TaggedLogger.error(e.message, self.class.name)
return false
end
end
def put_item(table_name, item_hash)
response = AWSConfigs.instance.ddb_client.put_item(
table_name: table_name,
return_consumed_capacity: 'TOTAL',
return_values: 'ALL_OLD',
item: item_hash
)
return response.successful?
rescue => _e
return false
end
def put_item!(table_name, item_hash)
response = AWSConfigs.instance.ddb_client.put_item(
table_name: table_name,
return_consumed_capacity: 'TOTAL',
return_values: 'ALL_OLD',
item: item_hash
)
return response.successful?
rescue => e
TaggedLogger.error(e.message, self.class.name)
raise e
end
def batch_write_item(table_name, items)
batch_items =
items.map do |item|
{ put_request: { item: item } }
end
params = {
request_items: {
table_name.to_s => batch_items
},
return_consumed_capacity: 'TOTAL'
}
AWSConfigs.instance.ddb_client.batch_write_item(params)
rescue Aws::DynamoDB::Errors::ServiceError => e
TaggedLogger.error(e.message, 'error', self.class.name)
false
end
def query(table_name, primary_key_name, primary_key_value, options = {}, range_options = {})
sort_order = options[:sort_order].present? && %w[desc descending].include?(options[:sort_order].downcase) ? false : true
begin
query_params = {
table_name: table_name,
expression_attribute_names: {
'#id' => primary_key_name
},
expression_attribute_values: {
':id' => primary_key_value
},
key_condition_expression: '#id = :id'
}
query_params[:scan_index_forward] = sort_order
if range_options.present?
query_params[:expression_attribute_names]['#range_key'] = range_options[:range_key_name]
case range_options[:operator_type].downcase
when 'between'
query_params[:expression_attribute_values][':range_val1'] = range_options[:range_val1]
query_params[:expression_attribute_values][':range_val2'] = range_options[:range_val2]
query_params[:key_condition_expression] = '#id = :id AND #range_key BETWEEN :range_val1 AND :range_val2'
when '=', '<', '<=', '>=', '>'
query_params[:expression_attribute_values][':range_val'] = range_options[:range_val]
query_params[:key_condition_expression] = "#id = :id AND #range_key #{range_options[:operator_type]} :range_val"
else
raise Exception, 'Invalid Query Operator'
end
end
if options[:index_name].present?
query_params[:index_name] = options[:index_name]
end
if options[:filters].present?
options[:filters].each_with_index do |filter_hash, index|
query_params[:expression_attribute_names]["#filter_key_#{index}"] = filter_hash[:key_name]
case filter_hash[:operator_type].downcase
when 'between'
query_params[:expression_attribute_values][":filter_val1_#{index}"] = filter_hash[:value1]
query_params[:expression_attribute_values][":filter_val2_#{index}"] = filter_hash[:value2]
filter_expression = "#filter_key_#{index} BETWEEN :filter_val1_#{index} AND :filter_val2_#{index}"
when '=', '<', '<=', '>=', '>'
query_params[:expression_attribute_values][":filter_val_#{index}"] = filter_hash[:value]
filter_expression = "#filter_key_#{index} #{filter_hash[:operator_type]} :filter_val_#{index}"
else
raise Exception, 'Invalid Query Operator'
end
if index == 0
query_params[:filter_expression] = filter_expression
else
query_params[:filter_expression] += " OR #{filter_expression}"
end
end
end
max_records = options[:limit].present? && options[:limit].positive? ? options[:limit] : nil
batch_size = options[:batch_size].present? ? options[:batch_size] : nil
query_params[:exclusive_start_key] = options[:last_evaluated_key] if options[:last_evaluated_key].present?
return query_paginated(query_params, max_records, batch_size)
rescue => e
raise e
end
end
def scan(table_name, options = {})
query_params = { table_name: table_name }
max_records = options[:limit].present? && options[:limit].positive? ? options[:limit] : nil
batch_size = options[:batch_size].present? ? options[:batch_size] : nil
begin
return query_paginated(query_params, max_records, batch_size, 'scan')
rescue => e
TaggedLogger.error("DDB scan Error: #{e.message}", name)
return []
end
end
def all(table_name)
scan(table_name)
end
private
def query_paginated(query_params, max_records, batch_size, operation_type = 'query')
raise Exception, "Invalid Operation Type, #{operation_type}" unless OPERATION_TYPES.include?(operation_type)
query_params[:limit] = max_records if max_records.present?
query_params[:limit] = batch_size if batch_size.present?
items = []
key_after_limit = nil
begin
loop do
result = AWSConfigs.instance.ddb_client.public_send(operation_type, query_params)
items << result.items
items.flatten!
if max_records.present?
if items.length < max_records
if result.last_evaluated_key.present?
query_params[:exclusive_start_key] = result.last_evaluated_key
else
break
end
else
# limit applied
key_after_limit = result.last_evaluated_key
break
end
else
# no limit given to data
if result.last_evaluated_key.present?
query_params[:exclusive_start_key] = result.last_evaluated_key
else
break
end
end
end
return items.flatten, key_after_limit
rescue Aws::DynamoDB::Errors::ServiceError => e
TaggedLogger.log("DDB #{operation_type} Error: #{e.message}", 'error', name)
return []
end
end
def query_without_pagination(query_params, limit = 0)
query_params[:limit] = limit if limit.positive?
AWSConfigs.instance.ddb_client.query(query_params).items
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment