Created
September 2, 2021 22:45
-
-
Save haynesgt/f98868ea53df401cc981e038eec62e6a to your computer and use it in GitHub Desktop.
App Search CLI for super-large newline-delimited json
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env node | |
/** | |
* Install: | |
* | |
* Download this file and give it executuable persmissions | |
* | |
* Usage: | |
* | |
* Uploading documents | |
* --------- | |
* | |
* app-search-cli upload ~/Documents/pokemon.json http://localhost:3002 test-engine private-kayik65qbd516q1brk724aaa | |
* | |
* Concurrency and batch size can be adjusted manually in code below | |
* | |
* | |
* Adapted from https://gist.github.com/JasonStoltz/41b29e71310743e94275fc02fac33095 by JasonStoltz | |
*/ | |
const fs = require("fs"); | |
const readline = require("readline"); | |
const [command, pathToJson, apiPath, engineName, apiKey] = process.argv.slice( | |
2 | |
); | |
console.log("command: ", command); | |
console.log("pathToJson: ", pathToJson); | |
console.log("apiPath: ", apiPath); | |
console.log("engineName: ", engineName); | |
console.log("apiKey: ", apiKey); | |
const apiBase = `${apiPath}/api/as/v1`; | |
const documentsUrl = `${apiBase}/engines/${engineName}/documents`; | |
// Adapted from: https://www.tomas-dvorak.cz/posts/nodejs-request-without-dependencies/ | |
const request = function(url, options = {}, requestBody = "") { | |
// return new pending promise | |
return new Promise((resolve, reject) => { | |
// select http or https module, depending on reqested url | |
const lib = url.startsWith("https") ? require("https") : require("http"); | |
const request = lib.request(url, options, response => { | |
// temporary data holder | |
const body = []; | |
// on every content chunk, push it to the data array | |
response.on("data", chunk => body.push(chunk)); | |
// we are done, resolve promise with those joined chunks | |
response.on("end", () => { | |
// handle http errors | |
if (response.statusCode < 200 || response.statusCode > 299) { | |
reject( | |
new Error(`API Error: ${response.statusCode} ${body.join(" ")}`) | |
); | |
} | |
resolve(body.join("")); | |
}); | |
}); | |
// handle connection errors of the request | |
request.on("error", err => reject(err)); | |
request.write(requestBody); | |
request.end(); | |
}); | |
}; | |
/* | |
Documents can only be indexed 100 at a time, so we index | |
our data set in batches. The following is a very simple batching function, which | |
allows for has a configurable `concurrency` variable, which allows for faster | |
indexing. | |
*/ | |
async function indexDocumentsInBatches(documentLines) { | |
debugger; | |
const concurrency = 20; | |
const size = 100; | |
let recordsIndexed = 0; | |
let requests = []; | |
let batch = []; | |
async function processBatch() { | |
if (batch.length === 0) return; | |
if (requests.length >= concurrency) { | |
await Promise.all(requests); | |
// we'll get some 502 errors if we don't let ES breathe | |
await new Promise((resolve) => setTimeout(resolve, 500)); | |
requests = []; | |
} | |
const batchString = JSON.stringify(batch); | |
const newRequest = request( | |
documentsUrl, | |
{ | |
method: "POST", | |
headers: { | |
Authorization: `Bearer ${apiKey}`, | |
"Content-Type": "application/json", | |
"Content-Length": Buffer.byteLength(batchString) | |
} | |
}, | |
batchString | |
).catch((e) => console.error("Bad request", e)); | |
requests.push(newRequest); | |
recordsIndexed += batch.length; | |
console.log(`Indexing ${recordsIndexed} documents`); | |
batch = []; | |
} | |
for await (const line of documentLines) { | |
const document = JSON.parse(line); | |
batch.push(document); | |
if (batch.length >= size) await processBatch(); | |
} | |
await processBatch(); | |
await Promise.all(requests); | |
} | |
try { | |
var documentStream = fs.createReadStream(pathToJson, "utf8"); | |
var documentLines = readline.createInterface({input: documentStream}); | |
console.log(`About to process documents`); | |
indexDocumentsInBatches(documentLines); | |
} catch (e) { | |
console.error(e); | |
process.exit(1); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment