Created
          January 10, 2013 14:50 
        
      - 
      
- 
        Save milosh012/4502583 to your computer and use it in GitHub Desktop. 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | _ = require 'underscore' | |
| moment = require 'moment' | |
| util = require 'util' | |
| async = require 'async' | |
| Twitter = require process.cwd() + '/lib/node/provider/twitter' | |
| Facebook = require process.cwd() + '/lib/node/provider/facebook' | |
| class Job | |
| key : '' | |
| user_key : '' | |
| user_type : 'active' | |
| provider : undefined | |
| service : undefined | |
| expire : 691200, # 8 days | |
| redis : undefined | |
| schedule_id : undefined | |
| increment_by: 1 | |
| postponed_jobs_path : [] | |
| delayed: false | |
| constructor: (attributes) -> | |
| _.bindAll @, 'process', 'execute' | |
| attributes = attributes || {} | |
| _.defaults attributes, | |
| user_key : '' | |
| user_type : 'active' | |
| priority : 'normal' | |
| schedule_id : undefined | |
| @redis = (require process.cwd() + '/config/redis').createClient() | |
| @user_type = attributes.user_type | |
| @user_key = attributes.user_key | |
| @priority = attributes.priority | |
| @user_service = attributes.user_service | |
| @schedule_id = attributes.schedule_id | |
| @date_key = moment.utc().format('YYYY:MM:DD') | |
| @delayed = attributes.delayed if attributes.delayed? | |
| days = if @user_type == 'new' then 7 else 1 | |
| @start_date_obj = moment.utc().subtract('days', days).startOf('day') | |
| @start_date = @start_date_obj.unix() | |
| if @service == 'twitter' | |
| @provider = new Twitter | |
| user_key: @user_key | |
| job_key: attributes.job_key | |
| redis: @redis | |
| else if @service == 'facebook' | |
| @provider = new Facebook | |
| user_key: @user_key | |
| job_key: attributes.job_key | |
| redis: @redis | |
| getKey: (callback) -> | |
| callback(null, util.format(@key, @user_key, @date_key)) | |
| hasStats: (callback) -> | |
| callback null | |
| process: (callback) -> | |
| @cleanup (err) => | |
| if err? | |
| @redis.end() | |
| return callback err | |
| else | |
| @pre_execute (err) => | |
| if err? | |
| @redis.end() | |
| return callback() | |
| else | |
| @execute (err) => | |
| @post_execute err, (err) => | |
| @redis.end() | |
| return callback err | |
| cleanup: (callback) -> | |
| return callback() if @service == '*' | |
| multi = @redis.multi() | |
| days = if @user_type == 'new' then 8 else 2 | |
| count = @key.match /\%s$/g | |
| if count? | |
| async.forEach _.range(0, days), (days_ago, callback) => | |
| day = moment.utc().subtract('days', days_ago).format 'YYYY:MM:DD' | |
| key = util.format(@key, @user_key, day) | |
| multi.del key | |
| callback() | |
| , (err) -> | |
| multi.exec (err) -> | |
| callback err | |
| else | |
| key = util.format(@key, @user_key) | |
| multi.del key | |
| multi.exec (err) -> | |
| callback() | |
| pre_execute: (callback) -> | |
| return callback() if @service == '*' | |
| return callback() unless @service? | |
| @redis.hgetall util.format('%s:service:%s', @user_key, @service), (err, user_service) => | |
| return callback err if err? | |
| return callback() unless user_service? | |
| if user_service.is_valid == '0' | |
| return callback 'invalid-service' | |
| else if !_.isUndefined(@schedule_id) && user_service.schedule_id != @schedule_id.toString() | |
| return callback 'invalid-schedule-id' | |
| else | |
| return callback() | |
| execute: (callback) -> | |
| callback() | |
| user_in_progress: (callback) -> | |
| @getServiceKey @service, @user_key, (service_key) => | |
| @redis.hgetall service_key, (err, user_service) => | |
| return callback err if err? | |
| return callback() unless user_service? | |
| @get_service_data @service, (err, service) => | |
| return callback err if err? | |
| return callback() unless service? | |
| user_service.processed_jobs_count = if user_service.processed_jobs_count? then user_service.processed_jobs_count else 0 | |
| if user_service.processed_jobs_count < service.jobs_count | |
| # add user to stuck users | |
| @redis.zadd 'users_in_progress:'+@service, moment.utc().unix(), @user_key, (err)-> | |
| callback err | |
| else | |
| # remove user from stuck users | |
| @redis.zrem 'users_in_progress:'+@service, @user_key, (err)-> | |
| callback err | |
| post_execute: (err, callback) -> | |
| return callback 'Job service no set' unless @service? | |
| return callback() if @service == '*' | |
| @increment_by = @postponed_jobs_path.length + 1 if err? | |
| @incrementProcessedJobsCount @service, () => | |
| @user_in_progress ()=> | |
| if err == 'expired-access-token' | |
| @redis.hset [@user_key + ':service:' + @service, 'is_valid', 0], (error) -> | |
| err = error if error? | |
| return callback() | |
| else if @key != '' | |
| # we need to set expire key | |
| multi = @redis.multi() | |
| days = _.range(8) | |
| days = [0, 1] if @user_type == 'active' | |
| async.forEach days, (day, callback) => | |
| date = moment.utc().subtract('days', day).format 'YYYY:MM:DD' | |
| key = util.format(@key, @user_key, date) | |
| multi.expire key, @expire | |
| callback() | |
| , => | |
| multi.exec () => | |
| callback err | |
| else | |
| return callback err | |
| set_shedule_id: (user_key, user_service, callback) -> | |
| key = util.format '%s:service:%s', user_key, user_service | |
| schedule_id = _.random 99, 9999999 | |
| @redis.hset key, 'schedule_id', schedule_id, (err, res) -> | |
| callback schedule_id | |
| getServiceKey: (service, user_key, callback) -> | |
| callback(util.format '%s:service:%s', user_key, service) | |
| resetProcessedJobsCount: (services, user_key, callback) -> | |
| multi = @redis.multi() | |
| async.every services, (service, callback) => | |
| @getServiceKey service, user_key, (service_key) => | |
| multi.hset service_key, 'updated_at', moment.utc().unix() | |
| multi.hset service_key, 'processed_jobs_count', 0 | |
| callback true | |
| , () => | |
| multi.exec (err, result) => | |
| callback() | |
| get_service_data: (service_code, callback) -> | |
| @redis.hgetall 'service:'+service_code, (err, service) -> | |
| return callback err if err? | |
| return callback() unless service? | |
| service.jobs_count = if service.jobs_count? then parseInt(service.jobs_count, 10) else 0 | |
| callback null, service; | |
| incrementProcessedJobsCount: (service_code, callback) -> | |
| @getServiceKey service_code, @user_key, (service_key) => | |
| @redis.hgetall service_key, (err, user_service) => | |
| return callback err if err? | |
| return callback() unless user_service? | |
| @get_service_data service_code, (err, service) => | |
| return callback err if err? | |
| return callback() unless service? | |
| user_service.processed_jobs_count = if user_service.processed_jobs_count? then parseInt(user_service.processed_jobs_count) else 0 | |
| return callback() if user_service.processed_jobs_count >= service.jobs_count | |
| return callback() if !_.isUndefined(@schedule_id) && | |
| user_service? && | |
| user_service.schedule_id != @schedule_id.toString() | |
| if ((user_service.processed_jobs_count + @increment_by) > service.jobs_count) | |
| @increment_by = service.jobs_count - user_service.processed_jobs_count | |
| multi = @redis.multi() | |
| multi.hset service_key, 'updated_at', moment.utc().unix() | |
| multi.hincrby service_key, 'processed_jobs_count', @increment_by | |
| multi.exec (err, result) => | |
| callback() | |
| sync_que: (jobs, user_key, schedule_id, callback) -> | |
| return callback() if jobs.length == 0 | |
| job = new (require jobs.pop()) | |
| if job.auto_start | |
| job.enqueue | |
| user_key : user_key | |
| user_type : @user_type | |
| schedule_id : schedule_id | |
| , 0, => | |
| return @sync_que jobs, user_key, schedule_id, callback | |
| else | |
| return @sync_que jobs, user_key, schedule_id, callback | |
| module.exports = Job | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment