Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.io.PrintWriter
- import cascading.stats.CascadingStats
- import com.twitter.scalding._
- /**
- * Writes all custom counters into a tsv file args("counters-file") if this property is set.
- *
- * Output format:
- * counter_name value
- */
- trait SaveCountersToHdfs extends Job {
- private lazy val countersFile = args.optional("counters-file")
- override protected def handleStats(statsData: CascadingStats) = {
- super.handleStats(statsData)
- if (statsData.isSuccessful && countersFile.nonEmpty) {
- saveCounters(statsData, countersFile.get)
- }
- }
- private def saveCounters(statsData: CascadingStats, outputPath: String) = {
- implicit val statProvider = statsData
- val jobStats = Stats.getAllCustomCounters
- if (jobStats.nonEmpty) {
- writeToFile(outputPath) { writer =>
- jobStats.foreach {
- case (counter, value) =>
- writer.println("%s\t%s".format(counter, value))
- }
- }
- }
- }
- private def writeToFile(location: String)(fn: (PrintWriter => Unit)) = {
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.fs.{Path, FileSystem}
- val fs = FileSystem.get(new Configuration())
- val path = new Path(location)
- val writer = new PrintWriter(fs.create(path))
- try {
- fn(writer)
- } finally {
- writer.flush()
- writer.close()
- }
- }
- }
- class SampleJob(args: Args) extends Job(args) with SaveCountersToHdfs {
- val counterFoo = Stat("sample.foo")
- val counterBar = Stat("sample.bar")
- //implement your job here
- //when the job finishes SaveCountersToHdfs should write all custom counters to a file args("counters-file")
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement