Lab 10 – Make use of MapReduce Counter

Dear Hadoopers,

Today’s lecture is about how we can we make use of Map Reduce Counters. As the name implies, these counters would be helpful for us when we want to track something.

logo-mapreduce
These counters are global. Hence you would be able to track the MR process though they are running across the clusters. Lets modify the Mapper code we saw in Lab 08 – MapReduce using custom class as Key to include the counters.
Here is the updated Map code.

package org.grassfield.hadoop;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.grassfield.hadoop.entity.EntryCategory;

/**
 * A Mapper Program to count the categories in RSS XML file This may not be the
 * right approach to parse the XML. Only for demo purpose
 * 
 * @author pandian
 *
 */
public class FeedCategoryCountMapper extends Mapper {
    private IntWritable one = new IntWritable(1);

    public enum RCheck {
        ISVALID, ISINVALID
    };

    @Override
    protected void map(Object key, Object value, Context context)
            throws IOException, InterruptedException {
        /*
         * 1. feed generator 2. feed title 3. feed_author 4. feed_url 5.
         * feed_time 6. item-subject 7. item-author 8. item url 9. item date 10.
         * category
         */
        String line = value.toString();
        StringTokenizer st = new StringTokenizer(line, "\t");
        int countTokens = st.countTokens();
        if (countTokens != 10) {
            System.err.println("Incorrect record " + line);
            context.getCounter(RCheck.ISINVALID).increment(1);
            return;
        } else {
            context.getCounter(RCheck.ISVALID).increment(1);
        }

        st.nextToken();
        st.nextToken();
        st.nextToken();
        st.nextToken();
        st.nextToken();
        st.nextToken();
        st.nextToken();
        st.nextToken();
        st.nextToken();
        String catCsv = st.nextToken();
        catCsv = catCsv.substring(1);
        catCsv = catCsv.substring(0, catCsv.length() - 1);
        st = new StringTokenizer(catCsv, ",");
        while (st.hasMoreTokens()) {
            EntryCategory category = new EntryCategory();
            category.setCategory(st.nextToken().trim());
            context.write(category, one);
        }
    }
}

I have highlighted the changes I did for counters above. Let’s see the changes in the Driver.

@Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        GenericOptionsParser parser = new GenericOptionsParser(conf,
                args);
        args = parser.getRemainingArgs();

        Path input = new Path(args[0]);
        Path output = new Path(args[1]);

        Job job = new Job(conf, "Feed Category Count");
        job.setJarByClass(getClass());

        job.setMapOutputKeyClass(EntryCategory.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(EntryCategory.class);
        job.setOutputValueClass(IntWritable.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        job.setPartitionerClass(FeedCategoryPartitioner.class);
        job.setCombinerClass(FeedCategoryCombiner.class);
        job.setReducerClass(FeedCategoryReducer.class);
        job.setNumReduceTasks(3);

        FileInputFormat.setInputPaths(job, input);
        FileOutputFormat.setOutputPath(job, output);

        job.setMapperClass(FeedCategoryCountMapper.class);
        job.waitForCompletion(true);

        // this is the customer counter used in the mapper to count the invalid
        // records
        Counter validCounter = job.getCounters()
                .findCounter(RCheck.ISVALID);
        System.out.println(validCounter.getDisplayName() + " "
                + validCounter.getValue());

        // print the invalid record count
        Counter invalidCounter = job.getCounters()
                .findCounter(RCheck.ISINVALID);
        System.out.println(invalidCounter.getDisplayName() + " "
                + invalidCounter.getValue());

        // print all counters
        for (CounterGroup group : job.getCounters()) {
            System.out.println(
                    group.getDisplayName() + group.getName());
            System.out.println(group.size() + " items found");
            for (Counter counter : group) {
                System.out.println("\t" + counter.getDisplayName()
                        + " " + counter.getName() + " "
                        + counter.getValue());
            }
        }
        return 0;
    }

When I visualize how this code will work across the clusters, I’m so excited. 6 out of 2500 records are invalid!

Here you go.

16/09/17 11:34:17 INFO mapreduce.Job: Counters: 40
        File System Counters
                FILE: Number of bytes read=424450
                FILE: Number of bytes written=1659767
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=3676776
                HDFS: Number of bytes written=139412
                HDFS: Number of read operations=38
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=16
        Map-Reduce Framework
                Map input records=2518
                Map output records=6765
                Map output bytes=163836
                Map output materialized bytes=100676
                Input split bytes=114
                Combine input records=6765
                Combine output records=3741
                Reduce input groups=3741
                Reduce shuffle bytes=100676
                Reduce input records=3741
                Reduce output records=3741
                Spilled Records=7482
                Shuffled Maps =3
                Failed Shuffles=0
                Merged Map outputs=3
                GC time elapsed (ms)=0
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
                Total committed heap usage (bytes)=1870659584
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=919194
        File Output Format Counters
                Bytes Written=82021
        org.grassfield.hadoop.FeedCategoryCountMapper$RCheck
                ISINVALID=6
                ISVALID=2512
ISVALID 2512
ISINVALID 6
File System Countersorg.apache.hadoop.mapreduce.FileSystemCounter
10 items found
        FILE: Number of bytes read FILE_BYTES_READ 424450
        FILE: Number of bytes written FILE_BYTES_WRITTEN 1659767
        FILE: Number of read operations FILE_READ_OPS 0
        FILE: Number of large read operations FILE_LARGE_READ_OPS 0
        FILE: Number of write operations FILE_WRITE_OPS 0
        HDFS: Number of bytes read HDFS_BYTES_READ 3676776
        HDFS: Number of bytes written HDFS_BYTES_WRITTEN 139412
        HDFS: Number of read operations HDFS_READ_OPS 38
        HDFS: Number of large read operations HDFS_LARGE_READ_OPS 0
        HDFS: Number of write operations HDFS_WRITE_OPS 16
Map-Reduce Frameworkorg.apache.hadoop.mapreduce.TaskCounter
20 items found
        Map input records MAP_INPUT_RECORDS 2518
        Map output records MAP_OUTPUT_RECORDS 6765
        Map output bytes MAP_OUTPUT_BYTES 163836
        Map output materialized bytes MAP_OUTPUT_MATERIALIZED_BYTES 100676
        Input split bytes SPLIT_RAW_BYTES 114
        Combine input records COMBINE_INPUT_RECORDS 6765
        Combine output records COMBINE_OUTPUT_RECORDS 3741
        Reduce input groups REDUCE_INPUT_GROUPS 3741
        Reduce shuffle bytes REDUCE_SHUFFLE_BYTES 100676
        Reduce input records REDUCE_INPUT_RECORDS 3741
        Reduce output records REDUCE_OUTPUT_RECORDS 3741
        Spilled Records SPILLED_RECORDS 7482
        Shuffled Maps  SHUFFLED_MAPS 3
        Failed Shuffles FAILED_SHUFFLE 0
        Merged Map outputs MERGED_MAP_OUTPUTS 3
        GC time elapsed (ms) GC_TIME_MILLIS 0
        CPU time spent (ms) CPU_MILLISECONDS 0
        Physical memory (bytes) snapshot PHYSICAL_MEMORY_BYTES 0
        Virtual memory (bytes) snapshot VIRTUAL_MEMORY_BYTES 0
        Total committed heap usage (bytes) COMMITTED_HEAP_BYTES 1870659584
Shuffle ErrorsShuffle Errors
6 items found
        BAD_ID BAD_ID 0
        CONNECTION CONNECTION 0
        IO_ERROR IO_ERROR 0
        WRONG_LENGTH WRONG_LENGTH 0
        WRONG_MAP WRONG_MAP 0
        WRONG_REDUCE WRONG_REDUCE 0
File Input Format Counters org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
1 items found
        Bytes Read BYTES_READ 919194
File Output Format Counters org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
1 items found
        Bytes Written BYTES_WRITTEN 82021
org.grassfield.hadoop.FeedCategoryCountMapper$RCheckorg.grassfield.hadoop.FeedCategoryCountMapper$RCheck
2 items found
        ISINVALID ISINVALID 6
        ISVALID ISVALID 2512

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s