Lab 13 – Working with Hadoop serialization – MapReduce with custom key and custom value

Hi hadoopers,

Previously I have used a custom class for key. Pls check Lab 08 – MapReduce using custom class as Key This post will use custom class for both key and value. It will help you to understand the Hadoop serialization.

logo-mapreduce

Mapper

package org.grassfield.nandu.category;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.grassfield.hadoop.FeedCategoryCountMapper.MapperRCheck;
import org.grassfield.hadoop.entity.EntryCategory;
import org.grassfield.hadoop.entity.FeedEntryRecord;
import org.grassfield.hadoop.util.ParseUtil;

/**
 * Key: Entrycategory - custom class Value: FeedEntryRecord - custom class
 * 
 * @author pandian
 *
 */
public class CategoryToArticleMapper extends
        Mapper<LongWritable, Text, EntryCategory, FeedEntryRecord> {
    String dateFormat = "EEE MMM dd HH:mm:ss z yyyy";
    SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
    Logger logger = Logger.getLogger(CategoryToArticleMapper.class);

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, EntryCategory, FeedEntryRecord>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer st = new StringTokenizer(line, "\t");
        int countTokens = st.countTokens();
        
        //update hadoop counters
        if (countTokens != 10) {
            System.err.println("Incorrect record with " + countTokens
                    + " tokens. " + line);
            context.getCounter(MapperRCheck.INVALID).increment(1);
            return;
        } else {
            context.getCounter(MapperRCheck.VALID).increment(1);
        }
        FeedEntryRecord record = ParseUtil.populateRecord(line, sdf);

        Set<EntryCategory> categorySet = record.getCategorySet();
        if (categorySet == null || categorySet.isEmpty())
            logger.warn("no category received for "
                    + record.getFeedTitle() + "/"
                    + record.getEntrySubject());
        else {
            for (EntryCategory category : categorySet) {
                context.write(category, record);
            }
        }
    }
}

Reducer

Mapper emits EntryCategory and FeedEntryRecord as key and value pairs. So they are the inputs to our Reducer.

package org.grassfield.nandu.category;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.Logger;
import org.grassfield.hadoop.FeedCategoryCountMapper.ReducerRCheck;
import org.grassfield.hadoop.entity.EntryCategory;
import org.grassfield.hadoop.entity.FeedEntryRecord;

/**
 * Key: EntryCategory - custom class Value: FeedEntryRecord - custom class
 * output Key: EntryCategory - custom class outut value: text writable
 * 
 * @author pandian
 *
 */
public class CategoryToArticleReducer extends
        Reducer<EntryCategory, FeedEntryRecord, EntryCategory, Text> {
    private static Logger logger = Logger
            .getLogger(CategoryToArticleReducer.class);

    @Override
    protected void reduce(EntryCategory key,
            Iterable<FeedEntryRecord> values, Context context)
            throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for (FeedEntryRecord record : values) {
            
            //update hadoop counters
            if (record == null || record.getEntryUrl() == null) {
                logger.warn(
                        "Skipping.. This record seems to be faulty "
                                + record);
                context.getCounter(ReducerRCheck.INVALID)
                        .increment(1);
                continue;
            }
            context.getCounter(ReducerRCheck.VALID).increment(1);
            sb.append(",");
            sb.append(record.toString());
        }
        context.write(key, new Text(sb.toString().substring(1)));
    }

}

 

Driver

Here is our Driver class.

hadoop_yarn

package org.grassfield.nandu.category;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.grassfield.hadoop.entity.EntryCategory;
import org.grassfield.hadoop.entity.FeedEntryRecord;

/**
 * Driver class for Category to Article mapping
 * 
 * @author pandian
 *
 */
public class CategoryToArticleDriver extends Configured
        implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        GenericOptionsParser parser = new GenericOptionsParser(conf,
                args);
        args = parser.getRemainingArgs();
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);

        Job job = new Job(conf, "Category to Article");
        job.setJarByClass(getClass());

        job.setMapOutputKeyClass(EntryCategory.class);
        job.setMapOutputValueClass(FeedEntryRecord.class);

        job.setOutputKeyClass(EntryCategory.class);
        job.setOutputKeyClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        job.setReducerClass(CategoryToArticleReducer.class);
        job.setNumReduceTasks(1);

        FileInputFormat.setInputPaths(job, input);
        FileOutputFormat.setOutputPath(job, output);

        job.setMapperClass(CategoryToArticleMapper.class);
        job.waitForCompletion(true);

        return 0;
    }

    /**
     * hadoop jar FeedCategoryCount-13.jar
     * org.grassfield.nandu.category.CategoryToArticleDriver
     * 
     * @param args
     *            /user/hadoop/feed/2016-09-21 /user/hadoop/lab13
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(),
                new CategoryToArticleDriver(), args));
    }

}

Custom Key class

Here is our key class. Hadoop sorts the keys. Hence we need to implement WritableComparable interface. It is exciting to write and read using serialization. It is different from standard java serialization.

package org.grassfield.hadoop.entity;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * This will be used as a key. So it should be writable and comparable
 * @author pandian
 *
 */
public class EntryCategory
        implements WritableComparable<EntryCategory> {
    private String category;

    public EntryCategory(String category) {
        this.category = category;
    }

    public EntryCategory() {

    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.category = dataInput.readUTF();

    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(category);
    }

    @Override
    public boolean equals(Object obj) {
        if (null == obj)
            return false;
        if (!(obj instanceof EntryCategory))
            return false;

        EntryCategory o = (EntryCategory) obj;

        return this.getCategory().equals(o.getCategory());
    }

    @Override
    public int hashCode() {
        return this.category.hashCode();
    }

    @Override
    public String toString() {
        return this.category;
    }

    @Override
    public int compareTo(EntryCategory obj) {
        if (obj == null)
            return 1;

        if (equals(obj))
            return 0;

        if (!(obj instanceof EntryCategory))
            return 1;

        EntryCategory o = (EntryCategory) obj;

        return this.category.compareTo(o.category);
    }

}

 

Custom Value Class

Value class should implement Writable to comply the serialization standard. I have many fields in value calss. See the serialization in the below code. I read/write string and long.

package org.grassfield.hadoop.entity;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeSet;

import org.apache.hadoop.io.Writable;

/**
 * This will act as value. It is sufficient to implement Writable
 * @author pandian
 *
 */
public class FeedEntryRecord implements Writable {
    /*
     * 1. feed generator 2. feed title 3. feed_author 4. feed_url 5. feed_time
     * 6. item-subject 7. item-author 8. item url 9. item date 10. category
     */
    private String generator;
    private String feedTitle;
    private String feedAuthor;
    private String feedUrl;
    private Date feed_time;
    private String entrySubject;
    private String entryAuthor;
    private String entryUrl;
    private Date entryDate;
    private Set<EntryCategory> categorySet;

    @Override
    public void readFields(DataInput in) throws IOException {
        this.setCategorySet(in.readUTF());
        this.entryAuthor = in.readUTF();
        this.entryDate = new Date(in.readLong());
        this.entrySubject = in.readUTF();
        this.entryUrl = in.readUTF();
        this.feed_time = new Date(in.readLong());
        this.feedAuthor = in.readUTF();
        this.feedTitle = in.readUTF();
        this.feedUrl = in.readUTF();
        this.generator = in.readUTF();
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.categorySet.toString());
        dataOutput.writeUTF(this.entryAuthor);
        dataOutput.writeLong(this.entryDate.getTime());
        dataOutput.writeUTF(this.entrySubject);
        dataOutput.writeUTF(this.entryUrl);
        dataOutput.writeLong(this.feed_time.getTime());
        dataOutput.writeUTF(this.feedAuthor);
        dataOutput.writeUTF(this.feedTitle);
        dataOutput.writeUTF(this.feedUrl);
        dataOutput.writeUTF(this.generator);
    }

    public String getGenerator() {
        return generator;
    }

    public void setGenerator(String generator) {
        this.generator = generator;
    }

    public String getFeedTitle() {
        return feedTitle;
    }

    public void setFeedTitle(String feedTitle) {
        this.feedTitle = feedTitle;
    }

    public String getFeedAuthor() {
        return feedAuthor;
    }

    public void setFeedAuthor(String feedAuthor) {
        this.feedAuthor = feedAuthor;
    }

    public String getFeedUrl() {
        return feedUrl;
    }

    public void setFeedUrl(String feedUrl) {
        this.feedUrl = feedUrl;
    }

    public Date getFeed_time() {
        return feed_time;
    }

    public void setFeed_time(Date feed_time) {
        this.feed_time = feed_time;
    }

    public String getEntrySubject() {
        return entrySubject;
    }

    public void setEntrySubject(String entrySubject) {
        this.entrySubject = entrySubject;
    }

    public String getEntryAuthor() {
        return entryAuthor;
    }

    public void setEntryAuthor(String entryAuthor) {
        this.entryAuthor = entryAuthor;
    }

    public String getEntryUrl() {
        return entryUrl;
    }

    public void setEntryUrl(String entryUrl) {
        this.entryUrl = entryUrl;
    }

    public Date getEntryDate() {
        if (this.entryDate == null)
            return this.getFeed_time();
        return entryDate;
    }

    public void setEntryDate(Date entryDate) {
        this.entryDate = entryDate;
        if (this.getFeed_time() == null)
            this.setFeed_time(entryDate);
    }

    public Set<EntryCategory> getCategorySet() {
        return categorySet;
    }

    public void setCategorySet(String categoryCsv) {
        if (categoryCsv == null)
            return;
        if (categoryCsv.equals("[]"))
            return;

        categoryCsv = categoryCsv.substring(1);
        categoryCsv = categoryCsv.substring(0,
                categoryCsv.length() - 1);
        StringTokenizer st = new StringTokenizer(categoryCsv, ",");
        this.categorySet = new TreeSet<EntryCategory>();
        while (st.hasMoreTokens()) {
            EntryCategory category = new EntryCategory();
            category.setCategory(st.nextToken());
            categorySet.add(category);
        }
    }

    @Override
    public String toString() {
        return this.entryAuthor + "\t" + this.feedAuthor + "\t"
                + this.feedTitle + "\t" + this.feedUrl + "\t"
                + this.generator + "\t" + this.feed_time

                + "\t" + this.entryUrl + "\t" + this.entrySubject
                + "\t" + this.categorySet + "\t" + this.entryDate;
    }

}

Hope you had a good week. Have an enjoyable Friday!