Created
April 22, 2020 12:27
-
-
Save palawer/4527732fe93c5c803c9d8b4e3f547dd6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, timedelta | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from elasticsearch import Elasticsearch | |
from elasticsearch.helpers import bulk | |
from itertools import zip_longest | |
import csv | |
MAX_WORKERS = 4 | |
BULK_SIZE = 50 | |
CSV_FILE = 'file.csv' | |
INDEX_NAME = 'index_2020-04-22' | |
INDEX_TYPE = 'index_type' | |
es = Elasticsearch() | |
def grouper(iterable, n=1000, fillvalue=None): | |
args = [iter(iterable)] * n | |
return zip_longest(*args, fillvalue=fillvalue) | |
def chunks(l, n): | |
for i in range(0, len(l), n): | |
yield l[i:i+n] | |
def parse_csv(row): | |
_source = {} | |
_id = 'hola' | |
return { | |
"_id" : _id, | |
"_type" : INDEX_TYPE, | |
"_index" : INDEX_NAME, | |
"_source" : _source, | |
'_op_type': 'create', | |
#'_op_type': 'index', | |
} | |
if __name__ == "__main__": | |
with open(CSV_FILE, mode='r') as csv_file: | |
csv_reader = csv.DictReader(csv_file) | |
records = [] | |
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: | |
for group in grouper(csv_reader): | |
futures = [pool.submit(parse_csv, x) for x in group if x] | |
for future in as_completed(futures): | |
data = future.result() | |
if data: | |
records.append(data) | |
if len(records) >= BULK_SIZE: | |
for chunk in chunks(records, BULK_SIZE): | |
pass#success, _ = bulk(es, chunk) | |
records = [] | |
if records:#remaining records | |
print(records) | |
pass#success, _ = bulk(es, records) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment