Lab 30: Hive Queries

Hi hadoopers,

I have a program that will extract RSS feeds from different sources in tab limited text file. I used to Hive to do some mining today. Let’s see the results.

pighive3

The file has 12 fields separated by tab. Here is the table description.


CREATE external TABLE IF NOT EXISTS feed_article (feedgenerator STRING,feedtitle STRING,feed_author STRING,feed_url STRING,feed_time STRING,item_subject STRING,item_author STRING,itemurl STRING,itemdate STRING,category STRING,DescriptionFile STRING,uniqueId bigint) ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t'      STORED AS TEXTFILE     LOCATION '/user/hadoop/lab27';

hive> describe feed_article;
OK
feedgenerator           string
feedtitle               string
feed_author             string
feed_url                string
feed_time               string
item_subject            string
item_author             string
itemurl                 string
itemdate                string
category                string
descriptionfile         string
uniqueid                bigint
Time taken: 0.062 seconds, Fetched: 12 row(s)

Count how many articles published today.


hive> select count(*) from feed_article;
OK
3699
Time taken: 1.624 seconds, Fetched: 1 row(s)

List of distinct authors today.


hive> select distinct item_author from feed_article;
,Alok Deshpande
-தி.இன்பராஜ்-
-பா.ராஜா
A.D.Balasubramaniyan
A.T.S Pandian
AFP
AP
Aekaanthan
Aishwarya Parikh
Akanksha Jain
Alex Barile

Lets see which site has lot of articles


hive> select feedtitle, count(*) from feed_article group by feedtitle;
NULL    139
A Wandering Mind        1
APMdigest Hot Topics: APM       2
Application Performance Monitoring Blog | AppDynamics   1
BSNLTeleServices | BSNL Broadband Plans, Bill Payment Selfcare Portal   3
Bangalore Aviation      1
Blog Feed       1
Cloudera Engineering Blog       1
DailyThanthi.com        20

Who wrote many articles today?

hive> select item_author, count (*) from feed_article group by item_author order by item_author desc limit 5;
OK
ஹாவேரி, 1
ஹரி கிருஷ்ணன்     14
ஹரன் பிரசன்னா     2
ஸ்கிரீனன்  4
ஷங்கர்    2
Time taken: 2.476 seconds, Fetched: 5 row(s)

Author of which website wrote many article today?

hive> hive> select item_author, feedtitle, count (*) from feed_article group by item_author, feedtitle order by item_author desc limit 10;
ஹாவேரி, Dinamani - பெங்களூரு - http://www.dinamani.com/all-editions/edition-bangalore/ 1
ஹரி கிருஷ்ணன்     Dinamani - தினந்தோறும் திருப்புகழ் - http://www.dinamani.com/specials/dinanthorum-thirupugal/     14
ஹரன் பிரசன்னா     ஹரன் பிரசன்னா     2
ஸ்கிரீனன்  தி இந்து - முகப்பு        1
ஸ்கிரீனன்  தி இந்து - தமிழ் சினிமா   1
ஸ்கிரீனன்  தி இந்து - சினிமா        2
ஷங்கர்    தி இந்து - சினிமா        1
ஷங்கர்    தி இந்து - முகப்பு        1
வெங்கடேசன். ஆர்    Dinamani - வேலைவாய்ப்பு - http://www.dinamani.com/employment/  32
வெங்கடேசன். ஆர்    Dinamani - விவசாயம் - http://www.dinamani.com/agriculture/    2
Time taken: 2.493 seconds, Fetched: 10 row(s)

Using which feed software the articles were published.


hive> select feedgenerator, count (*) from feed_article group by feedgenerator order by feedgenerator desc limit 10;
https://wordpress.org/?v=4.6.1  5
https://wordpress.org/?v=4.5.4  80
https://wordpress.org/?v=4.5.2  1
http://wordpress.org/?v=4.2.10  2
http://wordpress.org/?v=4.1.4   7
http://wordpress.org/?v=3.5.1   10
http://wordpress.org/?v=3.0     1
http://wordpress.com/   13
application/rss+xml     3434
Jive Engage 8.0.2.0  (http://jivesoftware.com/products/)        1
Time taken: 2.473 seconds, Fetched: 10 row(s)

Lab 29: External tables in Hive

Hi Hadoopers,

My previous post demonstrated how to create the tables in hive and how to retrieve the content from the same. All of those tables are ‘internal’ tables, ie., those are hive managed tables. If you drop any table or database, you will see those get deleted from the warehouse folder.

But we used to work a lot with external tables. When you drop an external table, the metastore information (only) will get deleted. The content will still be available in HDFS.

Hence this post will show you how to create external tables.

pighive3

Table creation

I have a CSV file in /user/hadoop/lab28/input. I’ll use it to create the external table.

$ hadoop fs -cat /user/hadoop/lab28/input/employee.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

Let’s create the table employee_ext as given below. The structure is same as that of the one I used in my earlier blog post, except the location.

hive> CREATE external TABLE IF NOT EXISTS employee_ext (eid INT, ename STRING, eage INT, edesig STRING) ROW FORMAT DELIMITED     FIELDS TERMINATED BY ','     STORED AS TEXTFILE     LOCATION '/user/hadoop/lab28/input';
OK
Time taken: 0.086 seconds

Let’s verify if the table is created.

hive> show tables;
OK
demo1
employee_ext
Time taken: 0.08 seconds, Fetched: 2 row(s)

Let’s check the meta store now.

mysql> select * from TBLS;
+--------+-------------+-------+------------------+--------+-----------+-------+--------------+----------------+--------------------+--------------------+----------------+
| TBL_ID | CREATE_TIME | DB_ID | LAST_ACCESS_TIME | OWNER  | RETENTION | SD_ID | TBL_NAME     | TBL_TYPE       | VIEW_EXPANDED_TEXT | VIEW_ORIGINAL_TEXT | LINK_TARGET_ID |
+--------+-------------+-------+------------------+--------+-----------+-------+--------------+----------------+--------------------+--------------------+----------------+
|      1 |  1471989527 |     1 |                0 | hadoop |         0 |     1 | demo1        | MANAGED_TABLE  | NULL               | NULL               |           NULL |
|      7 |  1476505836 |     6 |                0 | hadoop |         0 |     7 | employee     | MANAGED_TABLE  | NULL               | NULL               |           NULL |
|     11 |  1476519584 |     1 |                0 | hadoop |         0 |    11 | employee_ext | EXTERNAL_TABLE | NULL               | NULL               |           NULL |
+--------+-------------+-------+------------------+--------+-----------+-------+--------------+----------------+--------------------+--------------------+----------------+
3 rows in set (0.00 sec)

The table employee_ext is registered in meta store. Let’s start querying it now.

Querying

Show all records from employee_ext

hive> select * from employee_ext;
OK
1        Dharma NULL     Sr Manager
2        Bheema NULL     Cook
3        Arjuna NULL     Instructor
4        Nakula NULL     Jr Instructor
5        Sahadeva       NULL     Jr Instructor
Time taken: 0.123 seconds, Fetched: 5 row(s)

Describing the table.

hive> describe employee_ext;
OK
eid                     int
ename                   string
eage                    int
edesig                  string
Time taken: 0.044 seconds, Fetched: 4 row(s)

hive> describe formatted employee_ext;
OK
# col_name              data_type               comment

eid                     int
ename                   string
eage                    int
edesig                  string

# Detailed Table Information
Database:               default
Owner:                  hadoop
CreateTime:             Sat Oct 15 16:26:18 MYT 2016
LastAccessTime:         UNKNOWN
Retention:              0
Location:               hdfs://gandhari:9000/user/hadoop/lab28/input
Table Type:             EXTERNAL_TABLE
Table Parameters:
EXTERNAL                TRUE
transient_lastDdlTime   1476519978

# Storage Information
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
field.delim             ,
serialization.format    ,
Time taken: 0.053 seconds, Fetched: 30 row(s)

Let’s update one table using another query result.Let’s create a new table first. This is a hive managed table.

hive> CREATE TABLE IF NOT EXISTS employee_new (eid INT, ename STRING, eage INT, edesig STRING) ROW FORMAT DELIMITED     FIELDS TERMINATED BY ','     STORED AS TEXTFILE;
OK
Time taken: 0.101 seconds

Let’s update the new table now.

hive> INSERT OVERWRITE TABLE employee_new SELECT * FROM employee_ext limit 2;
hive> select * from employee_new;
OK
2        Bheema NULL     Cook
1        Dharma NULL     Sr Manager
Time taken: 0.078 seconds, Fetched: 2 row(s)

It is updated as expected. Let’s see what happened in HDFS. The WareHouse folder contains a MR result.

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ hadoop fs -ls /user/hive/warehouse/employee_new;
Found 1 items
-rwxrwxr-x   3 hadoop supergroup         44 2016-10-15 16:38 /user/hive/warehouse/employee_new/000000_0

Let’s try to cat the file.

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ hadoop fs -cat /user/hive/warehouse/employee_new/000000_0
2, Bheema,\N, Cook
1, Dharma,\N, Sr Manager

Lab 28: Getting started with Apache Hive

Hi Hadoopers,

We had an interesting journey into Apache Pig. I’m happy to start another blog post series on Apache Hive today.

I’m running short of time as I’m rushing to conclude my training schedule. So I’m do not have much time to draw the architecture diagram and explain it. When I do a revisit, I may be able to do it. Let me write about my first lab exercise in this post.

pighive3

As usual, the data would be available in HDFS. But Hive, gives a virtual tabular format using metastore database. Hence it maintains a local or remote database to store the relationship. Using this relationship, it reads the HDFS data. Hence this blog post will show you the commands and the meta store in parallel.

Show Databases

The command to see the list of tables is given below.

hive>  show databases;
OK
default
Time taken: 1.225 seconds, Fetched: 1 row(s)

The database information is stored in DBS table in meta store. Let’s see the content of DBS.

mysql> use metastore;
mysql> show tables;
+---------------------------+
| Tables_in_metastore       |
+---------------------------+
| BUCKETING_COLS            |
| CDS                       |
| COLUMNS_V2                |
| DATABASE_PARAMS           |
| DBS                       |
| DB_PRIVS                  |
| DELEGATION_TOKENS         |
| FUNCS                     |
| FUNC_RU                   |
| GLOBAL_PRIVS              |
| IDXS                      |
| INDEX_PARAMS              |
| MASTER_KEYS               |
| NUCLEUS_TABLES            |
| PARTITIONS                |
| PARTITION_EVENTS          |
| PARTITION_KEYS            |
| PARTITION_KEY_VALS        |
| PARTITION_PARAMS          |
| PART_COL_PRIVS            |
| PART_COL_STATS            |
| PART_PRIVS                |
| ROLES                     |
| ROLE_MAP                  |
| SDS                       |
| SD_PARAMS                 |
| SEQUENCE_TABLE            |
| SERDES                    |
| SERDE_PARAMS              |
| SKEWED_COL_NAMES          |
| SKEWED_COL_VALUE_LOC_MAP  |
| SKEWED_STRING_LIST        |
| SKEWED_STRING_LIST_VALUES |
| SKEWED_VALUES             |
| SORT_COLS                 |
| TABLE_PARAMS              |
| TAB_COL_STATS             |
| TBLS                      |
| TBL_COL_PRIVS             |
| TBL_PRIVS                 |
| TYPES                     |
| TYPE_FIELDS               |
| VERSION                   |
+---------------------------+
43 rows in set (0.00 sec)
mysql> select * from DBS;
+-------+-----------------------+------------------------------------------+---------+------------+------------+
| DB_ID | DESC                  | DB_LOCATION_URI                          | NAME    | OWNER_TYPE | OWNER_NAME |
+-------+-----------------------+------------------------------------------+---------+------------+------------+
|     1 | Default Hive database | hdfs://gandhari:9000/user/hive/warehouse | default | ROLE       | public     |
+-------+-----------------------+------------------------------------------+---------+------------+------------+
1 row in set (0.00 sec)

Database Creation

Let’s create a database my name mydatabase.

 hive> create database mydatabase;
 OK
 Time taken: 0.286 secondsmysql> select * from DBS;
+-------+-----------------------+--------------------------------------------------------+------------+------------+------------+
| DB_ID | DESC                  | DB_LOCATION_URI                                        | NAME       | OWNER_TYPE | OWNER_NAME |
+-------+-----------------------+--------------------------------------------------------+------------+------------+------------+
|     1 | Default Hive database | hdfs://gandhari:9000/user/hive/warehouse               | default    | ROLE       | public     |
|     6 | NULL                  | hdfs://gandhari:9000/user/hive/warehouse/mydatabase.db | mydatabase | USER       | hadoop     |
+-------+-----------------------+--------------------------------------------------------+------------+------------+------------+
2 rows in set (0.00 sec)

New database is created with id 6 now.

Here is the change on the HDFS side. You will see a .db folder created in the warehouse folder of Hive.

hadoop@gandhari:/opt/hadoop-2.6.4$ hadoop fs -ls /user/hive/warehouse
Found 2 items
drwxrwxr-x   - hadoop supergroup          0 2016-08-24 05:58 /user/hive/warehouse/demo1
drwxrwxr-x   - hadoop supergroup          0 2016-10-15 12:30 /user/hive/warehouse/mydatabase.db

Table Creation

Let’s create a table now.

 hive> use mydatabase;
 OK
 Time taken: 0.016 seconds
 hive>

 hive> CREATE TABLE IF NOT EXISTS employee (eid INT, ename STRING, eage INT, edesig STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;
 OK
 Time taken: 0.103 seconds

We created employee table, correct? What update did the meta store have now?

mysql> select * from TBLS;
+--------+-------------+-------+------------------+--------+-----------+-------+----------+---------------+--------------------+--------------------+----------------+
| TBL_ID | CREATE_TIME | DB_ID | LAST_ACCESS_TIME | OWNER  | RETENTION | SD_ID | TBL_NAME | TBL_TYPE      | VIEW_EXPANDED_TEXT | VIEW_ORIGINAL_TEXT | LINK_TARGET_ID |
+--------+-------------+-------+------------------+--------+-----------+-------+----------+---------------+--------------------+--------------------+----------------+
|      1 |  1471989527 |     1 |                0 | hadoop |         0 |     1 | demo1    | MANAGED_TABLE | NULL               | NULL               |           NULL |
|      7 |  1476505836 |     6 |                0 | hadoop |         0 |     7 | employee | MANAGED_TABLE | NULL               | NULL               |           NULL |
+--------+-------------+-------+------------------+--------+-----------+-------+----------+---------------+--------------------+--------------------+----------------+
2 rows in set (0.00 sec)

A new row is added in TBLS table for employee.

What about the change on HDFS. you see another directory is created in the warehouse folder.

hadoop@gandhari:/opt/hadoop-2.6.4$ hadoop fs -ls /user/hive/warehouse/mydatabase.db
Found 1 items
drwxrwxr-x   - hadoop supergroup          0 2016-10-15 12:30 /user/hive/warehouse/mydatabase.db/employee

Loading the content

We have a local CSV file for employees as given below.

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ cat employee.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

To load the file from unix file system to Hive, we use the below given statement.

 hive> LOAD DATA LOCAL INPATH '/opt/hadoop-2.6.4/hivefarm/employee.csv' OVERWRITE INTO TABLE employee;
 Loading data to table mydatabase.employee
 OK
 Time taken: 0.664 seconds

The CSV file is uploaded to hdfs.

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ hadoop fs -ls /user/hive/warehouse/mydatabase.db/employee
Found 1 items
-rwxrwxr-x   3 hadoop supergroup        264 2016-10-15 12:51 /user/hive/warehouse/mydatabase.db/employee/employee.csv

Querying

Let’s start querying this.

 hive> select * from employee;
 OK
 1        Dharma NULL     Sr Manager
 2        Bheema NULL     Cook
 3        Arjuna NULL     Instructor
 4        Nakula NULL     Jr Instructor
 5        Sahadeva       NULL     Jr Instructor
 1        Dharma NULL     Sr Manager
 2        Bheema NULL     Cook
 3        Arjuna NULL     Instructor
 4        Nakula NULL     Jr Instructor
 5        Sahadeva       NULL     Jr Instructor
 Time taken: 2.612 seconds, Fetched: 10 row(s)

 hive> select * from employee limit 5;
 OK
 1        Dharma NULL     Sr Manager
 2        Bheema NULL     Cook
 3        Arjuna NULL     Instructor
 4        Nakula NULL     Jr Instructor
 5        Sahadeva       NULL     Jr Instructor
 Time taken: 0.136 seconds, Fetched: 5 row(s)

Querying with Map Reduce

Aggregation functions use Map Reduce to give you the results. Here is one example.

 hive> select count(*) from employee;
 Total jobs = 1
 Launching Job 1 out of 1
 Number of reduce tasks determined at compile time: 1
 In order to change the average load for a reducer (in bytes):
   set hive.exec.reducers.bytes.per.reducer=<number>
 In order to limit the maximum number of reducers:
   set hive.exec.reducers.max=<number>
 In order to set a constant number of reducers:
   set mapreduce.job.reduces=<number>
 Job running in-process (local Hadoop)
 2016-10-15 12:55:39,909 Stage-1 map = 100%,  reduce = 100%
 Ended Job = job_local1840713229_0001
 MapReduce Jobs Launched:
 Stage-Stage-1:  HDFS Read: 1584 HDFS Write: 528 SUCCESS
 Total MapReduce CPU Time Spent: 0 msec
 OK
 10
 Time taken: 2.535 seconds, Fetched: 1 row(s)

Store the output

The following statement would save the output to local Unix FS.

 hive> INSERT OVERWRITE LOCAL DIRECTORY '/opt/hadoop/hivefarm/mr-result.csv' SELECT * FROM employee;
 WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
 Query ID = hadoop_20161015125721_f2ac0333-f444-47af-b3c7-57a079d6ca2a
 Total jobs = 1
 Launching Job 1 out of 1
 Number of reduce tasks is set to 0 since there's no reduce operator
 Job running in-process (local Hadoop)
 2016-10-15 12:57:23,074 Stage-1 map = 100%,  reduce = 0%
 Ended Job = job_local783745060_0002
 Moving data to local directory /opt/hadoop/hivefarm/mr-result.csv
 MapReduce Jobs Launched:
 Stage-Stage-1:  HDFS Read: 1056 HDFS Write: 264 SUCCESS
 Total MapReduce CPU Time Spent: 0 msec
 OK
 Time taken: 1.44 seconds

Let’s verify this output.

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ cat mr-result.csv/000000_0
1 Dharma\N Sr Manager
2 Bheema\N Cook
3 Arjuna\N Instructor
4 Nakula\N Jr Instructor
5 Sahadeva\N Jr Instructor
1 Dharma\N Sr Manager
2 Bheema\N Cook
3 Arjuna\N Instructor
4 Nakula\N Jr Instructor
5 Sahadeva\N Jr Instructor

Now let’s store the output to HDFS itself.

hive> INSERT OVERWRITE DIRECTORY '/user/hadoop/lab27' SELECT * FROM employee;

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ hadoop fs -ls /user/hadoop/lab27
Found 1 items
-rwxr-xr-x   3 hadoop supergroup        254 2016-10-15 13:05 /user/hadoop/lab27/000000_0

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ hadoop fs -cat /user/hadoop/lab27/000000_0
1 Dharma\N Sr Manager
2 Bheema\N Cook
3 Arjuna\N Instructor
4 Nakula\N Jr Instructor
5 Sahadeva\N Jr Instructor
1 Dharma\N Sr Manager
2 Bheema\N Cook
3 Arjuna\N Instructor
4 Nakula\N Jr Instructor
5 Sahadeva\N Jr Instructor

Drop

Finally, we can drop table or database as any other SQL.

hive> drop table employee;
OK
Time taken: 2.405 seconds

Lab 27: Filtering operators in Pig

Hi Hadoopers,

This blog post would demonstrate the filtering capabilities of Pig.

pig-on-elephant

Filter by

This is similar to WHERE clause in SQL. We have a table for employee as below.

grunt> fs -cat employee.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

Relation for employee is defined as below.

grunt> employee = load 'employee.csv' using PigStorage (',') as (eid:int,ename:chararray,eage:int,edes:chararray);

Let’s get the employee whose id is 1.

e1 = filter employee by eid==1;
dump e1;
(1, Dharma,45, Sr Manager)

Let’s get the employee whose name is Nakula.

e2 = filter employee by ename == ' Nakula';
dump e2;
(4, Nakula,35, Jr Instructor)

Distinct

As the name implies, let’s select the distinct records. Following is our input file.


grunt> fs -cat employee3.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

Lets create a relation for the employee.

grunt> employee = load 'employee3.csv' using PigStorage (',') as (eid:int,ename:chararray,eage:int,edes:chararray);

We are creating another relation for the distinct records. Here you go.


grunt> edist = distinct employee;

grunt> dump edist;
(1, Dharma,45, Sr Manager)
(2, Bheema,43, Cook)
(3, Arjuna,41, Instructor)
(4, Nakula,35, Jr Instructor)
(5, Sahadeva,33, Jr Instructor)

For-each

And finally, for – each.

Let’s iterate through the edist relation I have shown above.


grunt> foreachdata = foreach edist generate eid, ename;

(1, Dharma)
(2, Bheema)
(3, Arjuna)
(4, Nakula)
(5, Sahadeva)

Lab 26: Achieving Partitioner with Apache Pig – Split

Hi Hadoopers,

Partitioner is a powerful concept in MapReduce. I have given an example in Lab 06 – A simple hadoop partitioner

Let’s see how to implement same concept using Pig – It’s simple – using split!

pig-on-elephant

Let’s take a simple employee table as below


grunt> fs  -cat employee.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

Let’s create the employee relation as given below.


grunt> employee = load 'employee.csv' using PigStorage (',') as (eid:int,ename:chararray,eage:int,edes:chararray);

Now I’ll split employees based on age. Above 40, Below 40.


grunt> split employee into employee1 if eage>=40, employee2 if (eage<40);

grunt> dump employee1
(1, Dharma,45, Sr Manager)
(2, Bheema,43, Cook)
(3, Arjuna,41, Instructor)

grunt> dump employee2
(4, Nakula,35, Jr Instructor)
(5, Sahadeva,33, Jr Instructor)

Note – You need to specify the else column – it is must.

Lab 25: Combiners using Pig Union

Hi Friends,

I archive the blog entries and news entries on daily basis using one of my projects jatomrss. It saves the news articles as tab limited text

$ hadoop fs -ls /user/hadoop/feed/*.jatomrss.log
-rw-r--r--   3 hadoop supergroup    5933545 2016-10-01 21:44 /user/hadoop/feed/2016-10-01.jatomrss.log
-rw-r--r--   3 hadoop supergroup    6313692 2016-10-02 17:03 /user/hadoop/feed/2016-10-02.jatomrss.log
-rw-r--r--   3 hadoop supergroup     596174 2016-10-03 06:37 /user/hadoop/feed/2016-10-03.jatomrss.log
-rw-r--r--   3 hadoop supergroup     827974 2016-10-04 06:53 /user/hadoop/feed/2016-10-04.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1367507 2016-10-05 07:41 /user/hadoop/feed/2016-10-05.jatomrss.log
-rw-r--r--   3 hadoop supergroup      10927 2016-10-06 07:29 /user/hadoop/feed/2016-10-06.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1536870 2016-10-07 06:24 /user/hadoop/feed/2016-10-07.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1719126 2016-10-08 07:13 /user/hadoop/feed/2016-10-08.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1870073 2016-10-09 09:36 /user/hadoop/feed/2016-10-09.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1376982 2016-10-11 05:11 /user/hadoop/feed/2016-10-11.jatomrss.log

Lets use Union operators to combine the feeds on Oct 1st and 2nd.

pig-on-elephant

The relation for October 1st is given as below.


feedEntryRecord20161001 = load '/user/hadoop/feed/2016-10-01.jatomrss.log' using PigStorage ('\t') as (generator:chararray, feedTitle:chararray, feedAuthor:chararray, feedUrl:chararray, feed_time:chararray, entrySubject:chararray, entryAuthor:chararray, entryUrl:chararray, entryDate:chararray, categorySet:chararray, descriptionFile:chararray, uniqueId:chararray);

The relation for October 2nd is given as below.


feedEntryRecord20161002 = load '/user/hadoop/feed/2016-10-02.jatomrss.log' using PigStorage ('\t') as (generator:chararray, feedTitle:chararray, feedAuthor:chararray, feedUrl:chararray, feed_time:chararray, entrySubject:chararray, entryAuthor:chararray, entryUrl:chararray, entryDate:chararray, categorySet:chararray, descriptionFile:chararray, uniqueId:chararray);

Let’s describe them.


grunt> <span style="color: #ff0000;">describe feedEntryRecord20161001;</span>
feedEntryRecord20161001: {generator: chararray,feedTitle: chararray,feedAuthor: chararray,feedUrl: chararray,feed_time: chararray,entrySubject: chararray,entryAuthor: chararray,entryUrl: chararray,entryDate: chararray,categorySet: chararray,descriptionFile: chararray,uniqueId: chararray}
grunt> <span style="color: #ff0000;">describe feedEntryRecord20161002</span>
feedEntryRecord20161002: {generator: chararray,feedTitle: chararray,feedAuthor: chararray,feedUrl: chararray,feed_time: chararray,entrySubject: chararray,entryAuthor: chararray,entryUrl: chararray,entryDate: chararray,categorySet: chararray,descriptionFile: chararray,uniqueId: chararray}

Here is the combiner! To make the combiner (union) to work, we need to identical columns in both data. Yes. we do have.


union_data = union feedEntryRecord20161001, feedEntryRecord20161002;

grunt> describe union_data;
union_data: {generator: chararray,feedTitle: chararray,feedAuthor: chararray,feedUrl: chararray,feed_time: chararray,entrySubject: chararray,entryAuthor: chararray,entryUrl: chararray,entryDate: chararray,categorySet: chararray,descriptionFile: chararray,uniqueId: chararray}

Let's store it to check if the combiner works as expected.

store union_data into '/user/hadoop/lab25/union_data' using PigStorage ('\t');

Input(s):
Successfully read 10612 records from: "/user/hadoop/feed/2016-10-02.jatomrss.log"
Successfully read 10295 records from: "/user/hadoop/feed/2016-10-01.jatomrss.log"

Output(s):
Successfully stored 20907 records (206484670 bytes) in: "/user/hadoop/lab25/union_data"

Counters:
Total records written : 20907
Total bytes written : 206484670
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_local1564505665_0014

Lab 24: Joins using Pig

Let’s make some joins today, dear hadoopers.

pig-on-elephant

Join

We have a table for employee.

grunt> dump employee;
 (1, Dharma,45, Sr Manager)
 (2, Bheema,43, Cook)
 (3, Arjuna,41, Instructor)
 (4, Nakula,35, Jr Instructor)
 (5, Sahadeva,33, Jr Instructor)

We have another table salary as given below.

grunt> dump salary;

(1,2000)
 (2,1500)
 (3,1000)
 (4,500)
 (5,500)

Let’s join salary table with employee table on emp_id field.

grunt> salary_table = JOIN employee by emp_id, salary by emp_id;

grunt> dump salary_table;
 Input(s):
 Successfully read 5 records from: "hdfs://gandhari:9000/user/hadoop/lab23/salary.csv"
 Successfully read 5 records from: "hdfs://gandhari:9000/user/hadoop/lab23/employee.csv"

Output(s):
 Successfully stored 5 records (16134806 bytes) in: "hdfs://gandhari:9000/tmp/temp-2101978857/tmp596113376"

Counters:
 Total records written : 5
 Total bytes written : 16134806
 Spillable Memory Manager spill count : 0
 Total bags proactively spilled: 0
 Total records proactively spilled: 0

(1, Dharma,45, Sr Manager,1,2000)
 (2, Bheema,43, Cook,2,1500)
 (3, Arjuna,41, Instructor,3,1000)
 (4, Nakula,35, Jr Instructor,4,500)
 (5, Sahadeva,33, Jr Instructor,5,500)

Inner Join

The employee table looks like this.

(1, dharma,45, Sr Manager)
 (2, Bheema,43, Cook)
 (3, Arjuna,41, Instructor)
 (4, Nakula,35, Jr Instructor)
 (5, Sahadeva,33, Jr Instructor)

grunt> employee = load 'employee.csv' using PigStorage (',') as (emp_id:int,emp_name:chararray,emp_age:int,emp_desig:chararray);

grunt> describe employee;
 employee: {emp_id: int,emp_name: chararray,emp_age: int,emp_desig: chararray}

There is a change in salary table. No salary for Arjuna this month 🙂 Id 3 is missing in salary table.

grunt> dump salary;
 (1,2000)
 (2,1500)
 (4,500)
 (5,500)

grunt> salary = load 'salary.csv' using PigStorage (',') as (emp_id:int,emp_salary:int);

grunt> describe salary;
 salary: {emp_id: int,emp_salary: int}

Lets do an inner join now.

grunt> employee_salary = join employee by emp_id, salary by emp_id;

grunt> describe employee_salary;
 employee_salary: {employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray,salary::emp_id: int,salary::emp_salary: int}

grunt> illustrate employee_salary;

--------------------------------------------------------------------------------------------------
 | employee     | emp_id:int    | emp_name:chararray    | emp_age:int    | emp_desig:chararray    |
 --------------------------------------------------------------------------------------------------
 |              | 5             |  Sahadeva             | 33             |  Jr Instructor         |
 |              | 5             |  Sahadeva             | 33             |  Jr Instructor         |
 --------------------------------------------------------------------------------------------------
 --------------------------------------------------
 | salary     | emp_id:int    | emp_salary:int    |
 --------------------------------------------------
 |            | 5             | 500               |
 |            | 5             | 500               |
 --------------------------------------------------
 -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 | employee_salary     | employee::emp_id:int    | employee::emp_name:chararray    | employee::emp_age:int    | employee::emp_desig:chararray    | salary::emp_id:int    | salary::emp_salary:int    |
 -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 |                     | 5                       |  Sahadeva                       | 33                       |  Jr Instructor                   | 5                     | 500                       |
 |                     | 5                       |  Sahadeva                       | 33                       |  Jr Instructor                   | 5                     | 500                       |
 |                     | 5                       |  Sahadeva                       | 33                       |  Jr Instructor                   | 5                     | 500                       |
 |                     | 5                       |  Sahadeva                       | 33                       |  Jr Instructor                   | 5                     | 500                       |
 -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

grunt> dump employee_salary;

(1, dharma,45, Sr Manager,1,2000)
 (2, Bheema,43, Cook,2,1500)
 (4, Nakula,35, Jr Instructor,4,500)
 (5, Sahadeva,33, Jr Instructor,5,500)

Left Outer Join

grunt> left_outer = join employee by emp_id left outer, salary by emp_id;

grunt> describe left_outer;
 left_outer: {employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray,salary::emp_id: int,salary::emp_salary: int}

grunt> dump left_outer;

(1, dharma,45, Sr Manager,1,2000)
 (2, Bheema,43, Cook,2,1500)
 (3, Arjuna,41, Instructor,,)
 (4, Nakula,35, Jr Instructor,4,500)
 (5, Sahadeva,33, Jr Instructor,5,500)

We get all the records from employee table with null value for those value is not given in salary table.

Right Outer Join

Lets do a right outer join now

grunt> right_outer = join salary by emp_id right, employee by emp_id;

grunt> describe right_outer;
 right_outer: {salary::emp_id: int,salary::emp_salary: int,employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray}

grunt> dump right_outer;

(1,2000,1, dharma,45, Sr Manager)
 (2,1500,2, Bheema,43, Cook)
 (,,3, Arjuna,41, Instructor)
 (4,500,4, Nakula,35, Jr Instructor)
 (5,500,5, Sahadeva,33, Jr Instructor)

We could see all the records from right table even if there is no match in the left table.

Full outer join

Last topic of this blog post – Lets do a full outer join, to record from each table when there is at least a single match.

grunt> full_outer = join employee by emp_id full outer, salary by emp_id;
 grunt> describe full_outer;
 full_outer: {employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray,salary::emp_id: int,salary::emp_salary: int}

grunt> dump full_outer;

(1, dharma,45, Sr Manager,1,2000)
 (2, Bheema,43, Cook,2,1500)
 (3, Arjuna,41, Instructor,,)
 (4, Nakula,35, Jr Instructor,4,500)
 (5, Sahadeva,33, Jr Instructor,5,500)

Can we join by two keys? yes. join employee by (emp_id, emp_name)!

Cross

Lets do a cross to bring the X relationship between tables. To make it simpler, I have taken less data in our employee table.

grunt> employee = load 'employee.csv' using PigStorage (',') as (emp_id:int,emp_name:chararray,emp_age:int,emp_desig:chararray);

grunt> dump employee;

(1, Dharma,45, Sr Manager)
(2, Bheema,43, Cook)

grunt> leave = load 'leave.csv' using PigStorage (',') as (emp_id:int,emp_leave_date:chararray);

grunt> dump leave;

(1,2016-10-11)
(2,2016-10-12)

grunt> cross_data = cross employee, leave;

grunt> describe cross_data;
cross_data: {employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray,leave::emp_id: int,leave::emp_leave_date: chararray}

grunt> dump cross_data;

(2, Bheema,43, Cook,2,2016-10-12)
(2, Bheema,43, Cook,1,2016-10-11)
(1, Dharma,45, Sr Manager,2,2016-10-12)
(1, Dharma,45, Sr Manager,1,2016-10-11)

See you in another interesting post.

 

Lab 23: Apache Pig Basics

Hi ETL  enthusiast,

This post will talk about important concepts in Pig.

pig-on-elephant

PIG Conventions
Convention Description Example
( ) tuple data type (John,18,4.0F)
{ } bag data type (1,{(1,2,3)})
(4,{(4,2,1),(4,3,3)})
(8,{(8,3,4)})
[ ] map data type [name#John,phone#5551212]

Relations, Bags, Tuples, Fields

Field is a piece of data. Eg, Jessica

Tuple is an ordered set of fields (Jessica, F, 35, NY)

Bag is a collection of Tuples {(Jessica, F, 35, NY),(Nathan, M, 35, NJ)}

Data Types

Data Types
Simple and Complex
Simple Types Description Example
int Signed 32-bit integer 10
long Signed 64-bit integer Data:     10L or 10l
Display: 10L
float 32-bit floating point Data:     10.5F or 10.5f or 10.5e2f or 10.5E2F
Display: 10.5F or 1050.0F
double 64-bit floating point Data:     10.5 or 10.5e2 or 10.5E2
Display: 10.5 or 1050.0
chararray Character array (string) in Unicode UTF-8 format hello world
bytearray Byte array (blob)  
boolean boolean true/false (case insensitive)
datetime datetime 1970-01-01T00:00:00.000+00:00
Complex Types    
tuple An ordered set of fields. (19,2)
bag An collection of tuples. {(19,2), (18,1)}
map A set of key value pairs. [open#apache]

Nulls, Operators, and Functions

Operator Interaction
Comparison operators:

==, !=

>, <

>=, <=

If either subexpression is null, the result is null.
Comparison operator:

matches

If either the string being matched against or the string defining the match is null, the result is null.
Arithmetic operators:

+ , -, *, /

% modulo

? : bincond

If either subexpression is null, the resulting expression is null.
Null operator:

is null

If the tested value is null, returns true; otherwise, returns false (see Null Operators).
Null operator:

is not null

If the tested value is not null, returns true; otherwise, returns false (see Null Operators).
Dereference operators:

tuple (.) or map (#)

If the de-referenced tuple or map is null, returns null.
Operators:

COGROUP, GROUP, JOIN

These operators handle nulls differently (see examples below).
Function:

COUNT_STAR

This function counts all values, including nulls.
Cast operator Casting a null from one type to another type results in a null.
Functions:

AVG, MIN, MAX, SUM, COUNT

These functions ignore nulls.
Function:

CONCAT

If either subexpression is null, the resulting expression is null.
Function:

SIZE

If the tested object is null, returns null.

Operators

Following are the operators available in Pig. The items marked in red colour are used in this post.

Operator Description
Loading and Storing
LOAD To Load the data from the file system (local/HDFS) into a relation.
STORE To save a relation to the file system (local/HDFS).
Filtering
FILTER To remove unwanted rows from a relation.
DISTINCT To remove duplicate rows from a relation.
FOREACH, GENERATE To generate data transformations based on columns of data.
STREAM To transform a relation using an external program.
Grouping and Joining
JOIN To join two or more relations.
COGROUP To group the data in two or more relations.
GROUP To group the data in a single relation.
CROSS To create the cross product of two or more relations.
Sorting
ORDER To arrange a relation in a sorted order based on one or more fields (ascending or descending).
LIMIT To get a limited number of tuples from a relation.
Combining and Splitting
UNION To combine two or more relations into a single relation.
SPLIT To split a single relation into two or more relations.
Diagnostic Operators
DUMP To print the contents of a relation on the console.
DESCRIBE To describe the schema of a relation.
EXPLAIN To view the logical, physical, or MapReduce execution plans to compute a relation.
ILLUSTRATE To view the step-by-step execution of a series of statements.

Pig Latin statements

Let’s use the following as a sample table

emp id name age Desig
1 Dharma 45 Sr Manager
2 Bheema 43 Cook
3 Arjuna 41 Instructor
4 Nakula 35 Jr Instructor
5 Sahadeva 33 Jr Instructor

grunt> fs -cat lab23/employee.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

pig-hadoop

Load, Describe, illustrate, dump

grunt> employee = load ‘lab23/employee.csv’ using PigStorage(‘,’) as (emp_id:int,emp_name:chararray,emp_age:int,emp_desig:chararray);

Relation name: employee
Input file path: lab23/employee.csv (I have used relative path)
Storage function: We have used the PigStorage() function. It loads and stores data as structured text files. Default delimiter is \t. We use comma here.

grunt> describe employee;
employee: {emp_id: int,emp_name: chararray,emp_age: int,emp_desig: chararray}

grunt> illustrate employee
————————————————————————————————–
| employee     | emp_id:int    | emp_name:chararray    | emp_age:int    | emp_desig:chararray    |
————————————————————————————————–
|              | 5             |  Sahadeva             |  33            |  Jr Instructor         |
————————————————————————————————–

grunt> dump employee;
(1, Dharma,45, Sr Manager)
(2, Bheema,43, Cook)
(3, Arjuna,41, Instructor)
(4, Nakula,35, Jr Instructor)
(5, Sahadeva,33, Jr Instructor)

This will execute a MapReduce job to read data from HDFS and print the content on the screen.

grunt> explain employee;

#————————————————–
# Map Reduce Plan
#————————————————–
MapReduce node scope-119
Map Plan
employee: Store(fakefile:org.apache.pig.builtin.PigStorage) – scope-118
|
|—employee: New For Each(false,false,false,false)[bag] – scope-117
|   |
|   Cast[int] – scope-106
|   |
|   |—Project[bytearray][0] – scope-105
|   |
|   Cast[chararray] – scope-109
|   |
|   |—Project[bytearray][1] – scope-108
|   |
|   Cast[int] – scope-112
|   |
|   |—Project[bytearray][2] – scope-111
|   |
|   Cast[chararray] – scope-115
|   |
|   |—Project[bytearray][3] – scope-114
|
|—employee: Load(hdfs://gandhari:9000/user/hadoop/lab23/employee.csv:PigStorage(‘,’)) – scope-104——–
Global sort: false
—————-

Group

grunt> edesig = group employee by emp_desig;

grunt> dump edesig;
( Cook,{(2, Bheema,43, Cook)})
( Instructor,{(3, Arjuna,41, Instructor)})
( Sr Manager,{(1, Dharma,45, Sr Manager)})
( Jr Instructor,{(5, Sahadeva,33, Jr Instructor),(4, Nakula,35, Jr Instructor)})

I’m just thinking in terms of map reduce. By this time, I need to write a mapper extending Mapper, a reducer extending Reducer and a driver 🙂

Let’s do one more grouping by age

grunt> eage = group employee by emp_age;

grunt> dump eage;
(33,{(5, Sahadeva,33, Jr Instructor)})
(35,{(4, Nakula,35, Jr Instructor)})
(41,{(3, Arjuna,41, Instructor)})
(43,{(2, Bheema,43, Cook)})
(45,{(1, Dharma,45, Sr Manager)})

Let’s do a grouping based on both the columns.

grunt> e_age_desig = group employee by (emp_age, emp_desig);

grunt> dump e_age_desig;
((33, Jr Instructor),{(5, Sahadeva,33, Jr Instructor)})
((35, Jr Instructor),{(4, Nakula,35, Jr Instructor)})
((41, Instructor),{(3, Arjuna,41, Instructor)})
((43, Cook),{(2, Bheema,43, Cook)})
((45, Sr Manager),{(1, Dharma,45, Sr Manager)})

Yes. Writing MapReduce for simple tasks are time consuming. Rather we need to deploy the right tools to get the job done.

Co-group

In addition to employee, we have one other table, student as given below

grunt> fs -cat lab23/student.csv;
1, Duryodhana, 15, 11
2, Dushasana, 14, 10
3, Dushala, 13, 9
4, Dronacharya,45, 12

grunt> student = load ‘lab23/student.csv’ using PigStorage(‘,’) as (stud_id:int,stud_name:chararray,stud_age:int,stud_class:chararray);

grunt> illustrate student;
————————————————————————————————-
| student     | stud_id:int   | stud_name:chararray   | stud_age:int   | stud_class:chararray   |
————————————————————————————————-
|             | 3             |  Dushala              |  13            |  9                     |
————————————————————————————————-

grunt> cogroupdata = COGROUP student by stud_age, employee by emp_age;

(13,{(3, Dushala,13, 9)},{})
(14,{(2, Dushasana,14, 10)},{})
(15,{(1, Duryodhana,15, 11)},{})
(33,{},{(5, Sahadeva,33, Jr Instructor)})
(35,{},{(4, Nakula,35, Jr Instructor)})
(41,{},{(3, Arjuna,41, Instructor)})
(43,{},{(2, Bheema,43, Cook)})
(45,{(4, Dronacharya,45, 12)},{(1, Dharma,45, Sr Manager)})

Co group is similar to group operator. But it happens across two different relations.

 

Ref: http://pig.apache.org

Lab 21: MapReduce with Sequence File

Hi Hadoopers,

This post would be the continuation of my previous post on Sequence File. The output of my previous post is being read in this MapReduce program

This program will accept a sequence file as input and emit a text file as output.

Mapper:

package org.grassfield.nandu.etl;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SeqFileReadMapper
        extends Mapper<Text, Text, Text, Text> {

    @Override
    protected void map(Text key, Text value,
            Mapper<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        System.out.println("key:"+key+" "+key.getClass());
        System.out.println("value:"+value.toString()+" "+value.getClass());
        context.write(key, value);
    }
}

Reducer

package org.grassfield.nandu.etl;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SeqFileReadReducer
        extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        for(Text record:values){
            context.write(key, record);
        }
    }
}

Driver

package org.grassfield.nandu.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SeqFileReadJob extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new SeqFileReadJob(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());
        Configuration conf = job.getConfiguration();
        job.setJarByClass(this.getClass());
        job.setJobName("SeqFileReadJob");
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        job.setMapperClass(SeqFileReadMapper.class);
        job.setReducerClass(SeqFileReadReducer.class);
        
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setNumReduceTasks(1);
        job.waitForCompletion(true);
        return 0;
    }

}

Execution

$ hadoop jar FeedCategoryCount-21.jar org.grassfield.nandu.etl.SeqFileReadJob /user/hadoop/lab21/input/ /user/hadoop/lab21/19

$ hadoop fs -ls /user/hadoop/lab21/19
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2016-10-09 00:54 /user/hadoop/lab21/19/_SUCCESS
-rw-r--r--   3 hadoop supergroup        130 2016-10-09 00:54 /user/hadoop/lab21/19/part-r-00000
hadoop@gandhari:/opt/hadoop-2.6.4/jars$ hadoop fs -cat /user/hadoop/lab21/19/part-r-00000

$ hadoop fs -cat /user/hadoop/lab21/19/part-r-00000
0       101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati
0       101,2000
18      101,4000
27      102,3000
48      102,Bheema,Pandu,Kunti,Hidimbi
9       102,1500

Lab 20: Sequential File Creation

Hi hadoopers,

I have been told that Sequential files are created from many small junks of files placed in HDFS. I have lot of such files in Feed analytics project. I hope this would help me to free up considerable space in HDFS blocked by small html files.

logo-mapreduce

So, we accept a directory as input in this program. All files inside the directory would be put inside a sequential file.

This folder is my input.

$ hadoop fs -ls /user/hadoop/lab20/input
Found 2 items
-rw-r--r--   3 hadoop supergroup         79 2016-10-08 19:32 /user/hadoop/lab20/input/employee.csv
-rw-r--r--   3 hadoop supergroup         36 2016-10-08 19:32 /user/hadoop/lab20/input/salary.csv

Here is the mapper

package org.grassfield.nandu.etl;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SeqFileMapper
        extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        context.write(new Text(key.toString()), value);
    }
}

Here is the reducer

package org.grassfield.nandu.etl;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SeqFileReducer
        extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        for (Text value:values){
            context.write(key, value);
        }
    }

}

Here is the Driver

package org.grassfield.nandu.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class SeqFileJob extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf());
        Configuration conf = job.getConfiguration();
        job.setJarByClass(this.getClass());
        job.setJobName("Sequential File Job");
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(SeqFileMapper.class);
        job.setReducerClass(SeqFileReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(1);
        job.waitForCompletion(true);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new SeqFileJob(), args);

    }

}

Let’s execute it.

$ hadoop jar FeedCategoryCount-20.jar org.grassfield.nandu.etl.SeqFileJob /user/hadoop/lab20/input /user/hadoop/lab20/02

$ hadoop fs -ls /user/hadoop/lab20/02
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2016-10-08 19:36 /user/hadoop/lab20/02/_SUCCESS
-rw-r--r--   3 hadoop supergroup        256 2016-10-08 19:36 /user/hadoop/lab20/02/part-r-00000
And here is the output!

$ hadoop fs -cat /user/hadoop/lab20/02/part-r-00000
SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text㜄▒Ӛ▒▒▒▒#▒▒▒▒#
                                                                   101,200020/101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati
1101,4000
2102,3000"48102,Bheema,Pandu,Kunti,Hidimbi
                                          102,1500