Last active
July 10, 2018 14:47
-
-
Save pboling/5834177 to your computer and use it in GitHub Desktop.
Facebook API Rate Limit Throttler using Sidekiq, does not execute the job inside the lock, to maintain some semblance of performance, just marks it in a counter, which other jobs from the same queue and using the same token will also update, and which will be throttled. Jobs from other queues will not be able to bust the lock until the timer run…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Mixin to (i.e. include in) any worker class that does FB API calls and should be throttled. | |
module FacebookThrottle | |
def perform_throttled(*args, &block) | |
options = args.extract_options! | |
user = User.find_by_fb_uid(options[:fb_uid]) | |
if user | |
if !user.valid_facebook_token? # A bitwise flag managed by flag_shih_tzu gem | |
puts "Skipping #{self.class} #{user.fb_uid}: Invalid Oauth Token for #{user}" | |
return false | |
end | |
source = self.class.get_sidekiq_options['queue'] | |
if !user.get_oauth_lock!(source) | |
puts "Requeue #{self.class} #{user.fb_uid}: Oauth Token Locked for #{user} by #{user.oauth_locked_by} until #{user.oauth_locked_until} with #{user.oauth_lock_counter}" | |
self.requeue_myself(*args) | |
return false | |
end | |
begin | |
yield user | |
rescue Koala::Facebook::ClientError => e | |
#'Koala::Facebook::ClientError: type: OAuthException, code: 4, message: (#4) Application request limit reached [HTTP 403]' | |
user.oauth_lock_out! | |
puts "Requeue #{self.class} #{user.fb_uid}: #{e.class} #{e.message} for #{user} by #{user.oauth_locked_by} until #{user.oauth_locked_until} with #{user.oauth_lock_counter}" | |
self.requeue_myself(*args) | |
end | |
end | |
end | |
def requeue_myself(*args) | |
self.class.perform_in(15.minutes, *args) | |
end | |
def queue_source | |
self.class.get_sidekiq_options['queue'] | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AddOauthLockToUsers < ActiveRecord::Migration | |
def change | |
## These do not need to be indexed | |
add_column :users, :oauth_locked_at, :datetime | |
add_column :users, :oauth_locked_by, :string | |
add_column :users, :oauth_lock_counter, :integer, default: 0, null: false | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Example of one such worker class which makes FB API calls and needs to be throttled. | |
# Throttling is per queue / block of time / token | |
# When the internal limit is reached *before facebook shuts you down* the jobs past the limit are requeued to# the future. | |
class ImportFacebookProfilePhotos | |
include Sidekiq::Worker | |
sidekiq_options queue: :photo | |
include FacebookThrottle | |
def perform(*args) | |
perform_throttled(*args, &block) do |user| | |
# Transaction locked user object! | |
# Only executed if perform_throttled hits a yield, which will only happen if it can get a lock. | |
# If it fails to get a lock, the job will be requeued for 15 minutes later. | |
# Assumption: | |
# Only makes one API call! | |
# If it makes more then use the insert an 'api_count' into the args and | |
# handle it in the throttle code in the User class. | |
user.background_photo_import | |
end | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The meat of the throttler. | |
class User | |
# ... More class ... snip | |
LOCK_FOR_AT_LEAST = 700.seconds | |
INTERNAL_RATE_LIMIT = 250 # requests, Facebook will shut you down at 600 reqs / 600 secs / token | |
LOCKOUT_FOR = 10.minutes | |
def get_oauth_lock!(source) | |
source = source.to_s | |
self.with_lock do | |
# This block is called within a transaction, | |
# user record is now locked, preventing updates from other parallel processes. | |
# 1. Has the token never been locked? | |
# Then we set the lock. | |
if self.oauth_locked_at.nil? | |
puts "#{self} Token Never Locked - Set Lock for #{source}!" | |
self.oauth_set_lock!(source) | |
return true | |
end | |
# 2. Is the token unlocked? | |
# Then we set the lock. | |
if !self.oauth_lock_current? | |
puts "#{self} Token last used by #{self.oauth_locked_by} is unlocked - Set Lock for #{source}!" | |
self.oauth_set_lock!(source) | |
return true | |
else | |
# 3. Is the token locked by same source right now? | |
if self.oauth_locked_by == source | |
if self.oauth_internal_limit_reached? | |
# 4 If we have reached out internal limit, then we must stop | |
puts "#{self} Token Locked by #{source} - Internal Limit Reached..." | |
return false | |
else | |
# 4 If we have *NOT* reached out internal limit, then we can proceed | |
puts "#{self} Token Locked by #{source} - Update Counter!" | |
self.oauth_update_lock! | |
return true | |
end | |
else | |
# 4. Is the token locked by another source right now? | |
# Then we can do nothing. | |
puts "#{self} Token Locked by #{self.oauth_locked_by} prevents #{source} work." | |
return false | |
end | |
end | |
end | |
end | |
def oauth_lock_current? | |
self.oauth_locked_until > Time.zone.now | |
end | |
def oauth_locked_until | |
self.oauth_locked_at + LOCK_FOR_AT_LEAST | |
end | |
def oauth_set_lock!(source) | |
self.oauth_locked_at = Time.zone.now | |
self.oauth_locked_by = source | |
self.oauth_lock_counter = 0 | |
self.save! | |
end | |
def oauth_internal_limit_reached? | |
self.oauth_lock_counter >= INTERNAL_RATE_LIMIT | |
end | |
def oauth_update_lock! | |
self.oauth_lock_counter += 1 # If a job makes more than one call - then this needs to be more complex. | |
self.save! | |
end | |
def oauth_lock_out! | |
self.oauth_lock_counter = INTERNAL_RATE_LIMIT + 1 | |
self.oauth_locked_at = Time.zone.now + LOCKOUT_FOR | |
self.save! | |
end | |
end |
@rajagopals, in my use case it made sense. I have my jobs divided up in a way that I want jobs of a specific type to finish first (ones that are high priority and fast) and other jobs which may monopolize the lock are less important and slower. I need to provide the results of these jobs to the front end user experience in as few seconds as possible, and had to cut up the work load, and keep the user's oauth token available to the highest priority jobs only, preventing the lower priority ones from killing my "time to usability" on the front end.
This may not apply to scenarios where a live user isn't waiting on the results.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for sharing this. Very useful. Have a question though. Why do you use oauth_locked_by to restrict jobs from other queues to hit facebook? If all the jobs did the same number of queries to facebook, how would it matter which job executed the query? It seems like the rate limit is maintained by the oauth_lock_counter which keeps track of how many queries have been made...