pip install elasticsearch
Read https://elasticsearch-py.readthedocs.io/en/master/
Read example_es.py. :)
pip install elasticsearch
Read https://elasticsearch-py.readthedocs.io/en/master/
Read example_es.py. :)
| # stdlib | |
| import json | |
| import os | |
| from functools import partial | |
| # 3rd party | |
| import certifi | |
| import elasticsearch | |
| # TODO: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-bool-query.html | |
| # use bool queries too (maybe instead of and and or) | |
| def And(*clauses): | |
| return { | |
| "query": { | |
| "filtered": { | |
| "filter": { | |
| "and": clauses | |
| } | |
| } | |
| } | |
| } | |
| def Or(*clauses): | |
| return { | |
| "query": { | |
| "filtered": { | |
| "filter": { | |
| "or": clauses | |
| } | |
| } | |
| } | |
| } | |
| def Exists(field): | |
| return {"exists": {"field": field}} | |
| def Match(field, value): | |
| return {"match": {field: value}} | |
| def Script(*statements): | |
| return { | |
| "inline": '; '.join(statements) | |
| } | |
| def debug_elasticsearch(log='elastic.log', trace='elastic.trace.log'): | |
| """ | |
| Call this one time to turn on a trace to log files. This will log the | |
| HTTP calls, which you can then execute using curl. | |
| """ | |
| import logging | |
| import logging.handlers | |
| es_logger = logging.getLogger('elasticsearch') | |
| es_logger.propagate = False | |
| es_logger.setLevel(logging.INFO) | |
| es_logger_handler = logging.handlers.RotatingFileHandler( | |
| log, | |
| maxBytes=0.5*10**9, | |
| backupCount=3 | |
| ) | |
| es_logger.addHandler(es_logger_handler) | |
| es_tracer = logging.getLogger('elasticsearch.trace') | |
| es_tracer.propagate = False | |
| es_tracer.setLevel(logging.DEBUG) | |
| es_tracer_handler = logging.handlers.RotatingFileHandler( | |
| trace, | |
| maxBytes=0.5*10**9, | |
| backupCount=3 | |
| ) | |
| es_tracer.addHandler(es_tracer_handler) | |
| def get_elastic(url): | |
| """ | |
| Return an Elasticsearch instance | |
| """ | |
| elastic = elasticsearch.Elasticsearch( | |
| hosts=[url], | |
| verify_certs=True, | |
| use_ssl=True, | |
| ca_certs=certifi.where() | |
| ) | |
| return elastic | |
| def get_operation(elastic, method, **kwargs): | |
| """ | |
| Return an es operation bound to **kwargs. | |
| """ | |
| return partial(getattr(elastic, method), **kwargs) | |
| def get_document_op(elastic, index, doc_type): | |
| """ | |
| Get an operation bound to an :index and :doc_type, so that just the | |
| :method remains to be specified | |
| """ | |
| return partial(get_operation, elastic, doc_type=doc_type, index=index) | |
| def enumerate_call(search, call_id, link, call_side, show=None): | |
| """ | |
| List the fields on a call document, showing any in :show | |
| """ | |
| doc_id = '_'.join([call_id, link, call_side]) | |
| print('Fetching {}...'.format(doc_id)) | |
| query = And([ | |
| Match('call_id', call_id), | |
| Match('call_side', call_side), | |
| Match('link', link) | |
| ]) | |
| result = search( | |
| body=query, | |
| size=2, | |
| sort=['created:desc', 'call_id'] | |
| ) | |
| documents = result['hits']['hits'] | |
| for doc in documents: | |
| print(json.dumps(doc, indent=2, sort_keys=True)) | |
| print(json.dumps( | |
| sorted(doc['_source'].keys()), | |
| indent=2, | |
| sort_keys=True | |
| )) | |
| for field in show: | |
| print('{}:\n'.format(field)) | |
| print(json.dumps( | |
| doc['_source'].get(field), | |
| indent=2, | |
| sort_keys=True | |
| )) | |
| def doc_by_id(search, doc_id): | |
| query = And([Match('doc_id', doc_id)]) | |
| result = search( | |
| body=query, | |
| size=1 | |
| ) | |
| documents = result['hits']['hits'] | |
| print(documents) | |
| for doc in documents: | |
| print('doc id: {}'.format(doc['_id'])) | |
| print(json.dumps( | |
| sorted(doc['_source'].keys()), | |
| indent=2, | |
| sort_keys=True | |
| )) | |
| def docs_with_fields(search, fields, size=1): | |
| """ | |
| Get documents that have any field in :fields. | |
| """ | |
| query = Or([Exists(field) for field in fields]) | |
| result = search( | |
| body=query, | |
| size=size, | |
| sort=['created:desc', 'call_id'] | |
| ) | |
| return result | |
| def count_docs_with_fields(search, fields): | |
| """ | |
| Get a count of documents that have any field in :fields. | |
| """ | |
| result = docs_with_fields(search, fields) | |
| return result['hits']['total'] | |
| def list_docs_with_fields(search, fields, size=1): | |
| """ | |
| Get a count of documents that have any field in :fields. | |
| """ | |
| result = docs_with_fields(search, fields, size=size) | |
| return [d['_id'] for d in result['hits']['hits']] | |
| def last_N_processed(search, fields, N=1): | |
| doc_ids = list_docs_with_fields(search, fields, size=N * 2) | |
| return [d for d in doc_ids if 'caller' in d] | |
| def get_operations(elastic, doc_type, read_idx, write_idx, read_ops, | |
| write_ops): | |
| """ | |
| Binds indexes and doc_type to es operations | |
| """ | |
| ops = [] | |
| get_read_op = get_document_op(elastic, read_idx, doc_type) | |
| get_write_op = get_document_op(elastic, write_idx, doc_type) | |
| for op in read_ops: | |
| ops.append(get_read_op(op)) | |
| for op in write_ops: | |
| ops.append(get_write_op(op)) | |
| return ops | |
| def get_ops(url, doc_type, read_index, write_index): | |
| # instance | |
| elastic = get_elastic(url) | |
| # bound operations | |
| search, index, update, update_by_query = get_operations( | |
| elastic, | |
| doc_type, | |
| read_index, | |
| write_index, | |
| ['search'], | |
| ['index', 'update', 'update_by_query'] | |
| ) | |
| return search, index, update, update_by_query | |
| def transcript_context(): | |
| url = os.environ["ELASTICSEARCH_WRITE_URL"] | |
| doc_type = 'transcripts' | |
| read_index = 'echelon-read' | |
| write_index = 'echelon-write' | |
| return (url, doc_type, read_index, write_index) | |
| def main(): | |
| # context | |
| url = os.environ["ELASTICSEARCH_WRITE_URL"] | |
| doc_type = 'transcripts' | |
| read_index = 'echelon-read' | |
| write_index = 'echelon-write' | |
| # create the operations (these are functions) | |
| search, index, update, update_by_query = get_ops( | |
| url, | |
| doc_type, | |
| read_index, | |
| write_index | |
| ) | |
| # uncomment this to turn on tracing/logging | |
| # debug_elasticsearch() | |
| call = '00ad34d8-d09e-1235-06b0-0cc47a392728_0_caller' | |
| call_id, link, call_side = call.split('_') | |
| enumerate_call(search, call_id, link, call_side, show=[ | |
| 'post_features', # 'model_events' # 'post_events' | |
| ]) | |
| # doc_ids = last_N_processed(search, ['post_features'], N=50) | |
| # for id in doc_ids: | |
| # print(id) | |
| # doc_id = 'jdobson-test-12345' | |
| # doc_by_id(search, doc_id) | |
| if __name__ == "__main__": | |
| main() |