$ brew install apache-spark
A python shell with a preconfigured SparkContext (available as sc
). It is
single node, in fact it seems to ignore --num-executors
.
$ pyspark
$ IPYTHON=1 pyspark
$ PYSPARK_DRIVER_PYTHON=/path/to/python pyspark
$ PYSPARK_DRIVER_PYTHON=/path/to/python IPYTHON=1 pyspark
If required, explicitly state which python executable to use:
-
IPYTHON will cause an embedded ipython (if available on PATH) to be started.
-
PYSPARK_DRIVER_PYTHON can be used to explicitly specify which python executable to be used. It can also be used to point to ipython directly.
The GUI is available at http://10.1.0.111:4040
See https://spark.apache.org/docs/latest/configuration.html
To pass Spark properties to pyspark
$ PYSPARK_DRIVER_PYTHON=/path/to/python pyspark --conf 'user.country=GB user.name=rob'
Multiple --conf
params can be passed.
It is sometimes useful to see exactly what command is executed when starting
the pyspark shell. To do so, use SPARK_PRINT_LAUNCH_COMMAND=1
$ SPARK_PRINT_LAUNCH_COMMAND=1 pyspark
Set environment variables to configure AWS authentication AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
.
Load files with s3n://
prefix.
>>> rdd = sc.textFile('s3n://bucket/data.txt')
Using SparkSQL we can load json-per-line files thus
>>> dataframe = sqlContext.jsonFile('s3n://bucket/data.txt')
However, we have seen unexpected behaviour with in-memory caching of the data frame loading directly like this. A workaround is to load the file to an RDD and construct a JSON data frame from that intermediate RDD.
>>> rdd = sc.textFile('s3n://bucket/data.txt')
>>> rdd.cache()
>>> dataframe = sqlContext.jsonRDD(rdd)
Supports any glob pattern that hadoop supports.
Pattern | Description |
---|---|
? | Matches any single character. |
* | Matches zero or more characters. |
[abc] | Matches a single character from character set {a,b,c}. |
[a-b] | Matches a single character from the character range {a...b}. Note that character a must be lexicographically less than or equal to character b. |
[^a] | Matches a single character that is not from character set or range {a}. Note that the ^ character must occur immediately to the right of the opening bracket. |
\c | Removes (escapes) any special meaning of character c. |
{ab,cd} | Matches a string from the string set {ab, cd} |
{ab,c{de,fh}} | Matches a string from the string set {ab, cde, cfh} |
NOTE: Be careful with prefixes/paths. We discovered that .parquet
in an S3 prefix terminates recursive file lookup, presumably because it expects that string at the end of the path.
There is no built-in support for loading csv files. Instead, load the file as a text file and map a parsing function to each line. There are numerous ways to do this. Something like:
>>> import csv
>>> headers = ['col_a', 'col_b', 'col_c']
>>> sc.textFile(filepath).map(
lambda l: csv.DictReader([l], fieldnames=headers).next())
If performance is critical, rumour has it that pandas.io.parsers.read_csv
is
much faster. This is untested but may offer some interesting benefits.
>>> d = pandas.io.parsers.read_csv(filepath, names=headers)
>>> df = sqlContext.createDataFrame(d)
There is a third-party package
to add CSV support to .load
and .save
. It can be loaded by passing --packages
to pyspark.
$ pyspark --packages com.databricks:spark-csv_2.11:1.0.3
>>> df = sqlContext.load(
path="s3n://data.api/metadata/merchants.tsv.gz"
source="com.databricks.spark.csv",
delimiter="\t",
header="true")
>>> df.take(1)
According to the docs a jar of the package can be built for deployment to a cluster.
Since Spark 1.3 it is possible to load a table or SELECT statement into a data frame. Do so judiciously as we have not yet determined precisely how it loads data and what performance implications it may (or may not) have.
A jar of an appropriate JDBC driver is required. Load it my setting
--driver-library-path
and -driver-class-path
.
NOTE: A mysql jdbc driver seems to be pre-installed on EMR at /home/hadoop/spark/classpath/emr/mysql-connector-java-5.1.30.jar
.
$ pyspark --driver-library-path '../mysql-connector-java-5.1.35.jar' --driver-class-path '../mysql-connector-java-5.1.35.jar'
Note that the docs suggest using SPARK_CLASSPATH
env var. This works but is
deprecated.
Create a SQLContext
>>> from pyspark.sql import SQLContext
>>> sqlctx = SQLContext(sc)
>>> df = sqlctx.load(
source='jdbc',
driver='com.mysql.jdbc.Driver',
url='jdbc:mysql://host.net/mugic?user=user&password=password',
dbtable='table_or_select')
>>> df.take(2)
- Establish a SOCKS proxy to the master node.
- Use this proxy in your browser of choice.
- Connect to the Hadoop UI on port 8088, and navigate to the Spark app.
$ aws emr socks --cluster-id j-XXXXXXXXX --key-pair-file ~/.ssh/emr.pem
$ /Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome \
--proxy-server="socks5://localhost:8157" \
--host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" \
--user-data-dir=/tmp/
Go to the Hadoop web UI at {host}:9026, take the ApplicationMaster link (right hand side) and replace the internal IP with the hostname.
Hostname for the master node can be found in the management console (Master public DNS).
Within the UI urls are constructed using SPARK_PUBLIC_DNS
, which can be set
on the master node.
By default spark logs are printed to stdout. To reduce the verbosity change the log level in the log4j.properties file.
Look for log4j.rootCategory=INFO, console
and change INFO
to WARN
The file can (probably) be found at ~/spark-x.x.x/conf/log4j.properties
, or
/usr/local/Cellar/apache-spark/x.x.x/libexec/conf/log4j.properties
on osx if
you installed with homebrew.
- Spark env vars can be set in
/home/hadoop/spark/conf/spark-env.sh
, commonly in a bootstrap step. Spark 1.3.1
requires/tmp/spark-events
but it does not exist inAMI 3.8.0
. Create it in the bootstrap step.
- To get the IPs of all worker nodes:
hdfs dfsadmin -report | grep ^Name | cut -f2 -d: | cut -f2 -d' '
- To run a command on all nodes, use pssh:
hdfs dfsadmin -report | grep ^Name | cut -f2 -d: | cut -f2 -d' ' > hosts.txt
pssh -h hosts.txt -l hadoop -x "-oStrictHostKeyChecking=no" -o output.txt '{CMD}'
- Often useful to use S3DistCp to copy data to HDFS on the cluster.
- Hadoop, and HDFS can be configured in a bootstrap step.
- HDFS commands can be executed on the cli with
hdfs
. - Cluster disk usage:
hdfs dfs -df -h
- To move a local file into HDFS:
hdfs dfs -copyFromLocal /local /dst
- To view HDFS config:
hdfs getconf -confKey [key]
- HDFS site config is at
/home/hadoop/conf/hdfs-site.xml
- http://tech.just-eat.com/2014/02/13/tips-for-using-ec2-spot-instances/
- Running Spark on EC2
- EMR Spark bootstrap scripts from AWS
do you know how I can install databricks in amazon emr by default?