Lab 19: Reducer side join

Hi Hadoopers,

I have written a post about Mapper side join in my previous post. Here is the code for reducer side join.

When we do reducer side join, we need to keep the keys as common between the mappers.

I have two input files (tables) as given below.

logo-mapreduce

Employee Mapper

Employee.csv

$ hadoop fs -cat /user/hadoop/lab19/employee.csv
101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati
102,Bheema,Pandu,Kunti,Hidimbi

This would be handled by the following Mapper class

package org.grassfield.hadoop.dc;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EmployeeMapper
        extends Mapper<Object, Text, Text, Text> {

    @Override
    protected void map(Object key, Text value,
            Mapper<Object, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        //101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati
        String eRecord  = value.toString();
        String[] part = eRecord.split(",");
        System.out.println("part:"+part.length);
        context.write(new Text(part[0]), new Text("emp\t"+part[1]+"\t"+part[2]+"\t"+part[3]+"\t"+part[4]));
    }
}

Salary Mapper

Following is the second table, we are going to use.

$ hadoop fs -cat /user/hadoop/lab19/salary.csv
101,2000
102,1500
101,4000
102,3000

This would be handled by Salary Mapper, which is given below.

package org.grassfield.hadoop.dc;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SalaryMapper
        extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String [] split = line.split(",");
        context.write(new Text(split[0]), new Text("salary\t"+split[1]));
    }
}

Mapper Output

The output of both the mappers would be as given below. You can notice that both of them have same keys.

$ hadoop fs -ls /user/hadoop/lab19/01
Found 3 items
-rw-r--r--   3 hadoop supergroup          0 2016-10-08 17:42 /user/hadoop/lab19/01/_SUCCESS
-rw-r--r--   3 hadoop supergroup         87 2016-10-08 17:42 /user/hadoop/lab19/01/part-m-00000
-rw-r--r--   3 hadoop supergroup         64 2016-10-08 17:42 /user/hadoop/lab19/01/part-m-00001

$ hadoop fs -cat /user/hadoop/lab19/01/*m*
101     emp     Duryodhana      Dhritarashtra   Gandhari        Bhanumati
102     emp     Bheema  Pandu   Kunti   Hidimbi
101     salary  2000
102     salary  1500
101     salary  4000
102     salary  3000

We use employee id as the key to join both the tables.

Employee Reducer

Following is the output of the reducer. You will see both the tables are joined together using the key employee id.

$ hadoop jar FeedCategoryCount-19.jar org.grassfield.hadoop.dc.EmployeeJob /user/hadoop/lab19/employee.csv /user/hadoop/lab19/salary.csv /user/hadoop/lab19/01

$ hadoop fs -cat /user/hadoop/lab19/01/part-r-00000
Duryodhana      Dhritarashtra   Gandhari        Bhanumati       6000
Bheema  Pandu   Kunti   Hidimbi 4500

Here is the reducer code.

package org.grassfield.hadoop.dc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class EmployeeReducer
        extends Reducer<Text, Text, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        System.out.println("processing emp id "+key);
        int count=0;
        String name=null;
        for (Text t:values){
            String record = t.toString();
            String[] split = record.split("\t");
            if (split[0].equals("salary")){
                count+=Integer.parseInt(split[1]);
            }else if (split[0].equals("emp")){
                name = split[1]+"\t"+split[2]+"\t"+split[3]+"\t"+split[4];
            }
        }
        context.write(new Text(name), new IntWritable(count));
    }

}

See you in another interesting post.

 

Lab 18: Map Side Join with DistributedCache

Hi Hadoopers,

This post talks about how to do the joins in the mapper. I have two tables. One of the table should be less in content to do this operation.

Employee.csv – This has employee id and name

$ hadoop fs -cat /user/hadoop/lab18/dc/employee.csv
101,Duryodhana
102,Bheema

Salary.csv – This has employee id and salary

$ hadoop fs -cat /user/hadoop/lab18/dc/salary.csv
101,2000
102,1500

Let’s join them with this MR code. Using DistributedCache, I’ll keep employee table in memory. I’ll use salary table as my mapper input.

Here is the output.

$ hadoop fs -cat /user/hadoop/lab18/01/part-m-00000
0       101,Duryodhana,2000
9       102,Bheema,1500

logo-mapreduce

Mapper:

package org.grassfield.hadoop.dc;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;

/**
 * Mapper Sideways join
 * @author pandian
 *
 */
public class DcFileMapper
        extends Mapper<LongWritable, Text, LongWritable, Text> {
    private Map<String, String> employeeMap = new HashMap<String, String>();

    @Override
    protected void setup(
            Mapper<LongWritable, Text, LongWritable, Text>.Context context)
            throws IOException, InterruptedException {
        //get all the fiels from DC
        Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
        if (localCacheFiles!=null){
            System.out.println("cached files:"+localCacheFiles.length);
            for (Path path:localCacheFiles){
                if (path.getName().toString().equals("employee.csv")){
                    System.out.println("path:"+path+" * "+path.toString());
                    //prepare employee map
                    populateEmployeeMap(path);
                }
            }
        }
    }
    private void populateEmployeeMap(Path path)
            throws FileNotFoundException, IOException {
        String filePath = path.toString();
        String substring = filePath.substring(filePath.indexOf(":")+1);
        BufferedReader br=new BufferedReader(new FileReader(substring));
        String line = null;
        while((line=br.readLine())!=null){
            //101,Duryodhana
            String [] employee = line.split(",");
            employeeMap.put(employee[0].trim(), employee[1].trim());
        }
        br.close();
    }
    
    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, LongWritable, Text>.Context context)
            throws IOException, InterruptedException {
        
        //101,2000
        String line = value.toString();
        String [] salary = line.split(",");
        String name = this.employeeMap.get(salary[0]);
        context.write(key, new Text(salary[0]+","+name+","+salary[1]));
    }
}

Driver

package org.grassfield.hadoop.dc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DcFileJob extends Configured implements Tool {

    @SuppressWarnings("deprecation")
    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(super.getConf());
        Configuration conf = job.getConfiguration();
        job.setJobName("Sideways join using dc");
        DistributedCache.addCacheFile(new Path("/user/hadoop/lab18/dc/employee.csv").toUri(), conf);
        job.setJarByClass(this.getClass());
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(DcFileMapper.class);
        job.setNumReduceTasks(0);
        job.waitForCompletion(true);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new DcFileJob(), args);
    }

}

Let’s execute it.

$ hadoop jar FeedCategoryCount-18.jar org.grassfield.hadoop.dc.DcFileJob /user/hadoop/lab18/dc/salary.csv /user/hadoop/lab18/01

You will find the employee.csv is being downloaded locally.

16/10/08 11:22:22 INFO mapred.LocalDistributedCacheManager: Creating symlink: /tmp/hadoop-hadoop/mapred/local/1475896942033/employee.csv <- /opt/hadoop-2.6.4/jars/employee.csv
16/10/08 11:22:22 INFO mapred.LocalDistributedCacheManager: Localized hdfs://gandhari:9000/user/hadoop/lab18/dc/employee.csv as file:/tmp/hadoop-hadoop/mapred/local/1475896942033/employee.csv

Refer to the beginning of this post for the output.