Install the essentials.
$ brew update && brew install elasticsearch && brew install apache-spark
Start ES.
$ elasticsearch
Add some test data.
$ curl -X PUT localhost:9200/megacorp/employees/1 -d '
{
"first_name" : "John",
"last_name" : "Smith",
"age" : 25,
"about" : "I love to go rock climbing",
"interests": [ "sports", "music" ]
}
'
$ curl -X PUT localhost:9200/megacorp/employees/2 -d '
{
"first_name" : "Jane",
"last_name" : "Smith",
"age" : 32,
"about" : "I like to collect rock albums",
"interests": [ "music" ]
}
'
$ curl -X PUT localhost:9200/megacorp/employees/3 -d '
{
"first_name" : "Douglas",
"last_name" : "Fir",
"age" : 35,
"about": "I like to build cabinets",
"interests": [ "forestry" ]
}
'
Start the Spark shell with ES integration.
$ spark-shell --packages org.elasticsearch:elasticsearch-spark_2.10:2.1.0.Beta4
Play around in the shell.
scala> import org.elasticsearch.spark._
import org.elasticsearch.spark._
scala> val rdd = sc.esRDD("megacorp/employees", "")
rdd: org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])] = ScalaEsRDD[1] at RDD at AbstractEsRDD.scala:17
scala> rdd.count
...
...
res1: Long = 3
RDD entries are (ID, key->value map) tuples.
scala> rdd.first
res2: (String, scala.collection.Map[String,AnyRef]) = (1,Map(first_name -> John, last_name -> Smith, age -> 25, about -> I love to go rock climbing, interests -> Buffer(sports, music)))
scala> rdd.filter(_._2("age").asInstanceOf[Long] > 30).map(_._2("first_name")).take(2)
..
..
res3: Array[AnyRef] = Array(Jane, Douglas)
I was rushing into network issue and
spark.es.nodes.wan.only=true
resolved the issue. Thank you!