ctkenifueaa6u_a-jpg-large

MapReduce Job Execution Process – Map Task Execution

Hi Hadoopers,

The user had submitted his job. He has permissions. We have slots in the cluster. Job setup is completed. We look at 4th circle given below – The Map Task Execution

hadoop037-job-submission-1

 

The below given diagram depicts the Map Task Execution.

hadoop041-map-task-execution

  1. The task tracker launches the Map  Task
  2. The Map task read the jar file given the user. This is what we write in Eclipse. In the entire frameworks, this is what our contribution🙂
    The Map task also reads the job config (input path, output path etc). It gets everything from HDFS, as all these are already uploaded to HDFS initially.
  3. The Map task reads the input splits from HDFS
  4. From the input splits, Map task creates the record.
  5. The Map task invokes the user Mapper with the record
  6. The mapper writes intermediate output
  7. The task sort them based on key and flush them to disk.
  8. Map task informs Task Tracker about the completion of the job.
ctj_g7qusaarvis-jpg-large

MapReduce Job Execution Process – Job scheduling

Hi Hadoopers,

We shall talk about 3rd circle today, as we talk about Job submission and Job initialilzation already.

hadoop037-job-submission-1

Scheduling the jobs is an interesting concept. I’m really excited to see the communication between Scheduler, Job tracker and  Task tracker.

hadoop040-job-schedule

  1. The task tracker keeps on sending heartbeats to Job Tracker about the status of the job. So, it says to Job Tracker that job is completed and it wants more jobs.
  2. Job Tracker updates the task status and make a note of Task Tracker’s message.
  3. Job Tracker goes to Scheduler asking for tasks.
  4. Scheduler updates the tasks scheduler record. Based on job scheduling policy, either it makes the job client to wait or process the job. It is based on execution policy, priority etc.
  5. Job tracker gets the task.
  6. It submits the task to the task tracker.
282

MapReduce Job Execution Process – Job initialization

Hi Hadoopers,

I wrote about the first step of the MR Job execution – Job Submission in my earlier post.

hadoop037-job-submission-1

In this post, we talk about 2nd circle, which is Job initialization.

I got the job, How will I execute it. This is what hadoop elephant is thinking with a yarn in its trunk!

hadoop039-job-init

  1. Once the job is submitted, it becomes Job Tracker’s responsibility to initialize it.
  2. The job xml uploaded at the staging directory created as given in my earlier post. Job Tracker reads it and perform the validation.
  3. Once the XML validation is completed, It goes to scheduler for job validations. Scheduler check is the user is authorized for this job, content is allowed etc.
  4. If the job validation is also successful, the job is added by the Scheduler. The schedule information is updated.
  5. Job Scheduler initializes the job.
  6. It reads the number of splits needed for the job to get executed.
  7. Tasks are created to exec the job. If we have many splits, that many map tasks would be spawned.

 

st-photo-caroline-chia

MapReduce Job Execution Process – Job Submission

Hi Hadoopers,

After publishing many posts about MapReduce code, we’ll see the MR internals like, how the MR job is submitted and executed.

hadoop037-job-submission-1

This post talks about first circle – Job Submission.

We compiled the MR code and jar is ready. We execute the job with hadoop jar xxxxxx. First the job is submitted to hadoop. There are schedulers which runs the job, based on cluster capacity and availability.

I want to scribble down quick notes on Job Submission using the below given gantt diagram.

hadoop038-job-submission-2

  1. The user submits the job to Job Client.
  2. Job client talks to Job Tracker to get the job id
  3. The job client creates a staging directory in HDFS. This is where all the files related to the job would get uploaded.
  4. The MR code and configurations with their 10 replicas of the blocks would be uploaded to Staging directory. Jar file of the job, job splits, split meta data and job.xml which has the job description would be uploaded.
  5. Splits are computed automatically and input is read.
  6. Meta data of split is uploaded to HDFS
  7. Job is submitted and it is ready to execute.
robo_647_061716115359

Lab 15: Writing unit test cases for MapReduce

Hi Hadoopers,

Here is the next interesting post. We already know (at least, we assume we already!) junit is the unit testing framework for Java. Based on that, hadoop offers us MRUnit to write unit test cases.

Here is a sample.

Our Input:

Blogger http://www.blogger.com      BSNLTeleServices | BSNL Broadband Plans, Bill Payment Selfcare Portal   noreply@blogger.com (BSNL TeleServices) http://www.bsnlteleservices.com/        Sat Sep 24 02:42:36 MYT 2016     BSNL 599 Broadband Plan | MP Combo Internet Tariff     noreply@blogger.com (BSNL TeleServices) http://feedproxy.google.com/~r/BSNLTeleservices/~3/3KqPtw353PM/bsnl-599-broadband-plan-mp-combo.html    Sat Sep 24 02:14:11 MYT 2016    [BSNL Broadband Plans, Broadband Tariff]

This reducer will identify the author of this article as highlighted.

Mapper

Mapper accepts the tab limited line and finds the appearance of the author name.

package org.grassfield.nandu.author;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.grassfield.hadoop.FeedCategoryCountMapper.MapperRCheck;
import org.grassfield.hadoop.entity.FeedEntryRecord;
import org.grassfield.hadoop.util.ParseUtil;

public class AuthorMapper
        extends Mapper<LongWritable, Text, Text, IntWritable> {
    String dateFormat = "EEE MMM dd HH:mm:ss z yyyy";
    SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
    Logger logger = Logger.getLogger(AuthorMapper.class);
    IntWritable one = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer st = new StringTokenizer(line, "\t");
        int countTokens = st.countTokens();
        if (countTokens!=10){
            context.getCounter(MapperRCheck.INVALID).increment(1);
        }else{
            context.getCounter(MapperRCheck.VALID).increment(1);
        }
        
        try {
            FeedEntryRecord record = ParseUtil.populateRecord(line, sdf);
            String name = record.getEntryAuthor();
            if (name==null){
                name = record.getFeedAuthor();
            }
            context.write(new Text(name), one);
        } catch (Exception e) {
            logger.error("Error while mapping", e);
            context.getCounter(MapperRCheck.INVALID).increment(1);
        }
    }
}

Reducer

Reducer sums the occurrences of the author names.

package org.grassfield.nandu.author;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class AuthorReducer
        extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        
        int sum=0;
        for(IntWritable value:values){
            sum+=value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}

Driver

Here is the driver class

package org.grassfield.nandu.author;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AuthorDriver extends Configured 
    implements Tool{

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        GenericOptionsParser parser = new GenericOptionsParser(conf, args);
        args = parser.getRemainingArgs();
        
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);
        
        Job job = new Job(conf, "Author count");
        job.setJarByClass(this.getClass());
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapperClass(AuthorMapper.class);
        job.setReducerClass(AuthorReducer.class);
        job.setNumReduceTasks(1);
        
        FileInputFormat.setInputPaths(job, input);
        FileOutputFormat.setOutputPath(job, output);
        
        return job.waitForCompletion(true)?0:1;
    }
    
    public static void main(String [] args) throws Exception{
        System.exit(ToolRunner.run(new Configuration(), new AuthorDriver(), args));
    }

}

Test Case

 

Let’s run the unit test case for this reducer.

package org.grassfield.nandu.author;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.junit.Before;
import org.junit.Test;

public class AuthorDriverTest {
    private Mapper<LongWritable, Text, Text, IntWritable> mapper;
    private Reducer<Text, IntWritable, Text, IntWritable> reducer;
    private MapReduceDriver<LongWritable, Text,  Text, IntWritable,  Text, IntWritable> driver;
    
    @Before
    public void setUp() throws Exception{
        mapper = new AuthorMapper();
        reducer = new AuthorReducer();
        driver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
    }
    
    @Test
    public void testRun() throws Exception{
        String line = "Blogger http://www.blogger.com      BSNLTeleServices | BSNL Broadband Plans, Bill Payment Selfcare Portal   noreply@blogger.com (BSNL TeleServices) http://www.bsnlteleservices.com/        Sat Sep 24 02:42:36 MYT 2016     BSNL 599 Broadband Plan | MP Combo Internet Tariff     noreply@blogger.com (BSNL TeleServices) http://feedproxy.google.com/~r/BSNLTeleservices/~3/3KqPtw353PM/bsnl-599-broadband-plan-mp-combo.html    Sat Sep 24 02:14:11 MYT 2016    [BSNL Broadband Plans, Broadband Tariff]";
        driver.withInput(new LongWritable(), new Text(line));
        driver.withOutput(new Text("author"), new IntWritable(1));
        driver.runTest();
    }
}

Let’s run this as a jUnit test case.

hadoop036-lab-15-mrunit

com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

java.lang.Exception: java.io.IOException: Incorrect string value: ‘\xE0\xAE\xB5\xE0\xAF\x87…’

Hi Hadoopers,

This is a nasty exception which kicked off my reducer task, which updates my MySQL table with the reducer output.

The reason behind this is unicode character.

MySQL table was created with non-unicode wester encoding. I’m trying to insert multi lingual unicode text. After changing the table collation (if needed field collation also) to utf8_bin, it worked fine.

alter table FeedEntryRecord convert to character set utf8 collate utf8_bin;

 

whyus_2

Lab 14: Sending MapReduce output to JDBC

Hi Hadoopers,

Unfortunately I couldn’t post on time, as I’ve been hit with flu. Here is the post for today. Let’s see how to send the output of Reducer to JDBC in this post. I’ll take Lab 08 – MapReduce using custom class as Key post and modify it.

logo-mapreduce
Mapper

We have no change in Mapper. It will accept the long and Text object as input and emit the custom key EntryCategory and IntWritable Output.

Reducer

Reducer will accept the output of mapper as its input, EntryCategory as key and IntWritable as value. It will emit custom key DBOutputWritable as key and NullWritable as output.

/**
 * 
 */
package org.grassfield.hadoop;

import java.io.IOException;
import java.util.Date;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.grassfield.hadoop.entity.DBOutputWritable;
import org.grassfield.hadoop.entity.EntryCategory;

/**
 * Reducer for Feed Category reducer
 * @author pandian
 *
 */
public class FeedCategoryReducer extends 
    Reducer<EntryCategory, IntWritable, DBOutputWritable, NullWritable> {

    @Override
    protected void reduce(EntryCategory key, Iterable<IntWritable> values, Context context) {
        int sum=0;
        for (IntWritable value:values){
            sum+=value.get();
        }
        DBOutputWritable db = new DBOutputWritable();
        db.setParseDate(new java.sql.Date(new Date().getTime()));
        db.setCategory(key.getCategory());
        db.setCount(sum);
        try {
            context.write(db, NullWritable.get());
        } catch (IOException | InterruptedException e) {
            System.err.println("Error while updating record in database");
            e.printStackTrace();
        }
    }
}

 

DBOutputWritable

Our bean DBOutputWritable should implement Writable and DBWritable interfaces so that we shall update the database.

package org.grassfield.hadoop.entity;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

/**
 * Bean for table feed_analytics
 * @author pandian
 *
 */
public class DBOutputWritable implements Writable, DBWritable {
    private Date parseDate;
    private String category;
    private int count;

    public Date getParseDate() {
        return parseDate;
    }

    public void setParseDate(Date parseDate) {
        this.parseDate = parseDate;
    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    @Override
    public void readFields(ResultSet arg0) throws SQLException {
        throw new RuntimeException("not implemented");
    }

    @Override
    public void write(PreparedStatement ps) throws SQLException {
        ps.setDate(1, this.parseDate);
        ps.setString(2, this.category);
        ps.setInt(3, this.count);
    }

    @Override
    public void readFields(DataInput arg0) throws IOException {
        throw new RuntimeException("not implemented");

    }

    @Override
    public void write(DataOutput arg0) throws IOException {
        throw new RuntimeException("not implemented");
    }
}

Driver

Driver is where I’ll be specifying my database details. Note the changes in output key classes and values

package org.grassfield.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.grassfield.hadoop.entity.DBOutputWritable;
import org.grassfield.hadoop.entity.EntryCategory;

/**
 * A Mapper Driver Program to count the categories in RSS XML file This may not
 * be the right approach to parse the XML. Only for demo purpose
 * 
 * @author pandian
 *
 */
public class FeedCategoryCountDriver extends Configured
        implements Tool {

    @Override
    public int run(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
        Configuration conf = getConf();
        DBConfiguration.configureDB(
                conf, 
                "com.mysql.jdbc.Driver", 
                "jdbc:mysql://localhost:3306/feed_analytics?useUnicode=true&characterEncoding=UTF-8",
                "feed_analytics",
                "P@ssw0rd");
        GenericOptionsParser parser = new GenericOptionsParser(conf,
                args);
        args = parser.getRemainingArgs();

        Path input = new Path(args[0]);

        Job job = new Job(conf, "Feed Category Count");
        job.setJarByClass(getClass());

        job.setMapOutputKeyClass(EntryCategory.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(DBOutputWritable.class);
        job.setOutputValueClass(NullWritable.class);
        job.setOutputFormatClass(DBOutputFormat.class);
        
        job.setMapperClass(FeedCategoryCountMapper.class);
        job.setPartitionerClass(FeedCategoryPartitioner.class);
        job.setCombinerClass(FeedCategoryCombiner.class);
        job.setReducerClass(FeedCategoryReducer.class);
        job.setNumReduceTasks(3);
        
        try {
            FileInputFormat.setInputPaths(job, input);
            DBOutputFormat.setOutput(job, 
                    "feed_category", //table name
                    new String[]{"parseDate", "category", "count"}    //fields
            );
        } catch (IOException e) {
            e.printStackTrace();
        }

        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(),
                new FeedCategoryCountDriver(), args));
    }
}

Add the mysql driver to Maven dependencies. If you don’t use Maven, use the library as external jar dependency.

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>

****Copy the jar to HadoopHome/lib/native and HadoopHome/share/hadoop/mapreduce/lib/***

Restart hadoop deamons.

Table Structure & DB setup

Let’s create our table first.

mysql-php

CREATE TABLE `feed_category` (
`id` bigint(20) NOT NULL,
`parseDate` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
`category` varchar(100) COLLATE utf8_bin NOT NULL,
`count` int(11) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

ALTER TABLE `feed_category`
ADD PRIMARY KEY (`id`);

ALTER TABLE `feed_category`
MODIFY `id` bigint(20) NOT NULL AUTO_INCREMENT, AUTO_INCREMENT=151;

Execution

Let’s execute now.

hadoop@gandhari:/opt/hadoop-2.6.4/jars$ hadoop jar FeedCategoryCount-14.jar org.grassfield.hadoop.FeedCategoryCountDriver /user/hadoop/feed/2016-09-24

16/09/24 08:35:46 INFO mapreduce.Job: Counters: 40
        File System Counters
                FILE: Number of bytes read=128167
                FILE: Number of bytes written=1162256
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=1948800
                HDFS: Number of bytes written=0
                HDFS: Number of read operations=12
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=0
        Map-Reduce Framework
                Map input records=1107
                Map output records=623
                Map output bytes=19536
                Map output materialized bytes=4279
                Input split bytes=113
                Combine input records=623
                Combine output records=150
                Reduce input groups=150
                Reduce shuffle bytes=4279
                Reduce input records=150
                Reduce output records=150
                Spilled Records=300
                Shuffled Maps =3
                Failed Shuffles=0
                Merged Map outputs=3
                GC time elapsed (ms)=0
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
                Total committed heap usage (bytes)=1885339648
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=487200
        File Output Format Counters
                Bytes Written=0
        org.grassfield.hadoop.FeedCategoryCountMapper$MapperRCheck
                INVALID=35
                VALID=1072

So, is my table populated?

hadoop035-lab-14-jdbc-output

yes it is.

com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

Have a good weekend guys. Let me take some rest before moving to MRUnit.