Lab 30: Hive Queries

Hi hadoopers,

I have a program that will extract RSS feeds from different sources in tab limited text file. I used to Hive to do some mining today. Let’s see the results.

pighive3

The file has 12 fields separated by tab. Here is the table description.


CREATE external TABLE IF NOT EXISTS feed_article (feedgenerator STRING,feedtitle STRING,feed_author STRING,feed_url STRING,feed_time STRING,item_subject STRING,item_author STRING,itemurl STRING,itemdate STRING,category STRING,DescriptionFile STRING,uniqueId bigint) ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t'      STORED AS TEXTFILE     LOCATION '/user/hadoop/lab27';

hive> describe feed_article;
OK
feedgenerator           string
feedtitle               string
feed_author             string
feed_url                string
feed_time               string
item_subject            string
item_author             string
itemurl                 string
itemdate                string
category                string
descriptionfile         string
uniqueid                bigint
Time taken: 0.062 seconds, Fetched: 12 row(s)

Count how many articles published today.


hive> select count(*) from feed_article;
OK
3699
Time taken: 1.624 seconds, Fetched: 1 row(s)

List of distinct authors today.


hive> select distinct item_author from feed_article;
,Alok Deshpande
-தி.இன்பராஜ்-
-பா.ராஜா
A.D.Balasubramaniyan
A.T.S Pandian
AFP
AP
Aekaanthan
Aishwarya Parikh
Akanksha Jain
Alex Barile

Lets see which site has lot of articles


hive> select feedtitle, count(*) from feed_article group by feedtitle;
NULL    139
A Wandering Mind        1
APMdigest Hot Topics: APM       2
Application Performance Monitoring Blog | AppDynamics   1
BSNLTeleServices | BSNL Broadband Plans, Bill Payment Selfcare Portal   3
Bangalore Aviation      1
Blog Feed       1
Cloudera Engineering Blog       1
DailyThanthi.com        20

Who wrote many articles today?

hive> select item_author, count (*) from feed_article group by item_author order by item_author desc limit 5;
OK
ஹாவேரி, 1
ஹரி கிருஷ்ணன்     14
ஹரன் பிரசன்னா     2
ஸ்கிரீனன்  4
ஷங்கர்    2
Time taken: 2.476 seconds, Fetched: 5 row(s)

Author of which website wrote many article today?

hive> hive> select item_author, feedtitle, count (*) from feed_article group by item_author, feedtitle order by item_author desc limit 10;
ஹாவேரி, Dinamani - பெங்களூரு - http://www.dinamani.com/all-editions/edition-bangalore/ 1
ஹரி கிருஷ்ணன்     Dinamani - தினந்தோறும் திருப்புகழ் - http://www.dinamani.com/specials/dinanthorum-thirupugal/     14
ஹரன் பிரசன்னா     ஹரன் பிரசன்னா     2
ஸ்கிரீனன்  தி இந்து - முகப்பு        1
ஸ்கிரீனன்  தி இந்து - தமிழ் சினிமா   1
ஸ்கிரீனன்  தி இந்து - சினிமா        2
ஷங்கர்    தி இந்து - சினிமா        1
ஷங்கர்    தி இந்து - முகப்பு        1
வெங்கடேசன். ஆர்    Dinamani - வேலைவாய்ப்பு - http://www.dinamani.com/employment/  32
வெங்கடேசன். ஆர்    Dinamani - விவசாயம் - http://www.dinamani.com/agriculture/    2
Time taken: 2.493 seconds, Fetched: 10 row(s)

Using which feed software the articles were published.


hive> select feedgenerator, count (*) from feed_article group by feedgenerator order by feedgenerator desc limit 10;
https://wordpress.org/?v=4.6.1  5
https://wordpress.org/?v=4.5.4  80
https://wordpress.org/?v=4.5.2  1
http://wordpress.org/?v=4.2.10  2
http://wordpress.org/?v=4.1.4   7
http://wordpress.org/?v=3.5.1   10
http://wordpress.org/?v=3.0     1
http://wordpress.com/   13
application/rss+xml     3434
Jive Engage 8.0.2.0  (http://jivesoftware.com/products/)        1
Time taken: 2.473 seconds, Fetched: 10 row(s)

Advertisements

Lab 29: External tables in Hive

Hi Hadoopers,

My previous post demonstrated how to create the tables in hive and how to retrieve the content from the same. All of those tables are ‘internal’ tables, ie., those are hive managed tables. If you drop any table or database, you will see those get deleted from the warehouse folder.

But we used to work a lot with external tables. When you drop an external table, the metastore information (only) will get deleted. The content will still be available in HDFS.

Hence this post will show you how to create external tables.

pighive3

Table creation

I have a CSV file in /user/hadoop/lab28/input. I’ll use it to create the external table.

$ hadoop fs -cat /user/hadoop/lab28/input/employee.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

Let’s create the table employee_ext as given below. The structure is same as that of the one I used in my earlier blog post, except the location.

hive> CREATE external TABLE IF NOT EXISTS employee_ext (eid INT, ename STRING, eage INT, edesig STRING) ROW FORMAT DELIMITED     FIELDS TERMINATED BY ','     STORED AS TEXTFILE     LOCATION '/user/hadoop/lab28/input';
OK
Time taken: 0.086 seconds

Let’s verify if the table is created.

hive> show tables;
OK
demo1
employee_ext
Time taken: 0.08 seconds, Fetched: 2 row(s)

Let’s check the meta store now.

mysql> select * from TBLS;
+--------+-------------+-------+------------------+--------+-----------+-------+--------------+----------------+--------------------+--------------------+----------------+
| TBL_ID | CREATE_TIME | DB_ID | LAST_ACCESS_TIME | OWNER  | RETENTION | SD_ID | TBL_NAME     | TBL_TYPE       | VIEW_EXPANDED_TEXT | VIEW_ORIGINAL_TEXT | LINK_TARGET_ID |
+--------+-------------+-------+------------------+--------+-----------+-------+--------------+----------------+--------------------+--------------------+----------------+
|      1 |  1471989527 |     1 |                0 | hadoop |         0 |     1 | demo1        | MANAGED_TABLE  | NULL               | NULL               |           NULL |
|      7 |  1476505836 |     6 |                0 | hadoop |         0 |     7 | employee     | MANAGED_TABLE  | NULL               | NULL               |           NULL |
|     11 |  1476519584 |     1 |                0 | hadoop |         0 |    11 | employee_ext | EXTERNAL_TABLE | NULL               | NULL               |           NULL |
+--------+-------------+-------+------------------+--------+-----------+-------+--------------+----------------+--------------------+--------------------+----------------+
3 rows in set (0.00 sec)

The table employee_ext is registered in meta store. Let’s start querying it now.

Querying

Show all records from employee_ext

hive> select * from employee_ext;
OK
1        Dharma NULL     Sr Manager
2        Bheema NULL     Cook
3        Arjuna NULL     Instructor
4        Nakula NULL     Jr Instructor
5        Sahadeva       NULL     Jr Instructor
Time taken: 0.123 seconds, Fetched: 5 row(s)

Describing the table.

hive> describe employee_ext;
OK
eid                     int
ename                   string
eage                    int
edesig                  string
Time taken: 0.044 seconds, Fetched: 4 row(s)

hive> describe formatted employee_ext;
OK
# col_name              data_type               comment

eid                     int
ename                   string
eage                    int
edesig                  string

# Detailed Table Information
Database:               default
Owner:                  hadoop
CreateTime:             Sat Oct 15 16:26:18 MYT 2016
LastAccessTime:         UNKNOWN
Retention:              0
Location:               hdfs://gandhari:9000/user/hadoop/lab28/input
Table Type:             EXTERNAL_TABLE
Table Parameters:
EXTERNAL                TRUE
transient_lastDdlTime   1476519978

# Storage Information
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
field.delim             ,
serialization.format    ,
Time taken: 0.053 seconds, Fetched: 30 row(s)

Let’s update one table using another query result.Let’s create a new table first. This is a hive managed table.

hive> CREATE TABLE IF NOT EXISTS employee_new (eid INT, ename STRING, eage INT, edesig STRING) ROW FORMAT DELIMITED     FIELDS TERMINATED BY ','     STORED AS TEXTFILE;
OK
Time taken: 0.101 seconds

Let’s update the new table now.

hive> INSERT OVERWRITE TABLE employee_new SELECT * FROM employee_ext limit 2;
hive> select * from employee_new;
OK
2        Bheema NULL     Cook
1        Dharma NULL     Sr Manager
Time taken: 0.078 seconds, Fetched: 2 row(s)

It is updated as expected. Let’s see what happened in HDFS. The WareHouse folder contains a MR result.

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ hadoop fs -ls /user/hive/warehouse/employee_new;
Found 1 items
-rwxrwxr-x   3 hadoop supergroup         44 2016-10-15 16:38 /user/hive/warehouse/employee_new/000000_0

Let’s try to cat the file.

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ hadoop fs -cat /user/hive/warehouse/employee_new/000000_0
2, Bheema,\N, Cook
1, Dharma,\N, Sr Manager

Lab 28: Getting started with Apache Hive

Hi Hadoopers,

We had an interesting journey into Apache Pig. I’m happy to start another blog post series on Apache Hive today.

I’m running short of time as I’m rushing to conclude my training schedule. So I’m do not have much time to draw the architecture diagram and explain it. When I do a revisit, I may be able to do it. Let me write about my first lab exercise in this post.

pighive3

As usual, the data would be available in HDFS. But Hive, gives a virtual tabular format using metastore database. Hence it maintains a local or remote database to store the relationship. Using this relationship, it reads the HDFS data. Hence this blog post will show you the commands and the meta store in parallel.

Show Databases

The command to see the list of tables is given below.

hive>  show databases;
OK
default
Time taken: 1.225 seconds, Fetched: 1 row(s)

The database information is stored in DBS table in meta store. Let’s see the content of DBS.

mysql> use metastore;
mysql> show tables;
+---------------------------+
| Tables_in_metastore       |
+---------------------------+
| BUCKETING_COLS            |
| CDS                       |
| COLUMNS_V2                |
| DATABASE_PARAMS           |
| DBS                       |
| DB_PRIVS                  |
| DELEGATION_TOKENS         |
| FUNCS                     |
| FUNC_RU                   |
| GLOBAL_PRIVS              |
| IDXS                      |
| INDEX_PARAMS              |
| MASTER_KEYS               |
| NUCLEUS_TABLES            |
| PARTITIONS                |
| PARTITION_EVENTS          |
| PARTITION_KEYS            |
| PARTITION_KEY_VALS        |
| PARTITION_PARAMS          |
| PART_COL_PRIVS            |
| PART_COL_STATS            |
| PART_PRIVS                |
| ROLES                     |
| ROLE_MAP                  |
| SDS                       |
| SD_PARAMS                 |
| SEQUENCE_TABLE            |
| SERDES                    |
| SERDE_PARAMS              |
| SKEWED_COL_NAMES          |
| SKEWED_COL_VALUE_LOC_MAP  |
| SKEWED_STRING_LIST        |
| SKEWED_STRING_LIST_VALUES |
| SKEWED_VALUES             |
| SORT_COLS                 |
| TABLE_PARAMS              |
| TAB_COL_STATS             |
| TBLS                      |
| TBL_COL_PRIVS             |
| TBL_PRIVS                 |
| TYPES                     |
| TYPE_FIELDS               |
| VERSION                   |
+---------------------------+
43 rows in set (0.00 sec)
mysql> select * from DBS;
+-------+-----------------------+------------------------------------------+---------+------------+------------+
| DB_ID | DESC                  | DB_LOCATION_URI                          | NAME    | OWNER_TYPE | OWNER_NAME |
+-------+-----------------------+------------------------------------------+---------+------------+------------+
|     1 | Default Hive database | hdfs://gandhari:9000/user/hive/warehouse | default | ROLE       | public     |
+-------+-----------------------+------------------------------------------+---------+------------+------------+
1 row in set (0.00 sec)

Database Creation

Let’s create a database my name mydatabase.

 hive> create database mydatabase;
 OK
 Time taken: 0.286 secondsmysql> select * from DBS;
+-------+-----------------------+--------------------------------------------------------+------------+------------+------------+
| DB_ID | DESC                  | DB_LOCATION_URI                                        | NAME       | OWNER_TYPE | OWNER_NAME |
+-------+-----------------------+--------------------------------------------------------+------------+------------+------------+
|     1 | Default Hive database | hdfs://gandhari:9000/user/hive/warehouse               | default    | ROLE       | public     |
|     6 | NULL                  | hdfs://gandhari:9000/user/hive/warehouse/mydatabase.db | mydatabase | USER       | hadoop     |
+-------+-----------------------+--------------------------------------------------------+------------+------------+------------+
2 rows in set (0.00 sec)

New database is created with id 6 now.

Here is the change on the HDFS side. You will see a .db folder created in the warehouse folder of Hive.

hadoop@gandhari:/opt/hadoop-2.6.4$ hadoop fs -ls /user/hive/warehouse
Found 2 items
drwxrwxr-x   - hadoop supergroup          0 2016-08-24 05:58 /user/hive/warehouse/demo1
drwxrwxr-x   - hadoop supergroup          0 2016-10-15 12:30 /user/hive/warehouse/mydatabase.db

Table Creation

Let’s create a table now.

 hive> use mydatabase;
 OK
 Time taken: 0.016 seconds
 hive>

 hive> CREATE TABLE IF NOT EXISTS employee (eid INT, ename STRING, eage INT, edesig STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;
 OK
 Time taken: 0.103 seconds

We created employee table, correct? What update did the meta store have now?

mysql> select * from TBLS;
+--------+-------------+-------+------------------+--------+-----------+-------+----------+---------------+--------------------+--------------------+----------------+
| TBL_ID | CREATE_TIME | DB_ID | LAST_ACCESS_TIME | OWNER  | RETENTION | SD_ID | TBL_NAME | TBL_TYPE      | VIEW_EXPANDED_TEXT | VIEW_ORIGINAL_TEXT | LINK_TARGET_ID |
+--------+-------------+-------+------------------+--------+-----------+-------+----------+---------------+--------------------+--------------------+----------------+
|      1 |  1471989527 |     1 |                0 | hadoop |         0 |     1 | demo1    | MANAGED_TABLE | NULL               | NULL               |           NULL |
|      7 |  1476505836 |     6 |                0 | hadoop |         0 |     7 | employee | MANAGED_TABLE | NULL               | NULL               |           NULL |
+--------+-------------+-------+------------------+--------+-----------+-------+----------+---------------+--------------------+--------------------+----------------+
2 rows in set (0.00 sec)

A new row is added in TBLS table for employee.

What about the change on HDFS. you see another directory is created in the warehouse folder.

hadoop@gandhari:/opt/hadoop-2.6.4$ hadoop fs -ls /user/hive/warehouse/mydatabase.db
Found 1 items
drwxrwxr-x   - hadoop supergroup          0 2016-10-15 12:30 /user/hive/warehouse/mydatabase.db/employee

Loading the content

We have a local CSV file for employees as given below.

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ cat employee.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

To load the file from unix file system to Hive, we use the below given statement.

 hive> LOAD DATA LOCAL INPATH '/opt/hadoop-2.6.4/hivefarm/employee.csv' OVERWRITE INTO TABLE employee;
 Loading data to table mydatabase.employee
 OK
 Time taken: 0.664 seconds

The CSV file is uploaded to hdfs.

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ hadoop fs -ls /user/hive/warehouse/mydatabase.db/employee
Found 1 items
-rwxrwxr-x   3 hadoop supergroup        264 2016-10-15 12:51 /user/hive/warehouse/mydatabase.db/employee/employee.csv

Querying

Let’s start querying this.

 hive> select * from employee;
 OK
 1        Dharma NULL     Sr Manager
 2        Bheema NULL     Cook
 3        Arjuna NULL     Instructor
 4        Nakula NULL     Jr Instructor
 5        Sahadeva       NULL     Jr Instructor
 1        Dharma NULL     Sr Manager
 2        Bheema NULL     Cook
 3        Arjuna NULL     Instructor
 4        Nakula NULL     Jr Instructor
 5        Sahadeva       NULL     Jr Instructor
 Time taken: 2.612 seconds, Fetched: 10 row(s)

 hive> select * from employee limit 5;
 OK
 1        Dharma NULL     Sr Manager
 2        Bheema NULL     Cook
 3        Arjuna NULL     Instructor
 4        Nakula NULL     Jr Instructor
 5        Sahadeva       NULL     Jr Instructor
 Time taken: 0.136 seconds, Fetched: 5 row(s)

Querying with Map Reduce

Aggregation functions use Map Reduce to give you the results. Here is one example.

 hive> select count(*) from employee;
 Total jobs = 1
 Launching Job 1 out of 1
 Number of reduce tasks determined at compile time: 1
 In order to change the average load for a reducer (in bytes):
   set hive.exec.reducers.bytes.per.reducer=<number>
 In order to limit the maximum number of reducers:
   set hive.exec.reducers.max=<number>
 In order to set a constant number of reducers:
   set mapreduce.job.reduces=<number>
 Job running in-process (local Hadoop)
 2016-10-15 12:55:39,909 Stage-1 map = 100%,  reduce = 100%
 Ended Job = job_local1840713229_0001
 MapReduce Jobs Launched:
 Stage-Stage-1:  HDFS Read: 1584 HDFS Write: 528 SUCCESS
 Total MapReduce CPU Time Spent: 0 msec
 OK
 10
 Time taken: 2.535 seconds, Fetched: 1 row(s)

Store the output

The following statement would save the output to local Unix FS.

 hive> INSERT OVERWRITE LOCAL DIRECTORY '/opt/hadoop/hivefarm/mr-result.csv' SELECT * FROM employee;
 WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
 Query ID = hadoop_20161015125721_f2ac0333-f444-47af-b3c7-57a079d6ca2a
 Total jobs = 1
 Launching Job 1 out of 1
 Number of reduce tasks is set to 0 since there's no reduce operator
 Job running in-process (local Hadoop)
 2016-10-15 12:57:23,074 Stage-1 map = 100%,  reduce = 0%
 Ended Job = job_local783745060_0002
 Moving data to local directory /opt/hadoop/hivefarm/mr-result.csv
 MapReduce Jobs Launched:
 Stage-Stage-1:  HDFS Read: 1056 HDFS Write: 264 SUCCESS
 Total MapReduce CPU Time Spent: 0 msec
 OK
 Time taken: 1.44 seconds

Let’s verify this output.

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ cat mr-result.csv/000000_0
1 Dharma\N Sr Manager
2 Bheema\N Cook
3 Arjuna\N Instructor
4 Nakula\N Jr Instructor
5 Sahadeva\N Jr Instructor
1 Dharma\N Sr Manager
2 Bheema\N Cook
3 Arjuna\N Instructor
4 Nakula\N Jr Instructor
5 Sahadeva\N Jr Instructor

Now let’s store the output to HDFS itself.

hive> INSERT OVERWRITE DIRECTORY '/user/hadoop/lab27' SELECT * FROM employee;

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ hadoop fs -ls /user/hadoop/lab27
Found 1 items
-rwxr-xr-x   3 hadoop supergroup        254 2016-10-15 13:05 /user/hadoop/lab27/000000_0

hadoop@gandhari:/opt/hadoop-2.6.4/hivefarm$ hadoop fs -cat /user/hadoop/lab27/000000_0
1 Dharma\N Sr Manager
2 Bheema\N Cook
3 Arjuna\N Instructor
4 Nakula\N Jr Instructor
5 Sahadeva\N Jr Instructor
1 Dharma\N Sr Manager
2 Bheema\N Cook
3 Arjuna\N Instructor
4 Nakula\N Jr Instructor
5 Sahadeva\N Jr Instructor

Drop

Finally, we can drop table or database as any other SQL.

hive> drop table employee;
OK
Time taken: 2.405 seconds

Lab 27: Filtering operators in Pig

Hi Hadoopers,

This blog post would demonstrate the filtering capabilities of Pig.

pig-on-elephant

Filter by

This is similar to WHERE clause in SQL. We have a table for employee as below.

grunt> fs -cat employee.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

Relation for employee is defined as below.

grunt> employee = load 'employee.csv' using PigStorage (',') as (eid:int,ename:chararray,eage:int,edes:chararray);

Let’s get the employee whose id is 1.

e1 = filter employee by eid==1;
dump e1;
(1, Dharma,45, Sr Manager)

Let’s get the employee whose name is Nakula.

e2 = filter employee by ename == ' Nakula';
dump e2;
(4, Nakula,35, Jr Instructor)

Distinct

As the name implies, let’s select the distinct records. Following is our input file.


grunt> fs -cat employee3.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

Lets create a relation for the employee.

grunt> employee = load 'employee3.csv' using PigStorage (',') as (eid:int,ename:chararray,eage:int,edes:chararray);

We are creating another relation for the distinct records. Here you go.


grunt> edist = distinct employee;

grunt> dump edist;
(1, Dharma,45, Sr Manager)
(2, Bheema,43, Cook)
(3, Arjuna,41, Instructor)
(4, Nakula,35, Jr Instructor)
(5, Sahadeva,33, Jr Instructor)

For-each

And finally, for – each.

Let’s iterate through the edist relation I have shown above.


grunt> foreachdata = foreach edist generate eid, ename;

(1, Dharma)
(2, Bheema)
(3, Arjuna)
(4, Nakula)
(5, Sahadeva)

Lab 26: Achieving Partitioner with Apache Pig – Split

Hi Hadoopers,

Partitioner is a powerful concept in MapReduce. I have given an example in Lab 06 – A simple hadoop partitioner

Let’s see how to implement same concept using Pig – It’s simple – using split!

pig-on-elephant

Let’s take a simple employee table as below


grunt> fs  -cat employee.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

Let’s create the employee relation as given below.


grunt> employee = load 'employee.csv' using PigStorage (',') as (eid:int,ename:chararray,eage:int,edes:chararray);

Now I’ll split employees based on age. Above 40, Below 40.


grunt> split employee into employee1 if eage>=40, employee2 if (eage<40);

grunt> dump employee1
(1, Dharma,45, Sr Manager)
(2, Bheema,43, Cook)
(3, Arjuna,41, Instructor)

grunt> dump employee2
(4, Nakula,35, Jr Instructor)
(5, Sahadeva,33, Jr Instructor)

Note – You need to specify the else column – it is must.

Lab 25: Combiners using Pig Union

Hi Friends,

I archive the blog entries and news entries on daily basis using one of my projects jatomrss. It saves the news articles as tab limited text

$ hadoop fs -ls /user/hadoop/feed/*.jatomrss.log
-rw-r--r--   3 hadoop supergroup    5933545 2016-10-01 21:44 /user/hadoop/feed/2016-10-01.jatomrss.log
-rw-r--r--   3 hadoop supergroup    6313692 2016-10-02 17:03 /user/hadoop/feed/2016-10-02.jatomrss.log
-rw-r--r--   3 hadoop supergroup     596174 2016-10-03 06:37 /user/hadoop/feed/2016-10-03.jatomrss.log
-rw-r--r--   3 hadoop supergroup     827974 2016-10-04 06:53 /user/hadoop/feed/2016-10-04.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1367507 2016-10-05 07:41 /user/hadoop/feed/2016-10-05.jatomrss.log
-rw-r--r--   3 hadoop supergroup      10927 2016-10-06 07:29 /user/hadoop/feed/2016-10-06.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1536870 2016-10-07 06:24 /user/hadoop/feed/2016-10-07.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1719126 2016-10-08 07:13 /user/hadoop/feed/2016-10-08.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1870073 2016-10-09 09:36 /user/hadoop/feed/2016-10-09.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1376982 2016-10-11 05:11 /user/hadoop/feed/2016-10-11.jatomrss.log

Lets use Union operators to combine the feeds on Oct 1st and 2nd.

pig-on-elephant

The relation for October 1st is given as below.


feedEntryRecord20161001 = load '/user/hadoop/feed/2016-10-01.jatomrss.log' using PigStorage ('\t') as (generator:chararray, feedTitle:chararray, feedAuthor:chararray, feedUrl:chararray, feed_time:chararray, entrySubject:chararray, entryAuthor:chararray, entryUrl:chararray, entryDate:chararray, categorySet:chararray, descriptionFile:chararray, uniqueId:chararray);

The relation for October 2nd is given as below.


feedEntryRecord20161002 = load '/user/hadoop/feed/2016-10-02.jatomrss.log' using PigStorage ('\t') as (generator:chararray, feedTitle:chararray, feedAuthor:chararray, feedUrl:chararray, feed_time:chararray, entrySubject:chararray, entryAuthor:chararray, entryUrl:chararray, entryDate:chararray, categorySet:chararray, descriptionFile:chararray, uniqueId:chararray);

Let’s describe them.


grunt> <span style="color: #ff0000;">describe feedEntryRecord20161001;</span>
feedEntryRecord20161001: {generator: chararray,feedTitle: chararray,feedAuthor: chararray,feedUrl: chararray,feed_time: chararray,entrySubject: chararray,entryAuthor: chararray,entryUrl: chararray,entryDate: chararray,categorySet: chararray,descriptionFile: chararray,uniqueId: chararray}
grunt> <span style="color: #ff0000;">describe feedEntryRecord20161002</span>
feedEntryRecord20161002: {generator: chararray,feedTitle: chararray,feedAuthor: chararray,feedUrl: chararray,feed_time: chararray,entrySubject: chararray,entryAuthor: chararray,entryUrl: chararray,entryDate: chararray,categorySet: chararray,descriptionFile: chararray,uniqueId: chararray}

Here is the combiner! To make the combiner (union) to work, we need to identical columns in both data. Yes. we do have.


union_data = union feedEntryRecord20161001, feedEntryRecord20161002;

grunt> describe union_data;
union_data: {generator: chararray,feedTitle: chararray,feedAuthor: chararray,feedUrl: chararray,feed_time: chararray,entrySubject: chararray,entryAuthor: chararray,entryUrl: chararray,entryDate: chararray,categorySet: chararray,descriptionFile: chararray,uniqueId: chararray}

Let's store it to check if the combiner works as expected.

store union_data into '/user/hadoop/lab25/union_data' using PigStorage ('\t');

Input(s):
Successfully read 10612 records from: "/user/hadoop/feed/2016-10-02.jatomrss.log"
Successfully read 10295 records from: "/user/hadoop/feed/2016-10-01.jatomrss.log"

Output(s):
Successfully stored 20907 records (206484670 bytes) in: "/user/hadoop/lab25/union_data"

Counters:
Total records written : 20907
Total bytes written : 206484670
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_local1564505665_0014

Lab 24: Joins using Pig

Let’s make some joins today, dear hadoopers.

pig-on-elephant

Join

We have a table for employee.

grunt> dump employee;
 (1, Dharma,45, Sr Manager)
 (2, Bheema,43, Cook)
 (3, Arjuna,41, Instructor)
 (4, Nakula,35, Jr Instructor)
 (5, Sahadeva,33, Jr Instructor)

We have another table salary as given below.

grunt> dump salary;

(1,2000)
 (2,1500)
 (3,1000)
 (4,500)
 (5,500)

Let’s join salary table with employee table on emp_id field.

grunt> salary_table = JOIN employee by emp_id, salary by emp_id;

grunt> dump salary_table;
 Input(s):
 Successfully read 5 records from: "hdfs://gandhari:9000/user/hadoop/lab23/salary.csv"
 Successfully read 5 records from: "hdfs://gandhari:9000/user/hadoop/lab23/employee.csv"

Output(s):
 Successfully stored 5 records (16134806 bytes) in: "hdfs://gandhari:9000/tmp/temp-2101978857/tmp596113376"

Counters:
 Total records written : 5
 Total bytes written : 16134806
 Spillable Memory Manager spill count : 0
 Total bags proactively spilled: 0
 Total records proactively spilled: 0

(1, Dharma,45, Sr Manager,1,2000)
 (2, Bheema,43, Cook,2,1500)
 (3, Arjuna,41, Instructor,3,1000)
 (4, Nakula,35, Jr Instructor,4,500)
 (5, Sahadeva,33, Jr Instructor,5,500)

Inner Join

The employee table looks like this.

(1, dharma,45, Sr Manager)
 (2, Bheema,43, Cook)
 (3, Arjuna,41, Instructor)
 (4, Nakula,35, Jr Instructor)
 (5, Sahadeva,33, Jr Instructor)

grunt> employee = load 'employee.csv' using PigStorage (',') as (emp_id:int,emp_name:chararray,emp_age:int,emp_desig:chararray);

grunt> describe employee;
 employee: {emp_id: int,emp_name: chararray,emp_age: int,emp_desig: chararray}

There is a change in salary table. No salary for Arjuna this month 🙂 Id 3 is missing in salary table.

grunt> dump salary;
 (1,2000)
 (2,1500)
 (4,500)
 (5,500)

grunt> salary = load 'salary.csv' using PigStorage (',') as (emp_id:int,emp_salary:int);

grunt> describe salary;
 salary: {emp_id: int,emp_salary: int}

Lets do an inner join now.

grunt> employee_salary = join employee by emp_id, salary by emp_id;

grunt> describe employee_salary;
 employee_salary: {employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray,salary::emp_id: int,salary::emp_salary: int}

grunt> illustrate employee_salary;

--------------------------------------------------------------------------------------------------
 | employee     | emp_id:int    | emp_name:chararray    | emp_age:int    | emp_desig:chararray    |
 --------------------------------------------------------------------------------------------------
 |              | 5             |  Sahadeva             | 33             |  Jr Instructor         |
 |              | 5             |  Sahadeva             | 33             |  Jr Instructor         |
 --------------------------------------------------------------------------------------------------
 --------------------------------------------------
 | salary     | emp_id:int    | emp_salary:int    |
 --------------------------------------------------
 |            | 5             | 500               |
 |            | 5             | 500               |
 --------------------------------------------------
 -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 | employee_salary     | employee::emp_id:int    | employee::emp_name:chararray    | employee::emp_age:int    | employee::emp_desig:chararray    | salary::emp_id:int    | salary::emp_salary:int    |
 -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 |                     | 5                       |  Sahadeva                       | 33                       |  Jr Instructor                   | 5                     | 500                       |
 |                     | 5                       |  Sahadeva                       | 33                       |  Jr Instructor                   | 5                     | 500                       |
 |                     | 5                       |  Sahadeva                       | 33                       |  Jr Instructor                   | 5                     | 500                       |
 |                     | 5                       |  Sahadeva                       | 33                       |  Jr Instructor                   | 5                     | 500                       |
 -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

grunt> dump employee_salary;

(1, dharma,45, Sr Manager,1,2000)
 (2, Bheema,43, Cook,2,1500)
 (4, Nakula,35, Jr Instructor,4,500)
 (5, Sahadeva,33, Jr Instructor,5,500)

Left Outer Join

grunt> left_outer = join employee by emp_id left outer, salary by emp_id;

grunt> describe left_outer;
 left_outer: {employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray,salary::emp_id: int,salary::emp_salary: int}

grunt> dump left_outer;

(1, dharma,45, Sr Manager,1,2000)
 (2, Bheema,43, Cook,2,1500)
 (3, Arjuna,41, Instructor,,)
 (4, Nakula,35, Jr Instructor,4,500)
 (5, Sahadeva,33, Jr Instructor,5,500)

We get all the records from employee table with null value for those value is not given in salary table.

Right Outer Join

Lets do a right outer join now

grunt> right_outer = join salary by emp_id right, employee by emp_id;

grunt> describe right_outer;
 right_outer: {salary::emp_id: int,salary::emp_salary: int,employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray}

grunt> dump right_outer;

(1,2000,1, dharma,45, Sr Manager)
 (2,1500,2, Bheema,43, Cook)
 (,,3, Arjuna,41, Instructor)
 (4,500,4, Nakula,35, Jr Instructor)
 (5,500,5, Sahadeva,33, Jr Instructor)

We could see all the records from right table even if there is no match in the left table.

Full outer join

Last topic of this blog post – Lets do a full outer join, to record from each table when there is at least a single match.

grunt> full_outer = join employee by emp_id full outer, salary by emp_id;
 grunt> describe full_outer;
 full_outer: {employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray,salary::emp_id: int,salary::emp_salary: int}

grunt> dump full_outer;

(1, dharma,45, Sr Manager,1,2000)
 (2, Bheema,43, Cook,2,1500)
 (3, Arjuna,41, Instructor,,)
 (4, Nakula,35, Jr Instructor,4,500)
 (5, Sahadeva,33, Jr Instructor,5,500)

Can we join by two keys? yes. join employee by (emp_id, emp_name)!

Cross

Lets do a cross to bring the X relationship between tables. To make it simpler, I have taken less data in our employee table.

grunt> employee = load 'employee.csv' using PigStorage (',') as (emp_id:int,emp_name:chararray,emp_age:int,emp_desig:chararray);

grunt> dump employee;

(1, Dharma,45, Sr Manager)
(2, Bheema,43, Cook)

grunt> leave = load 'leave.csv' using PigStorage (',') as (emp_id:int,emp_leave_date:chararray);

grunt> dump leave;

(1,2016-10-11)
(2,2016-10-12)

grunt> cross_data = cross employee, leave;

grunt> describe cross_data;
cross_data: {employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray,leave::emp_id: int,leave::emp_leave_date: chararray}

grunt> dump cross_data;

(2, Bheema,43, Cook,2,2016-10-12)
(2, Bheema,43, Cook,1,2016-10-11)
(1, Dharma,45, Sr Manager,2,2016-10-12)
(1, Dharma,45, Sr Manager,1,2016-10-11)

See you in another interesting post.