Lab 18: Map Side Join with DistributedCache

Hi Hadoopers,

This post talks about how to do the joins in the mapper. I have two tables. One of the table should be less in content to do this operation.

Employee.csv – This has employee id and name

$ hadoop fs -cat /user/hadoop/lab18/dc/employee.csv

Salary.csv – This has employee id and salary

$ hadoop fs -cat /user/hadoop/lab18/dc/salary.csv

Let’s join them with this MR code. Using DistributedCache, I’ll keep employee table in memory. I’ll use salary table as my mapper input.

Here is the output.

$ hadoop fs -cat /user/hadoop/lab18/01/part-m-00000
0       101,Duryodhana,2000
9       102,Bheema,1500



package org.grassfield.hadoop.dc;

import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;

 * Mapper Sideways join
 * @author pandian
public class DcFileMapper
        extends Mapper<LongWritable, Text, LongWritable, Text> {
    private Map<String, String> employeeMap = new HashMap<String, String>();

    protected void setup(
            Mapper<LongWritable, Text, LongWritable, Text>.Context context)
            throws IOException, InterruptedException {
        //get all the fiels from DC
        Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
        if (localCacheFiles!=null){
            System.out.println("cached files:"+localCacheFiles.length);
            for (Path path:localCacheFiles){
                if (path.getName().toString().equals("employee.csv")){
                    System.out.println("path:"+path+" * "+path.toString());
                    //prepare employee map
    private void populateEmployeeMap(Path path)
            throws FileNotFoundException, IOException {
        String filePath = path.toString();
        String substring = filePath.substring(filePath.indexOf(":")+1);
        BufferedReader br=new BufferedReader(new FileReader(substring));
        String line = null;
            String [] employee = line.split(",");
            employeeMap.put(employee[0].trim(), employee[1].trim());
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, LongWritable, Text>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String [] salary = line.split(",");
        String name = this.employeeMap.get(salary[0]);
        context.write(key, new Text(salary[0]+","+name+","+salary[1]));


package org.grassfield.hadoop.dc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DcFileJob extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        Job job = new Job(super.getConf());
        Configuration conf = job.getConfiguration();
        job.setJobName("Sideways join using dc");
        DistributedCache.addCacheFile(new Path("/user/hadoop/lab18/dc/employee.csv").toUri(), conf);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return 0;

    public static void main(String[] args) throws Exception { Configuration(), new DcFileJob(), args);


Let’s execute it.

$ hadoop jar FeedCategoryCount-18.jar org.grassfield.hadoop.dc.DcFileJob /user/hadoop/lab18/dc/salary.csv /user/hadoop/lab18/01

You will find the employee.csv is being downloaded locally.

16/10/08 11:22:22 INFO mapred.LocalDistributedCacheManager: Creating symlink: /tmp/hadoop-hadoop/mapred/local/1475896942033/employee.csv <- /opt/hadoop-2.6.4/jars/employee.csv
16/10/08 11:22:22 INFO mapred.LocalDistributedCacheManager: Localized hdfs://gandhari:9000/user/hadoop/lab18/dc/employee.csv as file:/tmp/hadoop-hadoop/mapred/local/1475896942033/employee.csv

Refer to the beginning of this post for the output.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s