Last active
July 13, 2016 05:00
-
-
Save pmashchak/b077a64e1760f4b52bf8a71a4cb7f831 to your computer and use it in GitHub Desktop.
HTTP request wrapper with retry logic and Manticore client integration + DynamoDB data class
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# HTTPRequest class is an http client for API calls services | |
# see usages bellow | |
class HTTPRequest | |
attr_accessor :client, :options, :exceptions, :api_context, :async | |
module ::Manticore | |
class ResponseCodeException < ManticoreException; end | |
end | |
class AsyncProxy | |
extend Forwardable | |
def_delegators :subject, :options, :set_default_headers, :background | |
attr_accessor :subject | |
def initialize(subject) | |
self.subject = subject | |
end | |
def get(options={}) | |
request(:get, options) | |
end | |
def post(options={}) | |
request(:post, options) | |
end | |
def request(method, options) | |
url = options.delete(:url) | |
oauth_url = options.delete(:oauth_url) || url | |
self.options.merge!(options) | |
set_default_headers(method, oauth_url) | |
background.send(method, url, self.options) | |
end | |
end | |
# API Options :api_context => Instance of API class | |
# Method mappings: | |
# [ endpoint ] => api_context | |
# [ consumer_key, consumer_secret, timeout ] => api_context.api_config | |
extend Forwardable | |
def_delegator :client, :background | |
def_delegator :api_context, :options, :api_options | |
def_delegator :api_options, :[], :api_option | |
def_delegator :api_config, :[], :api_config_get | |
def_delegators :api_context, :endpoint, :api_config | |
DEFAULT_RETRIES = 2 | |
DEFAULT_TIMEOUT = 5 | |
DEFAULT_EXCEPTIONS = [Manticore::Timeout, Manticore::SocketException, Manticore::ResponseCodeException, | |
Manticore::ClientProtocolException, Manticore::ResolutionFailure] | |
# options [ :fails_on_code, :timeout, :retries, :headers, :exceptions ] | |
# api_context options: [ :consumer_key, :consumer_secret, :timeout ] | |
def initialize(options={}) | |
self.api_context = options.delete(:api_context) | |
raise 'API not found' if self.api_context.blank? | |
self.options = options | |
self.options.merge!(timeout_options) | |
self.options[:headers] = options.fetch(:headers, {}) | |
self.exceptions = Array(options.delete(:exceptions)) + DEFAULT_EXCEPTIONS | |
self.client = Manticore::Client.new(options) | |
self.async = AsyncProxy.new(self) | |
end | |
def get(options={}) | |
request(:get, options) | |
end | |
def post(options={}) | |
request(:post, options) | |
end | |
private | |
def set_default_headers(method, url) | |
options[:headers]['Accept'] = 'application/json' | |
options[:headers]['Content-Type'] = 'application/json' | |
if oauth_options? | |
authorization = SimpleOAuth::Header.new(method, url, {}, oauth_options).to_s | |
options[:headers]['Authorization'] = authorization | |
end | |
end | |
def oauth_options? | |
api_config_get(:codebig_key).present? && | |
api_config_get(:codebig_secret).present? | |
end | |
def oauth_options | |
{ consumer_key: api_config_get(:codebig_key), | |
consumer_secret: api_config_get(:codebig_secret) } | |
end | |
def fails_on_code?(code) | |
if code != 404 && code >= 400 && options.fetch(:fails_on_code, true) | |
raise Manticore::ResponseCodeException, code | |
end | |
end | |
def request(method, options) | |
url = options.delete(:url) || endpoint | |
retries = options.delete(:retries) || options.fetch(:retries, DEFAULT_RETRIES) | |
self.options.merge!(options) | |
set_default_headers(method, url) | |
retried = 0 | |
begin | |
response = client.send(method, url, self.options) | |
fails_on_code?(response.code) | |
JSON.parse(response.body) rescue {} | |
rescue exception_matcher => e | |
if retried < retries | |
retried += 1 | |
if api_option(:logger).present? | |
params = { retries: retried, trace: e.inspect, action: method } | |
api_option(:logger).call(params) | |
end | |
sleep(rand(retried * 100) * 0.01) | |
retry | |
end | |
raise | |
end | |
end | |
def timeout_options | |
t = self.options.delete(:timeout) || api_config_get(:timeout) || DEFAULT_TIMEOUT | |
{ request_timeout: t, connect_timeout: t, socket_timeout: t } | |
end | |
def exception_matcher | |
exc = self.exceptions | |
matcher = Module.new | |
(class << matcher; self; end).class_eval do | |
define_method(:===) do |error| | |
exc.any? { |e| error.is_a?(e) } | |
end | |
end | |
matcher | |
end | |
end | |
# Usage for HTTPRequest | |
class XapiRequest | |
# Type of responses | |
# {"code":101, "message": "partner token expired"} | |
# {"code"=>0, "message"=>"OK"} alert, launch | |
# {"code"=>400, "message"=>"request parameter 'psid' is missing"} | |
# {"psid"=>"yo5MKBnUK7m8vWrNa%2BgTGo7JEpDRxGYkYdYh04mNcQ4%3D", "expires"=>"1452733646131"} | |
# Changes: | |
# Game launch => process_deeplink GET https://xapi-prod.codebig2.net/processdeeplink/@device_id.js | |
# Send alert => send_alert GET https://xapi-prod.codebig2.net/sendalert/@device_id.js | |
# XAPI login => login POST https://xapi-prod.codebig2.net/partnerlogin.js BODY { partnername: password: } | |
# New Method # Move screen => advance_screen same as #Game launch => process_deeplink | |
attr_accessor :client, :options, :params | |
def initialize(params={}) | |
self.params = params | |
self.options = { logger: self.params.delete(:logger), | |
codebig_key: Rails.configuration.xapi.codebig.prod_key, | |
codebig_secret: Rails.configuration.xapi.codebig.prod_secret } | |
self.client = HTTPRequest.new(api_context: self) | |
end | |
def login | |
url = xapi_base_url + Rails.configuration.xapi.codebig.prod_login_endpoint | |
params = { partnername: Rails.configuration.xapi.partner_name, | |
password: Rails.configuration.xapi.partner_password } | |
client.post(url: url, params: params, retries: 1) | |
end | |
def alert(title, message) | |
xapi_url = [ xapi_base_url, Rails.configuration.xapi.codebig.prod_send_alert_endpoint, "#{params[:device_id]}.js"].join('') | |
url = xapi_url + '?' + send_alert_params(title, message) | |
Rails.logger.info("[event=xapi_alert] [event_status=request] [url=#{url}]") | |
client.async.get(url: url, retries: 2) | |
end | |
end | |
# Pulsar data model used to connect to DynamoDB | |
class Pulsar | |
cattr_accessor :client | |
self.client = Aws::DynamoDB::Client.new(region: 'No', | |
credentials: Rails.configuration.api.pulsar.creds, | |
endpoint: Rails.configuration.api.pulsar.base, | |
ssl_verify_peer: false) | |
end | |
# Pair class example for retrieving data from Pulsar DB | |
# Usage of Pair class | |
logger = ->(params) { log_state(:error, event: 'ip_pairing_request', event_status: 'error_trace', | |
error_message: "Pulsar retry #{params[:retried].ordinalize} #{params[:trace]}", | |
error_code: 'XG-245',) } | |
@pair = Pair.where(ip_address: ip_address, logger: logger) | |
@pair = Pair.find_by_pairing_code(params[:pairing_code], logger: logger) | |
class Pair < Pulsar | |
include ActiveModel::Validations | |
class Collection | |
class NotValidItem < Exception; end | |
extend Forwardable | |
delegate %w(map each size detect blank? present?) => :collection | |
delegate %w(id app_id device_id comcast_session_id account_number) => :item | |
delegate %w(xbo_account_id zip3 ip_lease_expires auth_guid) => :item | |
delegate %w(stb_friendly_name game_id whitelisted hsi timestamp) => :item | |
def initialize(items) | |
self.collection = items | |
end | |
def collection? | |
size > 1 | |
end | |
def valid? | |
size > 0 | |
end | |
def can_pair_device? | |
size == 1 | |
end | |
def item | |
raise NotValidItem unless valid? | |
collection.first | |
end | |
def detect_device(id) | |
detect { |device| device.device_id == id } | |
end | |
def as_json(options={}) | |
super(options)['collection'] | |
end | |
private | |
attr_accessor :collection | |
end | |
TABLE_NAME = 'GamesAutoPair' | |
TIMEOUT_TIME = 2 | |
DEFAULT_RETRIES = 1 | |
EXCEPTIONS_LIST = [Errno::ECONNREFUSED, Timeout::Error] | |
attr_accessor :id, :app_id, :device_id, :comcast_session_id, :account_number | |
attr_accessor :xbo_account_id, :zip3, :ip_lease_expires, :auth_guid | |
attr_accessor :stb_friendly_name, :game_id, :whitelisted, :hsi, :timestamp | |
cattr_accessor :conditions, instance_writer: false | |
alias_method :can_pair_device?, :valid? | |
self.conditions = {} | |
validates :id, :app_id, :device_id, :comcast_session_id, :account_number, presence: true | |
validates :xbo_account_id, :zip3, :ip_lease_expires, :auth_guid, :stb_friendly_name, :timestamp, presence: true | |
validate :expiration_time | |
def initialize(attributes = {}) | |
self.id = attributes['id'] | |
self.device_id = attributes['deviceId'] | |
self.app_id = attributes['appId'] | |
self.comcast_session_id = attributes['comcastSessionId'] | |
self.account_number = attributes['billingAccountId'] | |
self.auth_guid = attributes['cstAuthGuid'] | |
self.xbo_account_id = attributes['xboAccountId'] | |
self.zip3 = attributes['zip3'] | |
self.stb_friendly_name = attributes['stbFriendlyName'] || 'X1 Device' | |
self.ip_lease_expires = attributes['ipLeaseExpires'] | |
self.game_id = attributes['gameId'] | |
self.whitelisted = attributes['whitelisted'] | |
self.hsi = attributes['hsi'] | |
self.timestamp = attributes['timestamp'] | |
end | |
def delete | |
client.delete_item(table_name: TABLE_NAME, key: { 'id' => id }) | |
end | |
def expired? | |
return true if Time.at(timestamp.to_i / 1000) < 5.minutes.ago | |
return true if conditions.key?(:ip_address) && Time.at(ip_lease_expires.to_i / 1000) < Time.now | |
end | |
def collection? | |
false | |
end | |
def as_json(options={}) | |
super(options.merge(only: %w(device_id stb_friendly_name), include_root_in_json: false)) | |
end | |
private | |
def expiration_time | |
errors.add(:base, 'record is expired') if expired? | |
end | |
# Class methods | |
class << self | |
def where(conditions={}) | |
self.conditions = conditions | |
items = find(conditions[:ip_address]) | |
if items.present? | |
id = items.delete('id') | |
items = items.values.map do |attributes| | |
new(attributes.merge('id' => id)) | |
end.select(&:valid?) | |
end | |
Collection.new(Array(items)) | |
end | |
def find_by_pairing_code(code, conditions={}) | |
self.conditions = conditions.merge(pairing_code: code) | |
item = find(code) | |
if item.present? | |
attributes = item.values.first.merge(item.slice('id')) | |
new(attributes) | |
end | |
end | |
def all(eager=false) | |
records = [] | |
result = client.scan(table_name: TABLE_NAME) | |
items = result.items | |
if eager # by default Pulsar returns 16 items | |
while result.last_evaluated_key.present? do | |
result = client.scan(table_name: TABLE_NAME, exclusive_start_key: { id: result.last_evaluated_key['id'] }) | |
items += result.items | |
end | |
end | |
if items.present? | |
items.map do |item| | |
id = item['id'] | |
data = JSON.parse(item['data']) | |
data.values.map do |attrs| | |
records << new(attrs.merge('id' => id)) | |
end | |
end | |
end | |
records | |
end | |
private | |
def find(id) | |
retried = 0 | |
item = begin | |
Timeout::timeout(TIMEOUT_TIME) do | |
client.get_item(table_name: TABLE_NAME, key: { 'id' => id }).item | |
end | |
rescue exception_matcher => e | |
if retried < DEFAULT_RETRIES | |
retried += 1 | |
if logger = self.conditions[:logger] | |
logger.call(retried: retried, trace: e.inspect) | |
end | |
sleep(rand(retried * 100) * 0.01) | |
retry | |
end | |
raise | |
end | |
if item.present? | |
begin | |
JSON.parse(item['data']).merge(item.slice('id')) | |
rescue JSON::ParserError => e | |
Rails.logger.error("[event=pulsar_data_parse] [event_status=error] [error=#{e.inspect}] [attributes=#{item}]") | |
{} | |
end | |
end | |
end | |
def exception_matcher | |
matcher = Module.new | |
(class << matcher; self; end).class_eval do | |
define_method(:===) do |error| | |
EXCEPTIONS_LIST.any? { |e| error.is_a?(e) } | |
end | |
end | |
matcher | |
end | |
end | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment