Hi Hadoopers,
Here is the next interesting post. We already know (at least, we assume we already!) junit is the unit testing framework for Java. Based on that, hadoop offers us MRUnit to write unit test cases.
Here is a sample.
Our Input:
Blogger http://www.blogger.com BSNLTeleServices | BSNL Broadband Plans, Bill Payment Selfcare Portal noreply@blogger.com (BSNL TeleServices) http://www.bsnlteleservices.com/ Sat Sep 24 02:42:36 MYT 2016 BSNL 599 Broadband Plan | MP Combo Internet Tariff noreply@blogger.com (BSNL TeleServices) http://feedproxy.google.com/~r/BSNLTeleservices/~3/3KqPtw353PM/bsnl-599-broadband-plan-mp-combo.html Sat Sep 24 02:14:11 MYT 2016 [BSNL Broadband Plans, Broadband Tariff]
This reducer will identify the author of this article as highlighted.
Mapper
Mapper accepts the tab limited line and finds the appearance of the author name.
package org.grassfield.nandu.author;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.grassfield.hadoop.FeedCategoryCountMapper.MapperRCheck;
import org.grassfield.hadoop.entity.FeedEntryRecord;
import org.grassfield.hadoop.util.ParseUtil;
public class AuthorMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
String dateFormat = "EEE MMM dd HH:mm:ss z yyyy";
SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
Logger logger = Logger.getLogger(AuthorMapper.class);
IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer st = new StringTokenizer(line, "\t");
int countTokens = st.countTokens();
if (countTokens!=10){
context.getCounter(MapperRCheck.INVALID).increment(1);
}else{
context.getCounter(MapperRCheck.VALID).increment(1);
}
try {
FeedEntryRecord record = ParseUtil.populateRecord(line, sdf);
String name = record.getEntryAuthor();
if (name==null){
name = record.getFeedAuthor();
}
context.write(new Text(name), one);
} catch (Exception e) {
logger.error("Error while mapping", e);
context.getCounter(MapperRCheck.INVALID).increment(1);
}
}
}
Reducer
Reducer sums the occurrences of the author names.
package org.grassfield.nandu.author;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AuthorReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum=0;
for(IntWritable value:values){
sum+=value.get();
}
context.write(key, new IntWritable(sum));
}
}
Driver
Here is the driver class
package org.grassfield.nandu.author;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class AuthorDriver extends Configured
implements Tool{
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
args = parser.getRemainingArgs();
Path input = new Path(args[0]);
Path output = new Path(args[1]);
Job job = new Job(conf, "Author count");
job.setJarByClass(this.getClass());
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(AuthorMapper.class);
job.setReducerClass(AuthorReducer.class);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
return job.waitForCompletion(true)?0:1;
}
public static void main(String [] args) throws Exception{
System.exit(ToolRunner.run(new Configuration(), new AuthorDriver(), args));
}
}
Test Case
Let’s run the unit test case for this reducer.
package org.grassfield.nandu.author;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.junit.Before;
import org.junit.Test;
public class AuthorDriverTest {
private Mapper<LongWritable, Text, Text, IntWritable> mapper;
private Reducer<Text, IntWritable, Text, IntWritable> reducer;
private MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> driver;
@Before
public void setUp() throws Exception{
mapper = new AuthorMapper();
reducer = new AuthorReducer();
driver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
}
@Test
public void testRun() throws Exception{
String line = "Blogger http://www.blogger.com BSNLTeleServices | BSNL Broadband Plans, Bill Payment Selfcare Portal noreply@blogger.com (BSNL TeleServices) http://www.bsnlteleservices.com/ Sat Sep 24 02:42:36 MYT 2016 BSNL 599 Broadband Plan | MP Combo Internet Tariff noreply@blogger.com (BSNL TeleServices) http://feedproxy.google.com/~r/BSNLTeleservices/~3/3KqPtw353PM/bsnl-599-broadband-plan-mp-combo.html Sat Sep 24 02:14:11 MYT 2016 [BSNL Broadband Plans, Broadband Tariff]";
driver.withInput(new LongWritable(), new Text(line));
driver.withOutput(new Text("author"), new IntWritable(1));
driver.runTest();
}
}
Let’s run this as a jUnit test case.
