Last active
August 9, 2017 06:52
-
-
Save 8parth/fa543111092e54ff1a597ca8ae88c91b to your computer and use it in GitHub Desktop.
pagination with dynamodb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # frozen_string_literal: true | |
| class AWSConfigs | |
| include Singleton | |
| attr_reader :ddb_client, :sqs_client, :iot_client, :configs, :ddb_configs | |
| def initialize | |
| @configs ||= { | |
| access_key_id: Rails.application.secrets.aws_access_key_id, | |
| secret_access_key: Rails.application.secrets.aws_secret_access_key, | |
| region: Rails.application.secrets.aws_region | |
| } | |
| @ddb_configs ||= | |
| begin | |
| ddb_configs = configs.dup | |
| case Rails.env | |
| when 'test', 'development' | |
| ddb_configs[:endpoint] = 'http://localhost:8000' | |
| ddb_configs[:logger] = Rails.logger | |
| when 'staging' | |
| ddb_configs[:logger] = Rails.logger | |
| else | |
| ddb_configs | |
| end | |
| ddb_configs | |
| end | |
| @ddb_client ||= Aws::DynamoDB::Client.new(ddb_configs) | |
| @sqs_client ||= Aws::SQS::Client.new(configs) | |
| @iot_client ||= Aws::IoT::Client.new(configs) | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # frozen_string_literal: true | |
| class DynamoDBAdapter | |
| OPERATION_TYPES = %w[query scan].freeze | |
| class << self | |
| def update_provisioned_throughput(table_name, primry_index_options: {}, secondary_index_options: {}) | |
| update_params = { | |
| table_name: table_name | |
| } | |
| update_params[:provisioned_throughput] = primry_index_options[:provisioned_throughput] if primry_index_options.present? | |
| if secondary_index_options.present? | |
| update_params[:global_secondary_index_updates] = [ | |
| { | |
| update: secondary_index_options | |
| } | |
| ] | |
| end | |
| begin | |
| resp = AWSConfigs.instance.ddb_client.update_table(update_params) | |
| return resp.successful? | |
| rescue Aws::DynamoDB::Errors::ServiceError => e | |
| TaggedLogger.error(e.message, self.class.name) | |
| return false | |
| end | |
| end | |
| def put_item(table_name, item_hash) | |
| response = AWSConfigs.instance.ddb_client.put_item( | |
| table_name: table_name, | |
| return_consumed_capacity: 'TOTAL', | |
| return_values: 'ALL_OLD', | |
| item: item_hash | |
| ) | |
| return response.successful? | |
| rescue => _e | |
| return false | |
| end | |
| def put_item!(table_name, item_hash) | |
| response = AWSConfigs.instance.ddb_client.put_item( | |
| table_name: table_name, | |
| return_consumed_capacity: 'TOTAL', | |
| return_values: 'ALL_OLD', | |
| item: item_hash | |
| ) | |
| return response.successful? | |
| rescue => e | |
| TaggedLogger.error(e.message, self.class.name) | |
| raise e | |
| end | |
| def batch_write_item(table_name, items) | |
| batch_items = | |
| items.map do |item| | |
| { put_request: { item: item } } | |
| end | |
| params = { | |
| request_items: { | |
| table_name.to_s => batch_items | |
| }, | |
| return_consumed_capacity: 'TOTAL' | |
| } | |
| AWSConfigs.instance.ddb_client.batch_write_item(params) | |
| rescue Aws::DynamoDB::Errors::ServiceError => e | |
| TaggedLogger.error(e.message, 'error', self.class.name) | |
| false | |
| end | |
| def query(table_name, primary_key_name, primary_key_value, options = {}, range_options = {}) | |
| sort_order = options[:sort_order].present? && %w[desc descending].include?(options[:sort_order].downcase) ? false : true | |
| begin | |
| query_params = { | |
| table_name: table_name, | |
| expression_attribute_names: { | |
| '#id' => primary_key_name | |
| }, | |
| expression_attribute_values: { | |
| ':id' => primary_key_value | |
| }, | |
| key_condition_expression: '#id = :id' | |
| } | |
| query_params[:scan_index_forward] = sort_order | |
| if range_options.present? | |
| query_params[:expression_attribute_names]['#range_key'] = range_options[:range_key_name] | |
| case range_options[:operator_type].downcase | |
| when 'between' | |
| query_params[:expression_attribute_values][':range_val1'] = range_options[:range_val1] | |
| query_params[:expression_attribute_values][':range_val2'] = range_options[:range_val2] | |
| query_params[:key_condition_expression] = '#id = :id AND #range_key BETWEEN :range_val1 AND :range_val2' | |
| when '=', '<', '<=', '>=', '>' | |
| query_params[:expression_attribute_values][':range_val'] = range_options[:range_val] | |
| query_params[:key_condition_expression] = "#id = :id AND #range_key #{range_options[:operator_type]} :range_val" | |
| else | |
| raise Exception, 'Invalid Query Operator' | |
| end | |
| end | |
| if options[:index_name].present? | |
| query_params[:index_name] = options[:index_name] | |
| end | |
| if options[:filters].present? | |
| options[:filters].each_with_index do |filter_hash, index| | |
| query_params[:expression_attribute_names]["#filter_key_#{index}"] = filter_hash[:key_name] | |
| case filter_hash[:operator_type].downcase | |
| when 'between' | |
| query_params[:expression_attribute_values][":filter_val1_#{index}"] = filter_hash[:value1] | |
| query_params[:expression_attribute_values][":filter_val2_#{index}"] = filter_hash[:value2] | |
| filter_expression = "#filter_key_#{index} BETWEEN :filter_val1_#{index} AND :filter_val2_#{index}" | |
| when '=', '<', '<=', '>=', '>' | |
| query_params[:expression_attribute_values][":filter_val_#{index}"] = filter_hash[:value] | |
| filter_expression = "#filter_key_#{index} #{filter_hash[:operator_type]} :filter_val_#{index}" | |
| else | |
| raise Exception, 'Invalid Query Operator' | |
| end | |
| if index == 0 | |
| query_params[:filter_expression] = filter_expression | |
| else | |
| query_params[:filter_expression] += " OR #{filter_expression}" | |
| end | |
| end | |
| end | |
| max_records = options[:limit].present? && options[:limit].positive? ? options[:limit] : nil | |
| batch_size = options[:batch_size].present? ? options[:batch_size] : nil | |
| query_params[:exclusive_start_key] = options[:last_evaluated_key] if options[:last_evaluated_key].present? | |
| return query_paginated(query_params, max_records, batch_size) | |
| rescue => e | |
| raise e | |
| end | |
| end | |
| def scan(table_name, options = {}) | |
| query_params = { table_name: table_name } | |
| max_records = options[:limit].present? && options[:limit].positive? ? options[:limit] : nil | |
| batch_size = options[:batch_size].present? ? options[:batch_size] : nil | |
| begin | |
| return query_paginated(query_params, max_records, batch_size, 'scan') | |
| rescue => e | |
| TaggedLogger.error("DDB scan Error: #{e.message}", name) | |
| return [] | |
| end | |
| end | |
| def all(table_name) | |
| scan(table_name) | |
| end | |
| private | |
| def query_paginated(query_params, max_records, batch_size, operation_type = 'query') | |
| raise Exception, "Invalid Operation Type, #{operation_type}" unless OPERATION_TYPES.include?(operation_type) | |
| query_params[:limit] = max_records if max_records.present? | |
| query_params[:limit] = batch_size if batch_size.present? | |
| items = [] | |
| key_after_limit = nil | |
| begin | |
| loop do | |
| result = AWSConfigs.instance.ddb_client.public_send(operation_type, query_params) | |
| items << result.items | |
| items.flatten! | |
| if max_records.present? | |
| if items.length < max_records | |
| if result.last_evaluated_key.present? | |
| query_params[:exclusive_start_key] = result.last_evaluated_key | |
| else | |
| break | |
| end | |
| else | |
| # limit applied | |
| key_after_limit = result.last_evaluated_key | |
| break | |
| end | |
| else | |
| # no limit given to data | |
| if result.last_evaluated_key.present? | |
| query_params[:exclusive_start_key] = result.last_evaluated_key | |
| else | |
| break | |
| end | |
| end | |
| end | |
| return items.flatten, key_after_limit | |
| rescue Aws::DynamoDB::Errors::ServiceError => e | |
| TaggedLogger.log("DDB #{operation_type} Error: #{e.message}", 'error', name) | |
| return [] | |
| end | |
| end | |
| def query_without_pagination(query_params, limit = 0) | |
| query_params[:limit] = limit if limit.positive? | |
| AWSConfigs.instance.ddb_client.query(query_params).items | |
| end | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment