Created
July 7, 2022 03:21
-
-
Save kakarukeys/07e50177c4919d1b24ab16b623b6aa14 to your computer and use it in GitHub Desktop.
robust_parallel_bulk.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from enum import IntEnum | |
from datetime import datetime | |
from typing import List, Iterable, Dict, Any, Tuple, Literal, NamedTuple, Type | |
import tenacity as tn | |
from elasticsearch.helpers import parallel_bulk | |
from app.elasticsearch.client import es_client | |
from app.db.models import BaseModel | |
from app.settings import settings | |
def robust_parallel_bulk(es_ops: List[EsOp], index: str) -> None: | |
""" perform specified es operations in <es_ops> on <index> | |
retry on connection errors | |
""" | |
for attempt in tn.Retrying( | |
reraise=True, | |
stop=tn.stop_after_attempt(3), | |
wait=tn.wait_fixed(90), | |
retry=tn.retry_if_exception_type(ConnectionError) | |
): | |
with attempt: | |
parallel_bulk( | |
es_client.es_client, | |
es_ops, | |
thread_count=settings.bulk_indexing_thread_count, | |
queue_size=settings.bulk_indexing_thread_count, | |
chunk_size=500, | |
max_chunk_bytes=104857600, | |
index=index | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
example of tests: