Last active
May 25, 2024 10:19
-
-
Save dacr/25779c454eceed7b7746e33b8039f30a to your computer and use it in GitHub Desktop.
elasticsearch / opensearch basic read, update, insert, delete operations / published by https://github.com/dacr/code-examples-manager #ae29e73c-709c-4338-bca5-c522b72ed983/7297f239a855fb010bd850fec0f2dedf0a210d27
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : elasticsearch / opensearch basic read, update, insert, delete operations | |
// keywords : scala, cem, examples, zio, elasticsearch, opensearch, crud | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : ae29e73c-709c-4338-bca5-c522b72ed983 | |
// created-on : 2023-05-07T08:50:35+02:00 | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// run-with : scala-cli $file | |
// --------------------- | |
//> using scala "3.4.2" | |
//> using dep "dev.zio::zio:2.0.13" | |
//> using dep "dev.zio::zio-streams:2.0.13" | |
//> using dep "dev.zio::zio-test:2.0.13" | |
//> using dep "dev.zio::zio-json:0.5.0" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-effect-zio:8.7.0" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-client-esjava:8.7.0" | |
//> using dep "com.sksamuel.elastic4s::elastic4s-json-zio:8.7.0" | |
//> using dep "org.slf4j:slf4j-api:2.0.7" | |
//> using dep "org.slf4j:slf4j-simple:2.0.7" | |
// --------------------- | |
import zio.* | |
import zio.json.* | |
import zio.stream.* | |
import zio.stream.ZPipeline.{splitLines, utf8Decode} | |
import zio.test.* | |
import zio.test.Assertion.* | |
import zio.test.TestAspect.* | |
import java.util.UUID | |
import scala.util.{Try, Success, Failure} | |
import scala.util.matching.Regex | |
import scala.util.{Either, Left, Properties, Right} | |
import java.time.format.DateTimeFormatter.ISO_DATE_TIME | |
import java.time.temporal.ChronoField | |
import java.time.{Instant, ZoneId, OffsetDateTime} | |
import java.util.concurrent.TimeUnit | |
// ===================================================================================================================== | |
object ElasticOps { | |
import com.sksamuel.elastic4s.zio.instances.* | |
import com.sksamuel.elastic4s.ziojson.* | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.ElasticDsl.* | |
import com.sksamuel.elastic4s.Index | |
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} | |
import com.sksamuel.elastic4s.http.JavaClient | |
import com.sksamuel.elastic4s.ElasticDsl.* | |
import com.sksamuel.elastic4s.requests.mappings.* | |
import com.sksamuel.elastic4s.Response | |
import com.sksamuel.elastic4s.requests.bulk.BulkResponse | |
import com.sksamuel.elastic4s.requests.searches.SearchResponse | |
import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, RequestConfigCallback} | |
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} | |
import org.apache.http.client.config.RequestConfig | |
import org.apache.http.impl.client.BasicCredentialsProvider | |
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder | |
import scala.concurrent.duration.FiniteDuration | |
import java.time.temporal.ChronoField | |
import java.util.concurrent.TimeUnit | |
import scala.util.Properties.{envOrNone, envOrElse} | |
val elasticUrl = envOrElse("CEM_ELASTIC_URL", "http://127.0.0.1:9200") | |
val elasticUrlTrust = envOrNone("CEM_ELASTIC_URL_TRUST_SSL").getOrElse("false").trim.toLowerCase | |
val elasticUsername = envOrNone("CEM_ELASTIC_USERNAME") | |
val elasticPassword = envOrNone("CEM_ELASTIC_PASSWORD") | |
private val client = { // TODO rewrite to be fully effect based | |
if (elasticPassword.isEmpty || elasticUsername.isEmpty) ElasticClient(JavaClient(ElasticProperties(elasticUrl))) | |
else { | |
lazy val provider = { | |
val basicProvider = new BasicCredentialsProvider | |
val credentials = new UsernamePasswordCredentials(elasticUsername.get, elasticPassword.get) | |
basicProvider.setCredentials(AuthScope.ANY, credentials) | |
basicProvider | |
} | |
import org.apache.http.ssl.SSLContexts | |
import org.apache.http.conn.ssl.TrustSelfSignedStrategy | |
val sslContext = elasticUrlTrust match { | |
case "true" => SSLContexts.custom().loadTrustMaterial(TrustSelfSignedStrategy()).build() | |
case _ => SSLContexts.createDefault() | |
} | |
val customElasticClient = ElasticClient( | |
JavaClient( | |
ElasticProperties(elasticUrl), | |
(requestConfigBuilder: RequestConfig.Builder) => requestConfigBuilder, | |
(httpClientBuilder: HttpAsyncClientBuilder) => httpClientBuilder.setDefaultCredentialsProvider(provider).setSSLContext(sslContext) | |
) | |
) | |
customElasticClient | |
} | |
} | |
private val scrollKeepAlive = FiniteDuration(30, "seconds") | |
private val timeout = 60.seconds | |
private val retrySchedule = (Schedule.exponential(500.millis, 2).jittered && Schedule.recurs(5)).onDecision((state, out, decision) => | |
decision match { | |
case Schedule.Decision.Done => ZIO.logInfo("No more retry attempt !") | |
case Schedule.Decision.Continue(interval) => ZIO.logInfo(s"Will retry at ${interval.start}") | |
} | |
) | |
val upsertGrouping = 50 | |
val searchPageSize = 500 | |
// ------------------------------------------------------ | |
private def indexNameFromTimestamp(indexPrefix: String, timestamp: OffsetDateTime): String = { | |
val year = timestamp.get(ChronoField.YEAR) | |
val month = timestamp.get(ChronoField.MONTH_OF_YEAR) | |
val day = timestamp.get(ChronoField.DAY_OF_MONTH) | |
val week = timestamp.get(ChronoField.ALIGNED_WEEK_OF_YEAR) | |
s"$indexPrefix-$year-$month" | |
} | |
// ------------------------------------------------------ | |
private def streamFromScroll(scrollId: String) = { | |
ZStream.paginateChunkZIO(scrollId) { currentScrollId => | |
for { | |
response <- client.execute(searchScroll(currentScrollId).keepAlive(scrollKeepAlive)) | |
nextScrollId = response.result.scrollId | |
results = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) // TODO find a better way than just sourceAsString ! | |
_ <- ZIO.log(s"Got ${results.size} more documents") | |
} yield results -> (if (results.size > 0) nextScrollId else None) | |
} | |
} | |
def fetchAll[T](indexPattern: String)(using JsonDecoder[T]) = { | |
val result = for { | |
response <- client.execute(search(Index(indexPattern)).size(searchPageSize).scroll(scrollKeepAlive)) | |
scrollId <- ZIO.fromOption(response.result.scrollId) | |
firstResults = Chunk.fromArray(response.result.hits.hits.map(_.sourceAsString)) // TODO find a better way than just sourceAsString ! | |
_ <- ZIO.log(s"Got ${firstResults.size} first documents") | |
nextResultsStream = streamFromScroll(scrollId) | |
} yield ZStream.fromChunk(firstResults) ++ nextResultsStream | |
ZStream.unwrap(result).map(_.fromJson[T]).absolve.mapError(err => Exception(err.toString)) | |
} | |
// ------------------------------------------------------ | |
def fetch[T](indexPattern: String, id: String)(using JsonDecoder[T]) = { | |
val response = client.execute { | |
search(indexPattern).matchQuery("_id", id) | |
} | |
response | |
.map(_.result.hits.hits.headOption.map(_.sourceAsString)) // TODO find a better way than just sourceAsString ! | |
.some | |
.map(_.fromJson[T]) | |
.absolve | |
.mapError(err => Exception(err.toString)) | |
} | |
// ------------------------------------------------------ | |
def upsert[T]( | |
indexPrefix: String, | |
documents: Chunk[T], | |
refreshImmediately: Boolean = false | |
)( | |
timestampExtractor: T => OffsetDateTime, | |
idExtractor: T => String | |
)(using | |
JsonEncoder[T] | |
) = { | |
val responseEffect = client.execute { | |
bulk { | |
for { document <- documents } yield { | |
val indexName = indexNameFromTimestamp(indexPrefix, timestampExtractor(document)) | |
val id = idExtractor(document) | |
def operation = indexInto(indexName).id(id).doc(document) | |
if (refreshImmediately) operation.refreshImmediately else operation | |
} | |
} | |
} | |
val upsertEffect = for { | |
// _ <- ZIO.log(s"Upserting : ${documents.map(_.toJson).mkString("\n-----------------------\n")}") | |
response <- responseEffect | |
failures = response.result.failures.flatMap(_.error).map(_.toString) | |
_ <- ZIO.log(s"${if (response.isSuccess) "Upserted" else "Failed to upsert"} ${documents.size} into elasticsearch") | |
_ <- ZIO.cond(response.isSuccess, (), failures.mkString("\n")) | |
} yield () | |
upsertEffect.timeout(timeout).retry(retrySchedule) | |
} | |
// ------------------------------------------------------ | |
def delete(indexPattern: String, id: String) = { | |
client.execute { | |
deleteById(indexPattern, id) | |
} | |
} | |
} | |
// ===================================================================================================================== | |
object SearchOperationsSpec extends ZIOSpecDefault { | |
case class Something(when: OffsetDateTime, uuid: UUID, name: String, thatBoolean: Boolean, thatNumber: Double, mayBe: Option[String]) derives JsonCodec | |
override def spec = suite("Basic elasticsearch/opensearch operations specs")( | |
test("upsert, fetch, fetchAll, delete") { | |
for { | |
when <- Clock.currentDateTime | |
uuid <- Random.nextUUID | |
indexPrefix = "test-elasticsearch-operations" | |
indexPattern = s"$indexPrefix-*" | |
something = Something(when = when, uuid = uuid, name = "Joe", thatBoolean = true, thatNumber = 42.0d, mayBe = Some("nothing")) | |
_ <- ElasticOps.upsert(indexPrefix, Chunk(something), true)(_.when, _.uuid.toString) | |
// Because of the way elasticsearch/opensearch works... | |
// previous upsert may not yet be available to fetch even with refreshImmediately | |
// Remember that elasticsearch/opensearch is not a database but a search engine, it is also not ACID | |
_ <- Clock.sleep(1.second) | |
one <- ElasticOps.fetch[Something](indexPattern, something.uuid.toString) | |
all <- ElasticOps.fetchAll[Something](indexPattern).runCollect | |
_ <- ElasticOps.delete(indexPattern, something.uuid.toString) | |
} yield assertTrue( | |
all.size == 1, | |
all.head == something, | |
one == something | |
) | |
} | |
) @@ withLiveClock | |
} | |
SearchOperationsSpec.main(Array.empty) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment