Skip to content

Instantly share code, notes, and snippets.

@ferrerojosh
Last active February 16, 2024 18:06
Show Gist options
  • Save ferrerojosh/109f8931223c92ecc24f5a751759e793 to your computer and use it in GitHub Desktop.
Save ferrerojosh/109f8931223c92ecc24f5a751759e793 to your computer and use it in GitHub Desktop.
Simple r2dbc client wrapper using kotlin coroutines
import io.r2dbc.spi.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirst
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlin.reflect.KClass
/**
* An R2DBC client that uses Kotlin coroutines.
*/
class R2DBCIO(private val connectionFactory: ConnectionFactory) {
/**
* Opens a connection from the factory.
*/
private suspend fun factoryOpen(): Connection {
return this.connectionFactory.create().awaitFirst()
}
/**
* Open a connection and pass on a handler.
*/
suspend fun open(invoke: suspend (Handle) -> Unit) {
val connection = factoryOpen()
val handle = Handle(connection)
invoke(handle)
}
/**
* Open the connection within a transaction.
*/
suspend fun transaction(invoke: suspend (Handle) -> Unit) {
val connection = factoryOpen()
val handle = Handle(connection)
connection.beginTransaction().awaitFirstOrNull()
try {
invoke(handle)
connection.commitTransaction().awaitFirstOrNull()
} catch (ex: Exception) {
connection.rollbackTransaction().awaitFirstOrNull()
}
}
}
typealias RowMapper<T> = (row: Row, rowMetadata: RowMetadata) -> T
/**
* Wrapper for [Connection].
*/
class Handle(val connection: Connection) {
/**
* Change transaction isolation level.
*/
fun transactionIsolationLevel(level: IsolationLevel) {
require(connection.isAutoCommit)
connection.transactionIsolationLevel = level
}
/**
* Executes an update.
* @param sql sql statement
* @param parameters binding parameters
* @return number of affected rows
*/
suspend fun execute(sql: String, vararg parameters: Any): Int {
val update = Update(connection.createStatement(sql))
parameters.forEachIndexed { index, param ->
update.bind(index, param)
}
return update.execute()
}
/**
* Creates a select query.
* @param sql sql statement
* @param parameters binding parameters
* @return query result
*/
suspend fun select(sql: String, vararg parameters: Any): Flow<Row> {
val query = Query(this.connection.createStatement(sql))
parameters.forEachIndexed { index, param ->
query.bind(index, param)
}
return query.execute()
.map { t, _ -> t }
.asFlow()
}
/**
* Creates a select query that returns [T] given the [RowMapper] function.
* @param sql sql statement
* @param parameters binding parameters
* @param rowMapper row mapping function
* @return query result
*/
suspend inline fun <reified T : Any> select(
sql: String,
noinline rowMapper: RowMapper<T>,
vararg parameters: Any
): Flow<T> {
val query = Query(this.connection.createStatement(sql))
parameters.forEachIndexed { index, param ->
query.bind(index, param)
}
return query.execute()
.map(rowMapper)
.asFlow()
}
}
/**
* Wrapper for [Statement] for running queries such as `INSERT`, `UPDATE`, or `DELETE`.
*/
class Update(private val statement: Statement) {
/**
* Executes the update.
* @return number of affected rows
*/
suspend fun execute(): Int {
val result = statement.add().execute().awaitFirst()
return result.rowsUpdated.awaitFirst()
}
fun bind(index: Int, value: Any) = statement.bind(index, value)
fun bind(name: String, value: Any) = statement.bind(name, value)
fun bindNull(index: Int, type: KClass<Any>) = statement.bindNull(index, type.java)
fun bindNull(name: String, type: KClass<Any>) = statement.bindNull(name, type.java)
}
/**
* Wrapper for [Statement] for running `SELECT` queries.
*/
class Query(private val statement: Statement) {
/**
* Executes the query.
* @return query result
*/
suspend fun execute(): Result {
return statement.add().execute().awaitFirst()
}
fun bind(index: Int, value: Any) = statement.bind(index, value)
fun bind(name: String, value: Any) = statement.bind(name, value)
fun bindNull(index: Int, type: KClass<Any>) = statement.bindNull(index, type.java)
fun bindNull(name: String, type: KClass<Any>) = statement.bindNull(name, type.java)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment