Created
November 1, 2018 09:47
-
-
Save oivoodoo/cb7147a314077e37543fdf3020730814 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val restarter = new Restarter(ssc, CommandLineArgs.values.env) | |
restarter.setup() | |
restarter.await() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Restarter(ssc: StreamingContext, env: String) extends Serializable { | |
val log: Logger = DatalakeLogger.log | |
// Basic idea to have marker file to rely when we should make restart. | |
// marker: | |
// - should be store in /opt/datalake-service/ | |
// - should have unique id at the end of file name | |
// to ensure that the new process should be always | |
// rely on unique marker file. | |
def id(): String = java.util.UUID.randomUUID.toString | |
// Lets assume that we are always to the same location that we wrote | |
// in ansible documentation. | |
val MARKER_FILE: String = s"/tmp/datalake.marker.${id()}" | |
var stopFlag = false | |
val checkIntervalMillis = 10000 | |
def await(): Unit = { | |
var isStopped = false | |
while (! isStopped) { | |
isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis) | |
if (isStopped) { | |
log.info("[Restarter] Confirmed! The streaming context is stopped. Exiting application...") | |
} else { | |
log.debug("[Restarter] Streaming App is still running. Timeout...") | |
} | |
// In case if file doesn't exist in hdfs and app is running | |
// we should kill spark job. | |
if (! isStopped && ! alive()) { | |
log.info("[Restarter] begin shutdown streaming app...") | |
ssc.stop(stopSparkContext = true, stopGracefully = true) | |
log.info("[Restarter] done shutdown streaming app...") | |
} | |
} | |
} | |
// should creates new file that will be marker for knowing when | |
// we should stop the streaming process. | |
def setup(): Unit = { | |
val file = FileSystem | |
.get(ssc.sparkContext.hadoopConfiguration) | |
.create(new Path(s"hdfs://$MARKER_FILE")) | |
try { | |
file.writeBytes(DateTime.now.toString) | |
} finally { | |
file.close() | |
} | |
if (! alive()) { | |
throw new Exception("can't create marker file in hdfs") | |
} | |
} | |
// should check that file exists in fs | |
// it means that streaming should continue to work. | |
def alive(): Boolean = { | |
try { | |
FileSystem | |
.get(ssc.sparkContext.hadoopConfiguration) | |
.exists(new Path(s"hdfs://$MARKER_FILE")) | |
} catch { | |
case e: IOException => | |
e.printStackTrace() | |
false | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment