Skip to content

Instantly share code, notes, and snippets.

@milosh012
Created January 10, 2013 14:50
Show Gist options
  • Save milosh012/4502583 to your computer and use it in GitHub Desktop.
Save milosh012/4502583 to your computer and use it in GitHub Desktop.
_ = require 'underscore'
moment = require 'moment'
util = require 'util'
async = require 'async'
Twitter = require process.cwd() + '/lib/node/provider/twitter'
Facebook = require process.cwd() + '/lib/node/provider/facebook'
class Job
key : ''
user_key : ''
user_type : 'active'
provider : undefined
service : undefined
expire : 691200, # 8 days
redis : undefined
schedule_id : undefined
increment_by: 1
postponed_jobs_path : []
delayed: false
constructor: (attributes) ->
_.bindAll @, 'process', 'execute'
attributes = attributes || {}
_.defaults attributes,
user_key : ''
user_type : 'active'
priority : 'normal'
schedule_id : undefined
@redis = (require process.cwd() + '/config/redis').createClient()
@user_type = attributes.user_type
@user_key = attributes.user_key
@priority = attributes.priority
@user_service = attributes.user_service
@schedule_id = attributes.schedule_id
@date_key = moment.utc().format('YYYY:MM:DD')
@delayed = attributes.delayed if attributes.delayed?
days = if @user_type == 'new' then 7 else 1
@start_date_obj = moment.utc().subtract('days', days).startOf('day')
@start_date = @start_date_obj.unix()
if @service == 'twitter'
@provider = new Twitter
user_key: @user_key
job_key: attributes.job_key
redis: @redis
else if @service == 'facebook'
@provider = new Facebook
user_key: @user_key
job_key: attributes.job_key
redis: @redis
getKey: (callback) ->
callback(null, util.format(@key, @user_key, @date_key))
hasStats: (callback) ->
callback null
process: (callback) ->
@cleanup (err) =>
if err?
@redis.end()
return callback err
else
@pre_execute (err) =>
if err?
@redis.end()
return callback()
else
@execute (err) =>
@post_execute err, (err) =>
@redis.end()
return callback err
cleanup: (callback) ->
return callback() if @service == '*'
multi = @redis.multi()
days = if @user_type == 'new' then 8 else 2
count = @key.match /\%s$/g
if count?
async.forEach _.range(0, days), (days_ago, callback) =>
day = moment.utc().subtract('days', days_ago).format 'YYYY:MM:DD'
key = util.format(@key, @user_key, day)
multi.del key
callback()
, (err) ->
multi.exec (err) ->
callback err
else
key = util.format(@key, @user_key)
multi.del key
multi.exec (err) ->
callback()
pre_execute: (callback) ->
return callback() if @service == '*'
return callback() unless @service?
@redis.hgetall util.format('%s:service:%s', @user_key, @service), (err, user_service) =>
return callback err if err?
return callback() unless user_service?
if user_service.is_valid == '0'
return callback 'invalid-service'
else if !_.isUndefined(@schedule_id) && user_service.schedule_id != @schedule_id.toString()
return callback 'invalid-schedule-id'
else
return callback()
execute: (callback) ->
callback()
user_in_progress: (callback) ->
@getServiceKey @service, @user_key, (service_key) =>
@redis.hgetall service_key, (err, user_service) =>
return callback err if err?
return callback() unless user_service?
@get_service_data @service, (err, service) =>
return callback err if err?
return callback() unless service?
user_service.processed_jobs_count = if user_service.processed_jobs_count? then user_service.processed_jobs_count else 0
if user_service.processed_jobs_count < service.jobs_count
# add user to stuck users
@redis.zadd 'users_in_progress:'+@service, moment.utc().unix(), @user_key, (err)->
callback err
else
# remove user from stuck users
@redis.zrem 'users_in_progress:'+@service, @user_key, (err)->
callback err
post_execute: (err, callback) ->
return callback 'Job service no set' unless @service?
return callback() if @service == '*'
@increment_by = @postponed_jobs_path.length + 1 if err?
@incrementProcessedJobsCount @service, () =>
@user_in_progress ()=>
if err == 'expired-access-token'
@redis.hset [@user_key + ':service:' + @service, 'is_valid', 0], (error) ->
err = error if error?
return callback()
else if @key != ''
# we need to set expire key
multi = @redis.multi()
days = _.range(8)
days = [0, 1] if @user_type == 'active'
async.forEach days, (day, callback) =>
date = moment.utc().subtract('days', day).format 'YYYY:MM:DD'
key = util.format(@key, @user_key, date)
multi.expire key, @expire
callback()
, =>
multi.exec () =>
callback err
else
return callback err
set_shedule_id: (user_key, user_service, callback) ->
key = util.format '%s:service:%s', user_key, user_service
schedule_id = _.random 99, 9999999
@redis.hset key, 'schedule_id', schedule_id, (err, res) ->
callback schedule_id
getServiceKey: (service, user_key, callback) ->
callback(util.format '%s:service:%s', user_key, service)
resetProcessedJobsCount: (services, user_key, callback) ->
multi = @redis.multi()
async.every services, (service, callback) =>
@getServiceKey service, user_key, (service_key) =>
multi.hset service_key, 'updated_at', moment.utc().unix()
multi.hset service_key, 'processed_jobs_count', 0
callback true
, () =>
multi.exec (err, result) =>
callback()
get_service_data: (service_code, callback) ->
@redis.hgetall 'service:'+service_code, (err, service) ->
return callback err if err?
return callback() unless service?
service.jobs_count = if service.jobs_count? then parseInt(service.jobs_count, 10) else 0
callback null, service;
incrementProcessedJobsCount: (service_code, callback) ->
@getServiceKey service_code, @user_key, (service_key) =>
@redis.hgetall service_key, (err, user_service) =>
return callback err if err?
return callback() unless user_service?
@get_service_data service_code, (err, service) =>
return callback err if err?
return callback() unless service?
user_service.processed_jobs_count = if user_service.processed_jobs_count? then parseInt(user_service.processed_jobs_count) else 0
return callback() if user_service.processed_jobs_count >= service.jobs_count
return callback() if !_.isUndefined(@schedule_id) &&
user_service? &&
user_service.schedule_id != @schedule_id.toString()
if ((user_service.processed_jobs_count + @increment_by) > service.jobs_count)
@increment_by = service.jobs_count - user_service.processed_jobs_count
multi = @redis.multi()
multi.hset service_key, 'updated_at', moment.utc().unix()
multi.hincrby service_key, 'processed_jobs_count', @increment_by
multi.exec (err, result) =>
callback()
sync_que: (jobs, user_key, schedule_id, callback) ->
return callback() if jobs.length == 0
job = new (require jobs.pop())
if job.auto_start
job.enqueue
user_key : user_key
user_type : @user_type
schedule_id : schedule_id
, 0, =>
return @sync_que jobs, user_key, schedule_id, callback
else
return @sync_que jobs, user_key, schedule_id, callback
module.exports = Job
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment