Created
February 17, 2010 13:43
-
-
Save kpumuk/306614 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# == Schema Information | |
# Schema version: 20090420101231 | |
# | |
# Table name: loop_locks | |
# | |
# entity_id :integer(4) not null | |
# loop :string(255) not null | |
# created_at :datetime not null | |
# timeout_at :datetime not null | |
# | |
# Represents a lock object for a specific item. | |
# | |
# Usually you should use only {lock} and {unlock} methods to get | |
# entity locked or unlocked correspondingly. Method {locked?} could | |
# be used in test to verify lock status. | |
# | |
# @example Create a lock on an entity | |
# LoopLock.lock(:loop => 'upload', :entity_id => 15) | |
# | |
# @example Create a lock on an entity with timeout 20 minutes | |
# LoopLock.lock(:loop => 'upload', :entity_id => 15, :timeout => 20.minutes) | |
# | |
# @example Remove a lock from an entity | |
# LoopLock.unlock(:loop => 'upload', :entity_id => 15) | |
# | |
# @example Verify entity locked | |
# LoopLock.locked?(:loop => 'upload', :entity_id => 15) | |
# | |
class LoopLock < ActiveRecord::Base | |
db_magic :connection => :loop_locks | |
set_primary_keys :loop, :entity_id | |
unlock_all_attributes | |
# Locks an entity in a specified namespace (loop). | |
# | |
# @param [Hash] params a hash of options. | |
# @option params [String, Symbol] :loop | |
# a loop to lock an entity in. Required. | |
# @option params [Integer] :entity_id | |
# ID of the entity to lock. | |
# @option params [Integer] :timeout (1.year) | |
# a timeout in seconds after which lock object should be expired. | |
# @return [Boolean] | |
# +true+ when locked successfully, | |
# +false+ when entity is already locked. | |
# | |
# @raise ArgumentError when :entity_id or :loop parameter is missing. | |
# | |
# @example | |
# LoopLock.lock(:loop => 'upload', :entity_id => 15, :timeout => 20.minutes) | |
# | |
def self.lock(params) | |
raise ArgumentError, 'Not enough params for a lock' unless params[:entity_id] && params[:loop] | |
# Remove all stale locks for this record | |
delete_all([ '`loop` = :loop AND `entity_id` = :entity_id AND `timeout_at` < NOW()', params ]) | |
# Create new lock | |
attributes = params.dup | |
timeout = attributes.delete(:timeout) | |
timeout ||= 1.year | |
attributes[:timeout_at] = Time.now + timeout | |
lock = new(attributes) | |
lock.save rescue false | |
end | |
# Unlocks an entity in a specified namespace (loop). | |
# | |
# @param [Hash] params a hash of options. | |
# @option params [String, Symbol] :loop | |
# a loop to lock an entity in. Required. | |
# @option params [Integer] :entity_id | |
# ID of the entity to lock. | |
# @return [Boolean] | |
# +true+ when unlocked successfully, | |
# +false+ when entity was not locked before. | |
# | |
# @raise ArgumentError when :entity_id or :loop parameter is missing. | |
# | |
# @example | |
# LoopLock.unlock(:loop => 'upload', :entity_id => 15) | |
# | |
def self.unlock(params) | |
raise ArgumentError, 'Not enough params for a lock' unless params[:entity_id] && params[:loop] | |
delete_all(params) > 0 | |
end | |
# Checks the state of an entity lock. | |
# | |
# @param [Hash] params a hash of options. | |
# @option params [String, Symbol] :loop | |
# a loop to lock an entity in. Required. | |
# @option params [Integer] :entity_id | |
# ID of the entity to lock. | |
# @return [Boolean] | |
# +true+ when an entity is locked, | |
# +false+ when an entity is not locked. | |
# | |
# @raise ArgumentError when :entity_id or :loop parameter is missing. | |
# | |
# @example | |
# LoopLock.locked?(:loop => 'upload', :entity_id => 15) | |
# | |
def self.locked?(params) | |
raise ArgumentError, 'Not enough params for a lock' unless params[:entity_id] && params[:loop] | |
exists?([ params[:loop], params[:entity_id] ]) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class UploadLoop < Loops::Base | |
# The entry point of the loop. | |
# | |
# The primary purpose of this method is to configure loop, select | |
# an {UploadQueueItem} to process and start uploading. It also | |
# handles all unexpected exceptions so loop will never (ok, it should | |
# never) exit unexpectedly. | |
# | |
# Ther is a tricky part: handling parallel uploads, synchronization, | |
# locks and deadlocks. So here is how each upload processed: | |
# | |
# 1. Find <b>10</b> instances of {UploadQueueItem} which should be processed. | |
# 2. Shuffle them so each separate instance of upload loop will start | |
# processing differrent {UploadQueueItem} (or at least it would try). | |
# 3. Among all found upload queue items find one which is not locked and | |
# set lock to it (this job is performed by the {Loops::Base#with_lock} | |
# method). Please note: all other queue items will be left untouched. | |
# 4. Check if this item is still exist (it can disappear right before | |
# locking, so we should double-check this case). | |
# 5. Start uploading by calling {#upload} method. | |
# 6. In case of any unhandled error, mark upload queue item as error | |
# (if it has been retrieved already), report exception to Jumpfrog, | |
# and write it to the log. | |
# | |
# @see Loops::Base#with_lock | |
# @see #upload | |
# | |
def run | |
# Initialize loop options | |
initialize_from_config | |
# We run the loop for 100 times and then quit to let the | |
# thing to release the memory we use | |
@loops_count.times do | |
# Find 10 items and sort them randomly | |
items = find_upload_queue_items.sort_by { rand } | |
unless items.any? | |
debug("No docs for upload. Sleeping for #{@sleep_time} seconds") | |
sleep(@sleep_time) if @sleep_time > 0 | |
next | |
end | |
# Get item IDs and pass an array to the {#with_lock} method, | |
# which will choose one of items, which is not locked. | |
item_ids = items.map { |item| item.id } | |
with_lock(item_ids, :upload, 20.minutes, 'upload queue item') do |item_id| | |
# Find item with given ID | |
@item = items.find { |item| item.id == item_id} | |
# Double-checked locking | |
begin | |
@item.reload | |
rescue ActiveRecord::RecordNotFound | |
next | |
end | |
info("Started processing word_upload##{@item.word_upload_id}") | |
if upload(@item) | |
info("Done processing word_upload##{@item.word_upload_id}") | |
else | |
info("Error processing word_upload##{@item.word_upload_id}") | |
end | |
end | |
end | |
rescue => e | |
# Report exception to Jumpfrog | |
report_exception(e, :action => :run, :upload_queue_item => @item) | |
# Log error | |
explanation = " while processing word_upload##{@item.word_upload_id}" if @item | |
error("Exception in uploading code#{explanation}!") | |
error("Exception was: #{e} at #{e.backtrace.join("\n")}") | |
# Mark item as error | |
@item.try(:error!, "Exception occured: #{e}") | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment