Created
July 14, 2022 16:43
-
-
Save rolangom/3d791d6cf44d9e4d022ac67bf223a4e3 to your computer and use it in GitHub Desktop.
DGII Query handler using GCP and FP crocks library
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// const crocks = require('crocks'); | |
// const fs = require('fs'); | |
const axios = require('axios'); | |
const unzip = require('unzip'); | |
const rimraf = require('rimraf'); | |
const Async = require('crocks/Async'); | |
const maybeToAsync = require('crocks/Async/maybeToAsync'); | |
const { curry, prop, compose, safe } = require('crocks/helpers'); | |
const isObject = require('crocks/predicates/isObject'); | |
const { chain, run, map, head } = require('crocks/pointfree'); | |
const { constant } = require('crocks/combinators'); | |
const TABLE_SCHEMA = require('./tableSchema'); | |
const BigQuery = require('@google-cloud/bigquery'); | |
// Your Google Cloud Platform project ID | |
const projectId = 'shop-f518d'; | |
// const keyFilename = './shop-81534f0bf144.json'; | |
// Creates a client | |
const bigquery = new BigQuery({ | |
projectId // , keyFilename | |
}); | |
const dataset = bigquery.dataset('production'); | |
// const ZIPPED_FILE_URL = 'https://storage.googleapis.com/react-firebaseui-example.appspot.com/2010_Census_Populations_by_Zip_Code.zip'; | |
const ZIPPED_FILE_URL = 'http://www.dgii.gov.do/app/WebApps/Consultas/rnc/DGII_RNC.zip'; | |
// const ZIPPED_FILE_URL = 'https://storage.googleapis.com/react-firebaseui-example.appspot.com/DGII_RNC.zip'; | |
// const LOCAL_ZIP_FILE = '/tmp/2010_Census_Populations_by_Zip_Code.csv.zip'; | |
const LOCAL_UNZIPPED_FOLDER = '/tmp/unzipped'; | |
const LOCAL_UNZIPPED_FILE = `${LOCAL_UNZIPPED_FOLDER}/TMP/DGII_RNC.TXT`; | |
// const TABLE_NAME = 'population'; | |
const TABLE_NAME = 'taxpayers'; | |
const rmrfAsync = path => new Promise((resolve, reject) => | |
rimraf(path, {}, (err, data) => | |
err ? reject(err) : resolve(data) | |
) | |
); | |
// const delay = ms => new Promise((resolve) => setTimeout(resolve, ms)); | |
// const delayAsync = | |
// ms => x => Async( | |
// (reject, resolve) => delay(ms).then(_ => resolve(x), reject) | |
// ); | |
const log = | |
label => x => | |
(console.log(label), x); | |
// clearTmpFolder :: String -> Async Error () | |
const clearTmpFolder = path => Async( | |
(reject, resolve) => rmrfAsync(path).then(resolve, reject) | |
); | |
// httpGetFile :: String -> Async(Error, http.IncomingMessage) | |
const httpGetFileAxios = url => Async((reject, resolve) => | |
axios.get(url, { responseType: 'stream' }) | |
.then(({ data }) => resolve(data)) | |
.catch(err => reject(err)) | |
); | |
// pipeStream :: String -> Writable -> Readable -> Async Error () | |
const pipeStream = curry((writable, readable) => Async( | |
(reject, resolve) => { | |
writable.on('finish', resolve); | |
writable.on('error', reject); | |
readable.pipe(writable); | |
} | |
)); | |
// deleteTable :: Dataset -> String -> Async Error () | |
const deleteTable = curry((dataset, tableId) => Async( | |
(reject, resolve) => dataset.table(tableId).delete(resolve, reject) | |
)); | |
// createTable :: Dataset -> String -> Schema -> Async Error Table | |
const createTable = curry((dataset, tableId, schema) => Async( | |
(reject, resolve) => | |
dataset.createTable(tableId, { schema: schema }) | |
.then(([table]) => resolve(table), reject) | |
)); | |
// loadTable :: String -> Table -> Async Error ApiResponse | |
// const loadTable = curry((path, table) => Async( | |
// (reject, resolve) => | |
// table.load(path, { fieldDelimiter: '|' }) | |
// .then(([apiResponse]) => resolve(apiResponse), reject) | |
// )); | |
const loadJobTable = curry((path, table) => Async( | |
(reject, resolve) => | |
table.createLoadJob(path, { | |
format: 'CSV', | |
fieldDelimiter: '|', | |
allowJaggedRows: true, | |
maxBadRecords: 5000000, | |
ignoreUnknownValues: true, | |
quote: '', | |
encoding: 'ISO-8859-1' | |
}) | |
.then(([job]) => resolve(job), reject) | |
)); | |
// handleJob :: Job -> Async Error Metadata | |
const handleJob = job => Async( | |
(reject, resolve) => { | |
job.on('complete', resolve); | |
job.on('error', reject); | |
} | |
); | |
// onErrResp :: Response -> Error -> () | |
const onErrResp = | |
res => err => ( | |
console.error('Error', err), | |
res | |
.status(400) | |
.json({ message: err.message }) | |
); | |
// onSuccResp :: Response -> a -> () | |
const onSuccResp = | |
res => x => res.json(x); | |
const reloadNCFData = (req, res) => { | |
clearTmpFolder(LOCAL_UNZIPPED_FOLDER) | |
.map(log('clearTmpFolder 1')) | |
.chain(constant(httpGetFileAxios(ZIPPED_FILE_URL))) | |
.map(log('httpGetFileAxios 2')) | |
.chain(pipeStream(unzip.Extract({ path: LOCAL_UNZIPPED_FOLDER }))) | |
.map(log('unzip.Extract 3')) | |
.chain(constant(deleteTable(dataset, TABLE_NAME))) | |
.map(log('deleteTable 4')) | |
.chain(constant(createTable(dataset, TABLE_NAME, TABLE_SCHEMA))) | |
.map(log('createTable 5')) | |
.chain(loadJobTable(LOCAL_UNZIPPED_FILE)) | |
.map(log('loadJobTable 6')) | |
.chain(handleJob) | |
.map(log('handleJob 7')) | |
.fork( | |
onErrResp(res), | |
onSuccResp(res) | |
); | |
}; | |
// getTaxId :: Request -> Maybe String | |
const getReqTaxId = compose( | |
chain(prop('id')), | |
chain(prop('query')), | |
safe(isObject) | |
); | |
// getQueryStr :: String -> String | |
const getQueryStr = id => | |
`select * from production.taxpayers where id = "${id}" limit 1`; | |
// queryTable :: String -> Async Error [row] | |
const queryTable = id => Async( | |
(reject, resolve) => | |
dataset.table(TABLE_NAME) | |
.query({ query: getQueryStr(id) }) | |
.then(([rows]) => resolve(rows), reject) | |
); | |
// resultAsObject :: [row] -> Async Error row | |
const resultAsObject = compose( | |
maybeToAsync(Error('Not found')), | |
head | |
); | |
// queryBy :: Request -> Async Error row | |
const queryBy = compose( | |
chain(resultAsObject), | |
chain(queryTable), | |
maybeToAsync(Error("Query 'id' param is required")), | |
map(log('getReqTaxId')), | |
getReqTaxId | |
); | |
const queryDgiiBy = (req, res) => { | |
queryBy(req) | |
.fork( | |
onErrResp(res), | |
onSuccResp(res) | |
) | |
}; | |
exports.reloadNCFData = reloadNCFData; | |
exports.queryDgiiBy = queryDgiiBy; | |
// to deploy: gcloud beta functions deploy reloadNCFData --trigger-http | |
// to deploy: gcloud beta functions deploy queryDgiiBy --trigger-http |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment