Created
June 28, 2022 09:32
-
-
Save danield137/47e56c7cfd49c8fc8d3c3ae80fcced36 to your computer and use it in GitHub Desktop.
Index a sample of a kusto table into elasticsearch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List | |
import requests | |
import json | |
import time | |
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder | |
from datetime import datetime, timedelta | |
def main(): | |
def read_kusto_data(q): | |
results = kc.execute_query(KUSTO_DB ,q) | |
return [row[0] for row in results.primary_results[0]] | |
def push_objects_to_es(collection_name: str, objects: List[dict]): | |
for object in data: | |
result = session.post(f'{ES_URI}/{collection_name}/_doc', json=object, headers={'Content-Type': 'application/json'}) | |
result.raise_for_status() | |
return result | |
# connect to es | |
session = requests.session() | |
session.auth = ES_AUTH | |
session.verify = False | |
# because elastic only knows how to handle lowercase index names | |
es_index_name = KUSTO_TABLE.lower() | |
# ensure index exists | |
try: | |
indices = session.get(f'{ES_URI}/_cat/indices') | |
if indices.status_code == 200 and (not indices.content or KUSTO_TABLE not in indices): | |
print(f'Creating index : {es_index_name}') | |
result = session.put(f'{ES_URI}/{es_index_name}') | |
print(f'Creating index : {es_index_name}, result: {result.json()}') | |
else: | |
print(f'Index {es_index_name} exists') | |
except requests.exceptions.HTTPError as e: | |
print('Index already exists', e) | |
# connect to kusto | |
kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(KUSTO_URI) | |
kc = KustoClient(kcsb) | |
# read sample data from kusto | |
d = datetime.now() - timedelta(hours=12) | |
batch_size = 100 | |
last_query_time = 0 | |
courtesy_backoff_time_in_seconds = 15 | |
for iteration in range(60 * 60): | |
time_since_last_query = time.time() - last_query_time | |
if time_since_last_query < courtesy_backoff_time_in_seconds: | |
sleep_time = int(min(courtesy_backoff_time_in_seconds - time_since_last_query, courtesy_backoff_time_in_seconds)) | |
print(f'#{iteration}) Sleeping for {sleep_time} seconds') | |
time.sleep(sleep_time) | |
time_range = f"(datetime({d.isoformat()}) .. 1s)" | |
query = f'{KUSTO_TABLE} | where Timestamp between {time_range} | take {batch_size} | extend PackedRow = pack_all() | project PackedRow' | |
data = read_kusto_data(query) | |
last_query_time = time.time() | |
print(f"#{iteration}) submitting {batch_size} records in range {time_range} from {KUSTO_TABLE} to {es_index_name} ...") | |
result = push_objects_to_es(es_index_name, data) | |
print(f"${iteration}) Done. result: {result.status_code}") | |
result.raise_for_status() | |
d += timedelta(seconds=1) | |
return data | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment