Skip to content

Instantly share code, notes, and snippets.

@KOBA789
Created August 27, 2012 03:08
Show Gist options
  • Save KOBA789/3485231 to your computer and use it in GitHub Desktop.
Save KOBA789/3485231 to your computer and use it in GitHub Desktop.
Storm チュートリアル日本語訳

このチュートリアルでは Storm トポロジーの作り方とそれを Storm クラスタにデプロイする方法を学べます。Java を主な言語として使いますが、Storm が多言語対応であることを示すためにいくつかの例は Python を使います。

前付

このチュートリアルでは storm-starter にある例を使います。このプロジェクトを clone し、例にそって追うことをおすすめします。Read [[Setting up development environment]] and [[Creating a new Storm project]] to get your machine set up.

Storm クラスターの構成

Storm クラスターはぱっと見 Hadoop クラスターに似ています。Hadoop では「MapReduce ジョブ」を走らせるのに対して、Storm では「トポロジー」を走らせます。「ジョブ」と「トポロジ」は大いに異なります。1つの鍵となる違いは MapReduce ジョブがいつかは終わることに対して、トポロジーは永遠にメッセージを処理し続けます(終了しない限り)。

Storm クラスターにはマスターノードとワーカーノードという2種類のノードがあります。マスターノードは「ニンバス」と呼ばれる Hadoop の「ジョブトラッカー」に似たデーモンを走らせます。ニンバスはクラスタにコードを分散したりマシンにタスクを割り当てたり失敗を監視したりする責任があります。

それぞれのワーカーノードは「スーパーバイザー」と呼ばれるデーモンを走らせます。スーパーバイザーはニンバスがそのマシンに割り当てた内容に基づいて、仕事を待機したり、ワーカープロセスを開始または停止させたりします。それぞれのワーカープロセスはトポロジーのサブセットを実行します。実行中のトポロジーはたくさんのマシンにまたがって展開されたたくさんのワーカープロセスによって構成されています。

Storm cluster

ニンバスとスーパーバイザー間のすべての調整は Zookeeper クラスターを通して行われます。さらにニンバスデーモンとスーパーバイザーデーモンはフェイルファスト(訳注: エラーを早く検知してすぐに処理を中断する)であり、ステートレスです。すべての状態は Zookeeper またはローカルのディスクに保持されます。これはニンバスやスーパーバイザーのデーモンに kill -9 可能であり、何事もなかったかのように起動させることができるということを意味します。この設計は Storm クラスターの信じられないほどの安定性につながります。

トポロジー

Storm 上でリアルタイムな計算をするために、「トポロジー」と呼ばれるものを作ります。トポロジーは計算のグラフです。トポロジー内のノードは処理ロジックを含んでおり、ノード間でどんなデータを受け渡すべきか指し示すノードの間に繋がっています。

トポロジーの実行は素直です。まず、あなたのコードと依存するものすべてを1つの jar ファイルに入れます。そして次のようなコマンドを実行します。

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

これは backtype.storm.MyTopology クラスを arg1 and arg2 という引数を付けて実行します。クラスの主な関数はトポロジーを定義し、それをニンバスへ送信します。storm jar 部分はニンバスへの接続と jar ファイルのアップロードの面倒を見ます。

トポロジ定義は単なる Thrift 構造体であり、ニンバスは Thrift サービスであるため、どんな言語でもトポロジーを作り、送信することができます。上記の例は JVM ベースの言語から扱うのに最も簡単な例です。トポロジーの開始と停止に関する更なる情報は [[Running topologies on a production cluster]] を参照してください。

ストリーム

Storm でコアとなる概念は「ストリーム」です。ストリームは限りないタプル連続です。Storm は、ストリームを新しいストリームに変換する、分散されていて信頼性のある方法の根本を提供します。例えばツイートのストリームをトレンドトピックのストリームに変換することができます。

Storm が提供するストリーム変換の基本的な根本は「スパウト」と「ボルト」です。スパウトとボルトはアプリケーション定義ロジックを実行するためのインターフェース実装を持っています。

スパウトはストリームのソースです。例えばスパウトは Kestrelキューのタプルを読み取り、それをストリームとして流すことができます。あるいはスパウトは Twitter API に接続してツイートのストリームを流すことができます。

ボルトは入力ストリームをいくらでも消費し、なにかを処理し、時には新しいストリームを流します。ツイートのストリームからトレンドトピックのストリームを計算するような複雑なストリーム変換は複数のステップを必要とし、そのために複数のボルトを必要とします。ボルトは関数を実行することによってタプルのフィルタリング、ストリームの集計、ストリームの連結、データベースへの問い合わせやその他多くのことをすることができます。

スパウトのネットワークとボルトとは実行するために Storm クラスターへ送信された最上概念の「トポロジー」にパッケージングされています。トポロジーは各ノードがスパウトやボルトであるストリーム変換のグラフです。グラフの辺はどのボルトがどのストリームを引き受けてるかを指し示しています。スパウトやボルトがタプルをストリームに流した時、タプルはそのストリームを引き受けているすべてのボルトに送信されます。

A Storm topology

Links between nodes in your topology indicate how tuples should be passed around. For example, if there is a link between Spout A and Bolt B, a link from Spout A to Bolt C, and a link from Bolt B to Bolt C, then everytime Spout A emits a tuple, it will send the tuple to both Bolt B and Bolt C. All of Bolt B's output tuples will go to Bolt C as well.

Each node in a Storm topology executes in parallel. In your topology, you can specify how much parallelism you want for each node, and then Storm will spawn that number of threads across the cluster to do the execution.

A topology runs forever, or until you kill it. Storm will automatically reassign any failed tasks. Additionally, Storm guarantees that there will be no data loss, even if machines go down and messages are dropped.

Data model

Storm uses tuples as its data model. A tuple is a named list of values, and a field in a tuple can be an object of any type. Out of the box, Storm supports all the primitive types, strings, and byte arrays as tuple field values. To use an object of another type, you just need to implement a serializer for the type.

Every node in a topology must declare the output fields for the tuples it emits. For example, this bolt declares that it emits 2-tuples with the fields "double" and "triple":

public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollectorBase _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        int val = input.getInteger(0);        
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    
}

The declareOutputFields function declares the output fields ["double", "triple"] for the component. The rest of the bolt will be explained in the upcoming sections.

A simple topology

Let's take a look at a simple topology to explore the concepts more and see how the code shapes up. Let's look at the ExclamationTopology definition from storm-starter:

TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new TestWordSpout(), 10);        
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
        .shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
        .shuffleGrouping("exclaim1");

This topology contains a spout and two bolts. The spout emits words, and each bolt appends the string "!!!" to its input. The nodes are arranged in a line: the spout emits to the first bolt which then emits to the second bolt. If the spout emits the tuples ["bob"] and ["john"], then the second bolt will emit the words ["bob!!!!!!"] and ["john!!!!!!"].

This code defines the nodes using the setSpout and setBolt methods. These methods take as input a user-specified id, an object containing the processing logic, and the amount of parallelism you want for the node. In this example, the spout is given id "words" and the bolts are given ids "exclaim1" and "exclaim2".

The object containing the processing logic implements the IRichSpout interface for spouts and the IRichBolt interface for bolts.

The last parameter, how much parallelism you want for the node, is optional. It indicates how many threads should execute that component across the cluster. If you omit it, Storm will only allocate one thread for that node.

setBolt returns an InputDeclarer object that is used to define the inputs to the Bolt. Here, component "exclaim1" declares that it wants to read all the tuples emitted by component "words" using a shuffle grouping, and component "exclaim2" declares that it wants to read all the tuples emitted by component "exclaim1" using a shuffle grouping. "shuffle grouping" means that tuples should be randomly distributed from the input tasks to the bolt's tasks. There are many ways to group data between components. These will be explained in a few sections.

If you wanted component "exclaim2" to read all the tuples emitted by both component "words" and component "exclaim1", you would write component "exclaim2"'s definition like this:

builder.setBolt("exclaim2", new ExclamationBolt(), 5)
            .shuffleGrouping("words")
            .shuffleGrouping("exclaim1");

As you can see, input declarations can be chained to specify multiple sources for the Bolt.

Let's dig into the implementations of the spouts and bolts in this topology. Spouts are responsible for emitting new messages into the topology. TestWordSpout in this topology emits a random word from the list ["nathan", "mike", "jackson", "golda", "bertels"] as a 1-tuple every 100ms. The implementation of nextTuple() in TestWordSpout looks like this:

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}

As you can see, the implementation is very straightforward.

ExclamationBolt appends the string "!!!" to its input. Let's take a look at the full implementation for ExclamationBolt:

public static class ExclamationBolt implements IRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    public void cleanup() {
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
    
    public Map getComponentConfiguration() {
        return null;
    }
}

The prepare method provides the bolt with an OutputCollector that is used for emitting tuples from this bolt. Tuples can be emitted at anytime from the bolt -- in the prepare, execute, or cleanup methods, or even asynchronously in another thread. This prepare implementation simply saves the OutputCollector as an instance variable to be used later on in the execute method.

The execute method receives a tuple from one of the bolt's inputs. The ExclamationBolt grabs the first field from the tuple and emits a new tuple with the string "!!!" appended to it. If you implement a bolt that subscribes to multiple input sources, you can find out which component the Tuple came from by using the Tuple#getSourceComponent method.

There's a few other things going in in the execute method, namely that the input tuple is passed as the first argument to emit and the input tuple is acked on the final line. These are part of Storm's reliability API for guaranteeing no data loss and will be explained later in this tutorial.

The cleanup method is called when a Bolt is being shutdown and should cleanup any resources that were opened. There's no guarantee that this method will be called on the cluster: for example, if the machine the task is running on blows up, there's no way to invoke the method. The cleanup method is intended for when you run topologies in [[local mode]] (where a Storm cluster is simulated in process), and you want to be able to run and kill many topologies without suffering any resource leaks.

The declareOutputFields method declares that the ExclamationBolt emits 1-tuples with one field called "word".

The getComponentConfiguration method allows you to configure various aspects of how this component runs. This is a more advanced topic that is explained further on [[Configuration]].

Methods like cleanup and getComponentConfiguration are often not needed in a bolt implementation. You can define bolts more succinctly by using a base class that provides default implementations where appropriate. ExclamationBolt can be written more succinctly by extending BaseRichBolt, like so:

public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }    
}

Running ExclamationTopology in local mode

Let's see how to run the ExclamationTopology in local mode and see that it's working.

Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is useful for testing and development of topologies. When you run the topologies in storm-starter, they'll run in local mode and you'll be able to see what messages each component is emitting. You can read more about running topologies in local mode on [[Local mode]].

In distributed mode, Storm operates as a cluster of machines. When you submit a topology to the master, you also submit all the code necessary to run the topology. The master will take care of distributing your code and allocating workers to run your topology. If workers go down, the master will reassign them somewhere else. You can read more about running topologies on a cluster on [[Running topologies on a production cluster]].

Here's the code that runs ExclamationTopology in local mode:

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

First, the code defines an in-process cluster by creating a LocalCluster object. Submitting topologies to this virtual cluster is identical to submitting topologies to distributed clusters. It submits a topology to the LocalCluster by calling submitTopology, which takes as arguments a name for the running topology, a configuration for the topology, and then the topology itself.

The name is used to identify the topology so that you can kill it later on. A topology will run indefinitely until you kill it.

The configuration is used to tune various aspects of the running topology. The two configurations specified here are very common:

  1. TOPOLOGY_WORKERS (set with setNumWorkers) specifies how many processes you want allocated around the cluster to execute the topology. Each component in the topology will execute as many threads. The number of threads allocated to a given component is configured through the setBolt and setSpout methods. Those threads exist within worker processes. Each worker process contains within it some number of threads for some number of components. For instance, you may have 300 threads specified across all your components and 50 worker processes specified in your config. Each worker process will execute 6 threads, each of which of could belong to a different component. You tune the performance of Storm topologies by tweaking the parallelism for each component and the number of worker processes those threads should run within.
  2. TOPOLOGY_DEBUG (set with setDebug), when set to true, tells Storm to log every message every emitted by a component. This is useful in local mode when testing topologies, but you probably want to keep this turned off when running topologies on the cluster.

