Skip to content

Instantly share code, notes, and snippets.

@yssk22
Created August 22, 2010 07:49
Show Gist options
  • Save yssk22/543500 to your computer and use it in GitHub Desktop.
Save yssk22/543500 to your computer and use it in GitHub Desktop.
var sys = require('sys'),
url = require('url'),
querystring = require('querystring'),
http = require('http');
var logger = require('ext').logger;
function handleStream(request, callback){
request.connection.on('error', function(err){
logger.critical('[Twitter] Streaming failed.');
logger.critical(err);
});
request.on('response', function(response){
if( response.statusCode != 200 ){
logger.critical('[Twitter] Streaming failed with {status}'.format({status: response.statusCode }));
var buff = '';
response.on('data', function(chunk){
buff += chunk;
});
response.on('end', function(){
logger.critical('[Twitter] {buff}'.format({buff: buff }));
});
return;
}
var buf = "";
logger.info('[Twitter] Streaming started.');
response.setEncoding('utf8');
response.on('data', function(chunk){
// fire events each new line.
if( chunk.match(/\n/) ){
var chunks = chunk.split(/\r?\n/);
var jsonStr = buf + chunks.shift();
var obj = null, err = null;
if( jsonStr.length > 0 ){
try{
obj = JSON.parse(jsonStr);
}catch(e){
logger.error(e);
logger.error(jsonStr);
err = e;
}
try {
callback && callback(err, obj);
}catch(e){
logger.error('[Twitter] callback failed: ' + e);
}
}
if( chunks.length ){
buf = chunks.pop(); // last buffer.
}
var c = "";
while(c = chunks.shift()){
logger.debug('[Twitter] message received');
try{
obj = JSON.parse(c);
}catch(e){
logger.error(e);
logger.error(jsonStr);
err = e;
}
try {
callback && callback(err, obj);
}catch(e){
logger.error('[Twitter] callback failed: ' + e);
}
}
}else{
buf += chunk;
}
});
});
}
exports.streamStatus = function(user, password, type, params, callback){
var auth = new Buffer(user + ':' + password).toString('base64');
var method = 'GET';
var headers = {
Host : 'stream.twitter.com',
Authorization: auth
};
var body = querystring.stringify(params, '&', '=', false);
if( type == 'filter' ){
method = 'POST';
headers['Content-Length'] = body.length;
headers['Content-Type'] = "application/x-www-form-urlencoded";
}
logger.debug('[Twitter] ' + method);
logger.debug('[Twitter] ' + sys.inspect(headers));
logger.debug('[Twitter] ' + body);
var client = http.createClient(80, "stream.twitter.com");
var request = client.request(
method, '/1/statuses/' + type + '.json',
headers
);
request.write(body);
handleStream(request, callback);
request.end();
};
exports.chirpstream = function(user, password, callback){
var auth = new Buffer(user + ':' + password).toString('base64');
var client = http.createClient(80, "chirpstream.twitter.com");
var request = client.request(
'GET', '/2b/user.json',
{
Host : 'chirpstream.twitter.com',
Authorization: auth
}
);
handleStream(request, callback);
request.end();
};
exports.saveChirpStreamOnCouchHandler = function(urlStr){
var obj = url.parse(urlStr);
var client = http.createClient(obj.port, obj.hostname);
var db = obj.pathname;
return function(err, msg){
if( err ){
return;
}
if( msg.user && msg.user.protected ){
return;
}
var doc = {
"type" : "twitter",
"source" : msg
};
var headers = {'Content-Type': 'application/json'};
var body = JSON.stringify(doc);
headers['Content-Length'] = body.length;
var req = client.request("POST", db, headers);
req.write(body);
req.connection.on('error', function(e){
logger.error("[Twitter] failed to save tweet event: " + e);
logger.error("[Twitter] " + body);
});
req.on('response', function(response){
var c = '';
response.on('data', function(chunk){
c += chunk;
});
response.on('end', function(){
logger.debug("[Twitter] " + c);
});
});
req.end();
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment