Skip to content

Instantly share code, notes, and snippets.

@francescofrontera
Created May 7, 2020 14:49
Show Gist options
  • Save francescofrontera/bffbfef01c70b658fbf2701b35330152 to your computer and use it in GitHub Desktop.
Save francescofrontera/bffbfef01c70b658fbf2701b35330152 to your computer and use it in GitHub Desktop.
Simple InfluxDB Sink
package ai.igenius.sink
import java.util.concurrent.TimeUnit
import cats.Functor
import cats.effect._
import org.influxdb._
import org.influxdb.dto.Point
import pureconfig._
import pureconfig.generic.auto._
sealed abstract class Sink[F[_]: Functor, ST] extends Serializable {
protected def connect: F[ST]
protected def release(in: ST): F[Unit]
def writeBulk[D](bulk: Iterator[D])(converter: D => Point): F[Unit]
def usableSink: Resource[F, ST] = Resource.make[F, ST](connect)(release)
}
object Sink {
lazy val Configuration = ConfigSource.default.load[InfluxConfiguration]
@inline def apply[F[_], ST](implicit ev: Sink[F, ST]): ev.type = ev
def influxDBSink: Sink[IO, InfluxDB] = new Sink[IO, InfluxDB] {
private[this] val conf: IO[InfluxConfiguration] =
Configuration.fold(
err => IO.raiseError(new RuntimeException(s"Error: $err")),
IO(_)
)
protected def connect: IO[InfluxDB] =
conf map { conf =>
InfluxDBFactory
.connect(conf.influxUrl, conf.influxUser, conf.influxPassword)
.setDatabase(conf.influxDb)
.setRetentionPolicy(conf.influxRetentionPolicy)
.enableBatch(conf.influxActions, conf.influxFlushDuration, TimeUnit.MILLISECONDS)
}
protected def release(in: InfluxDB): IO[Unit] = IO(in.close())
def writeBulk[D](bulk: Iterator[D])(converter: D => Point): IO[Unit] =
usableSink use { db =>
val composedFunction = converter andThen db.write
bulk foreach composedFunction
IO.unit
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment