Skip to content

Instantly share code, notes, and snippets.

import cats.effect.{ConcurrentEffect, ExitCode, IO, IOApp}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
object ConsumerApplication extends IOApp {
implicit private val ec: ExecutionContextExecutor = ExecutionContext.global
private val cs = IO.contextShift(ec)
implicit private val concurrentEffect: ConcurrentEffect[IO] = IO.ioConcurrentEffect(cs)
val config = ConsumerConfig("localhost:9092", "test-topic", "consumer-group-1")
import java.util.concurrent.{ExecutorService, Executors}
import cats.effect.{ContextShift, IO, Resource}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
class KafkaContext(cs: ContextShift[IO]) {
// A thread pool with exactly 1 thread
private val threadPool = Executors.newFixedThreadPool(1)
protected val synchronousExecutionContext = ExecutionContext.fromExecutor(threadPool)
def execute[A](f: => A): IO[A] = cs.evalOn(synchronousExecutionContext)(IO(f))