Skip to content

Instantly share code, notes, and snippets.

@karthik20522
Last active May 25, 2019 03:23
Show Gist options
  • Save karthik20522/0724d8d912b4bb0db74eaaf5f3efc674 to your computer and use it in GitHub Desktop.
Save karthik20522/0724d8d912b4bb0db74eaaf5f3efc674 to your computer and use it in GitHub Desktop.
ReIndexing Elasticsearch in Scala
import org.json4s._
import org.json4s.JsonDSL._
import spray.client.pipelining._
import spray.http._
import spray.httpx._
import spray.httpx.encoding._
import scala.concurrent._
import scala.concurrent.duration._
import akka.actor._
import scala.collection.mutable.ListBuffer
import scala.annotation.tailrec
class ElasticsearchReIndexerActor(esInputHost: String,
esOutputHost: String,
inputIndex: String,
outputIndex: String,
indexType: String,
processData: (JValue) => JValue) extends Actor {
import context.dispatcher
val ouputIndexClient = new ESClient(s"http://$esOutputHost", outputIndex, indexType, context)
val pipeline = addHeader("Accept-Encoding", "gzip") ~> sendReceive ~> decode(Gzip) ~> unmarshal[HttpResponse]
var lastUpdateDateTime: String = "1900-01-01"
def receive = {
case "init" => {
val scanId: String = (Await.result(getScanId(lastUpdateDateTime), 60 seconds) \\ "_scroll_id").extract[String]
self ! scanId
}
case scanId: String => iterateData(scanId)
case ReceiveTimeout => self ! "init"
}
def getScanId(startDate: String): Future[JValue] = {
println("Query data with date gte: " + lastUpdateDateTime)
val esQuery = "{\"query\":{\"bool\":{\"must\":[{\"range\":{\"submitData\":{\"gte\":\"" + lastUpdateDateTime + "\"}}}]}}}"
val esURI = s"http://$esInputHost/$inputIndex/$indexType/_search?search_type=scan&scroll=5m&size=50"
val esResponse: Future[HttpResponse] = pipeline(Post(esURI, esQuery))
esResponse.map(r => { parse(r.entity.asString) })
}
def iterateData(scanId: String) = {
val scrollData = ("scroll_id" -> scanId)
val esURI = Uri(s"http://$esInputHost/_search/scroll?scroll=5m")
val esResponse: HttpResponse = Await.result(pipeline(Post(esURI, scanId)), 60 seconds)
val responseData: JValue = ModelJsonHelper.toJValue(esResponse.entity.asString)
val bulkList = new ListBuffer[JValue]()
val bulkData: ListBuffer[JValue] = (responseData \ "hits" \ "hits" \ "_source") match {
case JNothing | JNull => throw new Exception("Result set is empty")
case JArray(dataList) => {
dataList.foreach { data =>
val id = (data \ "id").extract[String]
val bulkIndexType = ("index" -> (("_index" -> outputIndex) ~
("_type" -> indexType) ~ ("_id" -> id)))
bulkList += bulkIndexType
bulkList += processData(data)
}
bulkList
}
case x => throw new Exception("UNKNWON TYPE: " + x);
}
val bulkResponse: SearchQueryResponse = Await.result(ouputIndexClient.bulk(bulkList.toList), 60 seconds)
(responseData \\ "_scroll_id") match {
case JNothing | JNull => {
lastUpdateDateTime = DateTime.now.toString
context.setReceiveTimeout(1.minute)
println("Paused at: " + lastUpdateDateTime)
}
case x => self ! x.extract[String]
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment