Last active
February 11, 2016 08:56
-
-
Save fversnel/4fa73ad78c47f08ac587 to your computer and use it in GitHub Desktop.
Creates an Observable from a cron expression using the Quartz scheduler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.frankversnel.rxquartz | |
import java.time.Instant | |
import java.util.UUID | |
import scala.concurrent.{ExecutionContext, Promise, Future} | |
import com.typesafe.scalalogging.Logger | |
import org.quartz._ | |
import rx.lang.scala.{Observer, Subscription, Observable} | |
import scala.concurrent.ExecutionContext | |
object Quartz { | |
class QuartzScheduler(val scheduler: Scheduler)(implicit ec: ExecutionContext) extends AutoCloseable { | |
val cancellable = new AsyncCancellable | |
def start() = scheduler.start() | |
override def close(): Unit = { | |
cancellable.close() | |
scheduler.shutdown(true) | |
} | |
} | |
private class CronJob extends Job { | |
override def execute(context: JobExecutionContext): Unit = { | |
val jobId = context.getMergedJobDataMap.get("jobId").asInstanceOf[String] | |
val obs = context.getScheduler.getContext.get(jobId).asInstanceOf[Observer[Instant]] | |
obs.onNext(Instant.now()) | |
} | |
} | |
implicit class SchedulerExtensions(quartzScheduler: QuartzScheduler) { | |
/** | |
* Creates an Observable that triggers based on a Cron expression | |
* | |
* @param cronExpression a valid cron expression | |
* @see http://www.quartz-scheduler.org/documentation/quartz-1.x/tutorials/crontrigger | |
* @return | |
*/ | |
def createCronJob(cronExpression: String)(implicit logger: Logger): Observable[Instant] = { | |
import JobBuilder._ | |
import TriggerBuilder._ | |
import CronScheduleBuilder._ | |
Observable.create(obs => { | |
val jobId = UUID.randomUUID().toString | |
val job = newJob(classOf[CronJob]) | |
.usingJobData("jobId", jobId) | |
.build() | |
val trigger = newTrigger() | |
.withSchedule(cronSchedule(cronExpression)) | |
.build() | |
val scheduler = quartzScheduler.scheduler | |
scheduler.getContext.put(jobId, obs) | |
scheduler.scheduleJob(job, trigger) | |
val subscription = Subscription { | |
obs.onCompleted() | |
try { | |
scheduler.deleteJob(job.getKey) | |
scheduler.getContext.remove(jobId) | |
} catch { | |
case e: Exception => | |
logger.debug(s"Something went wrong when deleting quartz jobs, due to: ${e.getMessage}") | |
} | |
} | |
quartzScheduler.cancellable.whenClosed(subscription.unsubscribe) | |
subscription | |
}) | |
} | |
} | |
class CancelledException extends Exception | |
class AsyncCancellable(implicit ec: ExecutionContext) extends AutoCloseable { | |
private val p = Promise[Nothing] | |
def onCancel: Future[Nothing] = p.future | |
def whenClosed(work: () => Unit): Unit = { | |
onCancel.onComplete(_ => work()) | |
} | |
override def close(): Unit = p.failure(new CancelledException) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment