Skip to content

Instantly share code, notes, and snippets.

@seandavi
Created April 27, 2019 13:02
Show Gist options
  • Save seandavi/6bb53d4c445e5ae8688d81e19c48bde3 to your computer and use it in GitHub Desktop.
Save seandavi/6bb53d4c445e5ae8688d81e19c48bde3 to your computer and use it in GitHub Desktop.
elasticsearch parallel bulk indexing example in python
"""parallel bulk indexing
Indexes a json file using parallel_bulk.
"""
import elasticsearch
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk as pb
from collections import deque
import json
es = Elasticsearch(
[
'https://user:password@host:9243/',
]
)
def yield_records(fname, idx='sra_bulk_test'):
with open(fname, 'r') as f:
for line in f:
res = json.loads(line)
yield {
"_op_type": "index", # this is the default
"_index": idx,
"_type": 'doc', # no more doc types after es_6
"_id": res['experiment']['accession'], # extract _id from record
"_source": res # use entire record as "source"
}
def main():
# pb returns a generator
# default is 4 threads
# So, need to drain the generator
# see: https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498/2
deque(pb(es, actions = yield_records('/tmp/experiment_json_agg.json')),maxlen=0)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment