Skip to content

Instantly share code, notes, and snippets.

@oivoodoo
Created November 1, 2018 09:47
Show Gist options
  • Save oivoodoo/cb7147a314077e37543fdf3020730814 to your computer and use it in GitHub Desktop.
Save oivoodoo/cb7147a314077e37543fdf3020730814 to your computer and use it in GitHub Desktop.
val restarter = new Restarter(ssc, CommandLineArgs.values.env)
restarter.setup()
restarter.await()
class Restarter(ssc: StreamingContext, env: String) extends Serializable {
val log: Logger = DatalakeLogger.log
// Basic idea to have marker file to rely when we should make restart.
// marker:
// - should be store in /opt/datalake-service/
// - should have unique id at the end of file name
// to ensure that the new process should be always
// rely on unique marker file.
def id(): String = java.util.UUID.randomUUID.toString
// Lets assume that we are always to the same location that we wrote
// in ansible documentation.
val MARKER_FILE: String = s"/tmp/datalake.marker.${id()}"
var stopFlag = false
val checkIntervalMillis = 10000
def await(): Unit = {
var isStopped = false
while (! isStopped) {
isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
if (isStopped) {
log.info("[Restarter] Confirmed! The streaming context is stopped. Exiting application...")
} else {
log.debug("[Restarter] Streaming App is still running. Timeout...")
}
// In case if file doesn't exist in hdfs and app is running
// we should kill spark job.
if (! isStopped && ! alive()) {
log.info("[Restarter] begin shutdown streaming app...")
ssc.stop(stopSparkContext = true, stopGracefully = true)
log.info("[Restarter] done shutdown streaming app...")
}
}
}
// should creates new file that will be marker for knowing when
// we should stop the streaming process.
def setup(): Unit = {
val file = FileSystem
.get(ssc.sparkContext.hadoopConfiguration)
.create(new Path(s"hdfs://$MARKER_FILE"))
try {
file.writeBytes(DateTime.now.toString)
} finally {
file.close()
}
if (! alive()) {
throw new Exception("can't create marker file in hdfs")
}
}
// should check that file exists in fs
// it means that streaming should continue to work.
def alive(): Boolean = {
try {
FileSystem
.get(ssc.sparkContext.hadoopConfiguration)
.exists(new Path(s"hdfs://$MARKER_FILE"))
} catch {
case e: IOException =>
e.printStackTrace()
false
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment