Skip to content

Instantly share code, notes, and snippets.

@kpumuk
Created February 17, 2010 13:43
Show Gist options
  • Save kpumuk/306614 to your computer and use it in GitHub Desktop.
Save kpumuk/306614 to your computer and use it in GitHub Desktop.
# == Schema Information
# Schema version: 20090420101231
#
# Table name: loop_locks
#
# entity_id :integer(4) not null
# loop :string(255) not null
# created_at :datetime not null
# timeout_at :datetime not null
#
# Represents a lock object for a specific item.
#
# Usually you should use only {lock} and {unlock} methods to get
# entity locked or unlocked correspondingly. Method {locked?} could
# be used in test to verify lock status.
#
# @example Create a lock on an entity
# LoopLock.lock(:loop => 'upload', :entity_id => 15)
#
# @example Create a lock on an entity with timeout 20 minutes
# LoopLock.lock(:loop => 'upload', :entity_id => 15, :timeout => 20.minutes)
#
# @example Remove a lock from an entity
# LoopLock.unlock(:loop => 'upload', :entity_id => 15)
#
# @example Verify entity locked
# LoopLock.locked?(:loop => 'upload', :entity_id => 15)
#
class LoopLock < ActiveRecord::Base
db_magic :connection => :loop_locks
set_primary_keys :loop, :entity_id
unlock_all_attributes
# Locks an entity in a specified namespace (loop).
#
# @param [Hash] params a hash of options.
# @option params [String, Symbol] :loop
# a loop to lock an entity in. Required.
# @option params [Integer] :entity_id
# ID of the entity to lock.
# @option params [Integer] :timeout (1.year)
# a timeout in seconds after which lock object should be expired.
# @return [Boolean]
# +true+ when locked successfully,
# +false+ when entity is already locked.
#
# @raise ArgumentError when :entity_id or :loop parameter is missing.
#
# @example
# LoopLock.lock(:loop => 'upload', :entity_id => 15, :timeout => 20.minutes)
#
def self.lock(params)
raise ArgumentError, 'Not enough params for a lock' unless params[:entity_id] && params[:loop]
# Remove all stale locks for this record
delete_all([ '`loop` = :loop AND `entity_id` = :entity_id AND `timeout_at` < NOW()', params ])
# Create new lock
attributes = params.dup
timeout = attributes.delete(:timeout)
timeout ||= 1.year
attributes[:timeout_at] = Time.now + timeout
lock = new(attributes)
lock.save rescue false
end
# Unlocks an entity in a specified namespace (loop).
#
# @param [Hash] params a hash of options.
# @option params [String, Symbol] :loop
# a loop to lock an entity in. Required.
# @option params [Integer] :entity_id
# ID of the entity to lock.
# @return [Boolean]
# +true+ when unlocked successfully,
# +false+ when entity was not locked before.
#
# @raise ArgumentError when :entity_id or :loop parameter is missing.
#
# @example
# LoopLock.unlock(:loop => 'upload', :entity_id => 15)
#
def self.unlock(params)
raise ArgumentError, 'Not enough params for a lock' unless params[:entity_id] && params[:loop]
delete_all(params) > 0
end
# Checks the state of an entity lock.
#
# @param [Hash] params a hash of options.
# @option params [String, Symbol] :loop
# a loop to lock an entity in. Required.
# @option params [Integer] :entity_id
# ID of the entity to lock.
# @return [Boolean]
# +true+ when an entity is locked,
# +false+ when an entity is not locked.
#
# @raise ArgumentError when :entity_id or :loop parameter is missing.
#
# @example
# LoopLock.locked?(:loop => 'upload', :entity_id => 15)
#
def self.locked?(params)
raise ArgumentError, 'Not enough params for a lock' unless params[:entity_id] && params[:loop]
exists?([ params[:loop], params[:entity_id] ])
end
end
class UploadLoop < Loops::Base
# The entry point of the loop.
#
# The primary purpose of this method is to configure loop, select
# an {UploadQueueItem} to process and start uploading. It also
# handles all unexpected exceptions so loop will never (ok, it should
# never) exit unexpectedly.
#
# Ther is a tricky part: handling parallel uploads, synchronization,
# locks and deadlocks. So here is how each upload processed:
#
# 1. Find <b>10</b> instances of {UploadQueueItem} which should be processed.
# 2. Shuffle them so each separate instance of upload loop will start
# processing differrent {UploadQueueItem} (or at least it would try).
# 3. Among all found upload queue items find one which is not locked and
# set lock to it (this job is performed by the {Loops::Base#with_lock}
# method). Please note: all other queue items will be left untouched.
# 4. Check if this item is still exist (it can disappear right before
# locking, so we should double-check this case).
# 5. Start uploading by calling {#upload} method.
# 6. In case of any unhandled error, mark upload queue item as error
# (if it has been retrieved already), report exception to Jumpfrog,
# and write it to the log.
#
# @see Loops::Base#with_lock
# @see #upload
#
def run
# Initialize loop options
initialize_from_config
# We run the loop for 100 times and then quit to let the
# thing to release the memory we use
@loops_count.times do
# Find 10 items and sort them randomly
items = find_upload_queue_items.sort_by { rand }
unless items.any?
debug("No docs for upload. Sleeping for #{@sleep_time} seconds")
sleep(@sleep_time) if @sleep_time > 0
next
end
# Get item IDs and pass an array to the {#with_lock} method,
# which will choose one of items, which is not locked.
item_ids = items.map { |item| item.id }
with_lock(item_ids, :upload, 20.minutes, 'upload queue item') do |item_id|
# Find item with given ID
@item = items.find { |item| item.id == item_id}
# Double-checked locking
begin
@item.reload
rescue ActiveRecord::RecordNotFound
next
end
info("Started processing word_upload##{@item.word_upload_id}")
if upload(@item)
info("Done processing word_upload##{@item.word_upload_id}")
else
info("Error processing word_upload##{@item.word_upload_id}")
end
end
end
rescue => e
# Report exception to Jumpfrog
report_exception(e, :action => :run, :upload_queue_item => @item)
# Log error
explanation = " while processing word_upload##{@item.word_upload_id}" if @item
error("Exception in uploading code#{explanation}!")
error("Exception was: #{e} at #{e.backtrace.join("\n")}")
# Mark item as error
@item.try(:error!, "Exception occured: #{e}")
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment