Last active
April 2, 2023 10:10
-
-
Save dacr/026c9207894c00871c19c872fd78d821 to your computer and use it in GitHub Desktop.
feed elasticsearch with code examples content, generates ready to insert json files with feeding scripts / published by https://github.com/dacr/code-examples-manager #b82d3ba4-aa7e-436b-a480-c46a4e011a30/6e8dc2c2385fa345f2413e0601cc2fc417d31730
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : feed elasticsearch with code examples content, generates ready to insert json files with feeding scripts | |
// keywords : scala, elasticsearch, cem, code-examples-manager, search, generator, elastic4s, json4s, basic-auth, credential | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : b82d3ba4-aa7e-436b-a480-c46a4e011a30 | |
// created-on : 2021-04-29T16:52:54Z | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc' | |
// run-with : cs launch com.lihaoyi:::ammonite:2.3.8 -M ammonite.Main -- $file | |
import $ivy.`com.github.pathikrit::better-files:3.9.1` | |
import $ivy.`com.sksamuel.elastic4s::elastic4s-core:7.12.0` | |
import $ivy.`com.sksamuel.elastic4s::elastic4s-client-esjava:7.12.0` | |
import $ivy.`com.sksamuel.elastic4s::elastic4s-json-json4s:7.12.0` | |
import $ivy.`org.json4s::json4s-jackson:3.6.11` | |
import $ivy.`org.json4s::json4s-ext:3.6.11` | |
import $ivy.`org.slf4j:slf4j-nop:1.7.30` | |
import better.files._ | |
import java.nio.file.attribute.PosixFilePermission | |
import java.time.format.DateTimeFormatter.ISO_DATE_TIME | |
import java.time.{Instant, ZoneId, ZonedDateTime} | |
import java.time.temporal.ChronoField | |
import scala.concurrent.duration.DurationInt | |
import scala.util.{Properties, Try, Success, Failure} | |
// ===================================================================================================================== | |
case class Document( | |
id: String, // important | |
file: String, | |
filename: String, | |
summary: String, | |
category: String, | |
content: String, | |
license: String, | |
keywords: List[String], | |
publish: List[String], | |
execution: Option[String], | |
run_with: Option[String], | |
authors: List[String], | |
created_on: ZonedDateTime, | |
last_updated: ZonedDateTime, | |
updated_count: Int, | |
) | |
// ===================================================================================================================== | |
val from = Properties.envOrElse("CEM_ELK_FEED_FROM_PATH", ".") | |
val glob = Properties.envOrElse("CEM_ELK_FEED_GLOB", "**/*.*") | |
val ignoreRE = Properties.envOrElse("CEM_SEARCH_IGNORE_MASK", "(/[.]bsp)|(/[.]scala)|([.png]$)").r | |
val gitCreatedByCacheDir = Properties.envOrElse("CEM_ELK_FEED_GIT_CREATED_BY_CACHE_DIR", "/tmp/cem/git-created-by-cache") | |
val insertedJsonDocumentsCacheDir = Properties.envOrElse("CEM_ELK_FEED_INSERTED_JSON_DOCUMENTS", "/tmp/cem/inserted-documents") | |
val elasticUrl = Properties.envOrElse("CEM_ELASTIC_URL", "http://127.0.0.1:9200") // CEM_ELASTIC_URL MUST CONTAIN the port explicitly ! | |
val elasticUsername = Properties.envOrNone("CEM_ELASTIC_USERNAME") | |
val elasticPassword = Properties.envOrNone("CEM_ELASTIC_PASSWORD") | |
// ===================================================================================================================== | |
def indexNameFromDocument(doc: Document): String = { | |
val year = doc.created_on.get(ChronoField.YEAR) | |
val month = doc.created_on.get(ChronoField.MONTH_OF_YEAR) | |
val day = doc.created_on.get(ChronoField.DAY_OF_MONTH) | |
val week = doc.created_on.get(ChronoField.ALIGNED_WEEK_OF_YEAR) | |
//s"cem-$year-$month-$day" | |
//s"cem-$year-$week" | |
s"cem-$year-$month" | |
//s"cem-default" | |
} | |
// ===================================================================================================================== | |
def eligible(file: File): Boolean = { | |
val firstLines = file.lineIterator.take(20).toList | |
firstLines.exists(_.matches(".* summary : .+")) && | |
firstLines.exists(_.matches(".* publish :.*")) && | |
firstLines.exists(_.matches(".* id : [-a-f0-9]+$")) | |
} | |
val fromFile = File(from) | |
val files = | |
fromFile | |
.glob(glob, includePath = false) | |
.filterNot(_.isDirectory) | |
.filterNot(file => ignoreRE.findFirstIn(file.path.toString).isDefined) | |
.filter(eligible) | |
.toList | |
println(s"Found ${files.size} eligible files for indexation") | |
def extractValue(from: String, key: String): Option[String] = { | |
val RE = ("""(?m)(?i)^(?:(?:// )|(?:## )|(?:- )|(?:-- )) *""" + key + """ *: *(.*)$""").r | |
RE.findFirstIn(from).collect { case RE(value) => value.trim }.filter(_.size > 0) | |
} | |
def extractValueList(from: String, key: String): List[String] = { | |
extractValue(from, key).map(_.split("""\s*[,;]\s*""").toList).getOrElse(Nil) | |
} | |
def getGitCommandResults(command:String, cachedResultKey:String, checkUpdatedDate:Option[Instant] = None):Try[String] = Try { | |
val cacheDirFile = gitCreatedByCacheDir.toFile | |
if (!cacheDirFile.exists()) cacheDirFile.createDirectoryIfNotExists(true) | |
val cached = file"$cacheDirFile/$cachedResultKey" | |
val result = | |
if ( | |
cached.exists && cached.contentAsString.trim.size>0 && ( | |
checkUpdatedDate.isEmpty || checkUpdatedDate.get.toEpochMilli <= cached.lastModifiedTime.toEpochMilli | |
) | |
) cached.contentAsString.trim | |
else { | |
import scala.sys.process._ | |
val result = command.!! | |
cached.writeText(result) | |
result | |
} | |
result | |
} | |
def getCreatedOnFromGit(file: File, id: String): Try[ZonedDateTime] = { | |
val filestr = file.pathAsString | |
val command = s"""git log --diff-filter=A --follow --format=%aI -1 -- $filestr""" | |
val createdOnStrTried = getGitCommandResults(command, id) | |
val createdOn = createdOnStrTried.flatMap(createdByStr => | |
Try(ZonedDateTime.from(ISO_DATE_TIME.parse(createdByStr.trim))) | |
) | |
createdOn | |
} | |
def getUpdatedCountFromGit(file: File, id: String): Try[Int] = { | |
val filestr = file.pathAsString | |
val command = s"""git log --oneline $filestr""" | |
val resultTried = getGitCommandResults(command, s"$id-history", Some(file.lastModifiedTime)) | |
resultTried.map(historyContent => historyContent.split("""\R""").size) | |
} | |
def getCleanedExampleContentForIndexing(file: File) = { | |
file | |
.lineIterator | |
.dropWhile(line => line.trim.startsWith("<!--")) | |
.dropWhile(line => line.matches("""(?m)(?i)^(?:(?:// )|(?:## )|(?:- )|(?:-- ))[-a-zA-Z]+ :.*""")) | |
.dropWhile(line => line.trim.startsWith("-->")) | |
.dropWhile(line => line.trim.isEmpty) | |
.mkString("\n") | |
} | |
def convertDateTime(that:String):ZonedDateTime = { | |
ZonedDateTime.from(ISO_DATE_TIME.parse(that)) | |
} | |
// ---------------------------------------------------------------- | |
// prepare input documents | |
val documents = for { | |
file <- files | |
_ = println(s"reading $file") | |
contentHeader = file.lineIterator.take(20).mkString("\n") | |
id <- extractValue(contentHeader, "id") | |
createdOn <- extractValue(contentHeader, "created-on").map(convertDateTime).orElse(getCreatedOnFromGit(file, id).toOption) | |
lastUpdated = file.lastModifiedTime | |
updatedCount <- getUpdatedCountFromGit(file, id).toOption | |
summary <- extractValue(contentHeader, "summary") | |
license <- extractValue(contentHeader, "license") | |
keywords = extractValueList(contentHeader, "keywords") | |
publish = extractValueList(contentHeader, "publish") | |
execution = extractValue(contentHeader, "execution") | |
run_with = extractValue(contentHeader, "run_with") | |
authors = extractValueList(contentHeader, "authors") | |
} yield { | |
Document( | |
file = file.pathAsString, | |
id = id, | |
created_on = createdOn, | |
last_updated = lastUpdated.atZone(ZoneId.systemDefault()), | |
filename = file.name, | |
category = file.parent.pathAsString.replaceAll(fromFile.pathAsString + "/", ""), | |
content = getCleanedExampleContentForIndexing(file), | |
summary = summary, | |
keywords = keywords, | |
publish = publish, | |
license = license, | |
execution = execution, | |
run_with = run_with, | |
authors = authors, | |
updated_count = updatedCount | |
) | |
} | |
// ---------------------------------------------------------------- | |
// id issues - all distincts | |
val idIssues = documents.groupBy(_.id).filter { case (id, docs) => docs.size > 1 } | |
assert(idIssues.size == 0, | |
"Found duplicates in documents identifiers :\n" + | |
idIssues.map { case (id, docs) => " " + id + " : " + docs.map(_.filename).mkString(" ") }.mkString("\n") | |
) | |
// ---------------------------------------------------------------- | |
// summary issues - all distincts | |
val summaryIssues = documents.groupBy(_.summary.trim.toLowerCase).filter { case (summary, docs) => docs.size > 1 } | |
assert(summaryIssues.size == 0, | |
"Found duplicates in documents summaries :\n" + | |
summaryIssues.map { case (_, docs) => " " + docs.map(_.filename).mkString(" ") }.toList.sorted.mkString("\n") | |
) | |
// ---------------------------------------------------------------- | |
// check if some files have been ignored because of missing information | |
val ignoredFilesIssues = files.filterNot(file => documents.exists(d => d.file == file.pathAsString)) | |
assert(ignoredFilesIssues.size == 0, | |
"Found ignored files :\n" + | |
ignoredFilesIssues.map(f => " " + f.path.toString).mkString("\n") | |
) | |
// ---------------------------------------------------------------- | |
//documents.sortBy(d => d.category -> d.filename).foreach(doc => | |
// println(doc.id, doc.category, doc.filename, doc.summary) | |
//) | |
println(s"Found ${documents.size} documents ready for indexation") | |
// ===================================================================================================================== | |
def now(): Long = System.currentTimeMillis() | |
// ===================================================================================================================== | |
object ElasticFeed { | |
import org.json4s.DefaultFormats | |
import org.json4s.jackson.Serialization | |
import org.json4s.ext.JavaTimeSerializers | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.http.JavaClient | |
import com.sksamuel.elastic4s.ElasticDsl._ | |
import com.sksamuel.elastic4s.json4s.ElasticJson4s.Implicits._ | |
import com.sksamuel.elastic4s.requests.mappings._ | |
import com.sksamuel.elastic4s.requests.mappings.FieldType._ | |
import com.sksamuel.elastic4s.Response | |
import com.sksamuel.elastic4s.requests.bulk.BulkResponse | |
import java.time.{Instant, OffsetDateTime, ZoneId} | |
import java.time.format.DateTimeFormatter | |
import scala.concurrent.Future | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} | |
import org.apache.http.client.config.RequestConfig | |
import org.apache.http.impl.client.BasicCredentialsProvider | |
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder | |
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback} | |
implicit val serialization = Serialization | |
implicit val formats = DefaultFormats.lossless ++ JavaTimeSerializers.all | |
val client = { | |
if (elasticPassword.isEmpty || elasticUsername.isEmpty) ElasticClient(JavaClient(ElasticProperties(elasticUrl))) | |
else { | |
val username = elasticUsername.get | |
val password = elasticPassword.get | |
lazy val provider = { | |
val provider = new BasicCredentialsProvider | |
val credentials = new UsernamePasswordCredentials(username, password) | |
provider.setCredentials(AuthScope.ANY, credentials) | |
provider | |
} | |
val client = ElasticClient(JavaClient(ElasticProperties(elasticUrl), new RequestConfigCallback { | |
override def customizeRequestConfig(requestConfigBuilder: RequestConfig.Builder) = { | |
requestConfigBuilder | |
} | |
}, new HttpClientConfigCallback { | |
override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder) = { | |
httpClientBuilder.setDefaultCredentialsProvider(provider) | |
} | |
})) | |
client | |
} | |
} | |
// IMPORTANT : GROUPED BULK INSERTION FOR BEST PERFORMANCE | |
def insertBulk(documents: Iterable[Document]): Future[Response[BulkResponse]] = client.execute { | |
bulk { | |
for {document <- documents} yield { | |
val indexName = indexNameFromDocument(document) | |
indexInto(indexName).id(document.id).doc(document).timeout(2.minutes) | |
} | |
} | |
} | |
// IMPORTANT : SIZED YOUR GROUPS DEPENDING YOUR DOCUMENT SIZE | |
def sendToElasticsearch(documents: Iterable[Document]): Future[Response[BulkResponse]] = { | |
val results: Iterator[Future[Response[BulkResponse]]] = for { | |
group <- documents.grouped(42).to(Iterator) | |
} yield { | |
insertBulk(group).andThen{ | |
case Success(value) => println("processed those documents "+group.map(_.filename).mkString(",")) | |
case Failure(exception: Exception) => println("Couldn't process one of this document "+group.map(_.filename).mkString(",")+" "+exception.getMessage()) | |
} | |
} | |
val folded = results.reduce( (fa,fb) => fa.flatMap(_ => fb)) | |
folded | |
} | |
def fill(documents: Iterable[Document]): Unit = { | |
val started = now() | |
val r = for { | |
results <- sendToElasticsearch(documents) | |
} yield results | |
r.await(300.seconds) | |
println(s"${documents.size} json documents sent to $elasticUrl in ${now - started}ms") | |
} | |
} | |
// ===================================================================================================================== | |
object FeedUsingCurlGenerator { | |
import org.json4s.DefaultFormats | |
import org.json4s.jackson.Serialization | |
import org.json4s.ext.JavaTimeSerializers | |
import org.json4s.jackson.JsonMethods.{pretty, render} | |
import org.json4s.Extraction.decompose | |
implicit val serialization = Serialization | |
implicit val formats = DefaultFormats.lossless ++ JavaTimeSerializers.all | |
def localGeneratedJsonFilename(document:Document):String = { | |
val id = document.id | |
val filename = document.filename | |
s"$id-$filename.json" | |
} | |
def fill(documents: Iterable[Document]): Unit = { | |
val started = now() | |
val destDir = insertedJsonDocumentsCacheDir.toFile | |
if (!destDir.exists) destDir.createDirectoryIfNotExists(true) | |
destDir.list.foreach(file => file.delete()) | |
documents.foreach { document => | |
val generatedFileName = localGeneratedJsonFilename(document) | |
val dest = file"$destDir/$generatedFileName" | |
dest.writeText(pretty(decompose(document))) | |
} | |
val scriptFile = file"$destDir/feed.sh" | |
scriptFile.appendLine("""TARGET=${FEED_ELASTIC:-http://127.0.0.1:9200}""") | |
scriptFile.appendLine("""SRC=$(dirname "$0")""") | |
scriptFile.appendText( | |
documents.map { document => | |
val index = indexNameFromDocument(document) | |
val id = document.id | |
val generatedFileName = localGeneratedJsonFilename(document) | |
s"""curl -d @$$SRC/$generatedFileName -H "Content-Type: application/json" -XPOST "$$TARGET/$index/_doc/$id?pretty" """ | |
}.mkString("\n") | |
) | |
import PosixFilePermission._ | |
scriptFile.setPermissions(Set(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE, GROUP_READ, OTHERS_READ)) | |
println(s"${documents.size} json documents generated to $insertedJsonDocumentsCacheDir ${now - started}ms") | |
println(s"run $scriptFile to feed again using a shell command (slower)") | |
} | |
} | |
// ===================================================================================================================== | |
val selectedDocuments = documents.filter(_.publish.nonEmpty) | |
ElasticFeed.fill(selectedDocuments) | |
//FeedUsingCurlGenerator.fill(selectedDocuments) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment