Skip to content

Instantly share code, notes, and snippets.

@palawer
Created April 22, 2020 12:27
Show Gist options
  • Save palawer/4527732fe93c5c803c9d8b4e3f547dd6 to your computer and use it in GitHub Desktop.
Save palawer/4527732fe93c5c803c9d8b4e3f547dd6 to your computer and use it in GitHub Desktop.
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from itertools import zip_longest
import csv
MAX_WORKERS = 4
BULK_SIZE = 50
CSV_FILE = 'file.csv'
INDEX_NAME = 'index_2020-04-22'
INDEX_TYPE = 'index_type'
es = Elasticsearch()
def grouper(iterable, n=1000, fillvalue=None):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i+n]
def parse_csv(row):
_source = {}
_id = 'hola'
return {
"_id" : _id,
"_type" : INDEX_TYPE,
"_index" : INDEX_NAME,
"_source" : _source,
'_op_type': 'create',
#'_op_type': 'index',
}
if __name__ == "__main__":
with open(CSV_FILE, mode='r') as csv_file:
csv_reader = csv.DictReader(csv_file)
records = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
for group in grouper(csv_reader):
futures = [pool.submit(parse_csv, x) for x in group if x]
for future in as_completed(futures):
data = future.result()
if data:
records.append(data)
if len(records) >= BULK_SIZE:
for chunk in chunks(records, BULK_SIZE):
pass#success, _ = bulk(es, chunk)
records = []
if records:#remaining records
print(records)
pass#success, _ = bulk(es, records)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment