Lab 19: Reducer side join

Hi Hadoopers,

I have written a post about Mapper side join in my previous post. Here is the code for reducer side join.

When we do reducer side join, we need to keep the keys as common between the mappers.

I have two input files (tables) as given below.

logo-mapreduce

Employee Mapper

Employee.csv

$ hadoop fs -cat /user/hadoop/lab19/employee.csv
101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati
102,Bheema,Pandu,Kunti,Hidimbi

This would be handled by the following Mapper class

package org.grassfield.hadoop.dc;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EmployeeMapper
        extends Mapper<Object, Text, Text, Text> {

    @Override
    protected void map(Object key, Text value,
            Mapper<Object, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        //101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati
        String eRecord  = value.toString();
        String[] part = eRecord.split(",");
        System.out.println("part:"+part.length);
        context.write(new Text(part[0]), new Text("emp\t"+part[1]+"\t"+part[2]+"\t"+part[3]+"\t"+part[4]));
    }
}

Salary Mapper

Following is the second table, we are going to use.

$ hadoop fs -cat /user/hadoop/lab19/salary.csv
101,2000
102,1500
101,4000
102,3000

This would be handled by Salary Mapper, which is given below.

package org.grassfield.hadoop.dc;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SalaryMapper
        extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String [] split = line.split(",");
        context.write(new Text(split[0]), new Text("salary\t"+split[1]));
    }
}

Mapper Output

The output of both the mappers would be as given below. You can notice that both of them have same keys.

$ hadoop fs -ls /user/hadoop/lab19/01
Found 3 items
-rw-r--r--   3 hadoop supergroup          0 2016-10-08 17:42 /user/hadoop/lab19/01/_SUCCESS
-rw-r--r--   3 hadoop supergroup         87 2016-10-08 17:42 /user/hadoop/lab19/01/part-m-00000
-rw-r--r--   3 hadoop supergroup         64 2016-10-08 17:42 /user/hadoop/lab19/01/part-m-00001

$ hadoop fs -cat /user/hadoop/lab19/01/*m*
101     emp     Duryodhana      Dhritarashtra   Gandhari        Bhanumati
102     emp     Bheema  Pandu   Kunti   Hidimbi
101     salary  2000
102     salary  1500
101     salary  4000
102     salary  3000

We use employee id as the key to join both the tables.

Employee Reducer

Following is the output of the reducer. You will see both the tables are joined together using the key employee id.

$ hadoop jar FeedCategoryCount-19.jar org.grassfield.hadoop.dc.EmployeeJob /user/hadoop/lab19/employee.csv /user/hadoop/lab19/salary.csv /user/hadoop/lab19/01

$ hadoop fs -cat /user/hadoop/lab19/01/part-r-00000
Duryodhana      Dhritarashtra   Gandhari        Bhanumati       6000
Bheema  Pandu   Kunti   Hidimbi 4500

Here is the reducer code.

package org.grassfield.hadoop.dc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class EmployeeReducer
        extends Reducer<Text, Text, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        System.out.println("processing emp id "+key);
        int count=0;
        String name=null;
        for (Text t:values){
            String record = t.toString();
            String[] split = record.split("\t");
            if (split[0].equals("salary")){
                count+=Integer.parseInt(split[1]);
            }else if (split[0].equals("emp")){
                name = split[1]+"\t"+split[2]+"\t"+split[3]+"\t"+split[4];
            }
        }
        context.write(new Text(name), new IntWritable(count));
    }

}

See you in another interesting post.

 

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s