Lab 14: Sending MapReduce output to JDBC

Hi Hadoopers,

Unfortunately I couldn’t post on time, as I’ve been hit with flu. Here is the post for today. Let’s see how to send the output of Reducer to JDBC in this post. I’ll take Lab 08 – MapReduce using custom class as Key post and modify it.

logo-mapreduce
Mapper

We have no change in Mapper. It will accept the long and Text object as input and emit the custom key EntryCategory and IntWritable Output.

Reducer

Reducer will accept the output of mapper as its input, EntryCategory as key and IntWritable as value. It will emit custom key DBOutputWritable as key and NullWritable as output.

/**
 * 
 */
package org.grassfield.hadoop;

import java.io.IOException;
import java.util.Date;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.grassfield.hadoop.entity.DBOutputWritable;
import org.grassfield.hadoop.entity.EntryCategory;

/**
 * Reducer for Feed Category reducer
 * @author pandian
 *
 */
public class FeedCategoryReducer extends 
    Reducer<EntryCategory, IntWritable, DBOutputWritable, NullWritable> {

    @Override
    protected void reduce(EntryCategory key, Iterable<IntWritable> values, Context context) {
        int sum=0;
        for (IntWritable value:values){
            sum+=value.get();
        }
        DBOutputWritable db = new DBOutputWritable();
        db.setParseDate(new java.sql.Date(new Date().getTime()));
        db.setCategory(key.getCategory());
        db.setCount(sum);
        try {
            context.write(db, NullWritable.get());
        } catch (IOException | InterruptedException e) {
            System.err.println("Error while updating record in database");
            e.printStackTrace();
        }
    }
}

 

DBOutputWritable

Our bean DBOutputWritable should implement Writable and DBWritable interfaces so that we shall update the database.

package org.grassfield.hadoop.entity;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

/**
 * Bean for table feed_analytics
 * @author pandian
 *
 */
public class DBOutputWritable implements Writable, DBWritable {
    private Date parseDate;
    private String category;
    private int count;

    public Date getParseDate() {
        return parseDate;
    }

    public void setParseDate(Date parseDate) {
        this.parseDate = parseDate;
    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    @Override
    public void readFields(ResultSet arg0) throws SQLException {
        throw new RuntimeException("not implemented");
    }

    @Override
    public void write(PreparedStatement ps) throws SQLException {
        ps.setDate(1, this.parseDate);
        ps.setString(2, this.category);
        ps.setInt(3, this.count);
    }

    @Override
    public void readFields(DataInput arg0) throws IOException {
        throw new RuntimeException("not implemented");

    }

    @Override
    public void write(DataOutput arg0) throws IOException {
        throw new RuntimeException("not implemented");
    }
}

Driver

Driver is where I’ll be specifying my database details. Note the changes in output key classes and values

package org.grassfield.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.grassfield.hadoop.entity.DBOutputWritable;
import org.grassfield.hadoop.entity.EntryCategory;

/**
 * A Mapper Driver Program to count the categories in RSS XML file This may not
 * be the right approach to parse the XML. Only for demo purpose
 * 
 * @author pandian
 *
 */
public class FeedCategoryCountDriver extends Configured
        implements Tool {

    @Override
    public int run(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
        Configuration conf = getConf();
        DBConfiguration.configureDB(
                conf, 
                "com.mysql.jdbc.Driver", 
                "jdbc:mysql://localhost:3306/feed_analytics?useUnicode=true&characterEncoding=UTF-8",
                "feed_analytics",
                "P@ssw0rd");
        GenericOptionsParser parser = new GenericOptionsParser(conf,
                args);
        args = parser.getRemainingArgs();

        Path input = new Path(args[0]);

        Job job = new Job(conf, "Feed Category Count");
        job.setJarByClass(getClass());

        job.setMapOutputKeyClass(EntryCategory.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(DBOutputWritable.class);
        job.setOutputValueClass(NullWritable.class);
        job.setOutputFormatClass(DBOutputFormat.class);
        
        job.setMapperClass(FeedCategoryCountMapper.class);
        job.setPartitionerClass(FeedCategoryPartitioner.class);
        job.setCombinerClass(FeedCategoryCombiner.class);
        job.setReducerClass(FeedCategoryReducer.class);
        job.setNumReduceTasks(3);
        
        try {
            FileInputFormat.setInputPaths(job, input);
            DBOutputFormat.setOutput(job, 
                    "feed_category", //table name
                    new String[]{"parseDate", "category", "count"}    //fields
            );
        } catch (IOException e) {
            e.printStackTrace();
        }

        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(),
                new FeedCategoryCountDriver(), args));
    }
}

Add the mysql driver to Maven dependencies. If you don’t use Maven, use the library as external jar dependency.

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>

****Copy the jar to HadoopHome/lib/native and HadoopHome/share/hadoop/mapreduce/lib/***

Restart hadoop deamons.

Table Structure & DB setup

Let’s create our table first.

mysql-php

CREATE TABLE `feed_category` (
`id` bigint(20) NOT NULL,
`parseDate` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
`category` varchar(100) COLLATE utf8_bin NOT NULL,
`count` int(11) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

ALTER TABLE `feed_category`
ADD PRIMARY KEY (`id`);

ALTER TABLE `feed_category`
MODIFY `id` bigint(20) NOT NULL AUTO_INCREMENT, AUTO_INCREMENT=151;

Execution

Let’s execute now.

hadoop@gandhari:/opt/hadoop-2.6.4/jars$ hadoop jar FeedCategoryCount-14.jar org.grassfield.hadoop.FeedCategoryCountDriver /user/hadoop/feed/2016-09-24

16/09/24 08:35:46 INFO mapreduce.Job: Counters: 40
        File System Counters
                FILE: Number of bytes read=128167
                FILE: Number of bytes written=1162256
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=1948800
                HDFS: Number of bytes written=0
                HDFS: Number of read operations=12
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=0
        Map-Reduce Framework
                Map input records=1107
                Map output records=623
                Map output bytes=19536
                Map output materialized bytes=4279
                Input split bytes=113
                Combine input records=623
                Combine output records=150
                Reduce input groups=150
                Reduce shuffle bytes=4279
                Reduce input records=150
                Reduce output records=150
                Spilled Records=300
                Shuffled Maps =3
                Failed Shuffles=0
                Merged Map outputs=3
                GC time elapsed (ms)=0
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
                Total committed heap usage (bytes)=1885339648
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=487200
        File Output Format Counters
                Bytes Written=0
        org.grassfield.hadoop.FeedCategoryCountMapper$MapperRCheck
                INVALID=35
                VALID=1072

So, is my table populated?

hadoop035-lab-14-jdbc-output

yes it is.

com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

Have a good weekend guys. Let me take some rest before moving to MRUnit.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s