Skip to content

Instantly share code, notes, and snippets.

@abner
Last active June 13, 2019 15:27
Show Gist options
  • Save abner/921dfaa4b2e719f020701e3ebbe4d896 to your computer and use it in GitHub Desktop.
Save abner/921dfaa4b2e719f020701e3ebbe4d896 to your computer and use it in GitHub Desktop.
ExemplosVertx
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.fasterxml.jackson.annotation.ObjectIdGenerators.UUIDGenerator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.redis.RedisTransaction;
import io.vertx.rxjava.core.Vertx;
import rx.Single;
import io.vertx.redis.RedisClient;
import io.vertx.redis.RedisOptions;
/***
* Exemplo Redis com Transaction
* [email protected]:abner/app-vertx-redis.git
*
*/
public class RedisVerticle extends AbstractVerticle {
public final static String DOCUMENTS_HASHSET_NAME = "DOCUMENTOS";
private final static Logger LOGGER = LoggerFactory.getLogger(RedisVerticle.class);
private UUIDGenerator uuidGenerator = new UUIDGenerator();
private RedisClient redisClient;
private io.vertx.rxjava.redis.RedisClient redisClient$;
private RedisOptions redisOptions;
private Router router;
private Vertx vertx$;
public void start(Future<Void> startup) {
redisOptions = new RedisOptions().setHost("0.0.0.0").setPort(6379);
redisClient = RedisClient.create(vertx, redisOptions);
vertx$ = Vertx.newInstance(vertx);
redisClient$ = io.vertx.rxjava.redis.RedisClient.create(vertx$, redisOptions);
setupHttpServer(startup);
}
private void setupHttpServer(Future<Void> startup) {
router = Router.router(vertx);
vertx.createHttpServer().requestHandler(router::accept).listen(8080, result -> {
if (result.succeeded()) {
router.route().handler(BodyHandler.create());
router.get("/documents").handler(this::handleGetDocuments);
router.get("/documents/:id").handler(this::handleGetDocument);
router.put("/documents/:id").handler(this::handlePutDocument);
router.post("/documents").handler(this::handlePostDocument);
startup.complete();
LOGGER.info("RedisVerticle iniciou com sucesso!");
} else {
LOGGER.error("Falhou ao iniciar o servidor HTTP", result.cause());
startup.fail(result.cause());
}
});
}
public void handlePutDocument(RoutingContext c) {
JsonObject obj = c.getBodyAsJson();
RedisTransaction transaction = this.redisClient.transaction();
// // inicia a transação e obtem o documento
// transaction.multi(evt -> transaction.hget(DOCUMENTS_HASHSET_NAME, obj.getString("id"), getDocEvent -> {
// if (getDocEvent.succeeded() && "QUEUED".equals(getDocEvent.result())) {
// updateDocument(c, transaction, obj);
// }
// }));
updateDocument$(obj).subscribe(r -> {
c.response().end();
}, e -> {
LOGGER.error(e);
c.fail(e);
});
}
private Single<Void> updateDocument$(JsonObject object) {
String watchKey = "UPDATE_DOCUMENT_" + object.getString("id");
io.vertx.rxjava.redis.RedisTransaction transaction$ = io.vertx.rxjava.redis.RedisTransaction.newInstance(redisClient.transaction());
// faz lock na chage UPDATE_DOCUMENT_{idDocumento}
return transaction$.rxWatch(watchKey)
.flatMap(rs ->
transaction$
.rxMulti() // inicia a transação
.flatMap(r -> transaction$.rxSet(watchKey, "1")) // escreve na chave de atualização do documento
// salva o documento
.flatMap(r -> transaction$.rxHset(DOCUMENTS_HASHSET_NAME, object.getString("id"), object.encode()).delay(20, TimeUnit.SECONDS)
// finaliza a transação. Se outra operação escrever na chave de atualização, então cancela a transação
.flatMap(rf -> transaction$.rxExec())
.map(r2 -> null)
)
);
}
private void updateDocument(RoutingContext c, RedisTransaction transaction, JsonObject obj) {
String watchKey = "UPDATE_DOCUMENT_" + obj.getString("id");
transaction.watch(watchKey, h -> {
if (!h.succeeded()) {
LOGGER.error("Falha ao atualizar documento.", h.cause());
c.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
return;
}
transaction.incr(watchKey, hw -> {
transaction.hset(DOCUMENTS_HASHSET_NAME, obj.getString("id"), obj.encode(), result -> {
vertx.setTimer(20_000, hTimer -> {
transaction.exec(r -> {
if (r.succeeded()) {
c.response().headers().add("Content-Type", "application/json");
c.response().setStatusCode(HttpResponseStatus.NO_CONTENT.code()).end();
} else {
LOGGER.error("Falha ao atualizar documento.", r.cause());
c.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
}
});
});
});
});
});
}
public void handlePostDocument(RoutingContext c) {
JsonObject obj = c.getBodyAsJson();
this.redisClient.hset(DOCUMENTS_HASHSET_NAME, obj.getString("id"), obj.encode(), result -> {
if (result.succeeded()) {
c.response().setStatusCode(HttpResponseStatus.CREATED.code()).end("");
} else {
LOGGER.error(result.cause());
c.response().setStatusCode(500).end();
}
});
}
public void handleGetDocuments(RoutingContext c) {
redisClient.hvals(DOCUMENTS_HASHSET_NAME, handerResult -> {
if (handerResult.succeeded()) {
c.response().headers().add("Content-Type", "application/json");
List<JsonObject> results = handerResult.result().stream().map(value -> value.toString())
.map(value -> new JsonObject(value)).collect(Collectors.toList());
c.response().end(new JsonArray(results).encode());
}
});
}
public void handleGetDocument(RoutingContext c) {
String id = c.pathParam("id");
LOGGER.info(String.format("Trying to GET %s", id));
redisClient.hget(DOCUMENTS_HASHSET_NAME, id, resultHandler -> {
if (resultHandler.succeeded()) {
if (resultHandler.result() != null) {
c.response().headers().add("Content-Type", "application/json");
c.response().end(resultHandler.result());
} else {
c.response().setStatusCode(HttpResponseStatus.NOT_FOUND.code()).end();
}
} else {
LOGGER.error(resultHandler.cause());
c.response().setStatusCode(500);
}
});
}
}
/* build.gradle
/*
* This file was generated by the Gradle 'init' task.
*
* This is a general purpose Gradle build.
* Learn how to create Gradle builds at https://guides.gradle.org/creating-new-gradle-builds/
*/
plugins {
id 'java'
id 'application'
id 'com.github.johnrengelman.shadow' version '2.0.2'
}
sourceCompatibility = '1.8'
mainClassName = 'io.vertx.core.Launcher'
version = '3.5.1'
def mainVerticleName='MyJavascriptVerticle.js'
repositories {
mavenLocal()
maven { url "http://central.maven.org/maven2/" }
maven { url "http://maven.repository.redhat.com/techpreview/all" }
maven { url "http://dist.wso2.org/maven2//" }
maven { url "http://repo.maven.apache.org/maven2" }
}
dependencies {
compile "io.vertx:vertx-core:$version"
compile "io.vertx:vertx-web:$version"
compile "io.vertx:vertx-lang-js:$version"
}
processResources {
from 'src/main/js'
}
shadowJar {
classifier = 'fat'
manifest {
attributes 'Main-Verticle': 'MyJavaScriptVerticle.js'
}
mergeServiceFiles {
include 'META-INF/services/io.vertx.core.spi.VerticleFactory'
}
}
run {
args = [
'run',
'./src/main/js/MyJavaScriptVerticle.js',
'--redeploy=src/main/js/*.js',
'--launcher-class=io.vertx.core.Launcher'
]
}
task wrapper(type: Wrapper) {
gradleVersion = '4.0'
}
*/
var Router = require("vertx-web-js/router");
var server = vertx.createHttpServer();
var router = Router.router(vertx);
router.route().handler(function (routingContext) {
// This handler will be called for every request
var response = routingContext.response();
response.putHeader("content-type", "text/plain");
// Write to the response and end it
response.end("Hello World from Vert.x-Web with Javascript! It is good!!!");
});
server.requestHandler(router.accept).listen(8080);
package io.abner.vertxkotlin
import io.vertx.core.http.HttpServer
import io.vertx.ext.web.Route
import io.vertx.ext.web.Router
import io.vertx.ext.web.RoutingContext
import io.vertx.kotlin.coroutines.CoroutineVerticle
import io.vertx.kotlin.coroutines.awaitResult
import io.vertx.kotlin.core.json.*
import io.vertx.kotlin.coroutines.dispatcher
import kotlinx.coroutines.experimental.launch
import io.reactiverse.reactivex.pgclient.PgConnection
import io.reactiverse.reactivex.pgclient.PgClient
import io.reactiverse.reactivex.pgclient.PgPool
import io.reactiverse.reactivex.pgclient.PgRowSet
import io.reactiverse.reactivex.pgclient.Tuple
import io.reactiverse.pgclient.PgPoolOptions
/* build.gradle
buildscript {
ext {
kotlin_version = '1.2.71'
vertx_version = '3.5.4'
slf4j_version = '1.7.21'
hsqldb_version = '2.3.4'
reactivepgclient_version = '0.10.6'
}
repositories {
jcenter()
}
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
}
}
plugins {
id 'java'
id 'application'
id 'com.github.johnrengelman.shadow' version '1.2.4'
}
apply plugin: 'kotlin'
sourceCompatibility = 1.8
targetCompatibility = 1.8
mainClassName = 'io.vertx.core.Launcher'
def mainVerticleName = 'io.abner.vertxkotlin.MainVerticle'
tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).all {
kotlinOptions {
jvmTarget = "1.8"
}
}
repositories {
jcenter()
}
dependencies {
compile "io.vertx:vertx-core:$vertx_version"
compile "io.vertx:vertx-web:$vertx_version"
compile "io.vertx:vertx-web-client:$vertx_version"
compile "io.vertx:vertx-lang-kotlin:$vertx_version"
compile "io.vertx:vertx-lang-kotlin-coroutines:$vertx_version"
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
compile "org.slf4j:slf4j-jdk14:$slf4j_version"
compile "org.hsqldb:hsqldb:$hsqldb_version"
compile "io.reactiverse:reactive-pg-client:$reactivepgclient_version"
}
// Redeploy watcher.
run {
args = ['run', mainVerticleName,
"--launcher-class=$mainClassName",
"--redeploy=src/**/*.*",
"--on-redeploy=./gradlew classes"
]
}
// Naming and packaging settings for the "shadow jar".
shadowJar {
baseName = 'app'
classifier = 'shadow'
manifest {
attributes 'Main-Verticle': mainVerticleName
}
mergeServiceFiles {
include 'META-INF/services/io.vertx.core.spi.VerticleFactory'
}
}
task wrapper(type: Wrapper) {
gradleVersion = '3.3'
}
// Heroku relies on the 'stage' task to deploy.
task stage {
dependsOn shadowJar
}
*/
// REPO: [email protected]:abner/vertx-kotlin-coroutines-gradle-template.git
class MainVerticle : CoroutineVerticle() {
private lateinit var client: PgPool;
// TODO: Implement calls to webclient through a event bus
// TODO: Implement service layer using coroutines with transaction handling
// TODO: Implement config loading to use as base for Verticles
suspend override fun start() {
// Pool options
val options = PgPoolOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setMaxSize(5)
client = PgClient.pool(vertx, options)
// Populate database
val statements = listOf(
"CREATE TABLE MOVIE (ID VARCHAR(16) PRIMARY KEY, TITLE VARCHAR(256) NOT NULL)",
"CREATE TABLE RATING (ID INTEGER IDENTITY PRIMARY KEY, value INTEGER, MOVIE_ID VARCHAR(16))",
"INSERT INTO MOVIE (ID, TITLE) VALUES 'starwars', 'Star Wars'",
"INSERT INTO MOVIE (ID, TITLE) VALUES 'indianajones', 'Indiana Jones'",
"INSERT INTO MOVIE (ID, TITLE) VALUES 'meufilme', 'Meu filme'",
"INSERT INTO RATING (VALUE, MOVIE_ID) VALUES 1, 'starwars'",
"INSERT INTO RATING (VALUE, MOVIE_ID) VALUES 5, 'starwars'",
"INSERT INTO RATING (VALUE, MOVIE_ID) VALUES 9, 'starwars'",
"INSERT INTO RATING (VALUE, MOVIE_ID) VALUES 10, 'starwars'",
"INSERT INTO RATING (VALUE, MOVIE_ID) VALUES 4, 'indianajones'",
"INSERT INTO RATING (VALUE, MOVIE_ID) VALUES 7, 'indianajones'",
"INSERT INTO RATING (VALUE, MOVIE_ID) VALUES 3, 'indianajones'",
"INSERT INTO RATING (VALUE, MOVIE_ID) VALUES 9, 'indianajones'"
)
val connection = awaitResult<PgConnection> { client.getConnection(it) }
connection.begin() {
for (statement in statements) {
awaitResult<Void> { connection.query(statement, it) }
}
}
// Build Vert.x Web router
val router = Router.router(vertx)
router.get("/movie/:id").coroutineHandler { ctx -> getMovie(ctx) }
router.post("/rateMovie/:id").coroutineHandler { ctx -> rateMovie(ctx) }
router.get("/getRating/:id").coroutineHandler { ctx -> getRating(ctx) }
// Start the server
awaitResult<HttpServer> { vertx.createHttpServer()
.requestHandler(router::accept)
.listen(config.getInteger("http.port", 8080), it)
}
}
// Send info about a movie
suspend fun getMovie(ctx: RoutingContext) {
val id = ctx.pathParam("id")
val result = awaitResult<PgRowSet> { client.preparedQuery("SELECT TITLE FROM MOVIE WHERE ID=?", Tuple.of(id), it) }
if (result.rowCount() === 1) {
val title = result.iterator().next().getString("TITLE");
ctx.response().end(json {
obj("id" to id, "title" to title).encode()
})
} else {
ctx.response().setStatusCode(404).end()
}
}
// // Rate a movie
// suspend fun rateMovie(ctx: RoutingContext) {
// val movie = ctx.pathParam("id")
// val rating = Integer.parseInt(ctx.queryParam("getRating")[0])
// val connection = awaitResult<SQLConnection> { client.getConnection(it) }
// connection.use {
// val result = awaitResult<ResultSet> { connection.queryWithParams("SELECT TITLE FROM MOVIE WHERE ID=?", json { array(movie) }, it) }
// if (result.rows.size == 1) {
// awaitResult<UpdateResult> { connection.updateWithParams("INSERT INTO RATING (VALUE, MOVIE_ID) VALUES ?, ?", json { array(rating, movie) }, it) }
// ctx.response().setStatusCode(200).end()
// } else {
// ctx.response().setStatusCode(404).end()
// }
// }
// }
// // Get the current rating of a movie
// suspend fun getRating(ctx: RoutingContext) {
// val id = ctx.pathParam("id")
// val result = awaitResult<ResultSet> { client.queryWithParams("SELECT AVG(VALUE) AS VALUE FROM RATING WHERE MOVIE_ID=?", json { array(id) }, it) }
// ctx.response().end(json {
// obj("id" to id, "getRating" to result.rows[0]["VALUE"]).encode()
// })
// }
}
/**
* An extension method for simplifying coroutines usage with Vert.x Web routers
*/
fun Route.coroutineHandler(fn : suspend (RoutingContext) -> Unit) {
handler { ctx ->
launch(ctx.vertx().dispatcher()) {
try {
fn(ctx)
} catch(e: Exception) {
ctx.fail(e)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment