How to manage state in Trident Storm topologies

Code for this example@ https://github.com/sumitchawla/storm-examples

Trident API is in Storm Topologies is just another abstraction on how “Stream” of data is processed in Storm.

Basic Storm stream processing guarantees “at least once” message processing, whereas Trident API guarantees “exactly once” message processing.  In simple terms, that means,  basic stream processing makes sure that no message is ever lost. To achieve that, storm might replay the same message again and again, until it is certain that the message is processed successfully.   There is no direct way to figure out if the message has been played first time, or its being replayed due to an error or failure.   Trident API solves this problem partially by grouping this message into a batch.  If Trident API, needs to replay the same message again, it will come back with same Batch Id.  The application receiving this message will have to keep track of this Batch Id (this will become more clearer once we go through the example).

Lets’s dive into the example, and we will discuss the concepts as we encounter them in the example.

US Presidential Election

In this example, we use the Trident Topologies to keep live track of the US Presidential Election.  The election is between two parties Republicans and Democrats.  We will be keeping track of votes coming from 100 different constituencies, and will be maintaining aggregates on who is leading the election at a particular point of time.  Disclaimer: This is a purely random simulation, so results are unpredictable.  Any party could be leading at any point of time 🙂

We start by creating a RandomVoteSpout This spout represents a particular constituency where voting is being held.  The spout produces a batch of 100 votes per second, and randomly divides these votes between two parties.

topology.newStream("voting-events", new RandomVoteSpout())
//Lets call this stream the VoteStream
.name("VoteStream")
//Each spout will act like a different constituency.
//For this election we will have 100 constituencies.
.parallelismHint(100)

RandomVoteSpout implements the IBatchSpout interface.  The spout emits a new batch of votes everytime the emitBatch method is called.  Every spout gets a unique id which is used as a constituency id here.

//RandomVoteSpout keeps emitting batches of 100 random votes
// in that constituency. Each batch has  random number of votes
// for either parties. So results are unpredictable 🙂
public static class RandomVoteSpout implements IBatchSpout {
    Random _rand;
    Integer _constituencyId;

    public void open(Map conf, TopologyContext context,
                     SpoutOutputCollector collector) {
    }

    public void open(Map map, 
                      TopologyContext topologyContext) {
        _rand = new Random();
        _constituencyId = topologyContext.getThisTaskIndex();
    }

    public void emitBatch(long batchId, 
                           TridentCollector collector) {
        Utils.sleep(1000);
        //Wait for 1 sec, and then emit a new batch of
        // 100 random votes for this constituency.
        for(Integer count = 0; count< 100; count++){
            PartyName[] partyNames = PartyName.values();
            collector.emit(new Values(this._constituencyId,
                    partyNames[_rand.nextInt(partyNames.length)]
                            .toString()));
        }
    }

    public void ack(long l) {
    }

    public void close() {
    }

    public Map getComponentConfiguration() {
        return null;
    }

    public Fields getOutputFields() {
        return new Fields("ConstituencyId", "PartyName");
    }
}

Next, we partition our Vote Stream by ConstituencyId.  This is to make sure that all the batches generated by a particular Constituency go to same aggregator. These partitions are then passed to VoteCountAggregator bolts.  Each VoteCountAggregator does a very simple vote count by party in the batch.

 Grouping partitonGroup = new Grouping();
 List<String> fields = new ArrayList<String>();
 fields.add("ConstituencyId");
 partitonGroup.set_fields(fields);
//Votes generated by a particular constituency
// should go to a same count aggregator.
.partition(partitonGroup)
//VoteCountAggregator handles batches of votes.
// Its does a very simple of aggregation.
// Maintains a separate count for each party
//PartyName, and ConstituencyId are the keys for partitioning
// to the VoteCountAggregator, and ConstituencyId, count is the
// output of the aggregator
.partitionAggregate(new Fields("ConstituencyId", "PartyName"),
new VoteCountAggregator(),
new Fields("ConstituencyId", "count"))
.name("CountAggregator")
//Lets have 10 centers doing VoteCountAggregator for
// 100 constituency.
// On Average each aggregator handles 10 constituencies
.parallelismHint(10)

VoteCountAggregator handles all votes coming for a particular constituency.   It extends BaseAggregator, and implements following methods:

  • init  –  This method is called every time a new batch comes in.  The method receives the id of the batch coming in.  Here you can check if you have already handled this batch id.  If yes, you could simply discard each incoming tuple for this batch.  For our example, we create a new ConstituencyState object which will hold aggregates for this particular batch.
  • aggregate – This method is called for each tuple in this batch.  For our example, we simply increment the vote count for the party for which we received this vote.
  • complete –  This method is called as soon as batch is complete.  Here we simply emit the ConstituencyState object. This object has counts of Votes by party.
//VoteCountAggregator handles all votes coming
// for a particular constituency.
public static class VoteCountAggregator
        extends BaseAggregator<VoteCountAggregator.CountState> {
    private static final Logger LOG =
            Logger.getLogger(VoteCountAggregator.class);
    private Object _batchId;
    static class CountState {
        Integer constituencyId;
        ConstituencyState state = new ConstituencyState();
    }

    public CountState init(Object batchId,
                           TridentCollector collector) {
        this._batchId = batchId;
        CountState state = new CountState();
        return state;
    }

    // Aggregate function will be called for
    // each vote generated by a Constituency
    // in a particular batch
    public void aggregate(CountState state,
                          TridentTuple tuple,
                          TridentCollector collector) {
        Integer constituencyId =
                tuple.getIntegerByField("ConstituencyId");
        state.constituencyId = constituencyId;
        PartyName partyName =
                PartyName.valueOf(
                        tuple.getStringByField("PartyName"));
        LOG.debug(String.format("Got tuple %s %s %s",
                constituencyId, partyName,
                state.state.DemocratsVotes));
        if (partyName == PartyName.Democratic) {
            state.state.DemocratsVotes++;
        } else if (partyName == partyName.Republican) {
            state.state.RepublicanVotes++;
        }
    }

    // complete is called as soon as all
    // the votes in the particular batch are processed.
    public void complete(CountState state,
                         TridentCollector collector) {
        LOG.debug(String.format("Emitting counts " +
                        "for Constituency %d" +
                        " - Democracts %d " +
                        "Republican %d for Batch %s",
                state.constituencyId,
                state.state.DemocratsVotes,
                state.state.RepublicanVotes,
                _batchId));
        collector.emit(new Values(state.constituencyId, state.state));
    }
}

Excellent, So now we know how to handle a batch of tuples using Trident API.  Our next step is to learn how to run the aggregator on multiple batches. For this we are going to use three new classes:

  •  ElectionState – Its just a uber class which holds the ultimate results, and sums up all incoming branches.  For our example, we are doing an in memory aggregation.  This class could write to database/memcache/redis based on your requirement.
  •  StateUpdater  – This class receives a collection of BatchIds and their aggregator tuples.   This is the tuple that was emitted by VoteCountAggregator in the complete method.  Hence we receive ConstituencyState in this tuple.  You would get a single tuple per batch here.
  • StateUpdaterFactory –  This is just factory class which creates an object of ElectionState for our example.  Since we are using a static ElectionState object here, we would get a single object of ElectionState created in our entire topology.

Order of Batch State Updates:

Storm documentation mentions that StateUpdater will be called only in a serial manner for the batches.  For example, lets say Batch 2 , 3 are already done and VoteCountAggregator has emitted ConstituencyState for these batches already.  However,  Batch 1 is still processing.  Storm would not call StateUpdater for Batch 2, 3 until Batch 1 is completed.  As soon as Batch 1 is completed, it would call StateUpdater for Batch 1 followed by Batch 2, and then Batch 3.  In other words,  you would always receive Batch updates in sequence, and never our of sequence.

//Finally we have a Single Authority maintaining
//overall state of the election.
.partitionPersist(new StateUpdaterFactory(),
new Fields("ConstituencyId", "count"),
new StateUpdater());

As mentioned earlier, StateUpdater receives the aggregation per batch, and updates the static ElectionState.

//Handles incoming aggregators from each batch,
// and updates the uber Election state
public static class StateUpdater
        extends BaseStateUpdater<ElectionState> {
    public void updateState(ElectionState state,
                            List<TridentTuple> tuples,
                            TridentCollector collector) {
        List<Integer> ids = new ArrayList<Integer>();
        List<ConstituencyState> states = 
            new ArrayList<ConstituencyState>();
        for(TridentTuple t: tuples){
            ids.add(t.getInteger(0));
            states.add((ConstituencyState) t.get(1));
        }
        state.setStateValues(ids, states);
    }
}

public static class StateUpdaterFactory
        implements StateFactory {
    //We keep a single static object to keep track of all the states
    static ElectionState electionState = new ElectionState();
    public storm.trident.state.State
        makeState(Map conf, IMetricsContext metricsContext,
                  int partitionIndex, int numPartitions) {
        return electionState;
    }
}

public static class ElectionState implements State {
    private static final Logger LOG =
            Logger.getLogger(ElectionState.class);

    private final Map<Integer, ConstituencyState> totalCount;

    public ElectionState() {
        totalCount = new HashMap<Integer, ConstituencyState>();
    }


    public void setStateValues(List<Integer> constituencyList,
                               List<ConstituencyState> batchStates){
        for(int i=0; i<constituencyList.size(); i++) {
            Integer constituencyId = constituencyList.get(i);
            ConstituencyState batchUpdateForConstituency =
                    batchStates.get(i);
            if (!totalCount.containsKey(constituencyId)){
                totalCount.put(constituencyId,
                        new ConstituencyState());
            }

            ConstituencyState totalCountsForConstituency =
                    totalCount.get(constituencyId);
            totalCountsForConstituency.DemocratsVotes +=
                    batchUpdateForConstituency.DemocratsVotes;
            totalCountsForConstituency.RepublicanVotes+=
                    batchUpdateForConstituency.RepublicanVotes;
            LOG.debug(String.format("Commited counts for " +
                            "Constituency %d - Current Counts - "+
                            "Democrats %d Republican %d ",
                              constituencyId,
                    totalCountsForConstituency.DemocratsVotes,
                    totalCountsForConstituency.RepublicanVotes));
        }
    }

    public List<ConstituencyState>
        getCurrentCountsForConstituency(
            List<Integer> constituencyList) {
        List<ConstituencyState> states =
                new ArrayList<ConstituencyState>();
        for(int i=0; i<constituencyList.size(); i++) {
            Integer constituencyId = constituencyList.get(i);
            LOG.debug(String.format("GET for %d", constituencyId));
            if (totalCount.containsKey(constituencyId)) {
                states.add(totalCount.get(constituencyId));
            } else {
                states.add(new ConstituencyState());
            }
        }
        return states;
    }

    public List<ConstituencyState> getAllCounts() {
        return new ArrayList<ConstituencyState>(
                totalCount.values());
    }

    public void beginCommit(Long aLong) {

    }

    public void commit(Long aLong) {

    }
}

So we are done. Our ElectionState class is maintaining constituency wise votes counts. The variable totalCounts hold the entire information about election. It knows who is leading in which constituency, and number of votes cast for each party in a constituency.  This is great place to track this information .  We can dump it into a database , or might as well push into Redis. Our application can pull this information, and display the current trends/results.

For completeness of our example,  we will use two QueryFunctions.  These functions can get information from ElectionState class and tell us what the state of election.  This will save us writing code to read database or query Redis.

  • QueryConstituencyState – This class received a list of inputs containing constituency ids, and returns the number of votes by party for those constituencies.
  • QueryElectionLeader – This class tells us which party is leading in how many constituency.  This information keeps updating as new votes are received in new batches.
public static class QueryConstituencyCount
            extends BaseQueryFunction<ElectionState, Object> {
        public List<Object> batchRetrieve(
                ElectionState state,
                List<TridentTuple> inputs) {
            List<Integer> ids =
                    new ArrayList<Integer>();
            List<Object> result =
                    new ArrayList<Object>();
            for(TridentTuple input: inputs) {
                ids.add(Integer.valueOf(input.getString(0)));
            }
            List<ConstituencyState> currentStates =
                    state.getCurrentCountsForConstituency(ids);
            for(ConstituencyState constituencyState :
                    currentStates ) {
                HashMap<String,Long> counts =
                        new HashMap<String, Long>();
                counts.put(PartyName.Democratic.toString(),
                        constituencyState.DemocratsVotes);
                counts.put(PartyName.Republican.toString(),
                        constituencyState.RepublicanVotes);
                result.add(counts);
            }
            return result;
        }

        public void execute(TridentTuple tuple,
                            Object result,
                            TridentCollector collector) {
            collector.emit(new Values(result));
        }
    }

    public static class QueryElectionLeader
            extends BaseQueryFunction<ElectionState, Object> {
        public List<Object> batchRetrieve(
                ElectionState state,
                List<TridentTuple> inputs) {
            List<Integer> ids =
                    new ArrayList<Integer>();
            List<Object> result =
                    new ArrayList<Object>();
            List<ConstituencyState> currentStates =
                    state.getAllCounts();
            Long democratsLeadingCount = 0L;
            Long republicansLeadingCount = 0L;
            for(ConstituencyState constituencyState :
                    currentStates ) {
                if (constituencyState.DemocratsVotes >
                        constituencyState.RepublicanVotes) {
                    democratsLeadingCount++;
                } else {
                    republicansLeadingCount++;
                }
            }
            Map<String, Long> map = new HashMap<String, Long>();
            map.put(PartyName.Democratic.toString(),
                    democratsLeadingCount);
            map.put(PartyName.Republican.toString(),
                    republicansLeadingCount);
            result.add(map);
            return result;
        }

        public void execute(TridentTuple tuple, 
                Object result, 
                TridentCollector collector) {
            collector.emit(new Values(result));
        }
    }

Above Query functions can be accessed by using a DRPC query.  You can  get the entire code from https://github.com/sumitchawla/storm-examples

You can run the above example by running following command:

mvn compile exec:java

You should see following as update.  The first line represents the current votes from Constituency 1, and second line represents party wise counts where the party has more votes than the opposition party.  As votes keep coming in the counts keep changing:

Current Counts for Constituency [[“1”,{“Republican”:2966,”Democratic”:3034}]]
Current Election Counts [[“”,{“Republican”:48,”Democratic”:52}]]
Current Counts for Constituency [[“1”,{“Republican”:2966,”Democratic”:3034}]]
Current Election Counts [[“”,{“Republican”:49,”Democratic”:51}]]
Current Counts for Constituency [[“1”,{“Republican”:2966,”Democratic”:3034}]]
Current Election Counts [[“”,{“Republican”:51,”Democratic”:49}]]

.

.

.

Current Counts for Constituency [[“1”,{“Republican”:5450,”Democratic”:5550}]]
Current Election Counts [[“”,{“Republican”:48,”Democratic”:52}]]
Current Counts for Constituency [[“1”,{“Republican”:5953,”Democratic”:6047}]]
Current Election Counts [[“”,{“Republican”:49,”Democratic”:51}]]
Current Counts for Constituency [[“1”,{“Republican”:5953,”Democratic”:6047}]]
Current Election Counts [[“”,{“Republican”:49,”Democratic”:51}]]

References:

2 thoughts on “How to manage state in Trident Storm topologies

  1. Good example, Sumit. There’s another blog by a certain Sunil which explains Trident using word counting, but your example is way better. The explanation here presents a unique solution and customization which helps a lot in understanding how Trident works. Thanks for putting this up!

    Like

Leave a comment