Lab 18: Map Side Join with DistributedCache

Hi Hadoopers,

This post talks about how to do the joins in the mapper. I have two tables. One of the table should be less in content to do this operation.

Employee.csv – This has employee id and name

$ hadoop fs -cat /user/hadoop/lab18/dc/employee.csv
101,Duryodhana
102,Bheema

Salary.csv – This has employee id and salary

$ hadoop fs -cat /user/hadoop/lab18/dc/salary.csv
101,2000
102,1500

Let’s join them with this MR code. Using DistributedCache, I’ll keep employee table in memory. I’ll use salary table as my mapper input.

Here is the output.

$ hadoop fs -cat /user/hadoop/lab18/01/part-m-00000
0       101,Duryodhana,2000
9       102,Bheema,1500

logo-mapreduce

Mapper:

package org.grassfield.hadoop.dc;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;

/**
 * Mapper Sideways join
 * @author pandian
 *
 */
public class DcFileMapper
        extends Mapper<LongWritable, Text, LongWritable, Text> {
    private Map<String, String> employeeMap = new HashMap<String, String>();

    @Override
    protected void setup(
            Mapper<LongWritable, Text, LongWritable, Text>.Context context)
            throws IOException, InterruptedException {
        //get all the fiels from DC
        Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
        if (localCacheFiles!=null){
            System.out.println("cached files:"+localCacheFiles.length);
            for (Path path:localCacheFiles){
                if (path.getName().toString().equals("employee.csv")){
                    System.out.println("path:"+path+" * "+path.toString());
                    //prepare employee map
                    populateEmployeeMap(path);
                }
            }
        }
    }
    private void populateEmployeeMap(Path path)
            throws FileNotFoundException, IOException {
        String filePath = path.toString();
        String substring = filePath.substring(filePath.indexOf(":")+1);
        BufferedReader br=new BufferedReader(new FileReader(substring));
        String line = null;
        while((line=br.readLine())!=null){
            //101,Duryodhana
            String [] employee = line.split(",");
            employeeMap.put(employee[0].trim(), employee[1].trim());
        }
        br.close();
    }
    
    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, LongWritable, Text>.Context context)
            throws IOException, InterruptedException {
        
        //101,2000
        String line = value.toString();
        String [] salary = line.split(",");
        String name = this.employeeMap.get(salary[0]);
        context.write(key, new Text(salary[0]+","+name+","+salary[1]));
    }
}

Driver

package org.grassfield.hadoop.dc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DcFileJob extends Configured implements Tool {

    @SuppressWarnings("deprecation")
    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(super.getConf());
        Configuration conf = job.getConfiguration();
        job.setJobName("Sideways join using dc");
        DistributedCache.addCacheFile(new Path("/user/hadoop/lab18/dc/employee.csv").toUri(), conf);
        job.setJarByClass(this.getClass());
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(DcFileMapper.class);
        job.setNumReduceTasks(0);
        job.waitForCompletion(true);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new DcFileJob(), args);
    }

}

Let’s execute it.

$ hadoop jar FeedCategoryCount-18.jar org.grassfield.hadoop.dc.DcFileJob /user/hadoop/lab18/dc/salary.csv /user/hadoop/lab18/01

You will find the employee.csv is being downloaded locally.

16/10/08 11:22:22 INFO mapred.LocalDistributedCacheManager: Creating symlink: /tmp/hadoop-hadoop/mapred/local/1475896942033/employee.csv <- /opt/hadoop-2.6.4/jars/employee.csv
16/10/08 11:22:22 INFO mapred.LocalDistributedCacheManager: Localized hdfs://gandhari:9000/user/hadoop/lab18/dc/employee.csv as file:/tmp/hadoop-hadoop/mapred/local/1475896942033/employee.csv

Refer to the beginning of this post for the output.

Advertisements

Lab 17 – Adding custom jars to MapReduce using Distributed Cache

Hi,

This is part of my blog post series on MapReduce. But let me tell you upfront, this doesn’t work. The job fails with ClassNotFound exception. I’ll come back again and update this post if I rectify this. As I spent too much time on this, I’ll use -libjars parameter and GenericOptionsParser for such requirements.

One of the other post Including Third-Party Libraries in my Map-Reduce Job (using distributed cache) says, this doesn’t work. I really don’t know how it worked for others. let’s put this on hold.

logo-mapreduce

Driver:

package org.grassfield.hadoop.dc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * hadoop jar FeedCategoryCount-17.jar org.grassfield.hadoop.dc.DcJob /user/hadoop/lab17/input/feedList.csv /user/hadoop/lab17/05 /user/hadoop/lab17/dc/temp-0.0.1-SNAPSHOT.jar
 * @author pandian
 *
 */
public class DcJob extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());
        Configuration conf = job.getConfiguration();
        job.setJarByClass(this.getClass());
        job.setJobName("DcJob");
        Path path = new Path(args[2]);
        DistributedCache.addFileToClassPath(path, conf);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(DcMapper.class);
        job.setNumReduceTasks(0);
        job.waitForCompletion(true);
        return 0;
    }
    public static void main(String [] args) throws Exception{
        ToolRunner.run(new Configuration(), new DcJob(), args);
    }

}

Mapper

package org.grassfield.hadoop.dc;

import java.io.IOException;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import temp.ToUpper;

/**
 * hadoop jar FeedCategoryCount-17.jar org.grassfield.hadoop.dc.DcJob /user/hadoop/lab17/input/feedList.csv /user/hadoop/lab17/05 /user/hadoop/lab17/dc/temp-0.0.1-SNAPSHOT.jar
 * @author pandian
 *
 */
public class DcMapper
        extends Mapper<LongWritable, Text, Text, LongWritable> {
    Path[] localCacheArchives;
    Path[] localCacheFiles;

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, LongWritable>.Context context)
            throws IOException, InterruptedException {
        context.write(new Text(ToUpper.toUpper(value.toString())), key);
    }

    @Override
    protected void setup(
            Mapper<LongWritable, Text, Text, LongWritable>.Context context)
            throws IOException, InterruptedException {
        localCacheArchives = DistributedCache.getLocalCacheArchives(context.getConfiguration());
        localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
    }
}

I already have the dependency jar file in HDFS.

$ hadoop fs -ls /user/hadoop/lab17/dc
Found 1 items
-rw-r--r--   3 hadoop supergroup       4539 2016-10-08 07:36 /user/hadoop/lab17/dc/temp-0.0.1-SNAPSHOT.jar

Following is the content of the jar

$ jar -tvf temp-0.0.1-SNAPSHOT.jar
     0 Fri Oct 07 07:24:08 MYT 2016 META-INF/
   133 Fri Oct 07 07:24:06 MYT 2016 META-INF/MANIFEST.MF
     0 Fri Oct 07 07:24:08 MYT 2016 temp/
  1276 Fri Oct 07 07:24:08 MYT 2016 temp/CharTest.class
  1750 Fri Oct 07 07:24:08 MYT 2016 temp/DateParser.class
   896 Fri Oct 07 07:24:08 MYT 2016 temp/Jdbc.class
   464 Fri Oct 07 07:24:08 MYT 2016 temp/ToUpper.class
     0 Fri Oct 07 07:24:08 MYT 2016 META-INF/maven/
     0 Fri Oct 07 07:24:08 MYT 2016 META-INF/maven/temp/
     0 Fri Oct 07 07:24:08 MYT 2016 META-INF/maven/temp/temp/
   927 Fri Oct 07 07:23:26 MYT 2016 META-INF/maven/temp/temp/pom.xml
   107 Fri Oct 07 07:24:08 MYT 2016 META-INF/maven/temp/temp/pom.properties

Let’s hope this will be resolved.