Last active
September 19, 2016 15:22
-
-
Save mavencode01/686398f1d5f22cc55e5563fcbb1813ec to your computer and use it in GitHub Desktop.
Spark running out of Memory without caching
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mavencode.clustering | |
import java.util.Properties | |
import com.typesafe.config.ConfigFactory | |
import org.apache.log4j.{Level, LogManager} | |
import org.apache.spark.SparkConf | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.sql.{SaveMode, SparkSession} | |
import org.apache.spark.storage.StorageLevel | |
/** | |
* Created by Philip K. Adetiloye on 2016-09-16. | |
*/ | |
object RecursivePostMerge { | |
case class Locator(id: Integer, security_1: String, content_Locator_sha1_20c: Option[String], security: String, raw_score: Double, relative_score: Double, size: Integer) | |
case class MergedLocator(super_security_1: String, super_security: String, super_cluster_size: Integer, id: Integer, security_1: String, content_Locator_sha1_20c: String, security: String, raw_score: Double, relative_score: Double, size: Integer) | |
val log = LogManager.getRootLogger | |
log.setLevel(Level.INFO) | |
case class WithAncestor(child: Locator, ancestor: Option[Locator]) { | |
def hasGrandparent: Boolean = { | |
child.security != null | |
} | |
} | |
object RecursiveParentLookup { | |
def findSimilarLocatores(rdd: RDD[Locator]): RDD[WithAncestor] = { | |
val persitRDD = rdd.coalesce(1000) | |
// all Locatores keyed by id | |
def byId = persitRDD.map(x => (x.security, x)) | |
// recursive function that "climbs" one generation at each iteration | |
def climbOneGeneration(LocatoresParent: Option[RDD[WithAncestor]], LocatoresGrandParent: RDD[WithAncestor]): RDD[WithAncestor] = { | |
//val cached = LocatoresGrandParent.cache() | |
val cached = LocatoresGrandParent | |
log.info("Locatores Grandparent partitions: " + LocatoresGrandParent.getNumPartitions) | |
// find which Locatores can climb further up family tree | |
val haveGrandparents = cached.filter(_.hasGrandparent) | |
if (LocatoresParent.isDefined) { | |
log.info("Locatores LocatoresParent partitions: " + LocatoresParent.get.getNumPartitions) | |
// Locatores climb further up family tree only once | |
val moreGrandParents = LocatoresParent.get.coalesce(1000).map(_.child).subtract(haveGrandparents.map(_.child)) | |
if (moreGrandParents.isEmpty()) { | |
cached // we're done, return result | |
} else { | |
val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is | |
val withGrandparents = haveGrandparents | |
.coalesce(1000) | |
.map(r => (r.ancestor.get.content_Locator_sha1_20c.get, r)) // grandparent id | |
.join(byId) | |
.values | |
.map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.child, Some(grandparent)) }) | |
done ++ cached ++ climbOneGeneration(Some(haveGrandparents), withGrandparents) | |
} | |
} else { | |
val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is | |
val withGrandparents = haveGrandparents | |
.coalesce(1000) | |
.map(r => (r.ancestor.get.content_Locator_sha1_20c.get, r)) // grandparent id | |
.join(byId) | |
.values | |
.map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.child, Some(grandparent)) }) | |
done ++ cached ++ climbOneGeneration(Some(haveGrandparents), withGrandparents) | |
} | |
} | |
// call recursive method - start by assuming each Locator is its own parent, if it has one: | |
climbOneGeneration(None, persitRDD.map(p => WithAncestor(p, Some(p)))) | |
} | |
} | |
def main(args: Array[String]) { | |
val properties = ConfigFactory.load() | |
//... | |
val CPU = properties.getInt("cpu") | |
val conf = new SparkConf() | |
//.setMaster("local[2]") | |
.setAppName("similarities-recursive-post-merge") | |
//.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // causes scratch space to grow out of disk space | |
.set("spark.default.parallelism", (CPU * 3).toString) | |
val sparkSession = SparkSession | |
.builder() | |
.config(conf) | |
.getOrCreate() | |
val opts = Map( | |
"url" -> s"jdbc:postgresql://$S_DB_HOST:$S_DB_PORT/$S_DATABASE", | |
"driver" -> "org.postgresql.Driver", | |
"dbtable" -> SIMILARITY_TABLE, | |
"user" -> S_DB_USER, | |
"password"-> S_DB_PASSWORD, | |
"partitionColumn" -> PARTITION_COL, | |
"lowerBound" -> LOWER_BOUND, | |
"upperBound" -> UPPER_BOUND, | |
"numPartitions" -> NUM_PARTITION | |
) | |
val sc = sparkSession.sparkContext | |
import sparkSession.implicits._ | |
val similarityDs = sparkSession.read.format("jdbc").options(opts).load | |
similarityDs.createOrReplaceTempView("trading_data") | |
val similarityDataset = sparkSession.sql("select * from trading_data where " + | |
"relative_score >= 0.9 and " + | |
"((security is not NULL or security <> '') " + | |
"or (security_1 is not NULL or security_1 <> ''))").as[Locator] | |
/* | |
Super cluster Test data | |
val entry1 = Locator(1, "11", Some("10"), "1",1.1, 2.1, 20) | |
val entry2 = Locator(2, "12", Some("1"), "8",1.1, 2.1, 20) | |
val entry3 = Locator(3, "13", Some("2"), "1",1.1, 2.1, 20) | |
val entry4 = Locator(4, "14", Some("2"), "4" ,1.1, 2.1, 20) | |
val entry5 = Locator(5, "15", Some("4"), "5", 1.1, 2.1, 20) | |
val entry6 = Locator(6, "16", Some("2"), "2",1.1, 2.1, 20) | |
val entry7 = Locator(7, "17", Some("4"), "4",1.1, 2.1, 20) | |
val input = sc.parallelize(Seq(entry1, entry2, entry3, entry4, entry5, entry6, entry7)) | |
*/ | |
val superClusterTmp = RecursiveParentLookup.findSimilarLocatores(similarityDataset.rdd).collect() | |
val superClusterTmp1 = superClusterTmp.map(r => (r.ancestor.get.security_1, (r.ancestor.get.security, r.ancestor.get.size, r.child))) | |
.groupBy(c => c._1).map{ case row => row._2.toSeq } | |
.flatten | |
val superClusters = superClusterTmp1.map( x => MergedLocator(x._1, x._2._1, x._2._2, x._2._3.id, x._2._3.security_1, x._2._3.content_Locator_sha1_20c.get, x._2._3.security, x._2._3.raw_score, x._2._3.relative_score, x._2._3.size )).toSeq.distinct | |
val prop = new Properties() | |
prop.setProperty("user", M_DB_USER) | |
prop.setProperty("password", M_DB_PASSWORD) | |
prop.setProperty("driver", "org.postgresql.Driver") | |
superClusters.toDS().coalesce(100).write | |
.mode(SaveMode.Append) | |
.jdbc(s"jdbc:postgresql://$M_DB_HOST:$M_DB_PORT/$M_DATABASE", MERGED_TABLE, prop) | |
sparkSession.stop() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment