Created
July 31, 2014 16:10
-
-
Save beiske/63b1605745daf7c0e0b0 to your computer and use it in GitHub Desktop.
Code for getting started with Elasticsearch and Lire
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.file.Files | |
import java.nio.file.Paths | |
import scala.Array.canBuildFrom | |
import scala.collection.JavaConverters.iterableAsScalaIterableConverter | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import org.elasticsearch.client.transport.TransportClient | |
import org.elasticsearch.common.settings.ImmutableSettings | |
import org.elasticsearch.common.transport.InetSocketTransportAddress | |
import com.sksamuel.elastic4s.ElasticClient | |
import dispatch.Future | |
import dispatch.Future | |
import dispatch.enrichFuture | |
/** | |
* Args: </path/to/jpg/images> <Number of images to index> | |
*/ | |
object BestMatchSearcher extends App { | |
val bulkSize = 50 | |
val path = Paths.get(args(0)) | |
val max = Integer.parseInt(args(1)) | |
def getData(): Stream[ImageDocument] = { | |
for { | |
childPath <- Files.newDirectoryStream(path).asScala.toStream | |
if childPath.toFile().getName().endsWith(".jpg") | |
} yield new ImageDocument(childPath) | |
} | |
val settings = ImmutableSettings.settingsBuilder() | |
.put("transport.type", "no.found.elasticsearch.transport.netty.FoundNettyTransportModule") | |
.put("transport.found.api-key", "api-key") | |
.put("cluster.name", "cluster_id") | |
.put("client.transport.ignore_cluster_name", false) | |
.build(); | |
val client = { | |
val address = new InetSocketTransportAddress("cluster_id-eu-west-1.foundcluster.com", 9343); | |
val c = ElasticClient.fromClient(new TransportClient(settings).addTransportAddress(address)) | |
println("Waiting for clusterstatus yellow.") | |
val clusterStatus = c.client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet().getStatus().name() | |
println(s"Got clusterstatus $clusterStatus") | |
c | |
} | |
def search(image: ImageDocument) = { | |
val response = Future {client.client.prepareSearch("images3").setQuery(image.toQuery).addField("name").get()} | |
// response.onSuccess{ | |
// case result => { | |
// println(s"Images similar to: ${image.name}") | |
// for (hit <- result.getHits().getHits()) { | |
// println(s"Found: [${hit.getFields().get("name").getValue()}] with score: [${hit.getScore()}]") | |
// } | |
// } | |
// } | |
val topAverageScore = response.map(_.getHits().getHits().take(3).map(_.getScore()).toSeq.sum / 3) | |
topAverageScore.map(image.name -> _) | |
} | |
def observeWinner(a: Future[Tuple2[String, Float]], b: Future[Tuple2[String, Float]]) = { | |
if (a()._2 < b()._2) { | |
println(s"New leader: ${b()._1}, score: ${b()._2}") | |
b | |
} else { | |
a | |
} | |
} | |
val winner = { | |
val stream = getData().map(search(_)) | |
new Thread{ | |
stream.size | |
}.start() | |
stream.reduce(observeWinner) | |
} | |
winner.onComplete{ | |
case _ => client.close | |
} | |
winner.onSuccess{ | |
case (image, score) => println(s"Winner is: $image with score $score") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "image-indexer" | |
version := "1.0-SNAPSHOT" | |
scalaVersion := "2.10.2" | |
libraryDependencies += "no.found.elasticsearch" % "elasticsearch-transport-module" % "0.8.7-1.0.0" | |
libraryDependencies += "com.sksamuel.elastic4s" % "elastic4s_2.10" % "1.0.3.0" | |
EclipseKeys.withSource := true | |
fork in run := true | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.file.Files | |
import java.nio.file.Path | |
import sun.misc.BASE64Encoder | |
case class ImageDocument(name: String, data: Array[Byte]) { | |
def this(path: Path) = this(path.toFile().getName(), Files.readAllBytes(path)) | |
def toJson = { | |
s""" | |
{ | |
"name" : "$name", | |
"image" :"${new BASE64Encoder().encode(data).replaceAll("[\n\r]", "")}" | |
}""" | |
} | |
def toQuery: String = { | |
s""" | |
{ | |
"image": { | |
"image": { | |
"feature": "CEDD", | |
"image": "${new BASE64Encoder().encode(data).replaceAll("[\n\r]", "")}", | |
"hash": "BIT_SAMPLING", | |
"limit": 10 | |
} | |
} | |
}""" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.file.Files | |
import java.nio.file.Paths | |
import scala.collection.JavaConverters.iterableAsScalaIterableConverter | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import org.elasticsearch.client.transport.TransportClient | |
import org.elasticsearch.common.settings.ImmutableSettings | |
import org.elasticsearch.common.transport.InetSocketTransportAddress | |
import com.sksamuel.elastic4s.ElasticClient | |
import com.sksamuel.elastic4s.ElasticDsl._ | |
import Reindexer.StringSource | |
import dispatch.Future | |
import dispatch.enrichFuture | |
import scala.concurrent.ExecutionContext | |
/** | |
* Args: </path/to/jpg/images> <Number of images to index> | |
*/ | |
object Indexer extends App { | |
val bulkSize = 3 | |
val path = Paths.get(args(0)) | |
val max = Integer.parseInt(args(1)) | |
def getData(): Stream[ImageDocument] = { | |
for { | |
childPath <- Files.newDirectoryStream(path).asScala.toStream | |
if childPath.toFile().getName().endsWith(".jpg") | |
} yield new ImageDocument(childPath) | |
} | |
def push(images: Stream[ImageDocument]) = { | |
val settings = ImmutableSettings.settingsBuilder() | |
.put("transport.type", "no.found.elasticsearch.transport.netty.FoundNettyTransportModule") | |
.put("transport.found.api-key", "api-key") | |
.put("cluster.name", "cluster_id") | |
.put("client.transport.ignore_cluster_name", false) | |
// .put("client.transport.nodes_sampler_interval", "30s") | |
// .put("client.transport.ping_timeout", "30s") | |
.build(); | |
val client = { | |
val address = new InetSocketTransportAddress("cluster_id-eu-west-1.foundcluster.com", 9343); | |
ElasticClient.fromClient(new TransportClient(settings).addTransportAddress(address)) | |
} | |
println("Waiting for clusterstatus yellow.") | |
val clusterStatus = client.client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet().getStatus().name() | |
println(s"Got clusterstatus $clusterStatus") | |
import ExecutionContext.Implicits.global | |
val bulkLoads = images.grouped(bulkSize).map { | |
batch => | |
client.bulk( | |
batch.map(image => index into "images5/image" source new StringSource(image.toJson)): _*) | |
}.toStream | |
Future.sequence(bulkLoads).onComplete(result => { | |
println("Closing client") | |
client.close() | |
}) | |
for (bulk <- bulkLoads) { | |
bulk.onSuccess { | |
case bulkResult => { | |
if (bulkResult.hasFailures()) { | |
for (result <- bulkResult.getItems()) { | |
if (result.isFailed()) { | |
println(s"Failed response: [${result.getFailure()}][${result.getFailureMessage()}]") | |
} | |
} | |
} else { | |
println("Bulkresult successful!") | |
} | |
} | |
} | |
} | |
val failureCount = (Future sequence bulkLoads.map(_.map(_.asScala.count(_.isFailed())))).map(_.sum) | |
failureCount.onSuccess { | |
case count: Int => { | |
println(s"Found ${images.size} images, $count failed to index") | |
} | |
} | |
failureCount() | |
} | |
push(getData().take(max)) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.file.Files | |
import java.nio.file.Paths | |
import scala.collection.JavaConverters.iterableAsScalaIterableConverter | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import org.elasticsearch.client.transport.TransportClient | |
import org.elasticsearch.common.settings.ImmutableSettings | |
import org.elasticsearch.common.transport.InetSocketTransportAddress | |
import com.sksamuel.elastic4s.ElasticClient | |
import com.sksamuel.elastic4s.ElasticDsl._ | |
import Reindexer.StringSource | |
import dispatch.Future | |
import dispatch.enrichFuture | |
import scala.concurrent.ExecutionContext | |
import scala.util.Random | |
/** | |
* Args: </path/to/jpg/images> <Number of images to consider> | |
*/ | |
object Searcher extends App { | |
val bulkSize = 50 | |
val path = Paths.get(args(0)) | |
val max = Integer.parseInt(args(1)) | |
def getData(): Stream[ImageDocument] = { | |
for { | |
childPath <- Files.newDirectoryStream(path).asScala.toStream | |
if childPath.toFile().getName().endsWith(".jpg") | |
} yield new ImageDocument(childPath) | |
} | |
def search(image: ImageDocument) = { | |
val settings = ImmutableSettings.settingsBuilder() | |
.put("transport.type", "no.found.elasticsearch.transport.netty.FoundNettyTransportModule") | |
.put("transport.found.api-key", "api-key") | |
.put("cluster.name", "cluster_id") | |
.put("client.transport.ignore_cluster_name", false) | |
.build(); | |
val client = { | |
val address = new InetSocketTransportAddress("cluster_id-eu-west-1.foundcluster.com", 9343); | |
ElasticClient.fromClient(new TransportClient(settings).addTransportAddress(address)) | |
} | |
println("Waiting for clusterstatus yellow.") | |
val clusterStatus = client.client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet().getStatus().name() | |
println(s"Got clusterstatus $clusterStatus") | |
println(s"Searching for images similar to: ${image.name}") | |
val response = client.client.prepareSearch("images3").setQuery(image.toQuery).addField("name").get() | |
for (hit <- response.getHits().getHits()) { | |
println(s"Found: [${hit.getFields().get("name").getValue()}] with score: [${hit.getScore()}]") | |
} | |
client.close | |
} | |
val index = Random.nextInt(max) | |
val image = getData()(index) | |
search(image) | |
System.exit(0) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment