Skip to content

Instantly share code, notes, and snippets.

@kangvcar
Last active May 29, 2025 09:50
Show Gist options
  • Select an option

  • Save kangvcar/527d680882746ccda57ffcfab07574c3 to your computer and use it in GitHub Desktop.

Select an option

Save kangvcar/527d680882746ccda57ffcfab07574c3 to your computer and use it in GitHub Desktop.

下载和安装Spark

  1. 下载Spark安装包
mkdir -p /opt/software
cd /opt/software
wget https://dlcdn.apache.org/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz

tar -zxf spark-3.5.6-bin-hadoop3.tgz -C /opt/
mv /opt/spark-3.5.6-bin-hadoop3 /opt/spark
  1. 配置Spark环境变量
vim /etc/profile

# 添加以下内容到文件末尾
export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

使环境变量生效

source /etc/profile

验证安装

spark-submit --version

配置Spark

  1. 创建配置文件
cd /opt/spark/conf

cp spark-env.sh.template spark-env.sh
cp spark-defaults.conf.template spark-defaults.conf
cp workers.template workers
  1. 配置spark-env.sh
vim spark-env.sh

# 添加以下内容
export JAVA_HOME=/opt/java8
export HADOOP_HOME=/opt/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

# Spark集群管理配置
export SPARK_MASTER_HOST=master
export SPARK_MASTER_PORT=7077

# 资源配置 - 为3节点4核8G的集群合理分配资源
export SPARK_WORKER_CORES=3
export SPARK_WORKER_MEMORY=6g
export SPARK_DRIVER_MEMORY=2g
export SPARK_EXECUTOR_MEMORY=2g 
export SPARK_EXECUTOR_CORES=1 
export SPARK_EXECUTOR_INSTANCES=2

# 历史服务器配置
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs:///spark-history -Dspark.history.retainedApplications=20"
  1. 配置spark-defaults.conf
vim spark-defaults.conf

# 添加以下内容
spark.master                     yarn
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs:///spark-history
spark.history.fs.logDirectory    hdfs:///spark-history

# 资源配置 - 为YARN模式设置合理的资源分配
spark.driver.memory              2g
spark.executor.memory            2g
spark.executor.cores             1
spark.executor.instances         6
spark.dynamicAllocation.enabled  true
spark.dynamicAllocation.initialExecutors  2
spark.dynamicAllocation.minExecutors  2
spark.dynamicAllocation.maxExecutors  6

# 性能优化
spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max  128m
spark.network.timeout            600s
spark.local.dir                  /tmp/spark-temp
  1. 配置workers文件
vim workers

# 添加所有节点主机名
master
slave1
slave2

同步Spark到所有节点

  1. 在master节点上创建HDFS目录
#创建Spark历史服务器日志目录
hdfs dfs -mkdir -p /spark-history
hdfs dfs -chmod -R 777 /spark-history

#创建Spark程序使用的目录
hdfs dfs -mkdir -p /spark
hdfs dfs -mkdir -p /spark/jars
hdfs dfs -chmod -R 777 /spark
  1. 创建并上传Spark依赖包
#创建Spark依赖JAR包
cd /opt/spark
jar cvf /opt/software/spark-libs.jar -C /opt/spark/jars/ .

#上传到HDFS
hdfs dfs -put -f /opt/software/spark-libs.jar /spark/jars/
  1. 将Spark分发到其他节点
#分发Spark目录到slave1节点
scp -r /opt/spark slave1:/opt/
scp -r /etc/profile slave1:/etc/

#分发Spark目录到slave2节点
scp -r /opt/spark slave2:/opt/
scp -r /etc/profile slave2:/etc/

启动Spark服务

  1. 启动Spark历史服务器

/opt/spark/sbin/start-history-server.sh

  1. 验证Spark是否正常工作

jps | grep HistoryServer

  1. 确认可以在YARN上提交Spark作业
spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 2g \
    --executor-memory 2g \
    --executor-cores 1 \
    --num-executors 3 \
$SPARK_HOME/examples/jars/spark-examples_2.12-3.5.6.jar 10

运行Spark WordCount示例 (PySpark版本)

  1. 准备输入数据
cat > /tmp/words.txt << EOF
Hello Spark Hadoop Spark Hello
Big Data Processing with Spark
Spark is faster than MapReduce
Python API for Spark makes data processing easy
PySpark provides powerful analytics capabilities
Distributed computing with Spark and Python
EOF

# 上传到HDFS
hdfs dfs -mkdir -p /spark/input
hdfs dfs -put /tmp/words.txt /spark/input/
  1. 使用PySpark交互式环境运行WordCount

启动PySpark Shell

pyspark --master yarn

在PySpark Shell中输入如下Python代码

text_file = spark.read.text("hdfs:///spark/input/words.txt")

counts = text_file.rdd.flatMap(lambda line: line.value.split(" ")) \
                     .filter(lambda word: word.strip() != "") \
                     .map(lambda word: (word, 1)) \
                     .reduceByKey(lambda a, b: a + b) \
                     .toDF(["word", "count"])

counts.show()

counts.write.csv("hdfs:///spark/output/wordcount-py-result")

exit()
  1. 以脚本方式提交PySpark WordCount

创建Python脚本

cat > /tmp/wordcount.py << EOF
from pyspark.sql import SparkSession

# 初始化SparkSession
spark = SparkSession.builder \
    .appName("PySpark WordCount Example") \
    .getOrCreate()

# 读取输入文件
text_file = spark.read.text("hdfs:///spark/input/words.txt")

# 执行WordCount
counts = text_file.rdd.flatMap(lambda line: line.value.split(" ")) \
                     .filter(lambda word: word.strip() != "") \
                     .map(lambda word: (word, 1)) \
                     .reduceByKey(lambda a, b: a + b)

# 转换为DataFrame并保存
counts_df = counts.toDF(["word", "count"])
counts_df.write.mode("overwrite").csv("hdfs:///spark/output/wordcount-py-script")

# 打印结果
print("WordCount执行完成,结果已保存到HDFS")
spark.stop()
EOF

提交脚本到YARN

spark-submit --master yarn \
             --deploy-mode cluster \
             --driver-memory 2g \
             --executor-memory 2g \
             --executor-cores 1 \
             --num-executors 3 \
             /tmp/wordcount.py
  1. 查看WordCount结果

查看输出目录

hdfs dfs -ls /spark/output/wordcount-py-result
hdfs dfs -ls /spark/output/wordcount-py-script

查看具体结果

hdfs dfs -cat /spark/output/wordcount-py-result/part-*.csv

运行Spark Pi计算示例

spark-submit --class org.apache.spark.examples.SparkPi \
  --master yarn \
  --deploy-mode client \
  --driver-memory 2g \
  --executor-memory 2g \
  --executor-cores 1 \
  --num-executors 3 \
  $SPARK_HOME/examples/jars/spark-examples_2.12-3.5.6.jar 9
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment