Skip to content

Instantly share code, notes, and snippets.

@mavencode01
Last active September 19, 2016 15:22
Show Gist options
  • Save mavencode01/686398f1d5f22cc55e5563fcbb1813ec to your computer and use it in GitHub Desktop.
Save mavencode01/686398f1d5f22cc55e5563fcbb1813ec to your computer and use it in GitHub Desktop.
Spark running out of Memory without caching
package com.mavencode.clustering
import java.util.Properties
import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, LogManager}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
* Created by Philip K. Adetiloye on 2016-09-16.
*/
object RecursivePostMerge {
case class Locator(id: Integer, security_1: String, content_Locator_sha1_20c: Option[String], security: String, raw_score: Double, relative_score: Double, size: Integer)
case class MergedLocator(super_security_1: String, super_security: String, super_cluster_size: Integer, id: Integer, security_1: String, content_Locator_sha1_20c: String, security: String, raw_score: Double, relative_score: Double, size: Integer)
val log = LogManager.getRootLogger
log.setLevel(Level.INFO)
case class WithAncestor(child: Locator, ancestor: Option[Locator]) {
def hasGrandparent: Boolean = {
child.security != null
}
}
object RecursiveParentLookup {
def findSimilarLocatores(rdd: RDD[Locator]): RDD[WithAncestor] = {
val persitRDD = rdd.coalesce(1000)
// all Locatores keyed by id
def byId = persitRDD.map(x => (x.security, x))
// recursive function that "climbs" one generation at each iteration
def climbOneGeneration(LocatoresParent: Option[RDD[WithAncestor]], LocatoresGrandParent: RDD[WithAncestor]): RDD[WithAncestor] = {
//val cached = LocatoresGrandParent.cache()
val cached = LocatoresGrandParent
log.info("Locatores Grandparent partitions: " + LocatoresGrandParent.getNumPartitions)
// find which Locatores can climb further up family tree
val haveGrandparents = cached.filter(_.hasGrandparent)
if (LocatoresParent.isDefined) {
log.info("Locatores LocatoresParent partitions: " + LocatoresParent.get.getNumPartitions)
// Locatores climb further up family tree only once
val moreGrandParents = LocatoresParent.get.coalesce(1000).map(_.child).subtract(haveGrandparents.map(_.child))
if (moreGrandParents.isEmpty()) {
cached // we're done, return result
} else {
val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
val withGrandparents = haveGrandparents
.coalesce(1000)
.map(r => (r.ancestor.get.content_Locator_sha1_20c.get, r)) // grandparent id
.join(byId)
.values
.map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.child, Some(grandparent)) })
done ++ cached ++ climbOneGeneration(Some(haveGrandparents), withGrandparents)
}
} else {
val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
val withGrandparents = haveGrandparents
.coalesce(1000)
.map(r => (r.ancestor.get.content_Locator_sha1_20c.get, r)) // grandparent id
.join(byId)
.values
.map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.child, Some(grandparent)) })
done ++ cached ++ climbOneGeneration(Some(haveGrandparents), withGrandparents)
}
}
// call recursive method - start by assuming each Locator is its own parent, if it has one:
climbOneGeneration(None, persitRDD.map(p => WithAncestor(p, Some(p))))
}
}
def main(args: Array[String]) {
val properties = ConfigFactory.load()
//...
val CPU = properties.getInt("cpu")
val conf = new SparkConf()
//.setMaster("local[2]")
.setAppName("similarities-recursive-post-merge")
//.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // causes scratch space to grow out of disk space
.set("spark.default.parallelism", (CPU * 3).toString)
val sparkSession = SparkSession
.builder()
.config(conf)
.getOrCreate()
val opts = Map(
"url" -> s"jdbc:postgresql://$S_DB_HOST:$S_DB_PORT/$S_DATABASE",
"driver" -> "org.postgresql.Driver",
"dbtable" -> SIMILARITY_TABLE,
"user" -> S_DB_USER,
"password"-> S_DB_PASSWORD,
"partitionColumn" -> PARTITION_COL,
"lowerBound" -> LOWER_BOUND,
"upperBound" -> UPPER_BOUND,
"numPartitions" -> NUM_PARTITION
)
val sc = sparkSession.sparkContext
import sparkSession.implicits._
val similarityDs = sparkSession.read.format("jdbc").options(opts).load
similarityDs.createOrReplaceTempView("trading_data")
val similarityDataset = sparkSession.sql("select * from trading_data where " +
"relative_score >= 0.9 and " +
"((security is not NULL or security <> '') " +
"or (security_1 is not NULL or security_1 <> ''))").as[Locator]
/*
Super cluster Test data
val entry1 = Locator(1, "11", Some("10"), "1",1.1, 2.1, 20)
val entry2 = Locator(2, "12", Some("1"), "8",1.1, 2.1, 20)
val entry3 = Locator(3, "13", Some("2"), "1",1.1, 2.1, 20)
val entry4 = Locator(4, "14", Some("2"), "4" ,1.1, 2.1, 20)
val entry5 = Locator(5, "15", Some("4"), "5", 1.1, 2.1, 20)
val entry6 = Locator(6, "16", Some("2"), "2",1.1, 2.1, 20)
val entry7 = Locator(7, "17", Some("4"), "4",1.1, 2.1, 20)
val input = sc.parallelize(Seq(entry1, entry2, entry3, entry4, entry5, entry6, entry7))
*/
val superClusterTmp = RecursiveParentLookup.findSimilarLocatores(similarityDataset.rdd).collect()
val superClusterTmp1 = superClusterTmp.map(r => (r.ancestor.get.security_1, (r.ancestor.get.security, r.ancestor.get.size, r.child)))
.groupBy(c => c._1).map{ case row => row._2.toSeq }
.flatten
val superClusters = superClusterTmp1.map( x => MergedLocator(x._1, x._2._1, x._2._2, x._2._3.id, x._2._3.security_1, x._2._3.content_Locator_sha1_20c.get, x._2._3.security, x._2._3.raw_score, x._2._3.relative_score, x._2._3.size )).toSeq.distinct
val prop = new Properties()
prop.setProperty("user", M_DB_USER)
prop.setProperty("password", M_DB_PASSWORD)
prop.setProperty("driver", "org.postgresql.Driver")
superClusters.toDS().coalesce(100).write
.mode(SaveMode.Append)
.jdbc(s"jdbc:postgresql://$M_DB_HOST:$M_DB_PORT/$M_DATABASE", MERGED_TABLE, prop)
sparkSession.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment