Lab 21: MapReduce with Sequence File

Hi Hadoopers,

This post would be the continuation of my previous post on Sequence File. The output of my previous post is being read in this MapReduce program

This program will accept a sequence file as input and emit a text file as output.

Mapper:

package org.grassfield.nandu.etl;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SeqFileReadMapper
        extends Mapper<Text, Text, Text, Text> {

    @Override
    protected void map(Text key, Text value,
            Mapper<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        System.out.println("key:"+key+" "+key.getClass());
        System.out.println("value:"+value.toString()+" "+value.getClass());
        context.write(key, value);
    }
}

Reducer

package org.grassfield.nandu.etl;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SeqFileReadReducer
        extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        for(Text record:values){
            context.write(key, record);
        }
    }
}

Driver

package org.grassfield.nandu.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SeqFileReadJob extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new SeqFileReadJob(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());
        Configuration conf = job.getConfiguration();
        job.setJarByClass(this.getClass());
        job.setJobName("SeqFileReadJob");
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        job.setMapperClass(SeqFileReadMapper.class);
        job.setReducerClass(SeqFileReadReducer.class);
        
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setNumReduceTasks(1);
        job.waitForCompletion(true);
        return 0;
    }

}

Execution

$ hadoop jar FeedCategoryCount-21.jar org.grassfield.nandu.etl.SeqFileReadJob /user/hadoop/lab21/input/ /user/hadoop/lab21/19

$ hadoop fs -ls /user/hadoop/lab21/19
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2016-10-09 00:54 /user/hadoop/lab21/19/_SUCCESS
-rw-r--r--   3 hadoop supergroup        130 2016-10-09 00:54 /user/hadoop/lab21/19/part-r-00000
hadoop@gandhari:/opt/hadoop-2.6.4/jars$ hadoop fs -cat /user/hadoop/lab21/19/part-r-00000

$ hadoop fs -cat /user/hadoop/lab21/19/part-r-00000
0       101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati
0       101,2000
18      101,4000
27      102,3000
48      102,Bheema,Pandu,Kunti,Hidimbi
9       102,1500
Advertisements

Lab 20: Sequential File Creation

Hi hadoopers,

I have been told that Sequential files are created from many small junks of files placed in HDFS. I have lot of such files in Feed analytics project. I hope this would help me to free up considerable space in HDFS blocked by small html files.

logo-mapreduce

So, we accept a directory as input in this program. All files inside the directory would be put inside a sequential file.

This folder is my input.

$ hadoop fs -ls /user/hadoop/lab20/input
Found 2 items
-rw-r--r--   3 hadoop supergroup         79 2016-10-08 19:32 /user/hadoop/lab20/input/employee.csv
-rw-r--r--   3 hadoop supergroup         36 2016-10-08 19:32 /user/hadoop/lab20/input/salary.csv

Here is the mapper

package org.grassfield.nandu.etl;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SeqFileMapper
        extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        context.write(new Text(key.toString()), value);
    }
}

Here is the reducer

package org.grassfield.nandu.etl;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SeqFileReducer
        extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        for (Text value:values){
            context.write(key, value);
        }
    }

}

Here is the Driver

package org.grassfield.nandu.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class SeqFileJob extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());
        Configuration conf = job.getConfiguration();
        job.setJarByClass(this.getClass());
        job.setJobName("Sequential File Job");
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(SeqFileMapper.class);
        job.setReducerClass(SeqFileReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(1);
        job.waitForCompletion(true);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new SeqFileJob(), args);

    }

}

Let’s execute it.

$ hadoop jar FeedCategoryCount-20.jar org.grassfield.nandu.etl.SeqFileJob /user/hadoop/lab20/input /user/hadoop/lab20/02

$ hadoop fs -ls /user/hadoop/lab20/02
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2016-10-08 19:36 /user/hadoop/lab20/02/_SUCCESS
-rw-r--r--   3 hadoop supergroup        256 2016-10-08 19:36 /user/hadoop/lab20/02/part-r-00000
And here is the output!

$ hadoop fs -cat /user/hadoop/lab20/02/part-r-00000
SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text㜄▒Ӛ▒▒▒▒#▒▒▒▒#
                                                                   101,200020/101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati
1101,4000
2102,3000"48102,Bheema,Pandu,Kunti,Hidimbi
                                          102,1500