Skip to content

Instantly share code, notes, and snippets.

@ashkrit
Created January 27, 2019 04:35
Show Gist options
  • Save ashkrit/1aeff6894539cfd8907e92f9faaacef3 to your computer and use it in GitHub Desktop.
Save ashkrit/1aeff6894539cfd8907e92f9faaacef3 to your computer and use it in GitHub Desktop.
private def monitorRecordsProcessed(pointsRecordCounter: LongAccumulator): Unit = {
val startTime = System.currentTimeMillis()
val counter = pointsRecordCounter
log.info("Records monitoring started")
while (true) {
val timeInSec = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)
val recordsCount = counter.sum
val tp = recordsCount.toFloat / timeInSec
log.info(s"Records processed ${recordsCount} in ${timeInSec} sec , throughput ${tp} / sec")
Thread.sleep(TimeUnit.SECONDS.toMillis(1))
}
}
private def processData(path: String, pointsRecordCounter: LongAccumulator, sparkSession: SparkSession) = {
val topXValues = sparkSession.sparkContext.textFile(path)
.map(line => line.split(","))
.map(row => {
pointsRecordCounter.add(1)
(row(0), 1)
})
.reduceByKey((x, y) => x + y)
.sortBy(_._2, false)
.take(10)
topXValues
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment