Last active
May 30, 2016 19:31
-
-
Save cliftonc/7447f260100dca40e66c92bcc2679f13 to your computer and use it in GitHub Desktop.
Cassandra version
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//select * from downloads.downloads where user = 3626249 and resource IN (6109933,123,6326144); | |
var _ = require('lodash'); | |
var async = require('async'); | |
var fs = require('fs'); | |
var path = require('path'); | |
var written = 0; | |
var stream = require('stream'); | |
var util = require('util'); | |
const cassandra = require('cassandra-driver'); | |
const client = new cassandra.Client({ contactPoints: ['127.0.0.1']}); | |
function CassandraStream () { | |
stream.Writable.call(this); | |
this.buffer = []; | |
}; | |
util.inherits(CassandraStream, stream.Writable); | |
CassandraStream.prototype.write = function (chunk) { | |
var self = this; | |
var data = JSON.parse(chunk); | |
if(self.buffer.length === 1000) { | |
written += self.buffer.length; | |
client.batch(self.buffer, { prepare: true }, function(err) { | |
if (err) { console.dir(err); } | |
self.buffer = []; | |
if(written > 2000000) { process.exit(1); } | |
self.emit('drain'); | |
}); | |
return false; | |
} else { | |
self.buffer.push( | |
{ | |
query: 'INSERT INTO downloads.downloads (user, resource, when) VALUES (?, ?, ?)', | |
params: [data[0], data[1], new Date(data[2])] | |
} | |
); | |
return true; | |
} | |
} | |
function create(next) { | |
const keyspace = 'CREATE KEYSPACE IF NOT EXISTS downloads WITH replication = {\'class\' : \'SimpleStrategy\', \'replication_factor\' : 1}'; | |
const table = 'CREATE TABLE IF NOT EXISTS downloads.downloads (user int, resource int, when timestamp, PRIMARY KEY (user, resource))'; | |
async.series([ | |
async.apply(client.execute.bind(client), keyspace), | |
async.apply(client.execute.bind(client), table) | |
], next); | |
} | |
function load() { | |
console.log('Loading ...'); | |
read = 0; | |
written = 0; | |
var logger = setInterval(function() { | |
console.log('Loaded ' + written + ' ...'); | |
// client.hmset('user', user, function() {}); | |
// client.hmset('resource', resource, function() {}); | |
}, 5000); | |
var cassandraStream = new CassandraStream(); | |
var headers = ['user_id','resource_id','download_date'] | |
var filePath = path.resolve('rd.csv'); | |
var csv = require('fast-csv'); | |
csv | |
.fromPath(filePath, {objectMode: false}) | |
.pipe(cassandraStream) | |
.on('end', function() { | |
clearInterval(logger); | |
client.quit(); | |
}) | |
} | |
create(load); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment