Created
September 27, 2019 01:00
-
-
Save krishnabhargav/836ab745eab7d6448b10e6fcfbce0937 to your computer and use it in GitHub Desktop.
Example of using Eventstore.JVM client from a kotlin program without fully subscribing to Akka actors.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import eventstore.core.* | |
import eventstore.j.EsConnectionFactory | |
import eventstore.j.SettingsBuilder | |
import kotlinx.coroutines.future.await | |
import kotlinx.coroutines.runBlocking | |
import scala.compat.java8.FutureConverters | |
import scala.concurrent.Future | |
import java.net.InetSocketAddress | |
import java.util.* | |
private suspend fun <T> Future<T>.await() = FutureConverters.toJava(this).await()!! | |
data class Employee(val name: String, val age: Int) | |
fun main() { | |
val settings = SettingsBuilder() | |
.address(InetSocketAddress("127.0.0.1", 1113)) | |
.defaultCredentials("admin", "changeit") | |
.heartbeatInterval(10) | |
.build()!! | |
val actorSystem = ActorSystem.create() | |
val connection = EsConnectionFactory.create(actorSystem, settings) | |
//we don't need to fully adopt the Akka model to work with the eventstore client. | |
//internally it seems to be relying on actors to manage the communication with Es. | |
//the connection object here has methods that returns Future<> objects which can be awaited on. | |
val jsonPayload = Json.toJson(Employee("Krishna", 34)) | |
//for Json.toJson -> see https://gist.github.com/krishnabhargav/7b1832eeb86aa213ba5bb239153977ea | |
val newEvent = EventData( | |
"NewEmployeeCreated", | |
UUID.randomUUID(), | |
Content.apply(jsonPayload), | |
Content.Empty() | |
) | |
runBlocking { | |
val writeEvents = | |
connection.writeEvents( | |
"Employees", | |
ExpectedVersion.`Any$`(), | |
listOf(newEvent), | |
settings.defaultCredentials().get() | |
).await() | |
println("Write result => Position=${writeEvents.nextExpectedVersion()}") | |
val eventsRead = connection.readStreamEventsForward( | |
"Employees", | |
EventNumber.First(), | |
4096, | |
false, | |
settings.defaultCredentials().get() | |
).await() | |
eventsRead.events().foreach { | |
val eventData = it.data() | |
println (eventData.data().value().utf8String()) | |
} | |
//kill the actor system when you are done so your process can exist. | |
//otherwise background threads will prevent your process from closing. | |
actorSystem.terminate().await() | |
} | |
} | |
// using the HTTP API | |
//val httpClient = HttpClient.newHttpClient() | |
// | |
//sealed class ExpectedVersion(val value: Int) { | |
// object NoStream : ExpectedVersion(-1) | |
// object EmptyStream : ExpectedVersion(0) | |
// object Ignore : ExpectedVersion(-2) | |
// data class Number(val x: Int) : ExpectedVersion(x) | |
//} | |
// | |
//fun <T> makeWriteEventRequest( | |
// streamName: String, | |
// eventType: String, | |
// expectedVersion: ExpectedVersion, | |
// payload: T | |
//): HttpRequest? { | |
// return HttpRequest.newBuilder() | |
// .header("Content-Type", "application/json") | |
// .header("ES-EventType", eventType) | |
// .header("ES-EventId", UUID.randomUUID().toString()) | |
// .header("ES-ExpectedVersion", expectedVersion.value.toString()) | |
// .uri(URI.create("http://localhost:2113/streams/$streamName?embed=rich")) | |
// .POST(HttpRequest.BodyPublishers.ofString(Json.toJson(payload))) | |
// .build() | |
// | |
// //What do we need? | |
// //1. Append event to a stream. With support for version | |
// //2. Read events back from a stream. | |
// //3. Read all streams. | |
//// val writeEvent = makeWriteEventRequest( | |
//// "Employees", "NewEmployeeCreated", | |
//// ExpectedVersion.Number(4), Employee("Archana", 30) | |
//// ) | |
//// val result = | |
//// httpClient.sendAsync(writeEvent, HttpResponse.BodyHandlers.ofString())?.await()!! | |
//// println(result.headers().map()["ES-CurrentVersion"]) | |
//// println("Result: ${result.statusCode()}") | |
//} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment