Skip to content

Instantly share code, notes, and snippets.

@conatus
Last active September 10, 2018 15:19
Show Gist options
  • Save conatus/3f5aa2c929d92350b54fe30cdfa2a71d to your computer and use it in GitHub Desktop.
Save conatus/3f5aa2c929d92350b54fe30cdfa2a71d to your computer and use it in GitHub Desktop.
class DeleteJob extends Queueable {
static queue = 'delete-job';
static defaultPriority = 1;
constructor(post: Post) {
super();
this.postToDelete = post;
}
// Or perform, or dispatch or performLater
run(rehydratedObject) {
Posts.deletePost(this.post);
// do something heere
}
handleError() {
// Cleanup logic in here basically run the above in a try catch then do this.
Posts.blah(this.post);
}
};
DeletePost(post).at(new Date('2018-12-17T03:24:00')).enqueue();
DeletePost(post).enqueue();
DeletePost(post).delayFor(10000, 'minutes').enqueue();
DeletePost(post).now();
// Feels like these aren't associated with the queue.
DeletePost.getLength();
DeletePost.isEmpty();
// StompIt is promisified as it has the standard Node.js API.
// I want to say implements here but JS you know.
class ActiveMQBroker extends Broker {
static client = null;
async constructor() {
ActiveMQBroker.client = await stompit.connect(config.get('activeMq.connectParams');
}
send(message = {}, extraHeaders = {}) {
const headers = {
'destination': config.get('activeMq.queue.destination'),
'content-type': 'application/json',
...extraHeaders
};
const frame = client.send(headers);
frame.end(JSON.stringify({
client: 1,
time: new Date().toISOString(),
...message
}));
}
}
class ImmediateBroker extends Broker {
send(message = {}, extraHeaders = {}) {
// Execute immediately somehow...
}
}
// Then there is a queue singleton...
export default Queue.start(new ActiveMQBroker());
// And Queue just calls through...
// I looked at how Laravel, Rails and the various Python packages do things but for balance here are some JS APIs
// These are all Redis based and all have quite callbacky APIs
// but I like the fluency of how to do things whih I have attempted to mimic.
// Bee-Queue
// Promises API
Queue.createJob
.timeout(3000)
.retries(2)
.save()
.then((job) => {
// job enqueued, job.id populated
});
// Hence
const readyJob = await Queue.createJob
.timeout(3000)
.retries(2)
.save();
// convoy
// Quite like the way jobs are fed into que
const opts = {
concurrentWorkers: 10,
jobTimeout: 2000
};
const q = Convoy.createQueue('monsterTrucks', opts);
const job = new Convoy.Job(1);
q.addJob(job);
// Kue from Automattic
const job = queue.create('email', {
title: 'welcome email for tj'
, to: '[email protected]'
, template: 'welcome-email'
})
.priority('high')
.attempts(5)
.delay(1000) // Delay in miliseconds
.save( function(err){
if( !err ) console.log( job.id );
});
// Has a nice test feature - this seems actually a better approach
// than having a "immediate" processor.
before(function() {
queue.testMode.enter();
});
afterEach(function() {
queue.testMode.clear();
});
it('does something cool', function() {
queue.createJob('myJob', { foo: 'bar' }).save();
queue.createJob('anotherJob', { baz: 'bip' }).save();
expect(queue.testMode.jobs.length).to.equal(2);
});
// Which basically makes the queue into just an array, which is nice.
// Bull
const Queue = require('bull');
imageQueue.process(function(job, done){
// Transcode image asynchronously and report progress
job.progress(42);
// Call done when finished
done();
// Or give a error if error
done(new Error('error transcoding'));
// Or pass it a result
done(null, { width: 1280, height: 720 /* etc... */ });
// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
});
const imageQueue = new Queue('image transcoding');
imageQueue.add({image: 'http://example.com/image1.tiff'});
// rsmq
// Very very simple
rsmq.createQueue({qname:"myqueue"}, function (err, resp) {
if (resp===1) {
console.log("queue created")
}
});
rsmq.sendMessage({qname:"myqueue", message:"Hello World"}, function (err, resp) {
if (resp) {
console.log("Message sent. ID:", resp);
}
});
// Using rsmq-promise to add promises
// Which just literally uses Bluebird to promisify everything
// https://github.com/msfidelis/rsmq-promise/blob/master/index.js
const rsmq = new RSMQPromise({
host: "127.0.0.1",
port: 6379
});
async rsmq.createQueue({qname: 'myqueue'})
const result = async rsmq.sendMessage({ qname: 'myqueue', message: 'my message!' });
// bokeh
// This is a ZeroMQ version
// Very Scala like
class Reverse
run: (data, callback) -> callback null, data.split("").reverse().join("")
bokeh = require "bokeh"
handle = bokeh.getClient().submitTask "Reverse", "hello world"
handle.on "complete", (data) -> console.log "Task %s completed: %s", handle.id, data
handle.on "error", (error) -> console.error "Task %s failed: %s", handle.id, error
// busmq
// Focus on messages
var Bus = require('busmq');
var bus = Bus.create({redis: ['redis://127.0.0.1:6379']});
bus.on('online', function() {
var q = bus.queue('foo');
q.on('attached', function() {
console.log('attached to queue');
});
q.attach();
q.push({hello: 'world'});
q.push('my name if foo');
});
bus.connect();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment