Skip to content

Instantly share code, notes, and snippets.

@dapangmao
Last active August 29, 2015 14:09
Show Gist options
  • Save dapangmao/eebf352d5f9706a2c94e to your computer and use it in GitHub Desktop.
Save dapangmao/eebf352d5f9706a2c94e to your computer and use it in GitHub Desktop.

Chapter 2. Downloading and Getting Started

In this chapter we will walk through the process of downloading and running Spark in local mode on a single computer. This chapter was written for anybody that is new to Spark, including both Data Scientists and Engineers.

Spark can be used from Python, Java or Scala. To benefit from this book, you don’t need to be an expert programmer, but we do assume that you are comfortable with the basic syntax of at least one of these languages. We will include examples in all languages wherever possible.

Spark itself is written in Scala, and runs on the Java Virtual Machine (JVM). To run Spark on either your laptop or a cluster, all you need is an installation of Java 6 (or newer). If you wish to use the Python API you will also need a Python interpreter (version 2.6 or newer) . Spark does not yet work with Python 3.

Downloading Spark

The first step to using Spark is to download and unpack it into a usable form. Let’s start by downloading a recent precompiled released version of Spark. Visit http://spark.apache.org/downloads.html, then under “Pre-built packages”, next to “For Hadoop 1 (HDP1, CDH3)”, click “direct file download”. This will download a compressed tar file, or “tarball,” called spark-1.0.0-bin-hadoop1.tgz.

If you want to use Spark with another Hadoop version, those are also available from http://spark.apache.org/downloads.html but will have slightly different file names. Building from source is also possible, and you can find the latest source code on GitHub at http://github.com/apache/spark.

  • Note

    Most Unix and Linux variants, including Mac OS X, come with a command-line tool called tar that can be used to unpack tar files. If your operating system does not have the tar command installed, try searching the Internet for a free tar extractor—for example, on Windows, you may wish to try 7-Zip.

Now that we have downloaded Spark, let’s unpack it and take a look at what comes with the default Spark distribution. To do that, open a terminal, change to the directory where you downloaded Spark, and untar the file. This will create a new directory with the same name but without the final .tgz suffix. Change into the directory, and see what’s inside. You can use the following commands to accomplish all of that.

cd ~
tar -xf spark-1.0.0-bin-hadoop1.tgz
cd spark-1.0.0-bin-hadoop1
ls

In the line containing the tar command above, the x flag tells tar we are extracting files, and the f flag specifies the name of the tarball. The ls command lists the contents of the Spark directory. Let’s briefly consider the names and purpose of some of the more important files and directories you see here that come with Spark.

- README.md - Contains short instructions for getting started with Spark. 
- bin - Contains executable files that can be used to interact with Spark in various ways, e.g. the spark-shell, which we will cover later in this chapter, is in here. 
- core, streaming, python - source code of major components of the Spark project. 
- examples - contains some helpful Spark standalone jobs that you can look at and run to learn about the Spark API. 

Don’t worry about the large number of directories and files the Spark project comes with; we will cover most of these in the rest of this book. For now, let’s dive in right away and try out Spark’s Python and Scala shells. We will start by running some of the examples that come with Spark. Then we will write, compile and run a simple Spark Job of our own.

All of the work we will do in this chapter will be with Spark running in “local mode”, i.e. non-distributed mode, which only uses a single machine. Spark can run in a variety of different modes, or environments. Beyond local mode, Spark can also be run on Mesos, YARN, on top of a Standalone Scheduler that is included in the Spark distribution. We will cover the various deployment modes in detail in chapter (to come).

Introduction to Spark’s Python and Scala Shells

Spark comes with interactive shells that make ad-hoc data analysis easy. Spark’s shells will feel familiar if you have used other shells such as those in R, Python, and Scala, or operating system shells like Bash or the Windows command prompt.

Unlike most other shells, however, which let you manipulate data using the disk and memory on a single machine, Spark’s shells allow you to interact with data that is distributed on disk or in memory across many machines, and Spark takes care of automatically distributing this processing.

Because Spark can load data into memory, many distributed computations, even ones that process terabytes of data across dozens of machines, can finish running in a few seconds. This makes the sort of iterative, ad-hoc, and exploratory analysis commonly done in shells a good fit for Spark. Spark provides both Python and Scala shells that have been augmented to support connecting to a cluster.

  • Note

    Most of this book includes code in all of Spark’s languages, but interactive shells are only available in Python and Scala. Because a shell is very useful for learning the API, we recommend using one of these languages for these examples even if you are a Java developer. The API is the same in every language.

The easiest way to demonstrate the power of Spark’s shells is to start using one of them for some simple data analysis. Let’s walk through the example from the Quick Start Guide in the official Spark documentation [5].

The first step is to open up one of Spark’s shells. To open the Python version of the Spark Shell, which we also refer to as the PySpark Shell, go into your Spark directory and type: bin/pyspark

(Or bin\pyspark in Windows.) To open the Scala version of the shell, type: bin/spark-shell

The shell prompt should appear within a few seconds. When the shell starts, you will notice a lot of log messages. You may need to hit [Enter] once to clear the log output, and get to a shell prompt. Figure Figure 2-1 shows what the PySpark shell looks like when you open it.

The PySpark Shell With Default Logging Output

Figure 2-1. The PySpark Shell With Default Logging Output

You may find the logging statements that get printed in the shell distracting. You can control the verbosity of the logging. To do this, you can create a file in the conf directory called log4j.properties. The Spark developers already include a template for this file called log4j.properties.template. To make the logging less verbose, make a copy of conf/log4j.properties.template called conf/log4j.properties and find the following line: log4j.rootCategory=INFO, console

Then lower the log level so that we only show WARN message and above by changing it to the following: log4j.rootCategory=WARN, console

When you re-open the shell, you should see less output.

The PySpark Shell With Less Logging Output

Figure 2-2. The PySpark Shell With Less Logging Output

  • Using IPython

    IPython is an enhanced Python shell that many Python users prefer, offering features such as tab completion. You can find instructions for installing it at http://ipython.org. You can use IPython with Spark by setting the IPYTHON environment variable to 1: IPYTHON=1 ./bin/pyspark

    To use the IPython Notebook, which is a web browser based version of IPython, use: IPYTHON_OPTS="notebook" ./bin/pyspark

    On Windows, set the environment variable and run the shell as follows:

    set IPYTHON=1
    bin\pyspark
    

In Spark we express our computation through operations on distributed collections that are automatically parallelized across the cluster. These collections are called a Resilient Distributed Datasets, or RDDs. RDDs are Spark’s fundamental abstraction for distributed data and computation.

Before we say more about RDDs, let’s create one in the shell from a local text file and do some very simple ad-hoc analysis by following the example below.

Example 2-1. Python line count

>>> lines = sc.textFile("README.md") # Create an RDD called lines

>>> lines.count() # Count the number of items in this RDD
127
>>> lines.first() # First item in this RDD, i.e. first line of README.md
u'# Apache Spark'

Example 2-2. Scala line count

scala> val lines = sc.textFile("README.md") // Create an RDD called lines
lines: spark.RDD[String] = MappedRDD[...]

scala> lines.count() // Count the number of items in this RDD
res0: Long = 127

scala> lines.first() // First item in this RDD, i.e. first line of README.md
res1: String = # Apache Spark

To exit the shell, you can press Control+D.

In the example above, the variables called lines are RDDs, created here from a text file on our local machine. We can run various parallel operations on the RDDs, such as counting the number of elements in the dataset (here lines of text in the file) or printing the first one. We will discuss RDDs in great depth in later chapters, but before we go any further, let’s take a moment now to introduce basic Spark concepts.

Introduction to Core Spark Concepts

Now that you have run your first Spark code using the shell, it’s time learn about programming in it in more detail.

At a high level, every Spark application consists of a driver program that launches various parallel operations on a cluster. The driver program contains your application’s main function and defines distributed datasets on the cluster, then applies operations to them. In the examples above, the driver program was the Spark shell itself, and you could just type in the operations you wanted to run.

Driver programs access Spark through a SparkContext object, which represents a connection to a computing cluster. In the shell, a SparkContext is automatically created for you, as the variable called sc. Try printing out sc to see its type:

>>> sc
<pyspark.context.SparkContext object at 0x1025b8f90>

Once you have a SparkContext, you can use it to build resilient distributed datasets, or RDDs. In the example above, we called SparkContext.textFile to create an RDD representing the lines of text in a file. We can then run various operations on these lines, such as count().

To run these operations, driver programs typically manage a number of nodes called executors. For example, if we were running the count() above on a cluster, different machines might count lines in different ranges of the file. Because we just ran the Spark shell locally, it executed all its work on a single machine—but you can connect the same shell to a cluster to analyze data in parallel. Figure 2-3 shows how Spark executes on a cluster.

Components for distributed execution in Spark

Figure 2-3. Components for distributed execution in Spark

Finally, a lot of Spark’s API revolves around passing functions to its operators to run them on the cluster. For example, we could extend our README example by filtering the lines in the file that contain a word, such as “Python”:

Example 2-3. Python filtering example

>>> lines = sc.textFile("README.md")

>>> pythonLines = lines.filter(lambda line: "Python" in line)

>>> pythonLines.first()
u'## Interactive Python Shell'

Example 2-4. Scala filtering example

scala> val lines = sc.textFile("README.md") // Create an RDD called lines
lines: spark.RDD[String] = MappedRDD[...]

scala> val pythonLines = lines.filter(line => line.contains("Python"))
pythonLines: spark.RDD[String] = FilteredRDD[...]

scala> lines.first()
res0: String = ## Interactive Python Shell
  • Note

    If you are unfamiliar with the lambda or => syntax above, it is a shorthand way to define functions inline in Python and Scala. When using Spark in these languages, you can also define a function separately and then pass its name to Spark. For example, in Python:

    def hasPython(line):
        return "Python" in line
    
    pythonLines = lines.filter(hasPython)
    

    Passing functions to Spark is also possible in Java, but in this case they are defined as classes, implementing an interface called Function. For example:

    JavaRDD<String> pythonLines = lines.filter(
      new Function<String, Boolean>() {
        Boolean call(String line) { return line.contains("Python"); }
      }
    );
    

    Java 8 introduces shorthand syntax called “lambdas” that looks similar to Python and Scala. Here is how the code would look with this syntax: JavaRDD pythonLines = lines.filter(line -> line.contains("Python"));

    We discuss passing functions further in Passing Functions to Spark.

While we will cover the Spark API in more detail later, a lot of its magic is that function-based operations like filter also parallelize across the cluster. That is, Spark automatically takes your function (e.g. line.contains("Python")) and ships it to executor nodes. Thus, you can write code in a single driver program and automatically have parts of it run on multiple nodes. Chapter 3 covers the RDD API in more detail.

Standalone Applications

The final piece missing in this quick tour of Spark is how to use it in standalone programs. Apart from running interactively, Spark can be linked into standalone applications in either Java, Scala or Python. The main difference from using it in the shell is that you need to initialize your own SparkContext. After that, the API is the same.

The process of linking to Spark varies by language. In Java and Scala, you give your application a Maven dependency on the spark-core artifact published by Apache. As of the time of writing, the latest Spark version is 1.0.0, and the Maven coordinates for that are:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.0.0

If you are unfamiliar with Maven, it is a popular package management tool for Java-based languages that lets you link to libraries in public repositories. You can use Maven itself to build your project, or use other tools that can talk to the Maven repositories, including Scala’s SBT tool or Gradle. Popular integrated development environments like Eclipse also allow you to directly add a Maven dependency to a project.

In Python, you simply write applications as Python scripts, but you must instead run them using a special bin/spark-submit script included in Spark. This script sets up the environment for Spark’s Python API to function. Simply run your script with: bin/spark-submit my_script.py

(Note that you will have to use backslashes instead of forward slashes on Windows.)

  • Note

    In Spark versions before 1.0, use bin/pyspark my_script.py to run Python applications instead.

For detailed examples of linking applications to Spark, refer to the Quick Start Guide [6] in the official Spark documentation. In a final version of the book, we will also include full examples in an appendix.

  • Initializing a SparkContext

Once you have linked an application to Spark, you need to import the Spark packages in your program and create a SparkContext. This is done by first creating a SparkConf object to configure your application, and then building a SparkContext for it. Here is a short example in each supported language:

Example 2-5. Initializing Spark in Python

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf)

Example 2-6. Initializing Spark in Java

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
JavaSparkContext sc = new JavaSparkContext(conf);

Example 2-7. Initializing Spark in Scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext("local", "My App")

These examples show the minimal way to initialize a SparkContext, where you pass two parameters:

  • A cluster URL, namely “local” in these examples, which tells Spark how to connect to a cluster. “local” is a special value that runs Spark on one thread on the local machine, without connecting to a cluster.
  • An application name, namely “My App” in these examples. This will identify your application on the cluster manager’s UI if you connect to a cluster.

Additional parameters exist for configuring how your application executes or adding code to be shipped to the cluster, but we will cover these in later chapters of the book.

After you have initialized a SparkContext, you can use all the methods we showed before to create RDDs (e.g. from a text file) and manipulate them.

Finally, to shut down Spark, you can either call the stop() method on your SparkContext, or simply exit the application (e.g. with System.exit(0) or sys.exit()).

This quick overview should be enough to let you run a standalone Spark application on your laptop. For more advanced configuration, a later chapter in the book will cover how to connect your application to a cluster, including packaging your application so that its code is automatically shipped to worker nodes. For now, please refer to the Quick Start Guide [7] in the official Spark documentation.

Conclusion

In this chapter, we have covered downloading Spark, running it locally on your laptop, and using it either interactively or from a standalone application. We gave a quick overview of the core concepts involved in programming with Spark: a driver program creates a SparkContext and RDDs, and then runs parallel operations on them. In the next chapter, we will dive more deeply into how RDDs operate.


[5] http://spark.apache.org/docs/latest/quick-start.html

[6] http://spark.apache.org/docs/latest/quick-start.html

[7] http://spark.apache.org/docs/latest/quick-start.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment