Last active
December 20, 2015 21:08
-
-
Save helena/6194946 to your computer and use it in GitHub Desktop.
TopAByBJob is a daily job which pulls .pailfile data with pathing by date/time and type from S3.
Data must be grouped by A and B, sorted by B, then only written for the top n (keep) for each A, based on B (count), descending.
I wrote all of the base jobs such as DailyJobWithKeep (for use by daily jobs needing a 'keepN') extends DailyJob extends …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Which generates a part-file of data in the format of: | |
2034 cid1 a | |
1025 cid1 g | |
2034 cid3 g | |
1025 cid3 a | |
2034 cid6 f | |
1025 cid6 b |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TopAByBJob(args: Args) extends DailyJobWithKeep(args, classOf[ProtobufTypeForS3PathPartition]) with TypeAFilters { | |
PailSource.source[FooProtobuf](rootpath, structure, directories).read | |
.mapTo('pailItem -> ('b, 'a)) { e: FooProtobuf ⇒ e.b -> calculateA(e) } | |
.filter('a) { n: String ⇒ n.nonEmpty } | |
.groupBy(('b, 'a)) { _.size('count) } | |
.groupBy('b) { _.sortedReverseTake[(Long, String, String)](('count, 'b, 'a) -> 'tcount, keep) } | |
.flatMapTo('tcount -> ('count, 'b, 'a)) { t: (List[(Long, String, String)]) ⇒ t } | |
.write(Tsv(outputdir)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for the simplification Oscar :)