Created
April 27, 2019 13:02
-
-
Save seandavi/6bb53d4c445e5ae8688d81e19c48bde3 to your computer and use it in GitHub Desktop.
elasticsearch parallel bulk indexing example in python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""parallel bulk indexing | |
Indexes a json file using parallel_bulk. | |
""" | |
import elasticsearch | |
from elasticsearch import Elasticsearch | |
from elasticsearch.helpers import parallel_bulk as pb | |
from collections import deque | |
import json | |
es = Elasticsearch( | |
[ | |
'https://user:password@host:9243/', | |
] | |
) | |
def yield_records(fname, idx='sra_bulk_test'): | |
with open(fname, 'r') as f: | |
for line in f: | |
res = json.loads(line) | |
yield { | |
"_op_type": "index", # this is the default | |
"_index": idx, | |
"_type": 'doc', # no more doc types after es_6 | |
"_id": res['experiment']['accession'], # extract _id from record | |
"_source": res # use entire record as "source" | |
} | |
def main(): | |
# pb returns a generator | |
# default is 4 threads | |
# So, need to drain the generator | |
# see: https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498/2 | |
deque(pb(es, actions = yield_records('/tmp/experiment_json_agg.json')),maxlen=0) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment