Last active
February 16, 2024 18:06
-
-
Save ferrerojosh/109f8931223c92ecc24f5a751759e793 to your computer and use it in GitHub Desktop.
Simple r2dbc client wrapper using kotlin coroutines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.r2dbc.spi.* | |
import kotlinx.coroutines.flow.Flow | |
import kotlinx.coroutines.reactive.asFlow | |
import kotlinx.coroutines.reactive.awaitFirst | |
import kotlinx.coroutines.reactive.awaitFirstOrNull | |
import kotlin.reflect.KClass | |
/** | |
* An R2DBC client that uses Kotlin coroutines. | |
*/ | |
class R2DBCIO(private val connectionFactory: ConnectionFactory) { | |
/** | |
* Opens a connection from the factory. | |
*/ | |
private suspend fun factoryOpen(): Connection { | |
return this.connectionFactory.create().awaitFirst() | |
} | |
/** | |
* Open a connection and pass on a handler. | |
*/ | |
suspend fun open(invoke: suspend (Handle) -> Unit) { | |
val connection = factoryOpen() | |
val handle = Handle(connection) | |
invoke(handle) | |
} | |
/** | |
* Open the connection within a transaction. | |
*/ | |
suspend fun transaction(invoke: suspend (Handle) -> Unit) { | |
val connection = factoryOpen() | |
val handle = Handle(connection) | |
connection.beginTransaction().awaitFirstOrNull() | |
try { | |
invoke(handle) | |
connection.commitTransaction().awaitFirstOrNull() | |
} catch (ex: Exception) { | |
connection.rollbackTransaction().awaitFirstOrNull() | |
} | |
} | |
} | |
typealias RowMapper<T> = (row: Row, rowMetadata: RowMetadata) -> T | |
/** | |
* Wrapper for [Connection]. | |
*/ | |
class Handle(val connection: Connection) { | |
/** | |
* Change transaction isolation level. | |
*/ | |
fun transactionIsolationLevel(level: IsolationLevel) { | |
require(connection.isAutoCommit) | |
connection.transactionIsolationLevel = level | |
} | |
/** | |
* Executes an update. | |
* @param sql sql statement | |
* @param parameters binding parameters | |
* @return number of affected rows | |
*/ | |
suspend fun execute(sql: String, vararg parameters: Any): Int { | |
val update = Update(connection.createStatement(sql)) | |
parameters.forEachIndexed { index, param -> | |
update.bind(index, param) | |
} | |
return update.execute() | |
} | |
/** | |
* Creates a select query. | |
* @param sql sql statement | |
* @param parameters binding parameters | |
* @return query result | |
*/ | |
suspend fun select(sql: String, vararg parameters: Any): Flow<Row> { | |
val query = Query(this.connection.createStatement(sql)) | |
parameters.forEachIndexed { index, param -> | |
query.bind(index, param) | |
} | |
return query.execute() | |
.map { t, _ -> t } | |
.asFlow() | |
} | |
/** | |
* Creates a select query that returns [T] given the [RowMapper] function. | |
* @param sql sql statement | |
* @param parameters binding parameters | |
* @param rowMapper row mapping function | |
* @return query result | |
*/ | |
suspend inline fun <reified T : Any> select( | |
sql: String, | |
noinline rowMapper: RowMapper<T>, | |
vararg parameters: Any | |
): Flow<T> { | |
val query = Query(this.connection.createStatement(sql)) | |
parameters.forEachIndexed { index, param -> | |
query.bind(index, param) | |
} | |
return query.execute() | |
.map(rowMapper) | |
.asFlow() | |
} | |
} | |
/** | |
* Wrapper for [Statement] for running queries such as `INSERT`, `UPDATE`, or `DELETE`. | |
*/ | |
class Update(private val statement: Statement) { | |
/** | |
* Executes the update. | |
* @return number of affected rows | |
*/ | |
suspend fun execute(): Int { | |
val result = statement.add().execute().awaitFirst() | |
return result.rowsUpdated.awaitFirst() | |
} | |
fun bind(index: Int, value: Any) = statement.bind(index, value) | |
fun bind(name: String, value: Any) = statement.bind(name, value) | |
fun bindNull(index: Int, type: KClass<Any>) = statement.bindNull(index, type.java) | |
fun bindNull(name: String, type: KClass<Any>) = statement.bindNull(name, type.java) | |
} | |
/** | |
* Wrapper for [Statement] for running `SELECT` queries. | |
*/ | |
class Query(private val statement: Statement) { | |
/** | |
* Executes the query. | |
* @return query result | |
*/ | |
suspend fun execute(): Result { | |
return statement.add().execute().awaitFirst() | |
} | |
fun bind(index: Int, value: Any) = statement.bind(index, value) | |
fun bind(name: String, value: Any) = statement.bind(name, value) | |
fun bindNull(index: Int, type: KClass<Any>) = statement.bindNull(index, type.java) | |
fun bindNull(name: String, type: KClass<Any>) = statement.bindNull(name, type.java) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment