Saturday, July 25, 2015

Writing your first Storm Topology

This blog contains multiple posts on Storm, its installation, shell scripts for installation of a Storm cluster and integration of Storm with HBase and RDBMS. But if you're a newbie to Storm this one's for you. Here you can find how to write a simple topology in Storm just like the "Hello World" in Java. So let's get rolling!

Logical Components of a Storm Topology 


The task submitted to a Storm cluster is called a Topology. It is a workflow of linked individual and dependent processing units of work that compositely define the task and the flow in which it is to be executed. These “individual and dependent processing units” can be of the following two types:
  1. Spout: This component of the topology is responsible for fetching the data stream from the source and forwarding the stream data after filtering or pre-processing (if required) to the bolts where the actual processing of this data is supposed to occur. There can be multiple Spouts in a topology each of which might be consuming data from different sources.
  2. Bolt: As mentioned above, the bolts comprise of the computational part of the topology. The “streams” are typically an unbounded sequence of tuples. The data stream composing of smaller units of data each of which is called a “tuple”, arrive at the bolt, where they get processed. A topology is not restricted to a workflow composing just a single spout and a single bolt. It can be a chain of multiple bolts consuming the tuples generated by spouts or by preceding bolts in the workflow as shown below:

A simplistic view of a Storm Topology and its components


This post would walk you through basics of writing Storm topologies and their components in Java.

Creating Spouts


Although Bolts are the processing units where the actual computation occurs but Spouts are the connecting links between the Storm Topology and the external sources of data. For every different source (for example, Kafka, HBase, MongoDB, Cassandra etc.) of data, we need to have specialized Spouts which can connect to the respective source and start consuming data to be able to emit a stream of tuples.
The bolts are simpler to implement since they include the core processing logic very much as we would have in a simple java program and are also abstract from the source of the data.

Any spout in Java is a class implementing the IRichSpout interface. Out of the various overridden methods, following two are of prime focus for us:

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// Called only once, when the spout is initialized
/ Initialization stuff including variables initialization, creation of log files, creating connection to remote servers if required for example to a Kafka Broker, HBase Master etc.
}

public void nextTuple() {
// called iteratively until the topology is not killed or stopped.
// with every call to this method, a tuple is emitted using a method called “emit” defined in the SpoutOutputCollector class. So, it includes code that defines what value of tuple to emit.
}

To clarify more, here is an example of a simple spout that does not connect to any external source but keeps emitting strings as tuples chosen randomly from a list.

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
}

@Override
public void nextTuple() {
Utils.sleep(1000);
String[] words = new String[] {"hello", "india", "how", "are", "you"};                                                                                            
Integer[] numbers = new Integer[] {
11, 22, 33, 44, 55
};

if(count == numbers.length -1) {
count = 0;
}
count ++;
int number = numbers[count];
String word = words[count];
_collector.emit(new Values(word, number));
}

If you need to connect to some external source to obtain you stream data, you would need to define the code for connecting to it in the open() method and then keep fetching the data from this source in the nextTuple() method to emit it further.

Creating Bolts


A bolt is a Java class implementing the interface “IBasicBolt”. Just as we have two methods to focus in case of Spout amongst the set of overridden methods, we have two methods in case of Bolt as well which are as follows:

public void prepare(Map stormConf, TopologyContext context) {
// similar to the open() method of a Spout.
// Called only once, when the bolt is initialized and contains all the initialization stuff.
}

public void execute(Tuple input, BasicOutputCollector collector) {
// called iteratively until the topology is not killed or stopped.
// every time a tuple is received this method is called, and the tuple gets processed as per the definition of this method
}

Continuing with the sample spout we saw above which emits strings from a give list, here we would define a bolt that would append an “!!!” to the received string as a part of the tuple. Here’s the java code for the two methods:

public void prepare(Map stormConf, TopologyContext context) {
//create an output log file where the output results would be logged
try {
String logFileName = logFileLocation;
// “file” and “outputFile” have already been declared as class variables.                                                                                                                                                                    
file = new FileWriter(logFileName);
outputFile = new PrintWriter(file);
outputFile.println("In the prepare() method of bolt");
} catch (IOException e) {
System.out.println(“An exception has occurred”);
e.printStackTrace();
}
}

public void execute(Tuple input, BasicOutputCollector collector) {
// get the string that needs to be processed from the tuple
String inputMsg = input.getString(0);
inputMsg = inputMsg + “!!!”
outputFile.println("Received the message:" +  inputMsg);
outputFile.flush();
        collector.emit(tuple(inputMsg));
}

I am sure you must have already made out that, the simple string processing can be replaced by any complex computations that are required as per your problem statement.

Creating Topologies


After you are done with building the blocks of your topology, you also need to link them to form a workflow, which would define a sequence in which they would get executed, precisely you need to create a topology.

A java class that creates an instance of all the spouts and bolt classes and links them together using a TopologyBuilder class object is what we need to achieve here. Have a look at the code snippet below illustrating how to create a topology with the above spout and bolt as a part of it:

public static void main(String[] args) {

// Number of workers to be used for the topology
int numberOfWorkers = 2;

// Number of executors to be used for the spout
int numberOfExecutorsSpout = 1;

// Number of executors to be used for the bolt
int numberOfExecutorsBolt = 1;

// IP of the Storm cluster node on which Nimbus runs
String nimbusHost = "your_nimbus_host_ip";

TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();

//set the spout for the topology
builder.setSpout("spout",new TestSpout(false), numberOfExecutorsSpout);

//set the bolt for the topology
builder.setBolt("bolt",new TestBolt(), numberOfExecutorsBolt).shuffleGrouping("spout");

// job configuration for remote Storm cluster starts
conf.setNumWorkers(numberOfWorkers);
conf.put(Config.NIMBUS_HOST, nimbusHost);
conf.put(Config.NIMBUS_THRIFT_PORT, 6627L);

// job configuration for a remote Storm cluster
try {
StormSubmitter.submitTopology("test_topology", conf, builder.createTopology());
} catch (AlreadyAliveException e) {
System.out.println("Topology with the Same name is already running on the cluster.");                                                                                          
e.printStackTrace();
} catch (InvalidTopologyException e) {
System.out.println("Topology seems to be invalid.");
e.printStackTrace();
}
}

The above code would launch a topology with name “test_topology” on the Storm cluster.

Hope it helped. Cheers !

No comments:

Post a Comment