Skip to content

Instantly share code, notes, and snippets.

@X-Bird
Last active July 15, 2017 07:01
Show Gist options
  • Save X-Bird/d698463b25a845a4c23ee087ee9ba9ee to your computer and use it in GitHub Desktop.
Save X-Bird/d698463b25a845a4c23ee087ee9ba9ee to your computer and use it in GitHub Desktop.
my js disque class
const Disqueue = require('disqueue-node');
const Bluebird = require('bluebird');
class Disq {
constructor(qname, options) {
this.q = new Disqueue(options || {
'port': 7711,
'retryMax': 5,
'retryDelay': 2000
});
this.qname = qname;
}
/**
* 返回队列长度
*/
len() {
return new Bluebird((resolve, reject) => {
this.q.qlen(this.qname, (err, len) => {
if (err) return reject(err);
return resolve(len);
});
});
}
/**
* 往队列新增一个任务
* @param {any} t 任务体
* @param {number} retry 任务重试时间,单位 秒
* @param {number} ttl 任务盛传时间,单位 秒
*/
addJob(t, retry, ttl) {
return new Bluebird((resolve, reject) => {
this.q.addJob({
'queue': this.qname,
'job': JSON.stringify(t),
'retry': retry, // 保证job最多被消费一次
// 'delay' : 2
'ttl': ttl
}, function (err, data) {
if (err) return reject(err);
return resolve(data);
});
});
}
/**
* Return jobs available in one of the specified queues, or return NULL if the timeout is reached. A single job per call is returned unless a count greater than 1 is specified. Jobs are returned as a three-element array containing the queue name, the Job ID, and the job body itself. If jobs are available in multiple queues, queues are processed left to right.
* If there are no jobs for the specified queues, the command blocks, and messages are exchanged with other nodes, in order to move messages about these queues to this node, so that the client can be served.
* 拿出任务,拿不到就是 null,如果 count == 1,返回 object,如果 count > 1,返回 array
* @param {number} count 取多少个任务
* @param {boolean} withcounters 返回失败次数 nack 和 额外入队次数 additionalDeliveries
* @param {boolean} nohang 要不要在取不到任务的时候挂起
*/
getJob(count, withcounters, nohang = false) {
return new Bluebird((resolve, reject) => {
this.q.getJob({
'count': count,
'queue': this.qname,
'withcounters': withcounters, // 返回 nacks 次数
'nohang': nohang
}, (err, data) => {
if (err) return reject(new Error('UNKOWN_ERROR_ON_GETTING_JOBS_FROM_QUEUE'));
if (data) {
if (count === 1) {
return resolve(data[0]);
}
return resolve(data);
}
});
});
}
/**
* The NACK command tells Disque to put the job back in the queue ASAP. It is very similar to ENQUEUE but it increments the job nacks counter instead of the additional-deliveries counter. The command should be used when the worker was not able to process a message and wants the message to be put back into the queue in order to be processed again.
* 告诉队列重新把任务放回去,并增加 nack 次数
* @param {string|array} jobIds
*/
nack(jobIds) {
return new Bluebird((resolve, reject) => {
this.q.nack(jobIds, (err, ack) => {
if (err) {
// console.log(err);
return reject(err);
}
// console.log('nack success');
return resolve(ack);
});
});
}
/**
* 完成任务,从队列剔除,参数为id列表,或单个id
* @param {string|array} jobIds
*/
ack(jobIds) {
return new Bluebird((resolve, reject) => {
this.q.fastAck(jobIds, (err, ack) => {
if (err) {
// console.log(err);
return reject(err);
}
// console.log('ack success');
return resolve(ack);
});
});
}
/**
* Remove the job from the queue.
* 从队列删除任务,参数为id列表,或单个id
* @param {string|array} jobIds
*/
deQ(jobIds) {
return new Bluebird((resolve, rejct) => {
this.q.dequeue(jobIds, (err, result) => {
if (err) {
return reject(err);
}
return resolve(result);
})
})
}
}
module.exports = Disq;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment