Lab 15: Writing unit test cases for MapReduce

Hi Hadoopers,

Here is the next interesting post. We already know (at least, we assume we already!) junit is the unit testing framework for Java. Based on that, hadoop offers us MRUnit to write unit test cases.

Here is a sample.

Our Input:

Blogger http://www.blogger.com      BSNLTeleServices | BSNL Broadband Plans, Bill Payment Selfcare Portal   noreply@blogger.com (BSNL TeleServices) http://www.bsnlteleservices.com/        Sat Sep 24 02:42:36 MYT 2016     BSNL 599 Broadband Plan | MP Combo Internet Tariff     noreply@blogger.com (BSNL TeleServices) http://feedproxy.google.com/~r/BSNLTeleservices/~3/3KqPtw353PM/bsnl-599-broadband-plan-mp-combo.html    Sat Sep 24 02:14:11 MYT 2016    [BSNL Broadband Plans, Broadband Tariff]

This reducer will identify the author of this article as highlighted.

Mapper

Mapper accepts the tab limited line and finds the appearance of the author name.

package org.grassfield.nandu.author;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.grassfield.hadoop.FeedCategoryCountMapper.MapperRCheck;
import org.grassfield.hadoop.entity.FeedEntryRecord;
import org.grassfield.hadoop.util.ParseUtil;

public class AuthorMapper
        extends Mapper<LongWritable, Text, Text, IntWritable> {
    String dateFormat = "EEE MMM dd HH:mm:ss z yyyy";
    SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
    Logger logger = Logger.getLogger(AuthorMapper.class);
    IntWritable one = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer st = new StringTokenizer(line, "\t");
        int countTokens = st.countTokens();
        if (countTokens!=10){
            context.getCounter(MapperRCheck.INVALID).increment(1);
        }else{
            context.getCounter(MapperRCheck.VALID).increment(1);
        }
        
        try {
            FeedEntryRecord record = ParseUtil.populateRecord(line, sdf);
            String name = record.getEntryAuthor();
            if (name==null){
                name = record.getFeedAuthor();
            }
            context.write(new Text(name), one);
        } catch (Exception e) {
            logger.error("Error while mapping", e);
            context.getCounter(MapperRCheck.INVALID).increment(1);
        }
    }
}

Reducer

Reducer sums the occurrences of the author names.

package org.grassfield.nandu.author;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class AuthorReducer
        extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        
        int sum=0;
        for(IntWritable value:values){
            sum+=value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

Driver

Here is the driver class

package org.grassfield.nandu.author;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AuthorDriver extends Configured 
    implements Tool{

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        GenericOptionsParser parser = new GenericOptionsParser(conf, args);
        args = parser.getRemainingArgs();
        
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);
        
        Job job = new Job(conf, "Author count");
        job.setJarByClass(this.getClass());
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapperClass(AuthorMapper.class);
        job.setReducerClass(AuthorReducer.class);
        job.setNumReduceTasks(1);
        
        FileInputFormat.setInputPaths(job, input);
        FileOutputFormat.setOutputPath(job, output);
        
        return job.waitForCompletion(true)?0:1;
    }
    
    public static void main(String [] args) throws Exception{
        System.exit(ToolRunner.run(new Configuration(), new AuthorDriver(), args));
    }

}

Test Case

 

Let’s run the unit test case for this reducer.

package org.grassfield.nandu.author;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.junit.Before;
import org.junit.Test;

public class AuthorDriverTest {
    private Mapper<LongWritable, Text, Text, IntWritable> mapper;
    private Reducer<Text, IntWritable, Text, IntWritable> reducer;
    private MapReduceDriver<LongWritable, Text,  Text, IntWritable,  Text, IntWritable> driver;
    
    @Before
    public void setUp() throws Exception{
        mapper = new AuthorMapper();
        reducer = new AuthorReducer();
        driver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
    }
    
    @Test
    public void testRun() throws Exception{
        String line = "Blogger http://www.blogger.com      BSNLTeleServices | BSNL Broadband Plans, Bill Payment Selfcare Portal   noreply@blogger.com (BSNL TeleServices) http://www.bsnlteleservices.com/        Sat Sep 24 02:42:36 MYT 2016     BSNL 599 Broadband Plan | MP Combo Internet Tariff     noreply@blogger.com (BSNL TeleServices) http://feedproxy.google.com/~r/BSNLTeleservices/~3/3KqPtw353PM/bsnl-599-broadband-plan-mp-combo.html    Sat Sep 24 02:14:11 MYT 2016    [BSNL Broadband Plans, Broadband Tariff]";
        driver.withInput(new LongWritable(), new Text(line));
        driver.withOutput(new Text("author"), new IntWritable(1));
        driver.runTest();
    }
}

Let’s run this as a jUnit test case.

hadoop036-lab-15-mrunit

Advertisements