Created
May 5, 2010 09:24
-
-
Save kurokikaze/390566 to your computer and use it in GitHub Desktop.
Simple work queue implementation for node.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var net = require('net'), | |
sys = require('sys'); | |
var messages = []; | |
var work_in_progress = []; | |
var host = '192.168.175.128'; | |
var port = 8999; | |
var server = net.createServer(function(client_stream) { | |
client_stream.setEncoding('utf8'); | |
var process_message = function(message_raw) { | |
var message = JSON.parse(message_raw); | |
if (message.action == 'put' && message.message) { | |
messages.push(message.message); | |
} | |
if (message.action == 'get') { | |
var work_message = messages.shift(); | |
if (work_message) { | |
work_in_progress.push({'client':client_stream, 'message':work_message}); | |
client_stream.write(JSON.stringify({'action':'work','contents': work_message.toString()})); | |
} else { | |
// No more work | |
client_stream.write(JSON.stringify({'action':'work','contents': ''})); | |
} | |
} | |
// Work done, remove item from 'work in progress' queue | |
if (message.action == 'notify' && message.message) { | |
for (var wip_id in work_in_progress) { | |
if (message.message == work_in_progress[wip_id].message && client_stream === work_in_progress[wip_id].client) { | |
work_in_progress.splice(wip_id, 1); | |
} | |
} | |
} | |
}; | |
client_stream.addListener('data', function(data) { | |
//sys.puts('Data received: ' + data); | |
var messages = []; | |
while (data.indexOf('}{') != -1) { | |
messages.push(data.substr(0,data.indexOf('}{') + 1)); | |
data = data.substr(data.indexOf('}{') + 1); | |
} | |
messages.push(data); | |
for (message_id in messages) { | |
process_message(messages[message_id]); | |
} | |
}); | |
// Return all unfinished work by client to queue | |
client_stream.addListener('end', function() { | |
for (var wip_id in work_in_progress) { | |
if (work_in_progress[wip_id].client === client_stream) { | |
// Если клиент отключился, вернуть выданные ему задания в очередь | |
messages.push(work_in_progress[wip_id].message); | |
work_in_progress.splice(wip_id, 1); | |
} | |
} | |
}); | |
}); | |
server.listen(port, host); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var queues = {}; | |
(function(){ | |
var sys = require('sys'), | |
net = require('net'), | |
events = require('events'); | |
queues.connect = function(port, host, callback) { | |
var queue = new events.EventEmitter; | |
var server_conn = net.createConnection(port, host); | |
server_conn.setEncoding('utf-8'); | |
server_conn.addListener('connect', function() { | |
queue.emit('connect'); | |
server_conn.addListener('close', function(code) { | |
queue.emit('close'); | |
}); | |
queue.add_job = function(message) { | |
server_conn.write(JSON.stringify({'action':'put', 'message':message})); | |
}; | |
queue.get_job = function(callback) { | |
server_conn.write(JSON.stringify({'action':'get'})); | |
server_conn.addListener('data', function(data) { | |
sys.puts('Data: ' + data); | |
var message = JSON.parse(data); | |
server_conn.removeAllListeners('data'); | |
if (message.contents) { | |
callback(false, message.contents); | |
} else { | |
callback(true); | |
} | |
}) | |
}; | |
queue.notify = function(message) { | |
server_conn.write(JSON.stringify({'action':'notify', 'message':message})); | |
}; | |
callback(false, queue); | |
}); | |
}; | |
})(); | |
process.mixin(exports, queues); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment