Last active
November 25, 2021 10:27
-
-
Save ndolgov/62f40e8c58ad61315079 to your computer and use it in GitHub Desktop.
Running SparkSQL in standalone mode
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<properties> | |
<spark.version>2.3.0</spark.version> | |
</properties> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-sql_2.11</artifactId> | |
<version>${spark.version}</version> | |
</dependency> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# 1) Unpack Spark archive to $SPARK_HOME | |
# 2) "cp conf/spark-env.sh.template conf/spark-env.sh" | |
# 3) edit "conf/spark-env.sh" | |
# 4) make sure "spark-env.sh" scripts are identical on all master and worker nodes | |
# | |
# 5) To actually start Spark master and worker(s), run from $SPARK_HOME directory: | |
# .. on master | |
"./sbin/start-master.sh" | |
# .. on worker(s) | |
"./bin/spark-class org.apache.spark.deploy.worker.Worker spark://127.0.0.1:8091" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Options for the daemons used in the standalone deploy mode | |
export SPARK_MASTER_IP=127.0.0.1 | |
export SPARK_MASTER_PORT=8091 | |
export SPARK_MASTER_WEBUI_PORT=8090 | |
export SPARK_WORKER_CORES=4 | |
export SPARK_WORKER_MEMORY=24g |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.SparkSession; | |
public class SparkDriver { | |
public static void main(String[] args) { | |
final SparkSession session = SparkSession.builder(). | |
appName("MySparkApp" + System.currentTimeMillis()). | |
master(SparkEnvCfg.sparkMasterUrl()). | |
config(SparkEnvCfg.SPARK_EXECUTOR_MEMORY, "1g"). | |
config(SparkEnvCfg.SPARK_SERIALIZER, SparkEnvCfg.KRYO). | |
config(SparkEnvCfg.SPARK_SQL_SHUFFLE_PARTITIONS, "2"). | |
config(SparkEnvCfg.SPARK_EXECUTOR_EXTRA_JAVA_OPTIONS, SparkEnvCfg.JMXREMOTE_ENABLED). | |
getOrCreate(); | |
// TODO use session to create and manipulate datasets | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File | |
import org.apache.spark.sql.SparkSession | |
object SparkDriver { | |
def sparkSession(name : String) : SparkSession = { | |
SparkSession.builder(). | |
appName(name). | |
master("local"). | |
config(SparkCtxCfg.SPARK_EXECUTOR_MEMORY, "1g"). | |
config(SparkCtxCfg.SPARK_SERIALIZER, SparkCtxCfg.KRYO). | |
config(SparkCtxCfg.SPARK_SQL_SHUFFLE_PARTITIONS, "2"). | |
config(SparkCtxCfg.SPARK_WAREHOUSE_DIR, "target/spark-warehouse"). | |
config(SparkCtxCfg.SPARK_JARS, SparkCtxCfg.toAbsolutePaths("", "")). | |
config(SparkCtxCfg.SPARK_DRIVER_HOST, "localhost"). | |
config(SparkCtxCfg.SPARK_DRIVER_PORT, "31000"). | |
getOrCreate() | |
} | |
} | |
object SparkCtxCfg { | |
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" | |
val SPARK_SERIALIZER = "spark.serializer" | |
val ALLOW_MULTIPLE_CONTEXTS = "spark.driver.allowMultipleContexts" | |
val SPARK_JARS = "spark.jars" | |
val SPARK_WAREHOUSE_DIR = "spark.sql.warehouse.dir" | |
val KRYO = "org.apache.spark.serializer.KryoSerializer" | |
val SPARK_SQL_SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" | |
val DEFAULT_SPARK_MASTER_URL = "spark://127.0.0.1:7077" | |
val SPARK_DRIVER_HOST = "spark.driver.host" | |
val SPARK_DRIVER_PORT = "spark.driver.port" | |
def envProperty(name : String, otherwise : String) : String = { | |
val prop = System.getProperty(name) | |
if (prop == null) otherwise else prop | |
} | |
def availableProcessors() : String = { | |
Integer.toString(Runtime.getRuntime.availableProcessors()) | |
} | |
def toAbsolutePaths(jarsString: String, baseDir: String): String = { | |
if (jarsString == null || jarsString.length == 0) { | |
return "" | |
} | |
val libDir: String = if (baseDir.endsWith(File.separator)) baseDir | |
else baseDir + File.separator | |
toAbsolutePaths(libDir, jarsString.split(",")).mkString(",") | |
} | |
private def toAbsolutePaths(libDir: String, jarFileNames: Array[String]): Array[String] = { | |
jarFileNames.map(jar => libDir + jar) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public final class SparkEnvCfg { | |
public static final String SPARK_EXECUTOR_MEMORY = "spark.executor.memory"; | |
public static final String SPARK_SERIALIZER = "spark.serializer"; | |
public static final String SPARK_EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions"; | |
public static final String KRYO = "org.apache.spark.serializer.KryoSerializer"; | |
public static final String JMXREMOTE_ENABLED = "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=" + executorJmxPort(); | |
public static final String SPARK_SQL_SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"; | |
public static String sparkMasterUrl() { | |
return envProperty("spark.master.url", "spark://127.0.0.1:8091"); | |
} | |
public static String cacheBatchSize() { | |
return envProperty("spark.cache.batch", "100000"); | |
} | |
public static String executorMemory() { | |
return envProperty("spark.executor.memory", "24g"); | |
} | |
private static String executorJmxPort() { | |
return envProperty("spark.executor.jmx", "20000"); | |
} | |
public static String envProperty(String name, String otherwise) { | |
final String prop = System.getProperty(name); | |
return prop == null ? otherwise : prop; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment