Lab 21: MapReduce with Sequence File

Hi Hadoopers,

This post would be the continuation of my previous post on Sequence File. The output of my previous post is being read in this MapReduce program

This program will accept a sequence file as input and emit a text file as output.

Mapper:

package org.grassfield.nandu.etl;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SeqFileReadMapper
        extends Mapper<Text, Text, Text, Text> {

    @Override
    protected void map(Text key, Text value,
            Mapper<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        System.out.println("key:"+key+" "+key.getClass());
        System.out.println("value:"+value.toString()+" "+value.getClass());
        context.write(key, value);
    }
}

Reducer

package org.grassfield.nandu.etl;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SeqFileReadReducer
        extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        for(Text record:values){
            context.write(key, record);
        }
    }
}

Driver

package org.grassfield.nandu.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SeqFileReadJob extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new SeqFileReadJob(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());
        Configuration conf = job.getConfiguration();
        job.setJarByClass(this.getClass());
        job.setJobName("SeqFileReadJob");
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        job.setMapperClass(SeqFileReadMapper.class);
        job.setReducerClass(SeqFileReadReducer.class);
        
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setNumReduceTasks(1);
        job.waitForCompletion(true);
        return 0;
    }

}

Execution

$ hadoop jar FeedCategoryCount-21.jar org.grassfield.nandu.etl.SeqFileReadJob /user/hadoop/lab21/input/ /user/hadoop/lab21/19

$ hadoop fs -ls /user/hadoop/lab21/19
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2016-10-09 00:54 /user/hadoop/lab21/19/_SUCCESS
-rw-r--r--   3 hadoop supergroup        130 2016-10-09 00:54 /user/hadoop/lab21/19/part-r-00000
hadoop@gandhari:/opt/hadoop-2.6.4/jars$ hadoop fs -cat /user/hadoop/lab21/19/part-r-00000

$ hadoop fs -cat /user/hadoop/lab21/19/part-r-00000
0       101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati
0       101,2000
18      101,4000
27      102,3000
48      102,Bheema,Pandu,Kunti,Hidimbi
9       102,1500

Lab 20: Sequential File Creation

Hi hadoopers,

I have been told that Sequential files are created from many small junks of files placed in HDFS. I have lot of such files in Feed analytics project. I hope this would help me to free up considerable space in HDFS blocked by small html files.

logo-mapreduce

So, we accept a directory as input in this program. All files inside the directory would be put inside a sequential file.

This folder is my input.

$ hadoop fs -ls /user/hadoop/lab20/input
Found 2 items
-rw-r--r--   3 hadoop supergroup         79 2016-10-08 19:32 /user/hadoop/lab20/input/employee.csv
-rw-r--r--   3 hadoop supergroup         36 2016-10-08 19:32 /user/hadoop/lab20/input/salary.csv

Here is the mapper

package org.grassfield.nandu.etl;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SeqFileMapper
        extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        context.write(new Text(key.toString()), value);
    }
}

Here is the reducer

package org.grassfield.nandu.etl;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SeqFileReducer
        extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        for (Text value:values){
            context.write(key, value);
        }
    }

}

Here is the Driver

package org.grassfield.nandu.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class SeqFileJob extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());
        Configuration conf = job.getConfiguration();
        job.setJarByClass(this.getClass());
        job.setJobName("Sequential File Job");
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(SeqFileMapper.class);
        job.setReducerClass(SeqFileReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(1);
        job.waitForCompletion(true);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new SeqFileJob(), args);

    }

}

Let’s execute it.

$ hadoop jar FeedCategoryCount-20.jar org.grassfield.nandu.etl.SeqFileJob /user/hadoop/lab20/input /user/hadoop/lab20/02

$ hadoop fs -ls /user/hadoop/lab20/02
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2016-10-08 19:36 /user/hadoop/lab20/02/_SUCCESS
-rw-r--r--   3 hadoop supergroup        256 2016-10-08 19:36 /user/hadoop/lab20/02/part-r-00000
And here is the output!

$ hadoop fs -cat /user/hadoop/lab20/02/part-r-00000
SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text㜄▒Ӛ▒▒▒▒#▒▒▒▒#
                                                                   101,200020/101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati
1101,4000
2102,3000"48102,Bheema,Pandu,Kunti,Hidimbi
                                          102,1500

Lab 19: Reducer side join

Hi Hadoopers,

I have written a post about Mapper side join in my previous post. Here is the code for reducer side join.

When we do reducer side join, we need to keep the keys as common between the mappers.

I have two input files (tables) as given below.

logo-mapreduce

Employee Mapper

Employee.csv

$ hadoop fs -cat /user/hadoop/lab19/employee.csv
101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati
102,Bheema,Pandu,Kunti,Hidimbi

This would be handled by the following Mapper class

package org.grassfield.hadoop.dc;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EmployeeMapper
        extends Mapper<Object, Text, Text, Text> {

    @Override
    protected void map(Object key, Text value,
            Mapper<Object, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        //101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati
        String eRecord  = value.toString();
        String[] part = eRecord.split(",");
        System.out.println("part:"+part.length);
        context.write(new Text(part[0]), new Text("emp\t"+part[1]+"\t"+part[2]+"\t"+part[3]+"\t"+part[4]));
    }
}

Salary Mapper

Following is the second table, we are going to use.

$ hadoop fs -cat /user/hadoop/lab19/salary.csv
101,2000
102,1500
101,4000
102,3000

This would be handled by Salary Mapper, which is given below.

package org.grassfield.hadoop.dc;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SalaryMapper
        extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String [] split = line.split(",");
        context.write(new Text(split[0]), new Text("salary\t"+split[1]));
    }
}

Mapper Output

The output of both the mappers would be as given below. You can notice that both of them have same keys.

$ hadoop fs -ls /user/hadoop/lab19/01
Found 3 items
-rw-r--r--   3 hadoop supergroup          0 2016-10-08 17:42 /user/hadoop/lab19/01/_SUCCESS
-rw-r--r--   3 hadoop supergroup         87 2016-10-08 17:42 /user/hadoop/lab19/01/part-m-00000
-rw-r--r--   3 hadoop supergroup         64 2016-10-08 17:42 /user/hadoop/lab19/01/part-m-00001

$ hadoop fs -cat /user/hadoop/lab19/01/*m*
101     emp     Duryodhana      Dhritarashtra   Gandhari        Bhanumati
102     emp     Bheema  Pandu   Kunti   Hidimbi
101     salary  2000
102     salary  1500
101     salary  4000
102     salary  3000

We use employee id as the key to join both the tables.

Employee Reducer

Following is the output of the reducer. You will see both the tables are joined together using the key employee id.

$ hadoop jar FeedCategoryCount-19.jar org.grassfield.hadoop.dc.EmployeeJob /user/hadoop/lab19/employee.csv /user/hadoop/lab19/salary.csv /user/hadoop/lab19/01

$ hadoop fs -cat /user/hadoop/lab19/01/part-r-00000
Duryodhana      Dhritarashtra   Gandhari        Bhanumati       6000
Bheema  Pandu   Kunti   Hidimbi 4500

Here is the reducer code.

package org.grassfield.hadoop.dc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class EmployeeReducer
        extends Reducer<Text, Text, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        System.out.println("processing emp id "+key);
        int count=0;
        String name=null;
        for (Text t:values){
            String record = t.toString();
            String[] split = record.split("\t");
            if (split[0].equals("salary")){
                count+=Integer.parseInt(split[1]);
            }else if (split[0].equals("emp")){
                name = split[1]+"\t"+split[2]+"\t"+split[3]+"\t"+split[4];
            }
        }
        context.write(new Text(name), new IntWritable(count));
    }

}

See you in another interesting post.

 

Lab 18: Map Side Join with DistributedCache

Hi Hadoopers,

This post talks about how to do the joins in the mapper. I have two tables. One of the table should be less in content to do this operation.

Employee.csv – This has employee id and name

$ hadoop fs -cat /user/hadoop/lab18/dc/employee.csv
101,Duryodhana
102,Bheema

Salary.csv – This has employee id and salary

$ hadoop fs -cat /user/hadoop/lab18/dc/salary.csv
101,2000
102,1500

Let’s join them with this MR code. Using DistributedCache, I’ll keep employee table in memory. I’ll use salary table as my mapper input.

Here is the output.

$ hadoop fs -cat /user/hadoop/lab18/01/part-m-00000
0       101,Duryodhana,2000
9       102,Bheema,1500

logo-mapreduce

Mapper:

package org.grassfield.hadoop.dc;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;

/**
 * Mapper Sideways join
 * @author pandian
 *
 */
public class DcFileMapper
        extends Mapper<LongWritable, Text, LongWritable, Text> {
    private Map<String, String> employeeMap = new HashMap<String, String>();

    @Override
    protected void setup(
            Mapper<LongWritable, Text, LongWritable, Text>.Context context)
            throws IOException, InterruptedException {
        //get all the fiels from DC
        Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
        if (localCacheFiles!=null){
            System.out.println("cached files:"+localCacheFiles.length);
            for (Path path:localCacheFiles){
                if (path.getName().toString().equals("employee.csv")){
                    System.out.println("path:"+path+" * "+path.toString());
                    //prepare employee map
                    populateEmployeeMap(path);
                }
            }
        }
    }
    private void populateEmployeeMap(Path path)
            throws FileNotFoundException, IOException {
        String filePath = path.toString();
        String substring = filePath.substring(filePath.indexOf(":")+1);
        BufferedReader br=new BufferedReader(new FileReader(substring));
        String line = null;
        while((line=br.readLine())!=null){
            //101,Duryodhana
            String [] employee = line.split(",");
            employeeMap.put(employee[0].trim(), employee[1].trim());
        }
        br.close();
    }
    
    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, LongWritable, Text>.Context context)
            throws IOException, InterruptedException {
        
        //101,2000
        String line = value.toString();
        String [] salary = line.split(",");
        String name = this.employeeMap.get(salary[0]);
        context.write(key, new Text(salary[0]+","+name+","+salary[1]));
    }
}

Driver

package org.grassfield.hadoop.dc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DcFileJob extends Configured implements Tool {

    @SuppressWarnings("deprecation")
    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(super.getConf());
        Configuration conf = job.getConfiguration();
        job.setJobName("Sideways join using dc");
        DistributedCache.addCacheFile(new Path("/user/hadoop/lab18/dc/employee.csv").toUri(), conf);
        job.setJarByClass(this.getClass());
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(DcFileMapper.class);
        job.setNumReduceTasks(0);
        job.waitForCompletion(true);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new DcFileJob(), args);
    }

}

Let’s execute it.

$ hadoop jar FeedCategoryCount-18.jar org.grassfield.hadoop.dc.DcFileJob /user/hadoop/lab18/dc/salary.csv /user/hadoop/lab18/01

You will find the employee.csv is being downloaded locally.

16/10/08 11:22:22 INFO mapred.LocalDistributedCacheManager: Creating symlink: /tmp/hadoop-hadoop/mapred/local/1475896942033/employee.csv <- /opt/hadoop-2.6.4/jars/employee.csv
16/10/08 11:22:22 INFO mapred.LocalDistributedCacheManager: Localized hdfs://gandhari:9000/user/hadoop/lab18/dc/employee.csv as file:/tmp/hadoop-hadoop/mapred/local/1475896942033/employee.csv

Refer to the beginning of this post for the output.

Lab 17 – Adding custom jars to MapReduce using Distributed Cache

Hi,

This is part of my blog post series on MapReduce. But let me tell you upfront, this doesn’t work. The job fails with ClassNotFound exception. I’ll come back again and update this post if I rectify this. As I spent too much time on this, I’ll use -libjars parameter and GenericOptionsParser for such requirements.

One of the other post Including Third-Party Libraries in my Map-Reduce Job (using distributed cache) says, this doesn’t work. I really don’t know how it worked for others. let’s put this on hold.

logo-mapreduce

Driver:

package org.grassfield.hadoop.dc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * hadoop jar FeedCategoryCount-17.jar org.grassfield.hadoop.dc.DcJob /user/hadoop/lab17/input/feedList.csv /user/hadoop/lab17/05 /user/hadoop/lab17/dc/temp-0.0.1-SNAPSHOT.jar
 * @author pandian
 *
 */
public class DcJob extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());
        Configuration conf = job.getConfiguration();
        job.setJarByClass(this.getClass());
        job.setJobName("DcJob");
        Path path = new Path(args[2]);
        DistributedCache.addFileToClassPath(path, conf);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(DcMapper.class);
        job.setNumReduceTasks(0);
        job.waitForCompletion(true);
        return 0;
    }
    public static void main(String [] args) throws Exception{
        ToolRunner.run(new Configuration(), new DcJob(), args);
    }

}

Mapper

package org.grassfield.hadoop.dc;

import java.io.IOException;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import temp.ToUpper;

/**
 * hadoop jar FeedCategoryCount-17.jar org.grassfield.hadoop.dc.DcJob /user/hadoop/lab17/input/feedList.csv /user/hadoop/lab17/05 /user/hadoop/lab17/dc/temp-0.0.1-SNAPSHOT.jar
 * @author pandian
 *
 */
public class DcMapper
        extends Mapper<LongWritable, Text, Text, LongWritable> {
    Path[] localCacheArchives;
    Path[] localCacheFiles;

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, LongWritable>.Context context)
            throws IOException, InterruptedException {
        context.write(new Text(ToUpper.toUpper(value.toString())), key);
    }

    @Override
    protected void setup(
            Mapper<LongWritable, Text, Text, LongWritable>.Context context)
            throws IOException, InterruptedException {
        localCacheArchives = DistributedCache.getLocalCacheArchives(context.getConfiguration());
        localCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
    }
}

I already have the dependency jar file in HDFS.

$ hadoop fs -ls /user/hadoop/lab17/dc
Found 1 items
-rw-r--r--   3 hadoop supergroup       4539 2016-10-08 07:36 /user/hadoop/lab17/dc/temp-0.0.1-SNAPSHOT.jar

Following is the content of the jar

$ jar -tvf temp-0.0.1-SNAPSHOT.jar
     0 Fri Oct 07 07:24:08 MYT 2016 META-INF/
   133 Fri Oct 07 07:24:06 MYT 2016 META-INF/MANIFEST.MF
     0 Fri Oct 07 07:24:08 MYT 2016 temp/
  1276 Fri Oct 07 07:24:08 MYT 2016 temp/CharTest.class
  1750 Fri Oct 07 07:24:08 MYT 2016 temp/DateParser.class
   896 Fri Oct 07 07:24:08 MYT 2016 temp/Jdbc.class
   464 Fri Oct 07 07:24:08 MYT 2016 temp/ToUpper.class
     0 Fri Oct 07 07:24:08 MYT 2016 META-INF/maven/
     0 Fri Oct 07 07:24:08 MYT 2016 META-INF/maven/temp/
     0 Fri Oct 07 07:24:08 MYT 2016 META-INF/maven/temp/temp/
   927 Fri Oct 07 07:23:26 MYT 2016 META-INF/maven/temp/temp/pom.xml
   107 Fri Oct 07 07:24:08 MYT 2016 META-INF/maven/temp/temp/pom.properties

Let’s hope this will be resolved.

 

java.io.IOException: Filesystem closed

Hi hadoopers,

Here is the exception that screwed up me on Saturday night and failed my Mapper task.

  • Mapper is reading the lines one by one and tokenize it.
  • The last token contains a path of a file in HDFS.
  • I need to open the file and read the contents.

For the above task, following is the flow I followed in the Mapper.

hadoop045-filesystem

Worse, my mapper failed with the following exception.

org.apache.hadoop.mapred.MapTask: Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader@1cb3ec38
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689)
at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:617)

Filesystem object is suppose to be global. When I close the filesystem, the Mapper input is also closed which breaks the complete flow. So I closed only the filestream, but I didn’t close the file system explicitly which resolved the problem.

Ref: https://github.com/linkedin/gobblin/issues/1219

 

MapReduce Job Execution Process – Job Cleanup

Hi Hadoopers,

So we are looking at the 7th circle today – which is the job clean up.

 

hadoop037-job-submission-1

MR job writes many intermediate results and junk files during the operation. Once the job is completed, these junks would occupy space on HDFS which is of no benefit any more. Hence the clean up task is launched.

hadoop043-hadoop-job-cleanup

  1. Job tracker informs all the task trackers to perform the cleanup.
  2. Individual task tracker cleans up the work folders
  3. They clean up the temporary directory
  4. Once the cleanup task is successful, Task Tracker ends the job by writing _SUCCESS file

abd120008c51cb337696000eead36d3d