Created
June 28, 2024 23:14
-
-
Save dacr/24c9fa53a451121051230a1aae81774f to your computer and use it in GitHub Desktop.
neo4j cypher - stream inserted records using zio / published by https://github.com/dacr/code-examples-manager #42059935-7404-4166-8e74-5ecb74844aa7/18c9aa07d4c6103405495898000d87fd395b4e2a
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : neo4j cypher - stream inserted records using zio | |
// keywords : scala, zio, scalatest, neo4j, neotypes, cypher, dsl, @testable | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : 42059935-7404-4166-8e74-5ecb74844aa7 | |
// created-on : 2024-06-26T15:39:57+02:00 | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// run-with : scala-cli $file | |
// --------------------- | |
//> using scala "3.4.2" | |
//> using dep io.github.neotypes::neotypes-core:1.1.0 | |
//> using dep io.github.neotypes::neotypes-zio:1.1.0 | |
//> using dep io.github.neotypes::neotypes-zio-stream:1.1.0 | |
//> using dep dev.zio::zio:2.1.4 | |
//> using dep dev.zio::zio-streams:2.1.4 | |
//> using dep dev.zio::zio-interop-reactivestreams:2.0.2 | |
//> using dep org.neo4j.test:neo4j-harness:5.20.0 | |
////> using options -Ykind-projector:underscores | |
////> using objectWrapper | |
// --------------------- | |
import zio.* | |
import zio.stream.ZStream | |
import org.neo4j.driver.AuthTokens | |
import neotypes.{AsyncDriver, GraphDatabase} | |
import neotypes.mappers.ResultMapper | |
import neotypes.syntax.all.* | |
import neotypes.zio.implicits.* | |
import neotypes.zio.stream.ZioStream | |
import neotypes.zio.stream.implicits.* | |
object NeoTypesApp extends ZIOAppDefault { | |
case class Record(name: String, age: Int) | |
val insertsQueries = for { | |
num <- LazyList.from(1).take(100_000) | |
name = s"joe$num" | |
age = scala.util.Random.between(1, 130) | |
qry = c"CREATE (:Person {name: $name, age: $age})" | |
} yield qry | |
val buildStreamDriver = | |
ZIO.acquireRelease( | |
ZIO | |
.attempt(org.neo4j.harness.Neo4jBuilders.newInProcessBuilder().build()) | |
.flatMap(embedded => GraphDatabase.streamDriver[ZioStream](embedded.boltURI(), AuthTokens.none())) | |
)(neo4jDriver => neo4jDriver.close.ignoreLogged) | |
val app = for { | |
_ <- Console.printLine("stream demo using ZIO streams") | |
streamDriver <- buildStreamDriver | |
batches = insertsQueries.grouped(100).map(_.reduce(_ + _)) | |
_ <- ZStream | |
.fromIterator(batches) | |
.mapZIO(batch => batch.execute.void(streamDriver)) | |
.runDrain | |
_ <- Console.printLine("collecting data") | |
results = c"MATCH (p: Person) return p" | |
.query(ResultMapper.fromFunction(Record.apply)) | |
.stream(streamDriver) | |
count <- results | |
// .mapZIO(result => Console.printLine(result.name)) | |
.runCount | |
_ <- Console.printLine(s"found $count results") | |
} yield () | |
override def run = ZIO.scoped(app) | |
} | |
NeoTypesApp.main(Array.empty) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment