Skip to content

Instantly share code, notes, and snippets.

@deusaquilus
Created March 18, 2021 18:11
Show Gist options
  • Save deusaquilus/f6f210f714b13c4ea539326c17979756 to your computer and use it in GitHub Desktop.
Save deusaquilus/f6f210f714b13c4ea539326c17979756 to your computer and use it in GitHub Desktop.
Abstracting Jasync Context Effect - I can do it but really don't want to...
package io.getquill.context.jasync
import com.github.jasync.sql.db.pool.ConnectionPool
import com.github.jasync.sql.db.{ ConcreteConnection, Connection, QueryResult, RowData }
import io.getquill.context.sql.SqlContext
import io.getquill.context.sql.idiom.SqlIdiom
import io.getquill.context.{ Context, TranslateContext }
import io.getquill.monad.ScalaFutureIOMonad
import io.getquill.util.ContextLogger
import io.getquill.{ NamingStrategy, ReturnAction }
import kotlin.jvm.functions.Function1
import scala.language.higherKinds
import java.util.concurrent.CompletableFuture
import scala.compat.java8.FutureConverters
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.concurrent.duration.Duration
import scala.jdk.CollectionConverters._
import scala.language.implicitConversions
import scala.util.Try
trait ConnectionIO[F[_]] {
def unit[T](value: T): F[T]
def close(): Unit;
def transact[T](inside: TransactionalExecutionContext => F[T]): F[T]
def withConnection[T](f: Connection => F[T]): F[T]
def probe[T](f: String): Try[_]
def fromCompleteableFuture[T](cf: CompletableFuture[T]): F[T]
def map[T, R](effect: F[T])(f: T => R): F[R]
def flatMap[T, R](effect: F[T])(f: T => F[R]): F[R]
def seq[T](values: Seq[F[T]]): F[Seq[T]]
}
object FutureConnectionIO {
import ConnectionIO._ // TODO some way to use this?
implicit def futureConnectionIO[C <: ConcreteConnection](pool: C)(implicit ec: ExecutionContext) = new ConnectionIO[Future] {
// inside of here?
def toCompletableFuture[T](f: Future[T]): CompletableFuture[T] = FutureConverters.toJava(f).asInstanceOf[CompletableFuture[T]]
implicit def toKotlinFunction[T, R](f: T => R): Function1[T, R] = new Function1[T, R] {
override def invoke(t: T): R = f(t)
}
override def unit[T](value: T): Future[T] = Future.successful(value)
override def close(): Unit = {
Await.result(fromCompleteableFuture(pool.disconnect()), Duration.Inf)
()
}
// Uses toKotlinFunction implicit
override def transact[T](f: TransactionalExecutionContext => Future[T]): Future[T] =
fromCompleteableFuture {
pool.inTransaction({ c: Connection =>
toCompletableFuture(f(TransactionalExecutionContext(ec, c)))
})
}
override def withConnection[T](f: Connection => Future[T]): Future[T] =
ec match {
case TransactionalExecutionContext(ec, conn) => f(conn)
case other => f(pool)
}
override def probe[T](sql: String): Try[_] =
Try {
Await.result(fromCompleteableFuture(pool.sendQuery(sql)), Duration.Inf)
}
override def fromCompleteableFuture[T](cf: CompletableFuture[T]): Future[T] =
FutureConverters.toScala(cf)
override def map[T, R](effect: Future[T])(f: T => R): Future[R] = effect.map(f)
override def flatMap[T, R](effect: Future[T])(f: T => Future[R]): Future[R] = effect.flatMap(f)
override def seq[T](values: Seq[Future[T]]): Future[Seq[T]] = Future.sequence(values)
}
}
object ConnectionIO {
implicit class Ext[F[_], T](effect: F[T])(implicit connIO: ConnectionIO[F]) {
def map[R](f: T => R): F[R] = connIO.map[T, R](effect)(f)
def flatMap[R](f: T => F[R]): F[R] = connIO.flatMap[T, R](effect)(f)
}
def seq[F[_], T](values: Seq[F[T]])(implicit connIO: ConnectionIO[F]): F[Seq[T]] = connIO.seq(values)
def unit[F[_], T](value: T)(implicit connIO: ConnectionIO[F]) = connIO.unit(value)
def close[F[_]]()(implicit connIO: ConnectionIO[F]): Unit = connIO.close()
def transact[F[_], T](inside: TransactionalExecutionContext => F[T])(implicit connIO: ConnectionIO[F]): F[T] = connIO.transact(inside)
def withConnection[F[_], T](f: Connection => F[T])(implicit connIO: ConnectionIO[F]): F[T] = connIO.withConnection(f)
def probe[F[_], T](sql: String)(implicit connIO: ConnectionIO[F]): Try[_] = connIO.probe(sql)
def fromCompleteableFuture[F[_], T](cf: CompletableFuture[T])(implicit connIO: ConnectionIO[F]): F[T] = connIO.fromCompleteableFuture(cf)
implicit class CompleteableFutureExt[F[_], T](cf: CompletableFuture[T])(implicit connIO: ConnectionIO[F]) {
def toF: F[T] = fromCompleteableFuture(cf)
}
}
abstract class JAsyncContextF[D <: SqlIdiom, N <: NamingStrategy, C <: ConcreteConnection, F[_]](val idiom: D, val naming: N)(implicit connIO: ConnectionIO[F])
extends Context[D, N]
with TranslateContext
with SqlContext[D, N] //with Decoders
//with Encoders
//with ScalaFutureIOMonad
{
import ConnectionIO._
private val logger = ContextLogger(classOf[JAsyncContextF[_, _, _, F]])
override type PrepareRow = Seq[Any]
override type ResultRow = RowData
override type Result[T] = Future[T]
override type RunQueryResult[T] = Seq[T]
override type RunQuerySingleResult[T] = T
override type RunActionResult = Long
override type RunActionReturningResult[T] = T
override type RunBatchActionResult = Seq[Long]
override type RunBatchActionReturningResult[T] = Seq[T]
implicit def toFuture[T](cf: CompletableFuture[T]): Future[T] = FutureConverters.toScala(cf)
implicit def toCompletableFuture[T](f: Future[T]): CompletableFuture[T] = FutureConverters.toJava(f).asInstanceOf[CompletableFuture[T]]
implicit def toKotlinFunction[T, R](f: T => R): Function1[T, R] = new Function1[T, R] {
override def invoke(t: T): R = f(t)
}
override def close = {
connIO.close()
}
protected def extractActionResult[O](returningAction: ReturnAction, extractor: Extractor[O])(result: QueryResult): O
protected def expandAction(sql: String, returningAction: ReturnAction) = sql
def probe(sql: String) = ConnectionIO.probe(sql)
def transaction[T](f: TransactionalExecutionContext => F[T]) =
ConnectionIO.transact(f)
def executeQuery[T](sql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor): F[List[T]] = {
val (params, values) = prepare(Nil)
logger.logQuery(sql, params)
ConnectionIO.withConnection(conn => connIO.fromCompleteableFuture(conn.sendPreparedStatement(sql, values.asJava)))
.map(_.getRows.asScala.iterator.map(extractor).toList)
}
def executeQuerySingle[T](sql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T] = identityExtractor): F[T] =
executeQuery(sql, prepare, extractor).map(handleSingleResult)
def executeAction[T](sql: String, prepare: Prepare = identityPrepare)(implicit ec: ExecutionContext): F[Long] = {
val (params, values) = prepare(Nil)
logger.logQuery(sql, params)
ConnectionIO.withConnection(_.sendPreparedStatement(sql, values.asJava).toF).map(_.getRowsAffected)
}
def executeActionReturning[T](sql: String, prepare: Prepare = identityPrepare, extractor: Extractor[T], returningAction: ReturnAction): F[T] = {
val expanded = expandAction(sql, returningAction)
val (params, values) = prepare(Nil)
logger.logQuery(sql, params)
withConnection(_.sendPreparedStatement(expanded, values.asJava).toF)
.map(extractActionResult(returningAction, extractor))
}
def executeBatchAction(groups: List[BatchGroup])(implicit ec: ExecutionContext): F[List[Long]] =
ConnectionIO.seq {
groups.map {
case BatchGroup(sql, prepare) =>
prepare.foldLeft(ConnectionIO.unit(List.newBuilder[Long])) {
case (acc, prepare) =>
acc.flatMap { list =>
executeAction(sql, prepare).map(list += _)
}
}.map(_.result())
}
}.map(_.flatten.toList)
def executeBatchActionReturning[T](groups: List[BatchGroupReturning], extractor: Extractor[T])(implicit ec: ExecutionContext): F[List[T]] =
ConnectionIO.seq {
groups.map {
case BatchGroupReturning(sql, column, prepare) =>
prepare.foldLeft(ConnectionIO.unit(List.newBuilder[T])) {
case (acc, prepare) =>
acc.flatMap { list =>
executeActionReturning(sql, prepare, extractor, column).map(list += _)
}
}.map(_.result())
}
}.map(_.flatten.toList)
override private[getquill] def prepareParams(statement: String, prepare: Prepare): Seq[String] =
prepare(Nil)._2.map(prepareParam)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment