Last active
April 26, 2017 07:15
-
-
Save ChristianRich/a7e086c76ecd0db1c78ae5e15b11606d to your computer and use it in GitHub Desktop.
Reads CSV files and saves them as Sqlite3 tables to a on-disc database
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import parse from 'csv-parse'; | |
import fs from 'fs'; | |
import sqlite3 from 'sqlite3'; | |
import async from 'async'; | |
import csvHeaders from 'csv-headers'; | |
import path from 'path'; | |
let count = 0; | |
/** | |
* Builds a Sqlite3 database table from a csv file | |
* It's important to note that with sqlite3 you can only build one table at a time | |
* @param {string} database - path to database | |
* @param {string} csvFile - path to csv file | |
* @param {int} maxRecords - Cap dataset | |
* @param {string=} delimiter | |
* @return {*} | |
*/ | |
module.exports = (database, csvFile, maxRecords = 250, delimiter = ',') => { | |
++count; | |
let dbfn = database, | |
csvfn = csvFile, | |
tblnm; | |
// Extract the table name from the filename | |
try{ | |
tblnm = path.parse(csvfn).name; | |
} catch(e){ | |
} | |
return new Promise((resolve, reject) => { | |
// console.log('#1 Loading file ' + csvfn); | |
if(!tblnm){ | |
return reject('Unable to determine table name'); | |
} | |
const opts = { | |
file: csvfn, | |
delimiter: delimiter | |
}; | |
return csvHeaders(opts, (err, headers) => { | |
if(err){ | |
return reject(err); | |
} | |
resolve({headers}); | |
}); | |
}) | |
.then((context) => { | |
return new Promise((resolve, reject) => { | |
// console.log('#2 Create new or load existing database ' + dbfn); | |
const db = new sqlite3.Database(dbfn); | |
db.on('error', err =>{ | |
reject(err); | |
}); | |
db.on('open', () => { | |
context.db = db; | |
resolve(context); | |
}); | |
}); | |
}) | |
.then((context) => { | |
return new Promise((resolve, reject) => { | |
// console.log(`#3 DROP TABLE IF EXISTS ${tblnm}`); | |
context.db.run(`DROP TABLE IF EXISTS ${tblnm}`, [], (err) => { | |
if(err){ | |
console.log(err); | |
return reject(err); | |
} | |
resolve(context); | |
}); | |
}); | |
}) | |
.then((context) => { | |
return new Promise((resolve, reject) => { | |
// console.log('#4 Reading fields'); | |
let fields = '', | |
fieldnms = '', | |
qs = ''; | |
context.headers.forEach((hdr) => { | |
hdr = hdr.replace(' ', '_'); | |
if(fields !== '') fields += ','; | |
if(fieldnms !== '') fieldnms += ','; | |
if(qs !== '') qs += ','; | |
fields += ` ${hdr} TEXT`; | |
fieldnms += ` ${hdr}`; | |
qs += ' ?'; | |
}); | |
context.qs = qs; | |
context.fieldnms = fieldnms; | |
context.db.run(`CREATE TABLE IF NOT EXISTS ${tblnm} ( ${fields} )`, [], (err) => { | |
if(err){ | |
console.log(err); | |
return reject(err); | |
} | |
resolve(context); | |
}); | |
}); | |
}) | |
.then((context) => { | |
return new Promise((resolve, reject) => { | |
// console.log('#5 Load and parse CSV'); | |
const opts = { | |
delimiter: delimiter, | |
columns: true, | |
relax_column_count: true | |
}; | |
fs | |
.createReadStream(csvfn) | |
.pipe(parse(opts, (err, data) => { | |
if(err){ | |
console.log(err + ' in file ' + csvfn); | |
return resolve(); | |
} | |
if(data.length > maxRecords){ | |
console.log(`Capping records to ${maxRecords} for table '${tblnm}'`); | |
data = data.splice(0, maxRecords); | |
} | |
console.log(`[${count}] Inserting ${data.length} records into table '${tblnm}'`); | |
if(data.length === 0){ | |
return resolve(context); | |
} | |
async.eachSeries(data, (datum, next) => { | |
let d = []; | |
context.headers.forEach((hdr) =>{ | |
d.push(datum[hdr]); | |
}); | |
// console.log(`INSERT INTO ${tblnm} ( ${context.fieldnms} ) VALUES ( ${context.qs} ) ${d}`) | |
context.db.run(`INSERT INTO ${tblnm} ( ${context.fieldnms} ) VALUES ( ${context.qs} )`, d, (err) => { | |
if(err){ | |
console.error(err); | |
return next(err); | |
} | |
next(); | |
}); | |
}, | |
(err) => { | |
if(err){ | |
return reject(err); | |
} | |
resolve(context); | |
}); | |
})); | |
}); | |
}) | |
.then((context) => { | |
return new Promise((resolve) => { | |
// console.log('#6 close db'); | |
if(context && context.db){ | |
context.db.close(); | |
} | |
resolve(); | |
}) | |
}) | |
.catch((err) => { | |
throw err; | |
}); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment