Skip to content

Instantly share code, notes, and snippets.

@Antwnis
Created January 29, 2015 11:25
Show Gist options
  • Save Antwnis/0dea8383788549302eb3 to your computer and use it in GitHub Desktop.
Save Antwnis/0dea8383788549302eb3 to your computer and use it in GitHub Desktop.
Capturing Deleted/Inserted records
package org.fannan.etl.examples
import cascading.pipe.joiner.{OuterJoin, LeftJoin}
import com.twitter.scalding._
class IUDJob(args: Args) extends Job(args) {
val schema = List('CustID,'AccountID,'LastUpdateDate)
val old_data = List(
("000001","343324332","2015-01-11"),
("000002","334544332","2015-01-09"),
("000003","334544123","2015-01-20"), // RECORD DELETED
("000004","343567652","2015-01-08"),
("000005","343345342","2015-01-07"))
val new_data= List(
("000001","343324332","2015-01-11"),
("000002","---------","2015-01-20"), // CHANGED
("000004","343567652","2015-01-08"),
("000005","343345342","2015-01-07"),
("000006","363365342","2015-01-20")) // NEW RECORD INTRODUCED
val OLD =
IterableSource[(String,String,String)](old_data, schema)
val NEW =
IterableSource[(String,String,String)](new_data, schema)
// identify NEW & DELETED RECORDS
val NEW_AND_DELETED = OLD.project('CustID).rename('CustID->'CustID_OLD)
.joinWithLarger('CustID_OLD -> 'CustID, NEW.project('CustID), joiner = new OuterJoin)
.filter('CustID_OLD, 'CustID) { x:(String,String) => x._1 == null | x._2 == null}
.write(Tsv("peiler"))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment