Hadoop – GC overhead limit exceeded error

In our Hadoop setup, we ended up having more than 1 million files in a single folder.  The folder had so many files, that any hdfs dfs command like -ls, -copyToLocal on the files was giving following error:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.util.Arrays.copyOf(Arrays.java:2367)
        at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
        at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
        at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
        at java.lang.StringBuffer.append(StringBuffer.java:237)
        at java.net.URI.appendAuthority(URI.java:1852)
        at java.net.URI.appendSchemeSpecificPart(URI.java:1890)
        at java.net.URI.toString(URI.java:1922)
        at java.net.URI.<init>(URI.java:749)
        at org.apache.hadoop.fs.Path.initialize(Path.java:203)
        at org.apache.hadoop.fs.Path.<init>(Path.java:116)
        at org.apache.hadoop.fs.Path.<init>(Path.java:94)
        at org.apache.hadoop.hdfs.protocol.HdfsFileStatus.getFullPath(HdfsFileStatus.java:230)
        at org.apache.hadoop.hdfs.protocol.HdfsFileStatus.makeQualified(HdfsFileStatus.java:263)
        at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:732)
        at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
        at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755)
        at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751)
        at org.apache.hadoop.fs.shell.PathData.getDirectoryContents(PathData.java:268)
        at org.apache.hadoop.fs.shell.Command.recursePath(Command.java:347)
        at org.apache.hadoop.fs.shell.CommandWithDestination.recursePath(CommandWithDestination.java:291)
        at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:308)
        at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:278)
        at org.apache.hadoop.fs.shell.CommandWithDestination.processPathArgument(CommandWithDestination.java:243)
        at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:260)
        at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:244)
        at org.apache.hadoop.fs.shell.CommandWithDestination.processArguments(CommandWithDestination.java:220)
        at org.apache.hadoop.fs.shell.Command.processRawArguments(Command.java:190)
        at org.apache.hadoop.fs.shell.Command.run(Command.java:154)
        at org.apache.hadoop.fs.FsShell.run(FsShell.java:287)

After doing some research, we added following environment variable to update Hadoop runtime options.

export HADOOP_OPTS="-XX:-UseGCOverheadLimit"

Adding this option fixed the GC error, but started throwing the following error, citing the lack of Java Heap space.

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1351)
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1413)
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1524)
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1533)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:557)
        at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy15.getListing(Unknown Source)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1952)
        at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:724)
        at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
        at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755)
        at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751)
        at org.apache.hadoop.fs.shell.PathData.getDirectoryContents(PathData.java:268)
        at org.apache.hadoop.fs.shell.Command.recursePath(Command.java:347)
        at org.apache.hadoop.fs.shell.CommandWithDestination.recursePath(CommandWithDestination.java:291)
        at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:308)
        at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:278)
        at org.apache.hadoop.fs.shell.CommandWithDestination.processPathArgument(CommandWithDestination.java:243)
        at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:260)
        at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:244)
        at org.apache.hadoop.fs.shell.CommandWithDestination.processArguments(CommandWithDestination.java:220)
        at org.apache.hadoop.fs.shell.Command.processRawArguments(Command.java:190)
        at org.apache.hadoop.fs.shell.Command.run(Command.java:154)
        at org.apache.hadoop.fs.FsShell.run(FsShell.java:287)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)

We modified the above export, and tried following instead.  Note that instead of  HADOOP_OPTS,  we needed to set HADOOP_CLIENT_OPTS fix this error. This was needed because all the hadoop commands run as a client.  HADOOP_OPTS needs to be setup for modifying actual Hadoop run time, and HADOOP_CLIENT_OPTS is needed to be setup for modifying run time for Hadoop command line client.

export HADOOP_CLIENT_OPTS="-XX:-UseGCOverheadLimit -Xmx4096m"

 

How to fix NameNode – SafeModeException

When you try to do any HDFS operation, you get following exception:

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.SafeModeException): 
Cannot create directory /user/hadoopuser/dir/in. 
Name node is in safe mode.

What is Safe Mode in Hadoop?

Safe Mode in Hadoop is a maintenance state of NameNode.  During Safe Mode, HDFS cluster is read-only, and does not allow any changes. It doesn’t replicate or delete blocks.

When the Namenode Starts, it automatically enters the Safe mode, and it performs following initialization tasks:

  • Loads the file system namespace from the last known saved fsimage,
  • Loads the edit log file.
  • Applies edits log file changes on fsimage,  and created in new file system namespace.
  • Receives block reports containing information about block locations from all Datanodes

To leave Safe Mode, NameNode should collect reports for at least a specified threshold percentage of blocks and these should satisfy minimum replication condition.Even though this threshold may be reached fast, safe mode will extend to the configurable amount of time . This is make sure that remaining DataNodes check in before it starts replicating missing blocks or deleting over replicated blocks. After completion of block replication maintenance activity, the name node leaves safe mode automatically.

 

You can check if your Hadoop cluster by running following command:

hdfs dfsadmin -safemode get

If you just restarted your cluster, you should give it ample time to recover from Safemode.  This time can vary based on size of your cluster.  If its stuck in dhat state, then that can be fixed by using following command:

hdfs dfsadmin -safemode leave

How to parse argument parameters in bash shell?

This script will help you to understand how to parse argument parameters in bash shell.

Lets start by declaring some default values for parameters. This is to handle the case in which the argument parameters are not passed to the script.

#!/bin/sh
export ALPHA="DEFAULT_ALPHA"
export BETA="DEFAULT_BETA"

 

The next we add a case statement, and handle each argument specifically.  In case we find a match with the parameter name, we set the environment variable with the next token passed in the input.  After that shift operation removes the argument and argument parameter value from the stack, and proceeds to processing the next argument parameter.

while true ; do
  case "$1" in
    --alpha) export ALPHA="$2" ; shift 2 ;;
    --beta) export BETA="$2" ; shift 2 ;;
    *) break ;;
  esac
done

Next lets print the values once we are done with parsing, and store the above script to a file bash_command_parsing.sh.  Change the mode on this script so that it can be run as executable.

echo "GOT ALPHA $ALPHA"
echo "GOT BETA $BETA"

 

Lets run the script. First we run it without passing any argument, and then we pass it both arguments.

$> ./bash_command_parsing.sh
GOT ALPHA DEFAULT_ALPHA
GOT BETA DEFAULT_BETA
$> ./bash_command_parsing.sh --alpha 0.4 --beta 0.2
GOT ALPHA 0.4
GOT BETA 0.2

 

Note that the argument parsing is very unforgiving in this example, and bails out as soon as it encounters  any unhandled parameters.  In the first example, script passes the first argument correctly, and then bails out as soon as it sees a unhandled parameter.  In the second example, the the first parameter itself is unhandled, so script would not even try to parse the second parameter.

$> ./bash_command_parsing.sh --alpha 0.4 --beta1 0.2
GOT ALPHA 0.4
GOT BETA DEFAULT_BETA
 $> ./bash_command_parsing.sh --alpha1 0.4 --beta 0.2
GOT ALPHA DEFAULT_ALPHA
GOT BETA DEFAULT_BETA

VM warning: Insufficient space for shared memory file

While we were experimenting with MapReduce programs in our hadoop cluster, we started noticing following errors.

Java HotSpot(TM) 64-Bit Server VM warning: Insufficient space for shared memory file:
/tmp/hsperfdata_hdfs/28099
Try using the -Djava.io.tmpdir= option to select an alternate temp location.

Exception in thread "main" java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:80)
at org.apache.hadoop.util.RunJar.unJar(RunJar.java:107)
at org.apache.hadoop.util.RunJar.unJar(RunJar.java:81)
at org.apache.hadoop.util.RunJar.run(RunJar.java:209)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

On first look it seemed as if disk is full, and that would be causing the jobs to fail.  Further analysis showed that /tmp directory was mounted with allocated space of just 300M.  Remounting the /tmp drive with 2GB space solved the problem.

sudo mount -o remount,size=2G /tmp

How to manage state in Trident Storm topologies

Code for this example@ https://github.com/sumitchawla/storm-examples

Trident API is in Storm Topologies is just another abstraction on how “Stream” of data is processed in Storm.

Basic Storm stream processing guarantees “at least once” message processing, whereas Trident API guarantees “exactly once” message processing.  In simple terms, that means,  basic stream processing makes sure that no message is ever lost. To achieve that, storm might replay the same message again and again, until it is certain that the message is processed successfully.   There is no direct way to figure out if the message has been played first time, or its being replayed due to an error or failure.   Trident API solves this problem partially by grouping this message into a batch.  If Trident API, needs to replay the same message again, it will come back with same Batch Id.  The application receiving this message will have to keep track of this Batch Id (this will become more clearer once we go through the example).

Lets’s dive into the example, and we will discuss the concepts as we encounter them in the example.

US Presidential Election

In this example, we use the Trident Topologies to keep live track of the US Presidential Election.  The election is between two parties Republicans and Democrats.  We will be keeping track of votes coming from 100 different constituencies, and will be maintaining aggregates on who is leading the election at a particular point of time.  Disclaimer: This is a purely random simulation, so results are unpredictable.  Any party could be leading at any point of time 🙂

We start by creating a RandomVoteSpout This spout represents a particular constituency where voting is being held.  The spout produces a batch of 100 votes per second, and randomly divides these votes between two parties.

topology.newStream("voting-events", new RandomVoteSpout())
//Lets call this stream the VoteStream
.name("VoteStream")
//Each spout will act like a different constituency.
//For this election we will have 100 constituencies.
.parallelismHint(100)

RandomVoteSpout implements the IBatchSpout interface.  The spout emits a new batch of votes everytime the emitBatch method is called.  Every spout gets a unique id which is used as a constituency id here.

//RandomVoteSpout keeps emitting batches of 100 random votes
// in that constituency. Each batch has  random number of votes
// for either parties. So results are unpredictable 🙂
public static class RandomVoteSpout implements IBatchSpout {
    Random _rand;
    Integer _constituencyId;

    public void open(Map conf, TopologyContext context,
                     SpoutOutputCollector collector) {
    }

    public void open(Map map, 
                      TopologyContext topologyContext) {
        _rand = new Random();
        _constituencyId = topologyContext.getThisTaskIndex();
    }

    public void emitBatch(long batchId, 
                           TridentCollector collector) {
        Utils.sleep(1000);
        //Wait for 1 sec, and then emit a new batch of
        // 100 random votes for this constituency.
        for(Integer count = 0; count< 100; count++){
            PartyName[] partyNames = PartyName.values();
            collector.emit(new Values(this._constituencyId,
                    partyNames[_rand.nextInt(partyNames.length)]
                            .toString()));
        }
    }

    public void ack(long l) {
    }

    public void close() {
    }

    public Map getComponentConfiguration() {
        return null;
    }

    public Fields getOutputFields() {
        return new Fields("ConstituencyId", "PartyName");
    }
}

Next, we partition our Vote Stream by ConstituencyId.  This is to make sure that all the batches generated by a particular Constituency go to same aggregator. These partitions are then passed to VoteCountAggregator bolts.  Each VoteCountAggregator does a very simple vote count by party in the batch.

 Grouping partitonGroup = new Grouping();
 List<String> fields = new ArrayList<String>();
 fields.add("ConstituencyId");
 partitonGroup.set_fields(fields);
//Votes generated by a particular constituency
// should go to a same count aggregator.
.partition(partitonGroup)
//VoteCountAggregator handles batches of votes.
// Its does a very simple of aggregation.
// Maintains a separate count for each party
//PartyName, and ConstituencyId are the keys for partitioning
// to the VoteCountAggregator, and ConstituencyId, count is the
// output of the aggregator
.partitionAggregate(new Fields("ConstituencyId", "PartyName"),
new VoteCountAggregator(),
new Fields("ConstituencyId", "count"))
.name("CountAggregator")
//Lets have 10 centers doing VoteCountAggregator for
// 100 constituency.
// On Average each aggregator handles 10 constituencies
.parallelismHint(10)

VoteCountAggregator handles all votes coming for a particular constituency.   It extends BaseAggregator, and implements following methods:

  • init  –  This method is called every time a new batch comes in.  The method receives the id of the batch coming in.  Here you can check if you have already handled this batch id.  If yes, you could simply discard each incoming tuple for this batch.  For our example, we create a new ConstituencyState object which will hold aggregates for this particular batch.
  • aggregate – This method is called for each tuple in this batch.  For our example, we simply increment the vote count for the party for which we received this vote.
  • complete –  This method is called as soon as batch is complete.  Here we simply emit the ConstituencyState object. This object has counts of Votes by party.
//VoteCountAggregator handles all votes coming
// for a particular constituency.
public static class VoteCountAggregator
        extends BaseAggregator<VoteCountAggregator.CountState> {
    private static final Logger LOG =
            Logger.getLogger(VoteCountAggregator.class);
    private Object _batchId;
    static class CountState {
        Integer constituencyId;
        ConstituencyState state = new ConstituencyState();
    }

    public CountState init(Object batchId,
                           TridentCollector collector) {
        this._batchId = batchId;
        CountState state = new CountState();
        return state;
    }

    // Aggregate function will be called for
    // each vote generated by a Constituency
    // in a particular batch
    public void aggregate(CountState state,
                          TridentTuple tuple,
                          TridentCollector collector) {
        Integer constituencyId =
                tuple.getIntegerByField("ConstituencyId");
        state.constituencyId = constituencyId;
        PartyName partyName =
                PartyName.valueOf(
                        tuple.getStringByField("PartyName"));
        LOG.debug(String.format("Got tuple %s %s %s",
                constituencyId, partyName,
                state.state.DemocratsVotes));
        if (partyName == PartyName.Democratic) {
            state.state.DemocratsVotes++;
        } else if (partyName == partyName.Republican) {
            state.state.RepublicanVotes++;
        }
    }

    // complete is called as soon as all
    // the votes in the particular batch are processed.
    public void complete(CountState state,
                         TridentCollector collector) {
        LOG.debug(String.format("Emitting counts " +
                        "for Constituency %d" +
                        " - Democracts %d " +
                        "Republican %d for Batch %s",
                state.constituencyId,
                state.state.DemocratsVotes,
                state.state.RepublicanVotes,
                _batchId));
        collector.emit(new Values(state.constituencyId, state.state));
    }
}

Excellent, So now we know how to handle a batch of tuples using Trident API.  Our next step is to learn how to run the aggregator on multiple batches. For this we are going to use three new classes:

  •  ElectionState – Its just a uber class which holds the ultimate results, and sums up all incoming branches.  For our example, we are doing an in memory aggregation.  This class could write to database/memcache/redis based on your requirement.
  •  StateUpdater  – This class receives a collection of BatchIds and their aggregator tuples.   This is the tuple that was emitted by VoteCountAggregator in the complete method.  Hence we receive ConstituencyState in this tuple.  You would get a single tuple per batch here.
  • StateUpdaterFactory –  This is just factory class which creates an object of ElectionState for our example.  Since we are using a static ElectionState object here, we would get a single object of ElectionState created in our entire topology.

Order of Batch State Updates:

Storm documentation mentions that StateUpdater will be called only in a serial manner for the batches.  For example, lets say Batch 2 , 3 are already done and VoteCountAggregator has emitted ConstituencyState for these batches already.  However,  Batch 1 is still processing.  Storm would not call StateUpdater for Batch 2, 3 until Batch 1 is completed.  As soon as Batch 1 is completed, it would call StateUpdater for Batch 1 followed by Batch 2, and then Batch 3.  In other words,  you would always receive Batch updates in sequence, and never our of sequence.

//Finally we have a Single Authority maintaining
//overall state of the election.
.partitionPersist(new StateUpdaterFactory(),
new Fields("ConstituencyId", "count"),
new StateUpdater());

As mentioned earlier, StateUpdater receives the aggregation per batch, and updates the static ElectionState.

//Handles incoming aggregators from each batch,
// and updates the uber Election state
public static class StateUpdater
        extends BaseStateUpdater<ElectionState> {
    public void updateState(ElectionState state,
                            List<TridentTuple> tuples,
                            TridentCollector collector) {
        List<Integer> ids = new ArrayList<Integer>();
        List<ConstituencyState> states = 
            new ArrayList<ConstituencyState>();
        for(TridentTuple t: tuples){
            ids.add(t.getInteger(0));
            states.add((ConstituencyState) t.get(1));
        }
        state.setStateValues(ids, states);
    }
}

public static class StateUpdaterFactory
        implements StateFactory {
    //We keep a single static object to keep track of all the states
    static ElectionState electionState = new ElectionState();
    public storm.trident.state.State
        makeState(Map conf, IMetricsContext metricsContext,
                  int partitionIndex, int numPartitions) {
        return electionState;
    }
}

public static class ElectionState implements State {
    private static final Logger LOG =
            Logger.getLogger(ElectionState.class);

    private final Map<Integer, ConstituencyState> totalCount;

    public ElectionState() {
        totalCount = new HashMap<Integer, ConstituencyState>();
    }


    public void setStateValues(List<Integer> constituencyList,
                               List<ConstituencyState> batchStates){
        for(int i=0; i<constituencyList.size(); i++) {
            Integer constituencyId = constituencyList.get(i);
            ConstituencyState batchUpdateForConstituency =
                    batchStates.get(i);
            if (!totalCount.containsKey(constituencyId)){
                totalCount.put(constituencyId,
                        new ConstituencyState());
            }

            ConstituencyState totalCountsForConstituency =
                    totalCount.get(constituencyId);
            totalCountsForConstituency.DemocratsVotes +=
                    batchUpdateForConstituency.DemocratsVotes;
            totalCountsForConstituency.RepublicanVotes+=
                    batchUpdateForConstituency.RepublicanVotes;
            LOG.debug(String.format("Commited counts for " +
                            "Constituency %d - Current Counts - "+
                            "Democrats %d Republican %d ",
                              constituencyId,
                    totalCountsForConstituency.DemocratsVotes,
                    totalCountsForConstituency.RepublicanVotes));
        }
    }

    public List<ConstituencyState>
        getCurrentCountsForConstituency(
            List<Integer> constituencyList) {
        List<ConstituencyState> states =
                new ArrayList<ConstituencyState>();
        for(int i=0; i<constituencyList.size(); i++) {
            Integer constituencyId = constituencyList.get(i);
            LOG.debug(String.format("GET for %d", constituencyId));
            if (totalCount.containsKey(constituencyId)) {
                states.add(totalCount.get(constituencyId));
            } else {
                states.add(new ConstituencyState());
            }
        }
        return states;
    }

    public List<ConstituencyState> getAllCounts() {
        return new ArrayList<ConstituencyState>(
                totalCount.values());
    }

    public void beginCommit(Long aLong) {

    }

    public void commit(Long aLong) {

    }
}

So we are done. Our ElectionState class is maintaining constituency wise votes counts. The variable totalCounts hold the entire information about election. It knows who is leading in which constituency, and number of votes cast for each party in a constituency.  This is great place to track this information .  We can dump it into a database , or might as well push into Redis. Our application can pull this information, and display the current trends/results.

For completeness of our example,  we will use two QueryFunctions.  These functions can get information from ElectionState class and tell us what the state of election.  This will save us writing code to read database or query Redis.

  • QueryConstituencyState – This class received a list of inputs containing constituency ids, and returns the number of votes by party for those constituencies.
  • QueryElectionLeader – This class tells us which party is leading in how many constituency.  This information keeps updating as new votes are received in new batches.
public static class QueryConstituencyCount
            extends BaseQueryFunction<ElectionState, Object> {
        public List<Object> batchRetrieve(
                ElectionState state,
                List<TridentTuple> inputs) {
            List<Integer> ids =
                    new ArrayList<Integer>();
            List<Object> result =
                    new ArrayList<Object>();
            for(TridentTuple input: inputs) {
                ids.add(Integer.valueOf(input.getString(0)));
            }
            List<ConstituencyState> currentStates =
                    state.getCurrentCountsForConstituency(ids);
            for(ConstituencyState constituencyState :
                    currentStates ) {
                HashMap<String,Long> counts =
                        new HashMap<String, Long>();
                counts.put(PartyName.Democratic.toString(),
                        constituencyState.DemocratsVotes);
                counts.put(PartyName.Republican.toString(),
                        constituencyState.RepublicanVotes);
                result.add(counts);
            }
            return result;
        }

        public void execute(TridentTuple tuple,
                            Object result,
                            TridentCollector collector) {
            collector.emit(new Values(result));
        }
    }

    public static class QueryElectionLeader
            extends BaseQueryFunction<ElectionState, Object> {
        public List<Object> batchRetrieve(
                ElectionState state,
                List<TridentTuple> inputs) {
            List<Integer> ids =
                    new ArrayList<Integer>();
            List<Object> result =
                    new ArrayList<Object>();
            List<ConstituencyState> currentStates =
                    state.getAllCounts();
            Long democratsLeadingCount = 0L;
            Long republicansLeadingCount = 0L;
            for(ConstituencyState constituencyState :
                    currentStates ) {
                if (constituencyState.DemocratsVotes >
                        constituencyState.RepublicanVotes) {
                    democratsLeadingCount++;
                } else {
                    republicansLeadingCount++;
                }
            }
            Map<String, Long> map = new HashMap<String, Long>();
            map.put(PartyName.Democratic.toString(),
                    democratsLeadingCount);
            map.put(PartyName.Republican.toString(),
                    republicansLeadingCount);
            result.add(map);
            return result;
        }

        public void execute(TridentTuple tuple, 
                Object result, 
                TridentCollector collector) {
            collector.emit(new Values(result));
        }
    }

Above Query functions can be accessed by using a DRPC query.  You can  get the entire code from https://github.com/sumitchawla/storm-examples

You can run the above example by running following command:

mvn compile exec:java

You should see following as update.  The first line represents the current votes from Constituency 1, and second line represents party wise counts where the party has more votes than the opposition party.  As votes keep coming in the counts keep changing:

Current Counts for Constituency [[“1”,{“Republican”:2966,”Democratic”:3034}]]
Current Election Counts [[“”,{“Republican”:48,”Democratic”:52}]]
Current Counts for Constituency [[“1”,{“Republican”:2966,”Democratic”:3034}]]
Current Election Counts [[“”,{“Republican”:49,”Democratic”:51}]]
Current Counts for Constituency [[“1”,{“Republican”:2966,”Democratic”:3034}]]
Current Election Counts [[“”,{“Republican”:51,”Democratic”:49}]]

.

.

.

Current Counts for Constituency [[“1”,{“Republican”:5450,”Democratic”:5550}]]
Current Election Counts [[“”,{“Republican”:48,”Democratic”:52}]]
Current Counts for Constituency [[“1”,{“Republican”:5953,”Democratic”:6047}]]
Current Election Counts [[“”,{“Republican”:49,”Democratic”:51}]]
Current Counts for Constituency [[“1”,{“Republican”:5953,”Democratic”:6047}]]
Current Election Counts [[“”,{“Republican”:49,”Democratic”:51}]]

References:

Storm Kafka – Support of Wildcard Topics.

Storm is great for consuming “streams” of data.  It offers good support for consuming topic streams from Kafka.  However, as of writing this article, Storm Kafka module does not support wildcard topics.  For example, lets say we want to consume all topics matching a particular pattern. e.g:

clickstream.abc.log

clickstream.def.log

clickstream.xyz.log

To consume this topic you will have to create multiple spouts.  And in case new topics get added, there is no way for you to consume those dynamically.  This was a big drawback of Storm Kafka module for me.  One option for me was to create my own Kafka Spout implementation.  But that would mean, i would miss all the existing features in Storm Kafka Spout.

To overcome this, i decided to augment the Storm Kafka code with this wildcard feature.  I have opened a merge request for this change https://github.com/apache/storm/pull/561.   Till this get merged, you can directly consume the change from my repo @ https://github.com/sumitchawla/storm.  Here are the steps:

  1. Add following repository.  Jitpack allows you to consume this github repo as a maven repository.
 <repository>
   <id>jitpack.io</id>
   <url>https://jitpack.io</url>
 </repository>

2.    Add following dependency to your maven project. Here version tag is the github commit number.

 <dependency>
  <groupId>com.github.sumitchawla</groupId>
  <artifactId>storm</artifactId>
  <version>fe7de58f09</version>
  <exclusions>
    <exclusion>
       <groupId>com.github.sumitchawla.storm</groupId>
       <artifactId>storm-core</artifactId>
    </exclusion>
    <exclusion>
       <groupId>com.github.sumitchawla.storm</groupId>
       <artifactId>storm-hive</artifactId>
    </exclusion>
  </exclusions>
 </dependency>

3.    For consuming the code, add following config to enable wildcard topic support

       config.put("kafka.topic.wildcard.match",true);

After this if you pass a wildcard topic to Kafka Spout e.g. “clickstream.*.log”, then it will match all topics matching this pattern.  It will discover for new topics dynamically, and you would need to create a single Kafka Spout.  You can use a parallelism hint so that all the partitions of all the matching topics get distributed equally.

Update:

As of Nov 2nd, 2015  this change has been absorbed in main branch of apache\storm code.

https://github.com/apache/storm/commit/60d9f81ba1b16f7711c487f831f202e49eda258c

References:

Install a Multi Node Hadoop Cluster on Ubuntu 14.04

This article is about multi-node installation of Hadoop cluster.  You would need minimum of 2 ubuntu machines or virtual images to complete a multi-node installation.  If you want to just try out a single node cluster, follow this article on Installing Hadoop on Ubuntu 14.04.

I used Hadoop Stable version 2.6.0 for this article. I did this setup on a 3 node cluster.  For simplicity, i will designate one node as master, and 2 nodes as slaves (slave-1, and slave-2). Make sure all slave nodes are reachable from master node.  To avoid any unreachable hosts error, make sure you add the slave hostnames and ip addresses in /etc/hosts file. Similarly, slave nodes should be able to resolve master hostname.

Installing Java on Master and Slaves

$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java7-installer
# Updata Java runtime
$ sudo update-java-alternatives -s java-7-oracle

Disable IPv6

As of now Hadoop does not support IPv6, and is tested to work only on IPv4 networks.   If you are using IPv6, you need to switch Hadoop host machines to use IPv4.  The Hadoop Wiki link provides a one liner command to disable the IPv6.  If you are not using IPv6, skip this step:

sudo sed -i 's/net.ipv6.bindv6only\ =\ 1/net.ipv6.bindv6only\ =\ 0/' \
/etc/sysctl.d/bindv6only.conf && sudo invoke-rc.d procps restart

Setting up a Hadoop User

Hadoop talks to other nodes in the cluster using no-password ssh.   By having Hadoop run under a specific user context, it will be easy to distribute the ssh keys around in the Hadoop cluster.  Lets’s create a user hadoopuser on master as well as slave nodes.

# Create hadoopgroup
$ sudo addgroup hadoopgroup
# Create hadoopuser user
$ sudo adduser —ingroup hadoopgroup hadoopuser

Our next step will be to generate a ssh key for password-less login between master and slave nodes.  Run the following commands only on master node.  Run the last two commands for each slave node.  Password less ssh should be working before you can proceed with further steps.

# Login as hadoopuser
$ su - hadoopuser
#Generate a ssh key for the user
$ ssh-keygen -t rsa -P ""
#Authorize the key to enable password less ssh 
$ cat /home/hadoopuser/.ssh/id_rsa.pub >> /home/hadoopuser/.ssh/authorized_keys
$ chmod 600 authorized_keys
#Copy this key to slave-1 to enable password less ssh 
$ ssh-copy-id -i ~/.ssh/id_rsa.pub slave-1
#Make sure you can do a password less ssh using following command.
$ ssh slave-1

Download and Install Hadoop binaries on Master and Slave nodes

Pick the best mirror site to download the binaries from Apache Hadoop, and download the stable/hadoop-2.6.0.tar.gz for your installation.  Do this step on master and every slave node.  You can download the file once and the distribute to each slave node using scp command.

$ cd /home/hadoopuser
$ wget http://www.webhostingjams.com/mirror/apache/hadoop/core/stable/hadoop-2.2.0.tar.gz
$ tar xvf hadoop-2.2.0.tar.gz
$ mv hadoop-2.2.0 hadoop

Setup Hadoop Environment on Master and Slave Nodes

Copy and paste following lines into your .bashrc file under /home/hadoopuser. Do this step on master and every slave node.

# Set HADOOP_HOME
export HADOOP_HOME=/home/hduser/hadoop
# Set JAVA_HOME 
export JAVA_HOME=/usr/lib/jvm/java-7-oracle
# Add Hadoop bin and sbin directory to PATH
export PATH=$PATH:$HADOOP_HOME/bin;$HADOOP_HOME/sbin

Update hadoop-env.sh on Master and Slave Nodes

Update JAVA_HOME in /home/hadoopuser/hadoop/etc/hadoop/hadoop_env.sh to following. Do this step on master and every slave node.

export JAVA_HOME=/usr/lib/jvm/java-7-oracle

Common Terminologies
Before we start getting into configuration details, lets discuss some of the basic terminologies used in Hadoop.

  • Hadoop Distributed File System: A distributed file system that provides high-throughput access to application data. A HDFS cluster primarily consists of a NameNode that manages the file system metadata and DataNodes that store the actual data. If you compare HDFS to a traditional storage structures ( e.g. FAT, NTFS), then NameNode is analogous to a Directory Node structure, and DataNode is analogous to actual file storage blocks.
  • Hadoop YARN: A framework for job scheduling and cluster resource management.
  • Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.

Update Configuration Files
Add/update core-site.xml on Master and Slave nodes with following options.  Master and slave nodes should all be using the same value for this property fs.defaultFS,  and should be pointing to master node only.

  /home/hadoopuser/hadoop/etc/hadoop/core-site.xml (Other Options)
<property>
  <name>hadoop.tmp.dir</name>
  <value>/home/hadoopuser/tmp</value>
  <description>Temporary Directory.</description>
</property>

<property>
  <name>fs.defaultFS</name>
  <value>hdfs://master:54310</value>
  <description>Use HDFS as file storage engine</description>
</property>

 

Add/update mapred-site.xml on Master node only with following options.

  /home/hadoopuser/hadoop/etc/hadoop/mapred-site.xml (Other Options)
<property>
 <name>mapreduce.jobtracker.address</name>
 <value>master:54311</value>
 <description>The host and port that the MapReduce job tracker runs
  at. If “local”, then jobs are run in-process as a single map
  and reduce task.
</description>
</property>
<property>
 <name>mapreduce.framework.name</name>
 <value>yarn</value>
 <description>The framework for running mapreduce jobs</description>
</property>

 

Add/update hdfs-site.xml on Master and Slave Nodes. We will be adding following three entries to the file.

  • dfs.replication– Here I am using a replication factor of 2. That means for every file stored in HDFS, there will be one redundant replication of that file on some other node in the cluster.
  • dfs.namenode.name.dir – This directory is used by Namenode to store its metadata file.  Here i manually created this directory /hadoop-data/hadoopuser/hdfs/namenode on master and slave node, and use the directory location for this configuration.
  • dfs.datanode.data.dir – This directory is used by Datanode to store hdfs data blocks.  Here i manually created this directory /hadoop-data/hadoopuser/hdfs/datanode on master and slave node, and use the directory location for this configuration.
  /home/hadoopuser/hadoop/etc/hadoop/hdfs-site.xml (Other Options)
<property>
 <name>dfs.replication</name>
 <value>2</value>
 <description>Default block replication.
  The actual number of replications can be specified when the file is created.
  The default is used if replication is not specified in create time.
 </description>
</property>
<property>
 <name>dfs.namenode.name.dir</name>
 <value>/hadoop-data/hadoopuser/hdfs/namenode</value>
 <description>Determines where on the local filesystem the DFS name node should store the name table(fsimage). If this is a comma-delimited list of directories then the name table is replicated in all of the directories, for redundancy.
 </description>
</property>
<property>
 <name>dfs.datanode.data.dir</name>
 <value>/hadoop-data/hadoopuser/hdfs/datanode</value>
 <description>Determines where on the local filesystem an DFS data node should store its blocks. If this is a comma-delimited list of directories, then data will be stored in all named directories, typically on different devices. Directories that do not exist are ignored.
 </description>
</property>

 

Add yarn-site.xml on Master and Slave Nodes.  This file is required for a Node to work as a Yarn Node.  Master and slave nodes should all be using the same value for the following properties,  and should be pointing to master node only.

  /home/hadoopuser/hadoop/etc/hadoop/yarn-site.xml
<property>
 <name>yarn.nodemanager.aux-services</name>
 <value>mapreduce_shuffle</value>
</property>
<property>
 <name>yarn.resourcemanager.scheduler.address</name>
 <value>master:8030</value>
</property> 
<property>
 <name>yarn.resourcemanager.address</name>
 <value>master:8032</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address</name>
  <value>master:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.resource-tracker.address</name>
  <value>master:8031</value>
</property>
<property>
  <name>yarn.resourcemanager.admin.address</name>
  <value>master:8033</value>
</property>

 

Add/update slaves file on Master node only.  Add just name, or ip addresses of master and all slave node.  If file has an entry for localhost, you can remove that.  This file is just helper file that are used by hadoop scripts to start appropriate services on master and slave nodes.

  /home/hadoopuser/hadoop/etc/hadoop/slave
master
slave-1
slave-2

Format the Namenode
Before starting the cluster, we need to format the Namenode. Use the following command only on master node:

$ hdfs namenode -format

Start the Distributed Format System 

Run the following on master node command to start the DFS.

$ ./home/hadoopuser/hadoop/sbin/start-dfs.sh

You should observe the output to ascertain that it tries to start datanode on slave nodes one by one.   To validate the success, run following command on master nodes, and slave node.

$ su - hadoopuser
$ jps

The output of this command should list NameNode, SecondaryNameNode, DataNode on master node, and DataNode on all slave nodes.  If you don’t see the expected output, review the log files listed in Troubleshooting section.

Start the Yarn MapReduce Job tracker

Run the following command to start the Yarn mapreduce framework.

$ ./home/hadoopuser/hadoop/sbin/start-yarn.sh

To validate the success, run jps command again on master nodes, and slave node.The output of this command should list NodeManager, ResourceManager on master node, and NodeManager, on all slave nodes.  If you don’t see the expected output, review the log files listed in Troubleshooting section.

Review Yarn Web console

If all the services started successfully on all nodes, then you should see all of your nodes listed under Yarn nodes.  You can hit the following url on your browser and verify that:

http://master:8088/cluster/nodes

Lets’s execute a MapReduce example now

You should be all set to run a MapReduce example now. Run the following command

$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar pi 30 100

Once the job is submitted you can validate that its running on the cluster by accessing following url.

http://master:8088/cluster/apps

Troubleshooting
Hadoop uses $HADOOP_HOME/logs directory. In case you get into any issues with your installation, that should be the first point to look at. In case, you need help with anything else, do leave me a comment.

Feedback and Questions?

if you have any feedback, or questions do leave a comment

Related Articles

Installing Hadoop on Ubuntu 14.04 ( Single Node Installation)

Hadoop Java HotSpot execstack warning

References

http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/ClusterSetup.html

 

Bash Shell Prompt: (Linux/Mac) – How to shorten your bash prompt

I love using Terminator/iTerm2 rather then standard OS terminals, and love to to be able to run multiple terminals in a single window, and be able to run multiple commands at once. While these tools are awesome, they also mean that you are left with very small screen real estate for the terminal.  Tying long commands can be really tedious, and its not easy to read stuff when its wrapping to next line.

Bash prompt takes generally 20% of the screen space in your terminal.  Looking clearly, we know that It has some information that might rarely change.  When i am working on my local laptop, i already know who i am, and where i am. So this information is something that can be easily removed from bash prompt.  Following is a screenshot from default bash prompt from my mac ox machine.

full_prompt

Well my first option is to just show my username, and current working directory.  For that you can set following variable in your .bash_profile file.

PS1='\[33[01;34m\]\u \[33[01;31m\] \w > \[33[m\] '

Here \u stands for username, and \w stands for current working directory. Username is being shown so that its easy to differentiate between sudo vs non-sudo terminal session. These values are color coded so that its easy to differentiate. Here i am using \[33[01;34m\]  for username, and \[33[01;31m\]  for directory. This would convert your prompt to following

shorter_prompt

I personally like to shorten it further, and use following prompt instead.

PS1='\[33[01;31m\] \w > \[33[m\] 

I just like to show my current working directory, and not the username. This gives me more real estate.

shortest_prompt

To differentiate the sudo vs non-sudo, you can either set different color codings by using following small IF condition in your /etc/bashrc

if [[ ${EUID} == 0 ]] ; then
        #sudo user show in blue color
        PS1='\[33[01;34m\] \w> \[33[m\]'
else
        #non-sudo user show in red color
        PS1='\[33[01;31m\] \w> \[33[m\]'
fi

 

WARNING: UNPROTECTED PRIVATE KEY FILE!

If you try to copy your .ssh private keys from one machine to another, then you might see this error. You probably forgot to set correct permissions on your private key after copying the key to .ssh directory. It’s very important that these files be protected from any unauthorized access. Only owner of the key should be allowed access to the key files.

The complete error message:

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@         WARNING: UNPROTECTED PRIVATE KEY FILE!          @
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
Permissions 0777 for '/home/sumitc/.ssh/id_rsa' are too open.
It is recommended that your private key files are NOT accessible by others.
This private key will be ignored.
bad permissions: ignore key: /home/sumitc/.ssh/id_rsa

To fix this, you’ll need to reset the default permission on key files.  We can do this by resetting the permissions back to 600. That means only owner has read/write permissions to the key file.

sudo chmod 600 ~/.ssh/id_rsa
sudo chmod 600 ~/.ssh/id_rsa.pub

This should fix the permission error, and you should be able to do a ssh session correctly.

Error: [ng:cpws] Can’t copy! Making copies of Window or Scope instances is not supported.

I started getting this error while developing a AngularJS component. This error from AngularJS sort of tells what the problem is, but does not tell where the problem is. The error was confusing and got me some time to decrypt the cause of the error.

Error: [ng:cpws] Can't copy! Making copies of Window or Scope instances is not supported.
http://errors.angularjs.org/1.2.16/ng/cpws
at https://ajax.googleapis.com/ajax/libs/angularjs/1.2.16/angular.js:78:12
at copy (https://ajax.googleapis.com/ajax/libs/angularjs/1.2.16/angular.js:844:11)
at copy (https://ajax.googleapis.com/ajax/libs/angularjs/1.2.16/angular.js:875:28)
at copy (https://ajax.googleapis.com/ajax/libs/angularjs/1.2.16/angular.js:858:23)
at copy (https://ajax.googleapis.com/ajax/libs/angularjs/1.2.16/angular.js:875:28)
at copy (https://ajax.googleapis.com/ajax/libs/angularjs/1.2.16/angular.js:858:23)
at Scope.$get.Scope.$digest (https://ajax.googleapis.com/ajax/libs/angularjs/1.2.16/angular.js:12250:47)
at Scope.$get.Scope.$apply (https://ajax.googleapis.com/ajax/libs/angularjs/1.2.16/angular.js:12516:24)
at HTMLDivElement. (https://ajax.googleapis.com/ajax/libs/angularjs/1.2.16/angular.js:18626:21)
at HTMLDivElement.m.event.dispatch (https://ajax.googleapis.com/ajax/libs/jquery/1.11.1/jquery.min.js:3:8436)

My code was not making any explicit copies of the scope. Rather, it was the AngularJS runtime which was making the copies of the parent scope while expanding a ng-repeat tag.  Further digging reveled that I had stored a reference of window.popup on my parent scope.  AngularJS code does not support copying the Windows object, so as to avoid causing cyclic references.   Removing this ‘Windows’ reference from the scope fixed this error.