Created
July 11, 2024 14:36
-
-
Save otobrglez/1407b2fc62cf439d46cfbeef2a56cf80 to your computer and use it in GitHub Desktop.
Quartz Scheduler with Scala and Cats Effect / FS2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package si.ogrodje.oge.scheduler | |
import cats.effect.unsafe.implicits.global | |
import cats.effect.{IO, Resource} | |
import fs2.Stream | |
import fs2.concurrent.Topic | |
import org.quartz.* | |
import org.quartz.JobBuilder.* | |
import org.quartz.TriggerBuilder.* | |
import org.quartz.impl.StdSchedulerFactory | |
import org.typelevel.log4cats.LoggerFactory | |
import org.typelevel.log4cats.slf4j.Slf4jFactory | |
import si.ogrodje.oge.scheduler.QScheduler.EventKind | |
import java.time.Instant | |
import java.util | |
import java.util.{Random, TimeZone} | |
import scala.jdk.CollectionConverters.* | |
import scala.reflect.ClassTag | |
final class QScheduler private ( | |
events: Topic[IO, EventKind], | |
scheduler: Scheduler | |
): | |
private val CET = TimeZone.getTimeZone("CET").toZoneId | |
private given lf: LoggerFactory[IO] = Slf4jFactory.create[IO] | |
private val logger = lf.getLogger | |
private def start: IO[Unit] = IO(scheduler.start()) <* logger.info("Scheduler started") | |
private def close: IO[Unit] = IO(scheduler.shutdown()) | |
final private class TriggerEvent extends org.quartz.Job: | |
override def execute(context: JobExecutionContext): Unit = | |
val name = context.getMergedJobDataMap.getString("name") | |
context.getMergedJobDataMap.get("callback").asInstanceOf[EventKind => Unit](name) | |
private def trigger[T <: Trigger]( | |
name: String, | |
schedulerBuilder: ScheduleBuilder[T] | |
)(using ClassTag[T]): IO[Unit] = | |
val classOfT: Class[? <: org.quartz.Job] = new TriggerEvent().getClass | |
val jMap: java.util.Map[String, EventKind => Unit] = Map("callback" -> { (event: String) => | |
(logger.debug(s"Pushing event $event to the main topic") *> events.publish1(event).void).unsafeRunSync() | |
}).asJava | |
IO( | |
scheduler.scheduleJob( | |
newJob(classOfT).withIdentity(name).usingJobData("name", name).usingJobData(new JobDataMap(jMap)).build(), | |
newTrigger().withSchedule(schedulerBuilder).build() | |
) | |
).flatTap(date => logger.info(s"First run of $name is scheduled at ${date.toInstant.atZone(CET)}")).void | |
private def listenTo(eventKind: EventKind): Resource[IO, Stream[IO, EventKind]] = | |
events.subscribeAwaitUnbounded.map(_.filter(_ == eventKind)) | |
def at[T <: Trigger, A]( | |
schedule: ScheduleBuilder[T], | |
name: String = s"at-${Instant.now().toEpochMilli}-${new Random().nextInt(10_000)}" | |
)( | |
io: => IO[A] | |
)(using ClassTag[T]): Resource[IO, Unit] = | |
listenTo(name) | |
.evalTap(_ => trigger(name, schedule)) | |
.evalMap(_.evalMap(_ => io).compile.drain) | |
object QScheduler: | |
type EventKind = String | |
def resource: Resource[IO, QScheduler] = for | |
events <- Resource.eval(Topic[IO, EventKind]()) | |
scheduler <- Resource | |
.make(IO.pure(new QScheduler(events, StdSchedulerFactory.getDefaultScheduler)))(_.close) | |
.evalTap(_.start) | |
yield scheduler |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package si.ogrodje.oge.apps | |
import cats.effect.kernel.Ref | |
import cats.effect.{IO, Resource, ResourceApp} | |
import cats.syntax.parallel.* | |
import org.quartz.CronScheduleBuilder.cronSchedule | |
import org.quartz.SimpleScheduleBuilder.simpleSchedule as sch | |
import si.ogrodje.oge.scheduler.QScheduler | |
import java.util.TimeZone | |
object SchedulerDemoApp extends ResourceApp.Forever: | |
private val CET: TimeZone = TimeZone.getTimeZone("CET") | |
def run(args: List[String]): Resource[IO, Unit] = QScheduler.resource.flatMap: scheduler => | |
for | |
cnt <- Resource.eval(Ref.of[IO, Int](0)) | |
_ <- ( | |
// Repetition with intervals | |
scheduler.at(sch.withRepeatCount(5).withIntervalInSeconds(5))(IO.println("every 5 seconds (5 times)")), | |
scheduler.at(sch.withRepeatCount(15).withIntervalInSeconds(2))(IO.println("every 2 seconds (15 times)")), | |
scheduler.at(sch.withRepeatCount(10).withIntervalInSeconds(7).repeatForever())( | |
cnt.getAndUpdate(_+1).flatTap(n => IO.println(s"Number N is $n")) | |
), | |
// Cron syntax | |
scheduler.at(cronSchedule("0 54 15 ? * *").inTimeZone(CET))(IO.println("At a very specific time")), | |
scheduler.at(cronSchedule("0 */5 * * * ?").inTimeZone(CET), name="every-5")(IO.println("Every 5 minutes")) | |
).parTupled | |
yield () | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment