Created
October 22, 2015 19:29
-
-
Save scotthaleen/169ad7c675c019f6d3f0 to your computer and use it in GitHub Desktop.
Bulk Index json to elastic search
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkContext, SparkConf | |
import json | |
import argparse | |
def fn_to_doc(line): | |
try: | |
doc = {} | |
data = json.loads(line) | |
doc['data'] = data | |
return [json.dumps(doc)] | |
except: | |
return [] | |
if __name__ == "__main__": | |
desc='elastic search ingest' | |
parser = argparse.ArgumentParser( | |
description=desc, | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
epilog=desc) | |
parser.add_argument("input_path", help="lines of json to ingest") | |
parser.add_argument("es_resource", help="index and doc_type (my-index/doc)") | |
parser.add_argument("--es_nodes", default="127.0.0.1", help="es.nodes") | |
parser.add_argument("--es_port", default="9200", help="es.port") | |
args = parser.parse_args() | |
conf = SparkConf().setAppName("Elastic Ingest") | |
sc = SparkContext(conf=conf) | |
es_write_conf = { | |
"es.nodes" : args.es_nodes, | |
"es.port" : args.es_port, | |
"es.resource" : args.es_resource, | |
#"es.nodes.client.only" : "true", | |
"es.input.json" : "yes" | |
} | |
hdfs_path = args.input_path | |
d = sc.textFile(hdfs_path).map(lambda x : ("key", x)) | |
d.saveAsNewAPIHadoopFile( | |
path='-', | |
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", | |
keyClass="org.apache.hadoop.io.NullWritable", | |
valueClass="org.apache.hadoop.io.Text", | |
#valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", | |
conf=es_write_conf) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
set +x | |
set -e | |
INDEX=my_index | |
DOC_TYPE=my_doc_type | |
MAPPING_FILE=my_mapping | |
response=$(curl -XHEAD -i --write-out %{http_code} --silent --output /dev/null "localhost:9200/${INDEX}") | |
if [[ "$response" -eq 404 ]]; then | |
printf "create index ${INDEX}\n" | |
curl -s -XPOST "http://localhost:9200/${INDEX}" -d '{ "settings": { "index": { "mapping.allow_type_wrapper": true } } }' | |
fi | |
response=$(curl -XHEAD -i --write-out %{http_code} --silent --output /dev/null "localhost:9200/${INDEX}/${DOC_TYPE}") | |
if [[ "$response" -eq 200 ]]; then | |
printf "delete doc_type\n" | |
curl -XDELETE "localhost:9200/${INDEX}/${DOC_TYPE}" | |
fi | |
printf "create doc_type\n" | |
curl -s -XPUT "http://localhost:9200/${INDEX}/${DOC_TYPE}/_mapping" --data-binary "@${MAPPING_FILE}" | |
printf "ingest documents\n" | |
spark-submit --master local[*] --driver-memory 8g --jars lib/elasticsearch-hadoop-2.1.1.jar --conf spark.storage.memoryFraction=.8 spark/elastic_bulk_ingest.py "my/path/input/part-*" "${INDEX}/${DOC_TYPE}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment