Skip to content

Instantly share code, notes, and snippets.

@siffash
Last active September 5, 2020 19:44
Show Gist options
  • Save siffash/1067473b18bdbc9ac3df7b64d61b232b to your computer and use it in GitHub Desktop.
Save siffash/1067473b18bdbc9ac3df7b64d61b232b to your computer and use it in GitHub Desktop.
Javascript function for using Elasticsearch in AWS
/**
The following environment variables have to be set:
AWS_ES_REGION
AWS_ES_DOMAIN
AWS_ES_INDEX
AWS_ES_ACCESS_KEY_ID
AWS_ES_SECRET_ACCESS_KEY
**/
require('dotenv').config();
// define AWS
const AWS = require('aws-sdk');
const { AWS_ES_REGION, AWS_ES_DOMAIN, AWS_ES_INDEX } = process.env;
enum HttpMethod {
OPTIONS = 'OPTIONS',
HEAD = 'HEAD',
GET = 'GET',
POST = 'POST',
PUT = 'PUT',
DELETE = 'DELETE',
}
async function aws_es(method: HttpMethod, api: string, jsonBody: Object | Array<Object>) {
// check for ndjson
const isNDJSON = api === '_bulk' || api === '_msearch';
if (isNDJSON && !Array.isArray(jsonBody)) {
return Promise.reject(`Expect the body to be an array because you want to use ${api} API`);
}
// construct the request
const endpoint = new AWS.Endpoint(AWS_ES_DOMAIN);
const request = new AWS.HttpRequest(endpoint, AWS_ES_REGION);
request.method = method;
request.path += `${AWS_ES_INDEX}/${api}`;
request.body = isNDJSON ?
jsonBody.reduce((acc, line) => acc + JSON.stringify(line) + "\n", "") :
JSON.stringify(jsonBody);
request.headers['host'] = AWS_ES_DOMAIN;
request.headers['Content-Type'] = isNDJSON ? 'application/x-ndjson' : 'application/json';
request.headers['Content-Length'] = Buffer.byteLength(request.body);
// sign the request
const credentials = new AWS.EnvironmentCredentials('AWS_ES');
const signer = new AWS.Signers.V4(request, 'es');
signer.addAuthorization(credentials, new Date());
// send the request
return new Promise((res, rej) => {
const client = new AWS.HttpClient();
client.handleRequest(request, null, response => {
const { statusCode, statusMessage } = response;
console.log(statusCode + ' ' + statusMessage);
let responseBody = '';
response.on('data', chunk => responseBody += chunk);
response.on('end', () => {
const parsedResponse = JSON.parse(responseBody);
console.log(JSON.stringify(parsedResponse, null, 4));
statusCode === 200 ? res(parsedResponse) : rej(parsedResponse);
});
}, err => {
console.log('Error: ' + err);
rej(err);
});
});
}
/**
EXAMPLE:
add mapping, create 2 documents, search them, set deactivated = true for all documents,
delete the documents where deactivated = true, and then delete the index (also deletes the mapping)
**/
// add mapping
aws_es('PUT', '', {
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "standard"
},
"description": {
"type": "text",
"analyzer": "standard",
"fields": {
"length": {
"type": "token_count",
"analyzer": "standard",
"index": false
}
}
},
"category": {
"type": "keyword"
},
"affiliateLink": {
"enabled": false
},
"image": {
"enabled": false
},
"thumbnail": {
"enabled": false
},
"currency": {
"type": "keyword"
},
"country": {
"type": "keyword"
},
"language": {
"type": "keyword"
},
"importType": {
"enabled": false
},
"price": {
"type": "double"
},
"earning": {
"enabled": false
},
"networkId": {
"enabled": false
},
"metaInformation": {
"enabled": false
},
"creationDate": {
"enabled": false
},
"updatedOn": {
"enabled": false
},
"deactivated": {
"type": "boolean"
}
}
}
})
// create 2 documents
.then(() =>
aws_es('POST', '_bulk', [
{ "index": { "_id": "123" } },
{ "title": "Test", "description": "test", "category": "test", "affiliateLink": "test", "image": "test", "thumbnail": "test", "currency": "test", "country": "test", "language": "test", "importType": "test", "price": 22.33, "earning": "test", "networkId": "test", "metaInformation": "test", "creationDate": "test", "updatedOn": "test", "deactivated": false },
{ "index": { "_id": "789" } },
{ "title": "Test", "description": "test", "category": "test", "affiliateLink": "test", "image": "test", "thumbnail": "test", "currency": "test", "country": "test", "language": "test", "importType": "test", "price": 22.33, "earning": "test", "networkId": "test", "metaInformation": "test", "creationDate": "test", "updatedOn": "test", "deactivated": false }
])
)
// wait 1 sec
.then(() => new Promise(res => setTimeout(res, 1000)))
// search 2 documents
.then(() =>
aws_es('POST', '_search', {
"query": {
"function_score": {
"query": {
"bool": {
"must": [
{
"multi_match": {
"query": "test", // the searching query
"operator": "and",
"fields": [
"title^3",
"description"
],
"type": "most_fields"
}
}
],
"filter": {
"term": {
"country": "test"
}
}
}
},
"functions": [
{
"field_value_factor": {
"field": "price",
"modifier": "log2p",
"missing": 1
}
},
{
"field_value_factor": {
"field": "description.length",
"modifier": "log2p",
"missing": 1
}
},
{
"script_score": {
"script": {
"source": "doc['category'].size() == 0 || doc['category'].value == '' ? 1 : 1.5"
}
}
}
]
}
},
"sort": [
{
"price": {
"order": "desc"
}
}
],
"size": 10,
"from": 0,
"aggs": {
"categories": {
"terms": {
"field": "category",
"missing": ""
}
},
"max_price": {
"max": {
"field": "price"
}
}
}
})
)
// set deactivated = true for all documents
.then(() =>
aws_es('POST', '_update_by_query', {
"script": {
"source": "ctx._source.deactivated=true",
"lang": "painless"
}
})
)
// wait 1 sec
.then(() => new Promise(res => setTimeout(res, 1000)))
// delete the documents where deactivated = true
.then(() =>
aws_es('POST', '_update_by_query', {
"query": {
"term": {
"deactivated": true
}
},
"script": {
"source": "ctx.op='delete'",
"lang": "painless"
}
})
)
// delete the index (also deletes the mapping)
.then(() =>
aws_es('DELETE', '', {})
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment