|
const csv = require('csv-parser') |
|
const fs = require('fs') |
|
const n = require('needle') |
|
|
|
// Customer.io Integration / Authentication settings |
|
let options = { |
|
username: process.env.SITE_KEY || 'INSERT_CUSTOMERIO_SITE_KEY_HERE', |
|
password: process.env.API_KEY || 'INSERT_CUSTOMERIO_API_KEY_HERE' |
|
} |
|
|
|
// File to be parsed and iterated over |
|
const CSV_FILE = process.env.CSV_FILE || 'customers.csv' |
|
|
|
// Number of items to process per queue iteration |
|
const ITEMS_PER_RUN = process.env.ITEMS_PER_RUN || 100 |
|
|
|
/***************************************************************************************************************************************** |
|
* CSV Item Iterator |
|
***************************************************************************************************************************************** |
|
* Each entry in the CSV is passed to this method as an Object named `item` each field is accessible like so: |
|
* |
|
* item.<field_name> |
|
* |
|
* The `method` to be ran when the queue item is processed is passed as the second variable, the default is `removeUser` |
|
*****************************************************************************************************************************************/ |
|
|
|
function onItem (item) { |
|
let method = removeUser |
|
let data = null |
|
|
|
data = item.customer_id ? item.customer_id : item.id |
|
|
|
addToQueue(data, method) |
|
} |
|
|
|
/***************************************************************************************************************************************** |
|
* Customer.io Queue Methods |
|
***************************************************************************************************************************************** |
|
* Add functions here to be processed for each item added to the queue. |
|
* |
|
* To modifiy the queue entry data, you must modify the `onItem` method above |
|
*****************************************************************************************************************************************/ |
|
|
|
function removeUser (id) { |
|
return new Promise((resolve, reject) => { |
|
n.delete(`https://track.customer.io/api/v1/customers/${id}`, null, options, (err, res) => resolve(err || res.statusCode)) |
|
}) |
|
} |
|
|
|
/***************************************************************************************************************************************** |
|
* Queue Handler / Processing |
|
***************************************************************************************************************************************** |
|
* The two functions here handle processing of queue data, modify at your own risk |
|
*****************************************************************************************************************************************/ |
|
|
|
let queue = [] |
|
let running = false |
|
|
|
function addToQueue (id, method) { |
|
queue.push([id, method]) |
|
processQueue() |
|
} |
|
|
|
function processQueue () { |
|
if (running) return |
|
if (!queue.length) return console.log('empty queue') |
|
|
|
let ids = queue.splice(0, queue.length > ITEMS_PER_RUN ? ITEMS_PER_RUN : queue.length) |
|
running = true |
|
|
|
Promise.all(ids.map(d => d[1](d[0]))).then(results => { |
|
running = false |
|
console.log(results) |
|
processQueue() |
|
}) |
|
} |
|
|
|
/***************************************************************************************************************************************** |
|
* CSV Processing |
|
***************************************************************************************************************************************** |
|
* Start processing CSV file |
|
*****************************************************************************************************************************************/ |
|
|
|
fs.createReadStream(CSV_FILE).pipe(csv()).on('data', onItem) |