There's many other configurations you can set for the topology. The various configurations are detailed on the Javadoc for Config.

To learn about how to set up your development environment so that you can run topologies in local mode (such as in Eclipse), see [[Creating a new Storm project]].

Stream groupings

A stream grouping tells a topology how to send tuples between two components. Remember, spouts and bolts execute in parallel as many tasks across the cluster. If you look at how a topology is executing at the task level, it looks something like this:

Tasks in a topology

When a task for Bolt A emits a tuple to Bolt B, which task should it send the tuple to?

A "stream grouping" answers this question by telling Storm how to send tuples between sets of tasks. Before we dig into the different kinds of stream groupings, let's take a look at another topology from storm-starter. This WordCountTopology reads sentences off of a spout and streams out of WordCountBolt the total number of times it has seen that word before:

TopologyBuilder builder = new TopologyBuilder();
        
builder.setSpout("sentences", new RandomSentenceSpout(), 5);        
builder.setBolt("split", new SplitSentence(), 8)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
        .fieldsGrouping("split", new Fields("word"));

SplitSentence emits a tuple for each word in each sentence it receives, and WordCount keeps a map in memory from word to count. Each time WordCount receives a word, it updates its state and emits the new word count.

There's a few different kinds of stream groupings.

The simplest kind of grouping is called a "shuffle grouping" which sends the tuple to a random task. A shuffle grouping is used in the WordCountTopology to send tuples from RandomSentenceSpout to the SplitSentence bolt. It has the effect of evenly distributing the work of processing the tuples across all of SplitSentence bolt's tasks.

A more interesting kind of grouping is the "fields grouping". A fields grouping is used between the SplitSentence bolt and the WordCount bolt. It is critical for the functioning of the WordCount bolt that the same word always go to the same task. Otherwise, more than one task will see the same word, and they'll each emit incorrect values for the count since each has incomplete information. A fields grouping lets you group a stream by a subset of its fields. This causes equal values for that subset of fields to go to the same task. Since WordCount subscribes to SplitSentence's output stream using a fields grouping on the "word" field, the same word always goes to the same task and the bolt produces the correct output.

Fields groupings are the basis of implementing streaming joins and streaming aggregations as well as a plethora of other use cases. Underneath the hood, fields groupings are implemented using mod hashing.

There's a few other kinds of stream groupings. You can read more about them on [[Concepts]].

Defining Bolts in other languages

Bolts can be defined in any language. Bolts written in another language are executed as subprocesses, and Storm communicates with those subprocesses with JSON messages over stdin/stdout. The communication protocol just requires an ‾100 line adapter library, and Storm ships with adapter libraries for Ruby, Python, and Fancy.

Here's the definition of the SplitSentence bolt from WordCountTopology:

public static class SplitSentence extends ShellBolt implements IRichBolt {
    public SplitSentence() {
        super("python", "splitsentence.py");
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

SplitSentence overrides ShellBolt and declares it as running using python with the arguments splitsentence.py. Here's the implementation of splitsentence.py:

import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

For more information on writing spouts and bolts in other languages, and to learn about how to create topologies in other languages (and avoid the JVM completely), see [[Using non-JVM languages with Storm]].

Guaranteeing message processing

Earlier on in this tutorial, we skipped over a few aspects of how tuples are emitted. Those aspects were part of Storm's reliability API: how Storm guarantees that every message coming off a spout will be fully processed. See [[Guaranteeing message processing]] for information on how this works and what you have to do as a user to take advantage of Storm's reliability capabilities.

Transactional topologies

Storm guarantees that every message will be played through the topology at least once. A common question asked is "how do you do things like counting on top of Storm? Won't you overcount?" Storm has a feature called transactional topologies that let you achieve exactly-once messaging semantics for most computations. Read more about transactional topologies here.

Distributed RPC

This tutorial showed how to do basic stream processing on top of Storm. There's lots more things you can do with Storm's primitives. One of the most interesting applications of Storm is Distributed RPC, where you parallelize the computation of intense functions on the fly. Read more about Distributed RPC here.

Conclusion

This tutorial gave a broad overview of developing, testing, and deploying Storm topologies. The rest of the documentation dives deeper into all the aspects of using Storm.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment