Last active
December 4, 2017 13:39
-
-
Save epishkin/ce69acdee6d18afa5997 to your computer and use it in GitHub Desktop.
write all counters of a scalding job to hdfs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.PrintWriter | |
import cascading.stats.CascadingStats | |
import com.twitter.scalding._ | |
/** | |
* Writes all custom counters into a tsv file args("counters-file") if this property is set. | |
* | |
* Output format: | |
* counter_name value | |
*/ | |
trait SaveCountersToHdfs extends Job { | |
private lazy val countersFile = args.optional("counters-file") | |
override protected def handleStats(statsData: CascadingStats) = { | |
super.handleStats(statsData) | |
if (statsData.isSuccessful && countersFile.nonEmpty) { | |
saveCounters(statsData, countersFile.get) | |
} | |
} | |
private def saveCounters(statsData: CascadingStats, outputPath: String) = { | |
implicit val statProvider = statsData | |
val jobStats = Stats.getAllCustomCounters | |
if (jobStats.nonEmpty) { | |
writeToFile(outputPath) { writer => | |
jobStats.foreach { | |
case (counter, value) => | |
writer.println("%s\t%s".format(counter, value)) | |
} | |
} | |
} | |
} | |
private def writeToFile(location: String)(fn: (PrintWriter => Unit)) = { | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.fs.{Path, FileSystem} | |
val fs = FileSystem.get(new Configuration()) | |
val path = new Path(location) | |
val writer = new PrintWriter(fs.create(path)) | |
try { | |
fn(writer) | |
} finally { | |
writer.flush() | |
writer.close() | |
} | |
} | |
} | |
class SampleJob(args: Args) extends Job(args) with SaveCountersToHdfs { | |
val counterFoo = Stat("sample.foo") | |
val counterBar = Stat("sample.bar") | |
//implement your job here | |
//when the job finishes SaveCountersToHdfs should write all custom counters to a file args("counters-file") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
the same using flow listeners
https://itellity.wordpress.com/2014/10/29/counters-using-cascading-flow-listeners-in-scalding/