Lab 18: Map Side Join with DistributedCache

Hi Hadoopers,

This post talks about how to do the joins in the mapper. I have two tables. One of the table should be less in content to do this operation.

Employee.csv – This has employee id and name

$ hadoop fs -cat /user/hadoop/lab18/dc/employee.csv
101,Duryodhana
102,Bheema

Salary.csv – This has employee id and salary

$ hadoop fs -cat /user/hadoop/lab18/dc/salary.csv
101,2000
102,1500

Let’s join them with this MR code. Using DistributedCache, I’ll keep employee table in memory. I’ll use salary table as my mapper input.

Here is the output.

$ hadoop fs -cat /user/hadoop/lab18/01/part-m-00000
0       101,Duryodhana,2000
9       102,Bheema,1500

logo-mapreduce

Mapper:

package org.grassfield.hadoop.dc;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;

/**
 * Mapper Sideways join
 * @author pandian
 *
 */
public class DcFileMapper
        extends Mapper<LongWritable, Text, LongWritable, Text> {
    private Map<String, String> employeeMap = new HashMap<String, String>();

    @Override
    protected void setup(
            Mapper<LongWritable, Text, LongWritable, Text>.Context context)
            throws IOException, InterruptedException {
        //get all the fiels from DC
        Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
        if (localCacheFiles!=null){
            System.out.println("cached files:"+localCacheFiles.length);
            for (Path path:localCacheFiles){
                if (path.getName().toString().equals("employee.csv")){
                    System.out.println("path:"+path+" * "+path.toString());
                    //prepare employee map
                    populateEmployeeMap(path);
                }
            }
        }
    }
    private void populateEmployeeMap(Path path)
            throws FileNotFoundException, IOException {
        String filePath = path.toString();
        String substring = filePath.substring(filePath.indexOf(":")+1);
        BufferedReader br=new BufferedReader(new FileReader(substring));
        String line = null;
        while((line=br.readLine())!=null){
            //101,Duryodhana
            String [] employee = line.split(",");
            employeeMap.put(employee[0].trim(), employee[1].trim());
        }
        br.close();
    }
    
    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, LongWritable, Text>.Context context)
            throws IOException, InterruptedException {
        
        //101,2000
        String line = value.toString();
        String [] salary = line.split(",");
        String name = this.employeeMap.get(salary[0]);
        context.write(key, new Text(salary[0]+","+name+","+salary[1]));
    }
}

Driver

package org.grassfield.hadoop.dc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DcFileJob extends Configured implements Tool {

    @SuppressWarnings("deprecation")
    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(super.getConf());
        Configuration conf = job.getConfiguration();
        job.setJobName("Sideways join using dc");
        DistributedCache.addCacheFile(new Path("/user/hadoop/lab18/dc/employee.csv").toUri(), conf);
        job.setJarByClass(this.getClass());
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(DcFileMapper.class);
        job.setNumReduceTasks(0);
        job.waitForCompletion(true);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new DcFileJob(), args);
    }

}

Let’s execute it.

$ hadoop jar FeedCategoryCount-18.jar org.grassfield.hadoop.dc.DcFileJob /user/hadoop/lab18/dc/salary.csv /user/hadoop/lab18/01

You will find the employee.csv is being downloaded locally.

16/10/08 11:22:22 INFO mapred.LocalDistributedCacheManager: Creating symlink: /tmp/hadoop-hadoop/mapred/local/1475896942033/employee.csv <- /opt/hadoop-2.6.4/jars/employee.csv
16/10/08 11:22:22 INFO mapred.LocalDistributedCacheManager: Localized hdfs://gandhari:9000/user/hadoop/lab18/dc/employee.csv as file:/tmp/hadoop-hadoop/mapred/local/1475896942033/employee.csv

Refer to the beginning of this post for the output.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s