Storm is a distributed and fault-tolerant realtime computation system
###Features of storm:
- Scalable and robust
- Fault-tolrant (automatic reassigning of tasks)
- Reliable (all messages are processed at least once)
- Fast
- Transactional (exactly once messaging semantics)
###Components used by storm:
- Zookeeper (Store's metadata)
- 0MQ (Message transport layer)
- Thrift (RPC bridge)
- LMAX Disruptor (High performance queue shared by threads)
- Kyro (Serialization framework)
###Components of Strom Cluster:
- Jobs are Topologies (topologies process messages for ever untill you kill it)
- Nimbus - master node (responsible for distributing code and assigning tasks & monitor failures)
- Supervisor - worker nodes (manages worker processes, starts/stops worker processes as necessary based on what nimbus assigned.). Each worker process executes a subset of topology
- Zookeeper - coordination between Nimbus and Supervisor
Nimbus and Supervisor daemons are fail-fast and stateless, all state is kept in zookeeper or on local disk.
###Terminology:
- Tuples are ordered list of elements
- Streams are unbounded sequence of tuples
- Spouts are source of streams
- Bolts processes input streams and produces new streams or persists streams
- Topology is a girected graph of spouts and bolts
- Tasks are instances of spouts and bolts, managed by supervisor
###Strom UseCases
- Stream Processing - process messages and update databases
- Continuous Computation - doing continuous query on data streams and streaming results into clients
- Distributed RPC - parallelizing an intense query like a search on the fly
###Modes of operation
- local mode - develop and test topologies locally
- In local mode, Storm executes completely in process by simulating worker nodes with threads
- remote mode - submit topologies for execution on a cluster of machines
- When you submit a topology to the master, you also submit all the code necessary to run the topology. The master will take care of distributing your code and allocating workers to run your topology. If workers go down, the master will reassign them somewhere else.
###Topologies A topology is graph of computation, each node in a topology contains processing logic and links between nodes indicate how data should be passed around between nodes.
Running a topology:
strom jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
This runs the class backtype.storm.MyTopology
with the arguments arg1
and arg2
. The main function of the class defines the topology and submits it to Nimbus. The storm jar
part takes care of connecting to Nimbus and uploading the jar.
###Streams:
Core abstraction in storm is the stream
. A stream is an unbounded sequence of tuples. Storm provides the primitives for transforming a stream into a new stream in a distributed and reliable way.
stream transformation primitives:
spout
is a source of streams- spout may read tuples from queue and emit them as a stream
- spout may connect to Twitter API and emit a stream of tweets
bolt
consumes any number of input streams, does some processing and possibly emits new streams- Bolts can do anything from run functions, filter tuples, do streaming aggregations, do streaming joins, talk to databases, and more.
Networks of spouts and bolts are packaged into a "topology" which is the top-level abstraction that you submit to Storm clusters for execution.
Links between nodes in your topology indicate how tuples should be passed around. For example, if there is a link between Spout A and Bolt B, a link from Spout A to Bolt C, and a link from Bolt B to Bolt C, then everytime Spout A emits a tuple, it will send the tuple to both Bolt B and Bolt C. All of Bolt B's output tuples will go to Bolt C as well.
###Data Model: Storm uses tuples as its data model. A tuple is a named list of values and a field in a tuple can be an object of any type. Out of the box, Storm supports primitive types, strings, and byte arrays as tuple field values. To use another object you need to implement a serializer.
Every node in a topology must declare the output fields for the tuples it emits.
###Stream Groupings A stream grouping tells a topology how to send tuples between two components.
When a task for Bolt A emits a tuple to Bolt B, which task should it send the tuple to?
A "stream grouping" answers this question by telling Storm how to send tuples between sets of tasks.
Types of Stream Grouping:
- shuffle grouping
- sends the tuple to random task, It has the effect of evenly distributing the work of processing the tuples across all of bolt's tasks
- fields grouping
- lets you group a stream by a subset of its fields
- causes equal values for that subset of fields to go to the same task
- fields groupings are implemented using mod hashing
###Storm parallelism
Storm distinguishes between the following three main entities that are used to actually run a topology in storm cluster:
- Worker processes - A machine in a storm cluster may run one or more worker processes for one or more topologies, worker process runs executors for a specific topology.
- Executors (threads) - One or more executors may run with in a single process, with each executor being a thread spawned by the worker process, each executor runs one or more tasks of the same component (spout or bolt)
- Tasks - A task performs the actual data processing.
Notes:
- Number of executor threads can be changed after the topology has been started (
rebalance
) - number of tasks of a topology is static
Configuring parallelism:
parallelism hint
specifies the initial number of executor threads of a component.
Property | Desc | Configuration option | code |
---|---|---|---|
worker processes | how many worker processes to create for the topology across machines in the cluster | TOPOLOGY_WORKERS |
Config.setNumWorkers |
executors(threads) | how many executors to spawn per component | - | TopologyBuilder.setSpouts() & TopologyBuilder.setBolts() |
tasks | how many tasks to create per component | TOPOLOGY_TASKS |
ComponentConfigurationDeclarer.setNumTasks() |
Example topology illustrating parallelism:
Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // parallelism hint
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout");
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
.shuffleGrouping("green-bolt");
StormSubmitter.submitTopology(
"mytopology",
conf,
topologyBuilder.createTopology()
);
###More Info:
###Strom code examples: