Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active September 21, 2018 18:01
Show Gist options
  • Save ottomata/14dd11625387723ab5b87ecdf4fd8885 to your computer and use it in GitHub Desktop.
Save ottomata/14dd11625387723ab5b87ecdf4fd8885 to your computer and use it in GitHub Desktop.
spark-shell with ammonite and default spark config loading
#!/usr/bin/env bash
export SPARK_HOME="${SPARK_HOME:-/usr/lib/spark2}"
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}"/conf}"
source ${SPARK_HOME}/bin/load-spark-env.sh
export HIVE_CONF_DIR=${SPARK_CONF_DIR}
AMMONITE=./amm
SPARK_PREDEF=~/spark.predef.scala
$AMMONITE --class-based --predef $SPARK_PREDEF $@
import ammonite.ops._
// Load spark2 jars
ls.rec! Path(sys.env.get("SPARK_HOME").getOrElse("/usr/lib/spark2") + "/jars") |? { _.segments.last.endsWith(".jar") } |! { interp.load.cp(_) }
// Import the ammonite-spark dependency
import $ivy.`sh.almond::ammonite-spark:0.1.1`
import $ivy.`org.jupyter-scala::spark:0.4.2` // for JupyterSparkSession (SparkSession aware of the jupyter-scala kernel)
// Cause the ammonite repl to run the above before the below
// so that the above dependencies are added to the classpath in time.
@
import java.nio.charset.StandardCharsets
import java.util.Properties
import java.io._
import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import jupyter.spark.session._
import jupyter.spark.JupyterSparkContext
object Utils {
/**
* Load default Spark properties from the given file. If no file is provided,
* use the common defaults file. This mutates state in the given SparkConf and
* in this JVM's system properties if the config specified in the file is not
* already set. Return the path of the properties file used.
*/
def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): String = {
val path = Option(filePath).getOrElse(getDefaultPropertiesFile())
Option(path).foreach { confFile =>
getPropertiesFromFile(confFile).filter { case (k, v) =>
k.startsWith("spark.")
}.foreach { case (k, v) =>
conf.setIfMissing(k, v)
sys.props.getOrElseUpdate(k, v)
}
}
path
}
/** Load properties present in the given file. */
def getPropertiesFromFile(filename: String): Map[String, String] = {
val file = new File(filename)
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)
try {
val properties = new Properties()
properties.load(inReader)
properties.stringPropertyNames().asScala.map(
k => (k, properties.getProperty(k).trim)).toMap
} catch {
case e: IOException =>
throw new RuntimeException(s"Failed when loading Spark properties from $filename", e)
} finally {
inReader.close()
}
}
/** Return the path of the default Spark properties file. */
def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
env.get("SPARK_CONF_DIR")
.orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
.map { t => new File(s"$t${File.separator}spark-defaults.conf")}
.filter(_.isFile)
.map(_.getAbsolutePath)
.orNull
}
def getSparkConf(jupyterize: Boolean = false) = {
val sparkConf = new SparkConf()
loadDefaultSparkProperties(sparkConf)
if (jupyterize)
JupyterSparkContext.withHooks(sparkConf)
sparkConf
}
def jupterizeSparkContext(sc: SparkContext) = {
JupyterSparkContext.applyContextHooks(sc)
sc.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) =
JupyterSparkContext.applyStopContextHooks(sc)
})
}
}
// Create a spark ammonite session
val spark = {
AmmoniteSparkSession.builder()
.config(Utils.getSparkConf(jupyterize=true))
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
}
Utils.jupterizeSparkContext(spark.sparkContext)
import ammonite.ops._
// Load spark2 jars
ls.rec! Path(sys.env.get("SPARK_HOME").getOrElse("/usr/lib/spark2") + "/jars") |? { _.segments.last.endsWith(".jar") } |! { interp.load.cp(_) }
// Import the ammonite-spark dependency
import $ivy.`sh.almond::ammonite-spark:0.1.1`
// Cause the ammonite repl to run the above before the below
// so that the above dependencies are added to the classpath in time.
@
import java.nio.charset.StandardCharsets
import java.util.Properties
import java.io._
import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.SparkConf
object Utils {
/**
* Load default Spark properties from the given file. If no file is provided,
* use the common defaults file. This mutates state in the given SparkConf and
* in this JVM's system properties if the config specified in the file is not
* already set. Return the path of the properties file used.
*/
def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): String = {
val path = Option(filePath).getOrElse(getDefaultPropertiesFile())
Option(path).foreach { confFile =>
getPropertiesFromFile(confFile).filter { case (k, v) =>
k.startsWith("spark.")
}.foreach { case (k, v) =>
conf.setIfMissing(k, v)
sys.props.getOrElseUpdate(k, v)
}
}
path
}
/** Load properties present in the given file. */
def getPropertiesFromFile(filename: String): Map[String, String] = {
val file = new File(filename)
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)
try {
val properties = new Properties()
properties.load(inReader)
properties.stringPropertyNames().asScala.map(
k => (k, properties.getProperty(k).trim)).toMap
} catch {
case e: IOException =>
throw new RuntimeException(s"Failed when loading Spark properties from $filename", e)
} finally {
inReader.close()
}
}
/** Return the path of the default Spark properties file. */
def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
env.get("SPARK_CONF_DIR")
.orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
.map { t => new File(s"$t${File.separator}spark-defaults.conf")}
.filter(_.isFile)
.map(_.getAbsolutePath)
.orNull
}
}
val sparkConf = new SparkConf()
Utils.loadDefaultSparkProperties(sparkConf)
// Create a spark ammonite session
val spark = {
AmmoniteSparkSession.builder()
.config(sparkConf)
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment