Skip to content

Instantly share code, notes, and snippets.

@cb372
Last active June 29, 2018 18:40
Show Gist options
  • Save cb372/1d7b1abbbf0c643f2903 to your computer and use it in GitHub Desktop.
Save cb372/1d7b1abbbf0c643f2903 to your computer and use it in GitHub Desktop.
Using Elasticsearch as a Spark data source

Install the essentials.

$ brew update && brew install elasticsearch && brew install apache-spark

Start ES.

$ elasticsearch

Add some test data.

$ curl -X PUT localhost:9200/megacorp/employees/1 -d '
{
    "first_name" : "John",
    "last_name" :  "Smith",
    "age" :        25,
    "about" :      "I love to go rock climbing",
    "interests": [ "sports", "music" ]
}
'

$ curl -X PUT localhost:9200/megacorp/employees/2 -d '
{
    "first_name" :  "Jane",
    "last_name" :   "Smith",
    "age" :         32,
    "about" :       "I like to collect rock albums",
    "interests":  [ "music" ]
}
'

$ curl -X PUT localhost:9200/megacorp/employees/3 -d '
{
    "first_name" :  "Douglas",
    "last_name" :   "Fir",
    "age" :         35,
    "about":        "I like to build cabinets",
    "interests":  [ "forestry" ]
}
'

Start the Spark shell with ES integration.

$ spark-shell --packages org.elasticsearch:elasticsearch-spark_2.10:2.1.0.Beta4

Play around in the shell.

scala> import org.elasticsearch.spark._
import org.elasticsearch.spark._

scala> val rdd = sc.esRDD("megacorp/employees", "")
rdd: org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])] = ScalaEsRDD[1] at RDD at AbstractEsRDD.scala:17

scala> rdd.count
...
...
res1: Long = 3

RDD entries are (ID, key->value map) tuples.

scala> rdd.first
res2: (String, scala.collection.Map[String,AnyRef]) = (1,Map(first_name -> John, last_name -> Smith, age -> 25, about -> I love to go rock climbing, interests -> Buffer(sports, music)))

scala> rdd.filter(_._2("age").asInstanceOf[Long] > 30).map(_._2("first_name")).take(2)
..
..
res3: Array[AnyRef] = Array(Jane, Douglas)
@ebuildy
Copy link

ebuildy commented Oct 10, 2016

@epugh what happens under the hood is this elasticsearch Hadoop driver uses classic elasticsearch HTTP API to get the data. So a simple count will result on fetching all data from elasticsearch, that is very long on large dataset.

@ebuildy
Copy link

ebuildy commented Oct 10, 2016

Ho @epugh, maybe you have a network issue, by default, all elasticsearch Hadoop drivers will try to use ES "local IP". But if you run ES on Docker for instance, this can not work. Thats why elastic team has added a configuration called "es.nodes.wan.only":

Running SPARK_LOCAL_IP="127.0.0.1" ./bin/spark-shell --master spark://localhost:7077 --conf spark.es.nodes.wan.only=true should help you.

@yuta-imai
Copy link

I was rushing into network issue and spark.es.nodes.wan.only=true resolved the issue. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment