Skip to content

Instantly share code, notes, and snippets.

@haynesgt
Created September 2, 2021 22:45
Show Gist options
  • Save haynesgt/f98868ea53df401cc981e038eec62e6a to your computer and use it in GitHub Desktop.
Save haynesgt/f98868ea53df401cc981e038eec62e6a to your computer and use it in GitHub Desktop.
App Search CLI for super-large newline-delimited json
#!/usr/bin/env node
/**
* Install:
*
* Download this file and give it executuable persmissions
*
* Usage:
*
* Uploading documents
* ---------
*
* app-search-cli upload ~/Documents/pokemon.json http://localhost:3002 test-engine private-kayik65qbd516q1brk724aaa
*
* Concurrency and batch size can be adjusted manually in code below
*
*
* Adapted from https://gist.github.com/JasonStoltz/41b29e71310743e94275fc02fac33095 by JasonStoltz
*/
const fs = require("fs");
const readline = require("readline");
const [command, pathToJson, apiPath, engineName, apiKey] = process.argv.slice(
2
);
console.log("command: ", command);
console.log("pathToJson: ", pathToJson);
console.log("apiPath: ", apiPath);
console.log("engineName: ", engineName);
console.log("apiKey: ", apiKey);
const apiBase = `${apiPath}/api/as/v1`;
const documentsUrl = `${apiBase}/engines/${engineName}/documents`;
// Adapted from: https://www.tomas-dvorak.cz/posts/nodejs-request-without-dependencies/
const request = function(url, options = {}, requestBody = "") {
// return new pending promise
return new Promise((resolve, reject) => {
// select http or https module, depending on reqested url
const lib = url.startsWith("https") ? require("https") : require("http");
const request = lib.request(url, options, response => {
// temporary data holder
const body = [];
// on every content chunk, push it to the data array
response.on("data", chunk => body.push(chunk));
// we are done, resolve promise with those joined chunks
response.on("end", () => {
// handle http errors
if (response.statusCode < 200 || response.statusCode > 299) {
reject(
new Error(`API Error: ${response.statusCode} ${body.join(" ")}`)
);
}
resolve(body.join(""));
});
});
// handle connection errors of the request
request.on("error", err => reject(err));
request.write(requestBody);
request.end();
});
};
/*
Documents can only be indexed 100 at a time, so we index
our data set in batches. The following is a very simple batching function, which
allows for has a configurable `concurrency` variable, which allows for faster
indexing.
*/
async function indexDocumentsInBatches(documentLines) {
debugger;
const concurrency = 20;
const size = 100;
let recordsIndexed = 0;
let requests = [];
let batch = [];
async function processBatch() {
if (batch.length === 0) return;
if (requests.length >= concurrency) {
await Promise.all(requests);
// we'll get some 502 errors if we don't let ES breathe
await new Promise((resolve) => setTimeout(resolve, 500));
requests = [];
}
const batchString = JSON.stringify(batch);
const newRequest = request(
documentsUrl,
{
method: "POST",
headers: {
Authorization: `Bearer ${apiKey}`,
"Content-Type": "application/json",
"Content-Length": Buffer.byteLength(batchString)
}
},
batchString
).catch((e) => console.error("Bad request", e));
requests.push(newRequest);
recordsIndexed += batch.length;
console.log(`Indexing ${recordsIndexed} documents`);
batch = [];
}
for await (const line of documentLines) {
const document = JSON.parse(line);
batch.push(document);
if (batch.length >= size) await processBatch();
}
await processBatch();
await Promise.all(requests);
}
try {
var documentStream = fs.createReadStream(pathToJson, "utf8");
var documentLines = readline.createInterface({input: documentStream});
console.log(`About to process documents`);
indexDocumentsInBatches(documentLines);
} catch (e) {
console.error(e);
process.exit(1);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment