Last active
September 10, 2018 15:19
-
-
Save conatus/3f5aa2c929d92350b54fe30cdfa2a71d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DeleteJob extends Queueable { | |
static queue = 'delete-job'; | |
static defaultPriority = 1; | |
constructor(post: Post) { | |
super(); | |
this.postToDelete = post; | |
} | |
// Or perform, or dispatch or performLater | |
run(rehydratedObject) { | |
Posts.deletePost(this.post); | |
// do something heere | |
} | |
handleError() { | |
// Cleanup logic in here basically run the above in a try catch then do this. | |
Posts.blah(this.post); | |
} | |
}; | |
DeletePost(post).at(new Date('2018-12-17T03:24:00')).enqueue(); | |
DeletePost(post).enqueue(); | |
DeletePost(post).delayFor(10000, 'minutes').enqueue(); | |
DeletePost(post).now(); | |
// Feels like these aren't associated with the queue. | |
DeletePost.getLength(); | |
DeletePost.isEmpty(); | |
// StompIt is promisified as it has the standard Node.js API. | |
// I want to say implements here but JS you know. | |
class ActiveMQBroker extends Broker { | |
static client = null; | |
async constructor() { | |
ActiveMQBroker.client = await stompit.connect(config.get('activeMq.connectParams'); | |
} | |
send(message = {}, extraHeaders = {}) { | |
const headers = { | |
'destination': config.get('activeMq.queue.destination'), | |
'content-type': 'application/json', | |
...extraHeaders | |
}; | |
const frame = client.send(headers); | |
frame.end(JSON.stringify({ | |
client: 1, | |
time: new Date().toISOString(), | |
...message | |
})); | |
} | |
} | |
class ImmediateBroker extends Broker { | |
send(message = {}, extraHeaders = {}) { | |
// Execute immediately somehow... | |
} | |
} | |
// Then there is a queue singleton... | |
export default Queue.start(new ActiveMQBroker()); | |
// And Queue just calls through... | |
// I looked at how Laravel, Rails and the various Python packages do things but for balance here are some JS APIs | |
// These are all Redis based and all have quite callbacky APIs | |
// but I like the fluency of how to do things whih I have attempted to mimic. | |
// Bee-Queue | |
// Promises API | |
Queue.createJob | |
.timeout(3000) | |
.retries(2) | |
.save() | |
.then((job) => { | |
// job enqueued, job.id populated | |
}); | |
// Hence | |
const readyJob = await Queue.createJob | |
.timeout(3000) | |
.retries(2) | |
.save(); | |
// convoy | |
// Quite like the way jobs are fed into que | |
const opts = { | |
concurrentWorkers: 10, | |
jobTimeout: 2000 | |
}; | |
const q = Convoy.createQueue('monsterTrucks', opts); | |
const job = new Convoy.Job(1); | |
q.addJob(job); | |
// Kue from Automattic | |
const job = queue.create('email', { | |
title: 'welcome email for tj' | |
, to: '[email protected]' | |
, template: 'welcome-email' | |
}) | |
.priority('high') | |
.attempts(5) | |
.delay(1000) // Delay in miliseconds | |
.save( function(err){ | |
if( !err ) console.log( job.id ); | |
}); | |
// Has a nice test feature - this seems actually a better approach | |
// than having a "immediate" processor. | |
before(function() { | |
queue.testMode.enter(); | |
}); | |
afterEach(function() { | |
queue.testMode.clear(); | |
}); | |
it('does something cool', function() { | |
queue.createJob('myJob', { foo: 'bar' }).save(); | |
queue.createJob('anotherJob', { baz: 'bip' }).save(); | |
expect(queue.testMode.jobs.length).to.equal(2); | |
}); | |
// Which basically makes the queue into just an array, which is nice. | |
// Bull | |
const Queue = require('bull'); | |
imageQueue.process(function(job, done){ | |
// Transcode image asynchronously and report progress | |
job.progress(42); | |
// Call done when finished | |
done(); | |
// Or give a error if error | |
done(new Error('error transcoding')); | |
// Or pass it a result | |
done(null, { width: 1280, height: 720 /* etc... */ }); | |
// If the job throws an unhandled exception it is also handled correctly | |
throw new Error('some unexpected error'); | |
}); | |
const imageQueue = new Queue('image transcoding'); | |
imageQueue.add({image: 'http://example.com/image1.tiff'}); | |
// rsmq | |
// Very very simple | |
rsmq.createQueue({qname:"myqueue"}, function (err, resp) { | |
if (resp===1) { | |
console.log("queue created") | |
} | |
}); | |
rsmq.sendMessage({qname:"myqueue", message:"Hello World"}, function (err, resp) { | |
if (resp) { | |
console.log("Message sent. ID:", resp); | |
} | |
}); | |
// Using rsmq-promise to add promises | |
// Which just literally uses Bluebird to promisify everything | |
// https://github.com/msfidelis/rsmq-promise/blob/master/index.js | |
const rsmq = new RSMQPromise({ | |
host: "127.0.0.1", | |
port: 6379 | |
}); | |
async rsmq.createQueue({qname: 'myqueue'}) | |
const result = async rsmq.sendMessage({ qname: 'myqueue', message: 'my message!' }); | |
// bokeh | |
// This is a ZeroMQ version | |
// Very Scala like | |
class Reverse | |
run: (data, callback) -> callback null, data.split("").reverse().join("") | |
bokeh = require "bokeh" | |
handle = bokeh.getClient().submitTask "Reverse", "hello world" | |
handle.on "complete", (data) -> console.log "Task %s completed: %s", handle.id, data | |
handle.on "error", (error) -> console.error "Task %s failed: %s", handle.id, error | |
// busmq | |
// Focus on messages | |
var Bus = require('busmq'); | |
var bus = Bus.create({redis: ['redis://127.0.0.1:6379']}); | |
bus.on('online', function() { | |
var q = bus.queue('foo'); | |
q.on('attached', function() { | |
console.log('attached to queue'); | |
}); | |
q.attach(); | |
q.push({hello: 'world'}); | |
q.push('my name if foo'); | |
}); | |
bus.connect(); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment