Last active
May 7, 2023 15:45
-
-
Save dacr/5329f56251cc42536d53c0357d772472 to your computer and use it in GitHub Desktop.
Feed elasticsearch cluster with almost 20 years of chicago crimes. / published by https://github.com/dacr/code-examples-manager #a4fba58a-12b4-4af0-a551-79a50ec66003/31b84545685c0b984002966d3747487e6ad14356
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : Feed elasticsearch cluster with almost 20 years of chicago crimes. | |
// keywords : scala, elasticsearch, feed, chicago, crimes, bigdata | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : a4fba58a-12b4-4af0-a551-79a50ec66003 | |
// created-on : 2018-10-11T15:27:20Z | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc' | |
import $ivy.`com.sksamuel.elastic4s::elastic4s-core:7.3.1` | |
import $ivy.`com.sksamuel.elastic4s::elastic4s-client-esjava:7.3.1` | |
import $ivy.`com.sksamuel.elastic4s::elastic4s-json-json4s:7.3.1` | |
import $ivy.`org.json4s::json4s-native:3.6.7` | |
import $ivy.`org.json4s::json4s-ext:3.6.7` | |
import $ivy.`ch.qos.logback:logback-classic:1.2.3` | |
import $ivy.`fr.janalyse::split:0.3.12` | |
import fr.janalyse.split.CsvSplit.split | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.http.JavaClient | |
import com.sksamuel.elastic4s.ElasticDsl._ | |
import com.sksamuel.elastic4s.json4s.ElasticJson4s.Implicits._ | |
import com.sksamuel.elastic4s.requests.mappings._ | |
import com.sksamuel.elastic4s.requests.mappings.FieldType._ | |
import org.json4s.{DefaultFormats, native} | |
import org.json4s.ext.JavaTimeSerializers | |
import java.time.{Instant, OffsetDateTime, ZoneId} | |
import java.time.format.DateTimeFormatter | |
import org.apache.http.client.config.RequestConfig | |
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder | |
import org.slf4j.{Logger, LoggerFactory} | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import annotation.tailrec | |
/* | |
ammonite good practices | |
- use class/object namespace when dealing with futures in scripts (in REPL it's OK) | |
- optimize JVM arguments : | |
+ export JAVA_OPTS="-Xms2g -Xmx6g" | |
--- | |
Fill elasticsearch with ~19 years of chicago crimes data : | |
`curl -L https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD -o crimes.csv` | |
- ~ 2001-01-01 / 19 years of criminal events / ~ 6 997 433 crimes | |
- 1.8Gb of data | |
- 584 seconds requires for the insertion (using await and 50 parallel futures) | |
+ The main bottleneck is IO - Write operations so it depends of disks performance | |
- crimes index size = 3.3Gb | |
export baseURL="http://127.0.0.1:9201" | |
curl "$baseURL/crimes?pretty" | |
curl "$baseURL/crimes/_count" | |
curl "$baseURL/crimes/_search" | |
curl "$baseURL/crimes/_refresh" | |
curl "$baseURL/crimes/_flush" | |
curl -XPOST "$baseURL/crimes/_open" | |
curl -XPOST "$baseURL/crimes/_close" | |
curl -XPOST "$baseURL/crimes/_freeze" // X-PACK | |
curl -XPOST "$baseURL/crimes/_unfreeze" // X-PACK | |
curl "$baseURL/crimes/_settings?pretty" | |
curl "$baseURL/_search?q=robbery&pretty&size=5" | |
curl "$baseURL/_cluster/allocation/explain?pretty" -d '{ "index":"crimes","shard":0,"primary":true}' -H 'Content-Type: application/json' | |
curl -XPUT "$baseURL/crimes/_settings" -d '{"index.number_of_replicas" : 0 }' -H 'Content-Type: application/json' | |
curl -XDELETE "$baseURL/crimes" | |
curl "$baseURL" | |
curl "$baseURL/_cat/indices" | |
curl "$baseURL/_all/_settings?pretty" | |
curl "$baseURL/_cluster/health?pretty" | |
curl "$baseURL/_cluster/health?pretty&wait_for_status=yellow&timeout=50s" | |
curl "$baseURL/_cluster/state" | |
curl "$baseURL/_cluster/state?human&pretty" | |
curl "$baseURL/_cluster/pending_tasks" | |
curl "$baseURL/_nodes" | |
curl "$baseURL/_nodes?pretty" | |
curl "$baseURL/_nodes/stats" | |
curl "$baseURL/_nodes/stats?pretty&human" | jq | |
curl "$baseURL/_search" -d '{"query":{"type":{"value":"_doc"}}}' -H 'Content-Type: application/json' | |
*/ | |
LoggerFactory | |
.getLogger(Logger.ROOT_LOGGER_NAME) | |
.asInstanceOf[ch.qos.logback.classic.Logger] | |
.setLevel(ch.qos.logback.classic.Level.ERROR) | |
LoggerFactory | |
.getLogger("esfill") | |
.asInstanceOf[ch.qos.logback.classic.Logger] | |
.setLevel(ch.qos.logback.classic.Level.INFO) | |
object Feed { | |
val logger = org.slf4j.LoggerFactory.getLogger("esfill") | |
import scala.concurrent.ExecutionContext.Implicits.global | |
implicit val serialization = native.Serialization | |
implicit val formats = DefaultFormats.lossless ++ JavaTimeSerializers.all | |
val indexName = "crimes" | |
val mappingName = "testmapping" | |
// Customize the default configuration, we're going to insert a huge amount of data in a unclean but fast way | |
val client = ElasticClient( JavaClient(ElasticProperties("http://127.0.0.1:9201,127.0.0.1:9202,127.0.0.1:9203,127.0.0.1:9204")) ) | |
client.execute { | |
clusterState() | |
}.map { | |
_.result.clusterName | |
} | |
def now() = System.currentTimeMillis() | |
def doCreateTestIndex(name: String) = client.execute { | |
logger.info(s"doCreateTestIndex($name)") | |
createIndex(name) | |
.mappings { | |
mapping() as Seq( | |
geopointField("location") | |
) | |
} | |
} | |
def doUpdateIndexRefreshInterval(name: String, value: String) = { | |
logger.info(s"doUpdateIndexRefreshInterval($name, $value)") | |
client.execute { | |
updateIndexLevelSettings(name) | |
.refreshInterval(value) | |
} | |
} | |
def doLoadingOptimizationsStart(name: String) = { | |
logger.info(s"doLoadingOptimizationsStart($name)") | |
client.execute { | |
updateIndexLevelSettings(name) | |
.numberOfReplicas(0) | |
.refreshInterval("-1") | |
} | |
} | |
def doLoadingOptimizationEnd(name: String) = { | |
logger.info(s"doLoadingOptimizationsEnd($name)") | |
client.execute { | |
updateIndexLevelSettings(name) | |
.numberOfReplicas(1) | |
.refreshInterval("10s") | |
} | |
} | |
def doRefreshIndex(name: String) = { | |
logger.info(s"doRefreshIndex($name)") | |
client.execute { refreshIndex(name) } | |
} | |
def doClean(name: String) = { | |
logger.info(s"doClean($name)") | |
client.execute { deleteIndex(name) } | |
} | |
def doCount(name: String) = { | |
logger.info(s"doCount($name)") | |
client.execute { count(name) } | |
} | |
def insertBulk(name: String, entries: Seq[Map[String, String]]) = client.execute { | |
bulk { | |
for {entry <- entries} yield { | |
indexInto(name ).doc(entry) | |
} | |
} | |
} | |
val dateFormat = DateTimeFormatter.ofPattern("MM/d/yyyy hh:mm:ss a").withZone(ZoneId.of("America/Chicago")) | |
def normalizeDate(date: String): String = { | |
Instant.from(dateFormat.parse(date)).toString | |
} | |
def normalizeHeaders(headers:List[String]):List[String] = { | |
headers.map(_.replaceAll("""\s+""", "")) | |
} | |
def lineToCell(line:String, limit:Int):Array[String] = { | |
line.split("""\s*,\s*""", limit) | |
} | |
def lineToDocument(headers:List[String])(line: String): Map[String, String] = { | |
// Join headers and cells into a map | |
val cells = headers.zip(split(line)).toMap | |
// Convert date format and normalize Timezone | |
val foundTimestamp = | |
cells | |
.get("Date") | |
.map(normalizeDate) | |
// remove parenthesis and space from geopoint and filter out missing locations | |
val foundLocation = | |
cells | |
.get("Location") | |
.map(_.replaceAll("""[^-,.0-9]""", "")) | |
.filterNot(_.trim.size==0) | |
.filter(_.matches("""-?\d+[.]\d+,-?\d+[.]\d+""")) | |
// Build the final document map | |
(cells -- Set("Date", "Location")) ++ | |
foundTimestamp.map(timestamp => "timestamp" -> timestamp) ++ | |
foundLocation.map(location => "location" -> location) | |
} | |
def fill() { | |
val linesIterator = scala.io.Source.fromFile("crimes.csv").getLines | |
val headers = normalizeHeaders(linesIterator.next.split("""\s*,\s*""").toList) | |
// consume input data | |
def writeData(indexName: String) = Future { | |
logger.info(s"writeData($indexName)") | |
val it = linesIterator.grouped(1000).toIterator // .take(500) // for test purposes, when you want to limit inputs | |
val parallelismLevel=20 | |
while (it.hasNext) { | |
print("*" * parallelismLevel) | |
val groups = it.take(parallelismLevel).map{group => | |
insertBulk(indexName, group.map(lineToDocument(headers))) | |
} | |
//blocking { | |
Future.sequence(groups).await // .await slower but won't generate any timeout with default config | |
//} | |
} | |
} | |
val startedAt = now() | |
val futureResponse = for { | |
cleaned <- doClean(indexName) // delete any existing indexName | |
created <- doCreateTestIndex(indexName) // create the indexName, required for geoloc type mapping | |
refreshDisabledResponse <- doLoadingOptimizationsStart(indexName) // To accelerate insertion | |
responses <- writeData(indexName) // bulk operation insert all events | |
refreshEnabledResponse <- doLoadingOptimizationEnd(indexName) // revert back to a normal behavior | |
refreshed <- doRefreshIndex(indexName) // to wait for every to be available for search... | |
count <- doCount(indexName) | |
} yield { | |
count | |
} | |
Await.result(futureResponse, 30.minutes) // because we don't want to exit the script before the future has completed | |
futureResponse map { countResponse => | |
val duration = (now() - startedAt) / 1000 | |
println(s"$countResponse documents inserted in $duration seconds") | |
} | |
} | |
} | |
Feed.fill() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment