|
#!/usr/bin/env python |
|
import json |
|
import time |
|
import logging |
|
|
|
import click |
|
|
|
from elasticsearch import Elasticsearch, helpers |
|
|
|
client = None |
|
|
|
@click.group() |
|
@click.option('--host', '-h', multiple=True, metavar='<host:port>', |
|
help='hostname:port of running elasticsearch, can be specified multiple times.') |
|
@click.option('--sniff', is_flag=True, |
|
help='Inspect the cluster to get a list of currently active nodes for load balancing.') |
|
@click.option('--timeout', type=int, default=10, metavar='<timeout>', |
|
help='Connection timeout in seconds, default 10.') |
|
@click.option('--log-level', type=click.Choice(('DEBUG', 'INFO', 'WARNING', 'ERROR')), |
|
help='Turn on logging to stderr on specified log level. Use INFO to see all http requests.') |
|
def cli(host, timeout, sniff, log_level): |
|
""" |
|
Set of commands to work with data in elasticsearch. To use this for reindex |
|
"my-index" from localhost to otherhost, just run: |
|
|
|
\b |
|
escli --host localhost:9200 scan --index my-index --include-meta - | escli --host otherhost:9200 bulk - |
|
|
|
To reindex data to a different index: |
|
|
|
\b |
|
escli scan --index my-index --include-meta --meta-keys _type,_id - | escli bulk --index new-index - |
|
""" |
|
global client |
|
if log_level: |
|
logging.basicConfig(level=getattr(logging, log_level)) |
|
|
|
client = Elasticsearch(host or None, timeout=timeout, sniff_on_start=sniff) |
|
|
|
@cli.command('bulk', short_help='load documents into elasticsearch') |
|
@click.option('--ignore-errors', is_flag=True, |
|
help='Ignore errors, just report their number.') |
|
@click.option('--no-meta', is_flag=True, |
|
help=( |
|
'Do not try and parse the file for json, just pass it on as ' |
|
'strings. This means that the json doc cannot be used to extract ID or ' |
|
'other metadata.' |
|
)) |
|
@click.option('--threads', type=int, default=4, metavar='<thread_count>', |
|
help='Degree of parallelization, default 4.') |
|
@click.option('--index', metavar='<index>', |
|
help='Default index, if not overriden by the _index key in the json docs.') |
|
@click.option('--doc-type', metavar='<doc_type>', |
|
help='Default type, if not overriden by the _type key in the json docs.') |
|
@click.option('--chunk-size', type=int, default=500, metavar='<chunk_size>', |
|
help='Maximum number of documents to be sent to es in one bulk request.') |
|
@click.option('--max-chunk-bytes', type=int, default=100 * 1024 * 1024, metavar='<max_chunk_bytes>', |
|
help='Maximum size of the http request in bytes.') |
|
@click.option('--notify-every', type=int, default=10000, metavar='<N>', |
|
help='Print a summary every <N> documents, default 10000.') |
|
@click.argument('input', type=click.File('r')) |
|
def bulk(input, threads, notify_every, no_meta, ignore_errors, **kwargs): |
|
""" |
|
Loads data into elasticsearch using the python client's bulk helpers. |
|
|
|
The file should contain a json document per line in any format accepted by |
|
the python helpers: |
|
|
|
\b |
|
http://elasticsearch-py.readthedocs.org/en/master/helpers.html#bulk-helpers |
|
|
|
For maximum perfomance you can disable the parsing of the documents in |
|
python by specifying --no-meta option and providing <index> and <doc_type> |
|
values. All documents will then be indexed with random IDs allocated by |
|
elasticsearch: |
|
|
|
\b |
|
for x in `seq 100000` |
|
do |
|
echo '{"title": "Document '$x'", "number": '$x'}' |
|
done | escli bulk --index my-index --doc-type my-doc --no-meta |
|
""" |
|
bulk_helper = helpers.streaming_bulk |
|
bulk_kwargs = {} |
|
|
|
if threads > 1: |
|
bulk_helper = helpers.parallel_bulk |
|
bulk_kwargs['thread_count'] = threads |
|
|
|
if not no_meta: |
|
actions = map(json.loads, input) |
|
elif kwargs['doc_type'] is None or kwargs['index'] is None: |
|
# no meta means that we have to have index and doc_type set |
|
raise click.BadParameter( |
|
'We need index and doc-type defaults if we don\'t parse the json to extract the information.', |
|
param_hint='--meta') |
|
else: |
|
# no meta, just pass in the string for more performance |
|
actions = input |
|
|
|
if ignore_errors: |
|
bulk_kwargs.update({ |
|
'raise_on_exception': False, |
|
'raise_on_error': False, |
|
}) |
|
|
|
cnt, err_cnt = 0, 0 |
|
bulk_kwargs.update(kwargs) |
|
start = time.time() |
|
for ok, _ in bulk_helper(client, actions, **bulk_kwargs): |
|
cnt += 1 |
|
if not ok: |
|
err_cnt += 1 |
|
|
|
if notify_every and cnt % notify_every == 0: |
|
duration = time.time() - start |
|
click.echo('%d documents (%d failed) written in %f seconds (%f docs/sec)' % ( |
|
cnt, err_cnt, duration, cnt / duration), err=True) |
|
|
|
duration = time.time() - start |
|
click.echo('DONE') |
|
click.echo('%d documents (%d failed) written in %f seconds (%f docs/sec)' % ( |
|
cnt, err_cnt, duration, cnt / duration), err=True) |
|
|
|
@cli.command('scan', short_help='dump data from elasticsearch') |
|
@click.option('--index', metavar='<index>', |
|
help='Index to dump, use comma separated values to specify multiple indices.') |
|
@click.option('--doc-type', metavar='<doc_type>', |
|
help='Doc type to dump, use comma separated values to specify multiple types.') |
|
@click.option('--query', metavar='<query>', |
|
help='Query as json you want to run.') |
|
@click.option('--scroll', default='5m', |
|
help='Timeout for which to leave the scroll context alive, default 5m.') |
|
@click.option('--ignore-errors', is_flag=True, |
|
help='Ignore errors.') |
|
@click.option('--size', type=int, default=100, |
|
help='Number of documents (per shard) to retrieve at once.') |
|
@click.option('--include-meta', is_flag=True, |
|
help='Include metadata (_index, _type, _id, ...) in the output.') |
|
@click.option('--meta-keys', |
|
help=( |
|
'When including metadata, limit the keys to be included.' |
|
' _source is always included. Example (only include _type and _id):' |
|
' --meta-keys _type,_id' |
|
)) |
|
@click.option('--notify-every', type=int, default=10000, metavar='<N>', |
|
help='Print a summary every <N> documents, default 10000.') |
|
@click.argument('output', type=click.File('w')) |
|
def scan(output, query, scroll, ignore_errors, include_meta, meta_keys, notify_every, **kwargs): |
|
""" |
|
Dump all data from <index>/<doc_type> that match <query> into OUTPUT. |
|
""" |
|
if meta_keys and not include_meta: |
|
raise click.BadParameter('You cannot specify meta keys if you are not including metadata.', |
|
param_hint='--meta-keys') |
|
|
|
# request all relevant metadata fields |
|
fields = ('_source', '_parent', '_routing', '_timestamp') |
|
if meta_keys: |
|
meta_keys = set(k.strip() for k in meta_keys.split(',')) |
|
meta_keys.add('_source') |
|
fields = tuple(f for f in fields if f in meta_keys) |
|
|
|
|
|
cnt = 0 |
|
start = time.time() |
|
for doc in helpers.scan(client, scroll=scroll, raise_on_error=not |
|
ignore_errors, query=query, fields=fields, **kwargs): |
|
|
|
cnt += 1 |
|
if not include_meta: |
|
doc = doc['_source'] |
|
elif meta_keys: |
|
doc = dict((k, v) for (k, v) in doc.items() if k in meta_keys) |
|
output.write(json.dumps(doc)) |
|
output.write('\n') |
|
|
|
if notify_every and cnt % notify_every == 0: |
|
duration = time.time() - start |
|
click.echo('%d documents read in %f seconds (%f docs/sec)' % ( |
|
cnt, duration, cnt / duration), err=True) |
|
|
|
duration = time.time() - start |
|
click.echo('%d documents read in %f seconds (%f docs/sec)' % ( |
|
cnt, duration, cnt / duration), err=True) |
|
|
|
if __name__ == '__main__': |
|
cli() |