This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val conf = new SparkConf().setAppName("SparkTest").setMaster("local[1]") | |
val sparkContext = new SparkContext(conf) | |
val sqlContext = new SQLContext(sparkContext) | |
import sqlContext.implicits.{getClass => _, _} | |
val fileNameGoogle = "/google-places-data.json" // 1 json places per line | |
val locationsGoogle = sqlContext.read.json(getClass.getResource(fileNameGoogle).getPath). | |
toDF(). |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import spark.implicits._ | |
val kvKDataset = spark.read.option("header", true).option("sep", ";").option("ignoreLeadingWhiteSpace", true).option("ignoreTrailingWhiteSpace", true).option("quote", """"""").option("nullValue", "").option("mode", "FAILFAST").csv(path) | |
kvKDataset.select( | |
'DOSSIERNR.as("dossierNummer"), | |
'VG_NR.as("vgNummer"), | |
'HANDELSN30.as("naamShort"), | |
'HANDN1_30.as("naamShortP1"), | |
'HANDN2_30.as("naamShortP2"), |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val comparableDataset = kvKDataset.as("l") | |
.joinWith( | |
kvKDataset.as("r"), | |
$"l.adresV" === $"r.adresV" && $"l.postcodePlaatsV" === $"r.postcodePlaatsV" && $"l.dossierNummer" =!= $"r.dossierNummer" | |
).map { | |
case (left, right) => (left, right, Vectors.dense(left.distance(right).toArray)) | |
} | |
.toDF("left", "right", "features") | |
.as[(KvKRecord, KvKRecord, Vector)] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.lang.Math._ | |
import info.debatty.java.stringsimilarity.JaroWinkler | |
case class KvKRecord(dossierNummer: String, vgNummer: String, naamShort: String, naamShortP1: String, naamShortP2: String, naamLong: String, adresV: String, postcodePlaatsV: String, adresC: String, postcodePlaatsC: String, wptf: Int, sbi: Int) { | |
def vectorValues: List[Any] = List(dossierNummer, vgNummer, naamShort, naamShortP1, naamShortP2, naamLong, adresV, postcodePlaatsV, adresC, postcodePlaatsC, wptf, sbi) | |
def distance(other: KvKRecord): List[Double] = { | |
val jw = new JaroWinkler |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Tabulator: checkout http://stackoverflow.com/questions/7539831/scala-draw-table-to-console | |
def propertyList(kvKRecord: KvKRecord): List[Any] = KvKRecord.unapply(kvKRecord).map(_.productIterator.toList).getOrElse(Nil) | |
val labeledList: ArrayBuffer[LabeledVector] = ArrayBuffer() | |
breakable { | |
comparableDataset | |
.sample(withReplacement = false, Config.sampleFactor) | |
.collect() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// load the labeled data | |
val labeledSet = spark.read.parquet(path).as[LabeledVector] | |
// split train/test (80/20) | |
val Array(trainingData, testData) = labeledSet.randomSplit(Array(Config.trainSplit, 1 - Config.trainSplit)) | |
// Basic model | |
val lr = new LogisticRegression().setMaxIter(200).setRegParam(0.01).setElasticNetParam(0.8) | |
// Train |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// probability will be a vector with 2 values resp. for label 0.0 and 1.0. We just need the 1.0 one | |
val getP1 = udf((v: Vector) => v(1)) | |
val predictionsRaw = lrModel.transform(comparableDataset) | |
val predictedLinksDataset = predictionsRaw | |
.select( | |
$"left.dossierNummer".as("dossierNummer_left"), | |
$"right.dossierNummer".as("dossierNummer_right"), | |
$"features", |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// edges are given the probabilty of the correctness of the edge | |
val edges: RDD[Edge[Double]] = predictedLinksDataset.rdd.map( | |
l => Edge(l.dossierNummer_left.toLong, l.dossierNummer_right.toLong, l.probability)) | |
// vertices are just tuples of the dossiernummer.toLong (VertexId is typed as Long) | |
val vertices: RDD[(VertexId, KvKRecord)] = predictedLinksDataset.rdd.map( | |
k => (k.dossierNummer.toLong, k)) | |
// the graph | |
val linkedGraph = Graph(vertices, edges) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import spark.sqlContext.implicits._ | |
case class GroupedKvKRecord(groupId: Long, kvkRecords: Seq[KvKRecord]) | |
// need to broadcast for rdd in rdd mapping | |
val bcVertices = spark.sparkContext.broadcast(vertices.collectAsMap) | |
// connectedComponents is the magic here | |
val groupedKvKRecords = linkedGraph | |
.connectedComponents |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val stringify = udf((vs: Seq[String]) => vs.mkString(";")) | |
groupedKvKRecords | |
.map( | |
r => r.kvkRecords | |
.foldRight((0, 0, List.empty[String]))( | |
(kvkrecord, comb) => ( | |
comb._1 + 1, | |
kvkrecord.wptf + comb._2, | |
kvkrecord.dossierNummer :: comb._3 |
OlderNewer