Apache Access Log analysis with Apache Pig

So far I have documented some of the key functions in Apache Pig. Today, Let’s write a simple Pig Latin script to parse and analyse Apache’s access log. Here you go.

$ cat accessLogETL.pig
-- demo script javashine.wordpress.com
-- extract user IPs
access = load '/user/cloudera/pig/access.log' using PigStorage (' ') as (hostIp:chararray, clientId:chararray, userId:chararray, reqTime:chararray, reqTimeZone:chararray, reqMethod:chararray, reqLine:chararray, reqProt:chararray, statusCode:chararray, respLength:int, referrer:chararray, userAgentMozilla:chararray, userAgentPf1:chararray, userAgentPf2:chararray, userAgentPf3:chararray, userAgentRender:chararray, userAgentBrowser:chararray);
hostIpList = FOREACH access GENERATE hostIp;
hostIpList = DISTINCT hostIpList;
STORE hostIpList INTO '/user/cloudera/pig/hostIpList' USING PigStorage('\t');

hostIpUrlList = FOREACH access GENERATE (hostIp,reqTime,reqTimeZone,reqLine);
hostIpUrlList = DISTINCT hostIpUrlList;
STORE hostIpUrlList INTO '/user/cloudera/pig/hostIpUrlList' USING PigStorage('\t');

hostIpBandwidthList = FOREACH access GENERATE (hostIp), respLength;
groupByIp = GROUP hostIpBandwidthList BY hostIp;
bandwidthByIp = FOREACH groupByIp GENERATE hostIpBandwidthList.hostIp, SUM(hostIpBandwidthList.respLength);
STORE bandwidthByIp INTO '/user/cloudera/pig/bandwidthByIp' USING PigStorage('\t');

$ pig -x mapreduce accessLogETL.pig
Job Stats (time in seconds):
JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime      MedianMapTime   MaxReduceTime   MinReduceTime   AvgReduceTime   MedianReducetime        Alias   Feature Outputs
job_1485688219066_0027  1       1       12      12      12      12      8       8       8       8       access,hostIpList,hostIpUrlList DISTINCT,MULTI_QUERY    /user/cloudera/pig/hostIpList,/user/cloudera/pig/hostIpUrlList,
job_1485688219066_0028  1       1       7       7       7       7       9       9       9       9       bandwidthByIp,groupByIp,hostIpBandwidthList     GROUP_BY        /user/cloudera/pig/bandwidthByIp,

Input(s):
Successfully read 470749 records (59020755 bytes) from: "/user/cloudera/pig/access.log"

Output(s):
Successfully stored 877 records (12342 bytes) in: "/user/cloudera/pig/hostIpList"
Successfully stored 449192 records (31075055 bytes) in: "/user/cloudera/pig/hostIpUrlList"
Successfully stored 877 records (6729928 bytes) in: "/user/cloudera/pig/bandwidthByIp"

Counters:
Total records written : 450946
Total bytes written : 37817325
Spillable Memory Manager spill count : 7
Total bags proactively spilled: 3
Total records proactively spilled: 213378

Let’s see our results now.

The above script will yield us three output.
First is the list of user IPs accessed the web server.

$ hadoop fs -ls /user/cloudera/pig/hostIpList
Found 2 items
-rw-r--r--   1 cloudera cloudera          0 2017-01-31 12:43 /user/cloudera/pig/hostIpList/_SUCCESS
-rw-r--r--   1 cloudera cloudera      12342 2017-01-31 12:43 /user/cloudera/pig/hostIpList/part-r-00000
[cloudera@quickstart pig]$ hadoop fs -cat /user/cloudera/pig/hostIpList/part-r-00000
::1
10.1.1.5
107.21.1.8
14.134.7.6
37.48.94.6
46.4.90.68
46.4.90.86

The second output will give us the list of user IPs, their access time and accessed URL

$ hadoop fs -ls /user/cloudera/pig/hostIpUrlList
Found 2 items
-rw-r--r--   1 cloudera cloudera          0 2017-01-31 12:43 /user/cloudera/pig/hostIpUrlList/_SUCCESS
-rw-r--r--   1 cloudera cloudera   31075055 2017-01-31 12:43 /user/cloudera/pig/hostIpUrlList/part-r-00000
[cloudera@quickstart pig]$ hadoop fs -cat /user/cloudera/pig/hostIpUrlList/part-r-00000
(10.1.1.5,[22/Jan/2017:17:51:34,+0000],/egcrm)
(10.1.1.5,[22/Jan/2017:17:51:34,+0000],/egcrm2/)
(10.1.1.5,[22/Jan/2017:17:51:34,+0000],/egcrm/helloWorld.action)

And, finally the bandwidth spent for each user IP.

$ hadoop fs -ls /user/cloudera/pig/bandwidthByIp
Found 2 items
-rw-r--r--   1 cloudera cloudera          0 2017-01-31 12:44 /user/cloudera/pig/bandwidthByIp/_SUCCESS
-rw-r--r--   1 cloudera cloudera    6729928 2017-01-31 12:44 /user/cloudera/pig/bandwidthByIp/part-r-00000
$ hadoop fs -cat /user/cloudera/pig/bandwidthByIp/part-r-00000
{(193.138.219.245)}     1313
{(193.138.219.250),(193.138.219.250),(193.138.219.250)} 3939
{(195.154.181.113)}     496
{(195.154.181.168)}     1026

chinese-new-year-2017

Lab 27: Filtering operators in Pig

Hi Hadoopers,

This blog post would demonstrate the filtering capabilities of Pig.

pig-on-elephant

Filter by

This is similar to WHERE clause in SQL. We have a table for employee as below.

grunt> fs -cat employee.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

Relation for employee is defined as below.

grunt> employee = load 'employee.csv' using PigStorage (',') as (eid:int,ename:chararray,eage:int,edes:chararray);

Let’s get the employee whose id is 1.

e1 = filter employee by eid==1;
dump e1;
(1, Dharma,45, Sr Manager)

Let’s get the employee whose name is Nakula.

e2 = filter employee by ename == ' Nakula';
dump e2;
(4, Nakula,35, Jr Instructor)

Distinct

As the name implies, let’s select the distinct records. Following is our input file.


grunt> fs -cat employee3.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

Lets create a relation for the employee.

grunt> employee = load 'employee3.csv' using PigStorage (',') as (eid:int,ename:chararray,eage:int,edes:chararray);

We are creating another relation for the distinct records. Here you go.


grunt> edist = distinct employee;

grunt> dump edist;
(1, Dharma,45, Sr Manager)
(2, Bheema,43, Cook)
(3, Arjuna,41, Instructor)
(4, Nakula,35, Jr Instructor)
(5, Sahadeva,33, Jr Instructor)

For-each

And finally, for – each.

Let’s iterate through the edist relation I have shown above.


grunt> foreachdata = foreach edist generate eid, ename;

(1, Dharma)
(2, Bheema)
(3, Arjuna)
(4, Nakula)
(5, Sahadeva)

Lab 26: Achieving Partitioner with Apache Pig – Split

Hi Hadoopers,

Partitioner is a powerful concept in MapReduce. I have given an example in Lab 06 – A simple hadoop partitioner

Let’s see how to implement same concept using Pig – It’s simple – using split!

pig-on-elephant

Let’s take a simple employee table as below


grunt> fs  -cat employee.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

Let’s create the employee relation as given below.


grunt> employee = load 'employee.csv' using PigStorage (',') as (eid:int,ename:chararray,eage:int,edes:chararray);

Now I’ll split employees based on age. Above 40, Below 40.


grunt> split employee into employee1 if eage>=40, employee2 if (eage<40);

grunt> dump employee1
(1, Dharma,45, Sr Manager)
(2, Bheema,43, Cook)
(3, Arjuna,41, Instructor)

grunt> dump employee2
(4, Nakula,35, Jr Instructor)
(5, Sahadeva,33, Jr Instructor)

Note – You need to specify the else column – it is must.

Lab 25: Combiners using Pig Union

Hi Friends,

I archive the blog entries and news entries on daily basis using one of my projects jatomrss. It saves the news articles as tab limited text

$ hadoop fs -ls /user/hadoop/feed/*.jatomrss.log
-rw-r--r--   3 hadoop supergroup    5933545 2016-10-01 21:44 /user/hadoop/feed/2016-10-01.jatomrss.log
-rw-r--r--   3 hadoop supergroup    6313692 2016-10-02 17:03 /user/hadoop/feed/2016-10-02.jatomrss.log
-rw-r--r--   3 hadoop supergroup     596174 2016-10-03 06:37 /user/hadoop/feed/2016-10-03.jatomrss.log
-rw-r--r--   3 hadoop supergroup     827974 2016-10-04 06:53 /user/hadoop/feed/2016-10-04.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1367507 2016-10-05 07:41 /user/hadoop/feed/2016-10-05.jatomrss.log
-rw-r--r--   3 hadoop supergroup      10927 2016-10-06 07:29 /user/hadoop/feed/2016-10-06.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1536870 2016-10-07 06:24 /user/hadoop/feed/2016-10-07.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1719126 2016-10-08 07:13 /user/hadoop/feed/2016-10-08.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1870073 2016-10-09 09:36 /user/hadoop/feed/2016-10-09.jatomrss.log
-rw-r--r--   3 hadoop supergroup    1376982 2016-10-11 05:11 /user/hadoop/feed/2016-10-11.jatomrss.log

Lets use Union operators to combine the feeds on Oct 1st and 2nd.

pig-on-elephant

The relation for October 1st is given as below.


feedEntryRecord20161001 = load '/user/hadoop/feed/2016-10-01.jatomrss.log' using PigStorage ('\t') as (generator:chararray, feedTitle:chararray, feedAuthor:chararray, feedUrl:chararray, feed_time:chararray, entrySubject:chararray, entryAuthor:chararray, entryUrl:chararray, entryDate:chararray, categorySet:chararray, descriptionFile:chararray, uniqueId:chararray);

The relation for October 2nd is given as below.


feedEntryRecord20161002 = load '/user/hadoop/feed/2016-10-02.jatomrss.log' using PigStorage ('\t') as (generator:chararray, feedTitle:chararray, feedAuthor:chararray, feedUrl:chararray, feed_time:chararray, entrySubject:chararray, entryAuthor:chararray, entryUrl:chararray, entryDate:chararray, categorySet:chararray, descriptionFile:chararray, uniqueId:chararray);

Let’s describe them.


grunt> <span style="color: #ff0000;">describe feedEntryRecord20161001;</span>
feedEntryRecord20161001: {generator: chararray,feedTitle: chararray,feedAuthor: chararray,feedUrl: chararray,feed_time: chararray,entrySubject: chararray,entryAuthor: chararray,entryUrl: chararray,entryDate: chararray,categorySet: chararray,descriptionFile: chararray,uniqueId: chararray}
grunt> <span style="color: #ff0000;">describe feedEntryRecord20161002</span>
feedEntryRecord20161002: {generator: chararray,feedTitle: chararray,feedAuthor: chararray,feedUrl: chararray,feed_time: chararray,entrySubject: chararray,entryAuthor: chararray,entryUrl: chararray,entryDate: chararray,categorySet: chararray,descriptionFile: chararray,uniqueId: chararray}

Here is the combiner! To make the combiner (union) to work, we need to identical columns in both data. Yes. we do have.


union_data = union feedEntryRecord20161001, feedEntryRecord20161002;

grunt> describe union_data;
union_data: {generator: chararray,feedTitle: chararray,feedAuthor: chararray,feedUrl: chararray,feed_time: chararray,entrySubject: chararray,entryAuthor: chararray,entryUrl: chararray,entryDate: chararray,categorySet: chararray,descriptionFile: chararray,uniqueId: chararray}

Let's store it to check if the combiner works as expected.

store union_data into '/user/hadoop/lab25/union_data' using PigStorage ('\t');

Input(s):
Successfully read 10612 records from: "/user/hadoop/feed/2016-10-02.jatomrss.log"
Successfully read 10295 records from: "/user/hadoop/feed/2016-10-01.jatomrss.log"

Output(s):
Successfully stored 20907 records (206484670 bytes) in: "/user/hadoop/lab25/union_data"

Counters:
Total records written : 20907
Total bytes written : 206484670
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_local1564505665_0014

Lab 24: Joins using Pig

Let’s make some joins today, dear hadoopers.

pig-on-elephant

Join

We have a table for employee.

grunt> dump employee;
 (1, Dharma,45, Sr Manager)
 (2, Bheema,43, Cook)
 (3, Arjuna,41, Instructor)
 (4, Nakula,35, Jr Instructor)
 (5, Sahadeva,33, Jr Instructor)

We have another table salary as given below.

grunt> dump salary;

(1,2000)
 (2,1500)
 (3,1000)
 (4,500)
 (5,500)

Let’s join salary table with employee table on emp_id field.

grunt> salary_table = JOIN employee by emp_id, salary by emp_id;

grunt> dump salary_table;
 Input(s):
 Successfully read 5 records from: "hdfs://gandhari:9000/user/hadoop/lab23/salary.csv"
 Successfully read 5 records from: "hdfs://gandhari:9000/user/hadoop/lab23/employee.csv"

Output(s):
 Successfully stored 5 records (16134806 bytes) in: "hdfs://gandhari:9000/tmp/temp-2101978857/tmp596113376"

Counters:
 Total records written : 5
 Total bytes written : 16134806
 Spillable Memory Manager spill count : 0
 Total bags proactively spilled: 0
 Total records proactively spilled: 0

(1, Dharma,45, Sr Manager,1,2000)
 (2, Bheema,43, Cook,2,1500)
 (3, Arjuna,41, Instructor,3,1000)
 (4, Nakula,35, Jr Instructor,4,500)
 (5, Sahadeva,33, Jr Instructor,5,500)

Inner Join

The employee table looks like this.

(1, dharma,45, Sr Manager)
 (2, Bheema,43, Cook)
 (3, Arjuna,41, Instructor)
 (4, Nakula,35, Jr Instructor)
 (5, Sahadeva,33, Jr Instructor)

grunt> employee = load 'employee.csv' using PigStorage (',') as (emp_id:int,emp_name:chararray,emp_age:int,emp_desig:chararray);

grunt> describe employee;
 employee: {emp_id: int,emp_name: chararray,emp_age: int,emp_desig: chararray}

There is a change in salary table. No salary for Arjuna this month 🙂 Id 3 is missing in salary table.

grunt> dump salary;
 (1,2000)
 (2,1500)
 (4,500)
 (5,500)

grunt> salary = load 'salary.csv' using PigStorage (',') as (emp_id:int,emp_salary:int);

grunt> describe salary;
 salary: {emp_id: int,emp_salary: int}

Lets do an inner join now.

grunt> employee_salary = join employee by emp_id, salary by emp_id;

grunt> describe employee_salary;
 employee_salary: {employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray,salary::emp_id: int,salary::emp_salary: int}

grunt> illustrate employee_salary;

--------------------------------------------------------------------------------------------------
 | employee     | emp_id:int    | emp_name:chararray    | emp_age:int    | emp_desig:chararray    |
 --------------------------------------------------------------------------------------------------
 |              | 5             |  Sahadeva             | 33             |  Jr Instructor         |
 |              | 5             |  Sahadeva             | 33             |  Jr Instructor         |
 --------------------------------------------------------------------------------------------------
 --------------------------------------------------
 | salary     | emp_id:int    | emp_salary:int    |
 --------------------------------------------------
 |            | 5             | 500               |
 |            | 5             | 500               |
 --------------------------------------------------
 -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 | employee_salary     | employee::emp_id:int    | employee::emp_name:chararray    | employee::emp_age:int    | employee::emp_desig:chararray    | salary::emp_id:int    | salary::emp_salary:int    |
 -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 |                     | 5                       |  Sahadeva                       | 33                       |  Jr Instructor                   | 5                     | 500                       |
 |                     | 5                       |  Sahadeva                       | 33                       |  Jr Instructor                   | 5                     | 500                       |
 |                     | 5                       |  Sahadeva                       | 33                       |  Jr Instructor                   | 5                     | 500                       |
 |                     | 5                       |  Sahadeva                       | 33                       |  Jr Instructor                   | 5                     | 500                       |
 -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

grunt> dump employee_salary;

(1, dharma,45, Sr Manager,1,2000)
 (2, Bheema,43, Cook,2,1500)
 (4, Nakula,35, Jr Instructor,4,500)
 (5, Sahadeva,33, Jr Instructor,5,500)

Left Outer Join

grunt> left_outer = join employee by emp_id left outer, salary by emp_id;

grunt> describe left_outer;
 left_outer: {employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray,salary::emp_id: int,salary::emp_salary: int}

grunt> dump left_outer;

(1, dharma,45, Sr Manager,1,2000)
 (2, Bheema,43, Cook,2,1500)
 (3, Arjuna,41, Instructor,,)
 (4, Nakula,35, Jr Instructor,4,500)
 (5, Sahadeva,33, Jr Instructor,5,500)

We get all the records from employee table with null value for those value is not given in salary table.

Right Outer Join

Lets do a right outer join now

grunt> right_outer = join salary by emp_id right, employee by emp_id;

grunt> describe right_outer;
 right_outer: {salary::emp_id: int,salary::emp_salary: int,employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray}

grunt> dump right_outer;

(1,2000,1, dharma,45, Sr Manager)
 (2,1500,2, Bheema,43, Cook)
 (,,3, Arjuna,41, Instructor)
 (4,500,4, Nakula,35, Jr Instructor)
 (5,500,5, Sahadeva,33, Jr Instructor)

We could see all the records from right table even if there is no match in the left table.

Full outer join

Last topic of this blog post – Lets do a full outer join, to record from each table when there is at least a single match.

grunt> full_outer = join employee by emp_id full outer, salary by emp_id;
 grunt> describe full_outer;
 full_outer: {employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray,salary::emp_id: int,salary::emp_salary: int}

grunt> dump full_outer;

(1, dharma,45, Sr Manager,1,2000)
 (2, Bheema,43, Cook,2,1500)
 (3, Arjuna,41, Instructor,,)
 (4, Nakula,35, Jr Instructor,4,500)
 (5, Sahadeva,33, Jr Instructor,5,500)

Can we join by two keys? yes. join employee by (emp_id, emp_name)!

Cross

Lets do a cross to bring the X relationship between tables. To make it simpler, I have taken less data in our employee table.

grunt> employee = load 'employee.csv' using PigStorage (',') as (emp_id:int,emp_name:chararray,emp_age:int,emp_desig:chararray);

grunt> dump employee;

(1, Dharma,45, Sr Manager)
(2, Bheema,43, Cook)

grunt> leave = load 'leave.csv' using PigStorage (',') as (emp_id:int,emp_leave_date:chararray);

grunt> dump leave;

(1,2016-10-11)
(2,2016-10-12)

grunt> cross_data = cross employee, leave;

grunt> describe cross_data;
cross_data: {employee::emp_id: int,employee::emp_name: chararray,employee::emp_age: int,employee::emp_desig: chararray,leave::emp_id: int,leave::emp_leave_date: chararray}

grunt> dump cross_data;

(2, Bheema,43, Cook,2,2016-10-12)
(2, Bheema,43, Cook,1,2016-10-11)
(1, Dharma,45, Sr Manager,2,2016-10-12)
(1, Dharma,45, Sr Manager,1,2016-10-11)

See you in another interesting post.

 

Lab 23: Apache Pig Basics

Hi ETL  enthusiast,

This post will talk about important concepts in Pig.

pig-on-elephant

PIG Conventions
Convention Description Example
( ) tuple data type (John,18,4.0F)
{ } bag data type (1,{(1,2,3)})
(4,{(4,2,1),(4,3,3)})
(8,{(8,3,4)})
[ ] map data type [name#John,phone#5551212]

Relations, Bags, Tuples, Fields

Field is a piece of data. Eg, Jessica

Tuple is an ordered set of fields (Jessica, F, 35, NY)

Bag is a collection of Tuples {(Jessica, F, 35, NY),(Nathan, M, 35, NJ)}

Data Types

Data Types
Simple and Complex
Simple Types Description Example
int Signed 32-bit integer 10
long Signed 64-bit integer Data:     10L or 10l
Display: 10L
float 32-bit floating point Data:     10.5F or 10.5f or 10.5e2f or 10.5E2F
Display: 10.5F or 1050.0F
double 64-bit floating point Data:     10.5 or 10.5e2 or 10.5E2
Display: 10.5 or 1050.0
chararray Character array (string) in Unicode UTF-8 format hello world
bytearray Byte array (blob)  
boolean boolean true/false (case insensitive)
datetime datetime 1970-01-01T00:00:00.000+00:00
Complex Types    
tuple An ordered set of fields. (19,2)
bag An collection of tuples. {(19,2), (18,1)}
map A set of key value pairs. [open#apache]

Nulls, Operators, and Functions

Operator Interaction
Comparison operators:

==, !=

>, <

>=, <=

If either subexpression is null, the result is null.
Comparison operator:

matches

If either the string being matched against or the string defining the match is null, the result is null.
Arithmetic operators:

+ , -, *, /

% modulo

? : bincond

If either subexpression is null, the resulting expression is null.
Null operator:

is null

If the tested value is null, returns true; otherwise, returns false (see Null Operators).
Null operator:

is not null

If the tested value is not null, returns true; otherwise, returns false (see Null Operators).
Dereference operators:

tuple (.) or map (#)

If the de-referenced tuple or map is null, returns null.
Operators:

COGROUP, GROUP, JOIN

These operators handle nulls differently (see examples below).
Function:

COUNT_STAR

This function counts all values, including nulls.
Cast operator Casting a null from one type to another type results in a null.
Functions:

AVG, MIN, MAX, SUM, COUNT

These functions ignore nulls.
Function:

CONCAT

If either subexpression is null, the resulting expression is null.
Function:

SIZE

If the tested object is null, returns null.

Operators

Following are the operators available in Pig. The items marked in red colour are used in this post.

Operator Description
Loading and Storing
LOAD To Load the data from the file system (local/HDFS) into a relation.
STORE To save a relation to the file system (local/HDFS).
Filtering
FILTER To remove unwanted rows from a relation.
DISTINCT To remove duplicate rows from a relation.
FOREACH, GENERATE To generate data transformations based on columns of data.
STREAM To transform a relation using an external program.
Grouping and Joining
JOIN To join two or more relations.
COGROUP To group the data in two or more relations.
GROUP To group the data in a single relation.
CROSS To create the cross product of two or more relations.
Sorting
ORDER To arrange a relation in a sorted order based on one or more fields (ascending or descending).
LIMIT To get a limited number of tuples from a relation.
Combining and Splitting
UNION To combine two or more relations into a single relation.
SPLIT To split a single relation into two or more relations.
Diagnostic Operators
DUMP To print the contents of a relation on the console.
DESCRIBE To describe the schema of a relation.
EXPLAIN To view the logical, physical, or MapReduce execution plans to compute a relation.
ILLUSTRATE To view the step-by-step execution of a series of statements.

Pig Latin statements

Let’s use the following as a sample table

emp id name age Desig
1 Dharma 45 Sr Manager
2 Bheema 43 Cook
3 Arjuna 41 Instructor
4 Nakula 35 Jr Instructor
5 Sahadeva 33 Jr Instructor

grunt> fs -cat lab23/employee.csv
1, Dharma, 45, Sr Manager
2, Bheema, 43, Cook
3, Arjuna, 41, Instructor
4, Nakula, 35, Jr Instructor
5, Sahadeva, 33, Jr Instructor

pig-hadoop

Load, Describe, illustrate, dump

grunt> employee = load ‘lab23/employee.csv’ using PigStorage(‘,’) as (emp_id:int,emp_name:chararray,emp_age:int,emp_desig:chararray);

Relation name: employee
Input file path: lab23/employee.csv (I have used relative path)
Storage function: We have used the PigStorage() function. It loads and stores data as structured text files. Default delimiter is \t. We use comma here.

grunt> describe employee;
employee: {emp_id: int,emp_name: chararray,emp_age: int,emp_desig: chararray}

grunt> illustrate employee
————————————————————————————————–
| employee     | emp_id:int    | emp_name:chararray    | emp_age:int    | emp_desig:chararray    |
————————————————————————————————–
|              | 5             |  Sahadeva             |  33            |  Jr Instructor         |
————————————————————————————————–

grunt> dump employee;
(1, Dharma,45, Sr Manager)
(2, Bheema,43, Cook)
(3, Arjuna,41, Instructor)
(4, Nakula,35, Jr Instructor)
(5, Sahadeva,33, Jr Instructor)

This will execute a MapReduce job to read data from HDFS and print the content on the screen.

grunt> explain employee;

#————————————————–
# Map Reduce Plan
#————————————————–
MapReduce node scope-119
Map Plan
employee: Store(fakefile:org.apache.pig.builtin.PigStorage) – scope-118
|
|—employee: New For Each(false,false,false,false)[bag] – scope-117
|   |
|   Cast[int] – scope-106
|   |
|   |—Project[bytearray][0] – scope-105
|   |
|   Cast[chararray] – scope-109
|   |
|   |—Project[bytearray][1] – scope-108
|   |
|   Cast[int] – scope-112
|   |
|   |—Project[bytearray][2] – scope-111
|   |
|   Cast[chararray] – scope-115
|   |
|   |—Project[bytearray][3] – scope-114
|
|—employee: Load(hdfs://gandhari:9000/user/hadoop/lab23/employee.csv:PigStorage(‘,’)) – scope-104——–
Global sort: false
—————-

Group

grunt> edesig = group employee by emp_desig;

grunt> dump edesig;
( Cook,{(2, Bheema,43, Cook)})
( Instructor,{(3, Arjuna,41, Instructor)})
( Sr Manager,{(1, Dharma,45, Sr Manager)})
( Jr Instructor,{(5, Sahadeva,33, Jr Instructor),(4, Nakula,35, Jr Instructor)})

I’m just thinking in terms of map reduce. By this time, I need to write a mapper extending Mapper, a reducer extending Reducer and a driver 🙂

Let’s do one more grouping by age

grunt> eage = group employee by emp_age;

grunt> dump eage;
(33,{(5, Sahadeva,33, Jr Instructor)})
(35,{(4, Nakula,35, Jr Instructor)})
(41,{(3, Arjuna,41, Instructor)})
(43,{(2, Bheema,43, Cook)})
(45,{(1, Dharma,45, Sr Manager)})

Let’s do a grouping based on both the columns.

grunt> e_age_desig = group employee by (emp_age, emp_desig);

grunt> dump e_age_desig;
((33, Jr Instructor),{(5, Sahadeva,33, Jr Instructor)})
((35, Jr Instructor),{(4, Nakula,35, Jr Instructor)})
((41, Instructor),{(3, Arjuna,41, Instructor)})
((43, Cook),{(2, Bheema,43, Cook)})
((45, Sr Manager),{(1, Dharma,45, Sr Manager)})

Yes. Writing MapReduce for simple tasks are time consuming. Rather we need to deploy the right tools to get the job done.

Co-group

In addition to employee, we have one other table, student as given below

grunt> fs -cat lab23/student.csv;
1, Duryodhana, 15, 11
2, Dushasana, 14, 10
3, Dushala, 13, 9
4, Dronacharya,45, 12

grunt> student = load ‘lab23/student.csv’ using PigStorage(‘,’) as (stud_id:int,stud_name:chararray,stud_age:int,stud_class:chararray);

grunt> illustrate student;
————————————————————————————————-
| student     | stud_id:int   | stud_name:chararray   | stud_age:int   | stud_class:chararray   |
————————————————————————————————-
|             | 3             |  Dushala              |  13            |  9                     |
————————————————————————————————-

grunt> cogroupdata = COGROUP student by stud_age, employee by emp_age;

(13,{(3, Dushala,13, 9)},{})
(14,{(2, Dushasana,14, 10)},{})
(15,{(1, Duryodhana,15, 11)},{})
(33,{},{(5, Sahadeva,33, Jr Instructor)})
(35,{},{(4, Nakula,35, Jr Instructor)})
(41,{},{(3, Arjuna,41, Instructor)})
(43,{},{(2, Bheema,43, Cook)})
(45,{(4, Dronacharya,45, 12)},{(1, Dharma,45, Sr Manager)})

Co group is similar to group operator. But it happens across two different relations.

 

Ref: http://pig.apache.org

Lab 22: Getting started with Apache Pig

Hi Hadoopers,

I’m happy to start another series of blog post in Big Data – Apache Pig!

Pig – Eats everything!

Here are the basic commands

If you are looking for installation, you can find it here.

pig-on-elephant.png

Version

$ pig -i
Apache Pig version 0.16.0 (r1746530)
compiled Jun 01 2016, 23:10:49

Launch Pig

The following command will load it in local mode. This is to work with local file system.

$ pig -l /tmp -x local

... ... ...

2016-10-09 03:02:04,064 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///

... ... ...

grunt> ls
file:/opt/hadoop-2.6.4/pigfarm/pig_1475951593891.log       2615

The following command will load it in mapreduce mode, where it will connect to hdfs.

$ pig -l /tmp -x mapreduce

2016-10-09 03:03:24,768 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://gandhari:9000

Some of the basic commands

grunt> fs -ls 

Found 20 items 
drwxr-xr-x   - hadoop supergroup          0 2016-10-06 06:07 dc 
drwxr-xr-x   - hadoop supergroup          0 2016-10-08 07:13 feed 
drwxr-xr-x   - hadoop supergroup          0 2016-09-10 10:51 lab01 
drwxr-xr-x   - hadoop supergroup          0 2016-09-10 15:20 lab03 
drwxr-xr-x   - hadoop supergroup          0 2016-09-12 14:04 lab07 
drwxr-xr-x   - hadoop supergroup          0 2016-09-15 05:39 lab08 
drwxr-xr-x   - hadoop supergroup          0 2016-09-17 07:58 lab09 
drwxr-xr-x   - hadoop supergroup          0 2016-09-17 15:47 lab10 
drwxr-xr-x   - hadoop supergroup          0 2016-09-22 21:07 lab13 
drwxr-xr-x   - hadoop supergroup          0 2016-09-25 00:40 lab15 
drwxr-xr-x   - hadoop supergroup          0 2016-10-02 18:17 lab16 
drwxr-xr-x   - hadoop supergroup          0 2016-10-08 09:25 lab17 
drwxr-xr-x   - hadoop supergroup          0 2016-10-08 11:27 lab18 
drwxr-xr-x   - hadoop supergroup          0 2016-10-08 19:35 lab19 
drwxr-xr-x   - hadoop supergroup          0 2016-10-08 19:36 lab20 
drwxr-xr-x   - hadoop supergroup          0 2016-10-09 00:54 lab21 
drwxr-xr-x   - hadoop supergroup          0 2016-09-11 09:41 output 
drwxr-xr-x   - hadoop supergroup          0 2016-08-27 08:40 share 
drwxr-xr-x   - hadoop supergroup          0 2016-09-04 15:41 trial1 
drwxr-xr-x   - hadoop supergroup          0 2016-09-04 16:00 trial2 
grunt> pwd hdfs://gandhari:9000/user/hadoop 
grunt> fs -cp /user/hadoop/lab20/input/*.csv lab22 
grunt> fs -cat /user/hadoop/lab20/input/employee.csv 
101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati 
102,Bheema,Pandu,Kunti,Hidimbi

Store

Creating a relation for employee.csv table

grunt> a = load '/user/hadoop/lab20/input/employee.csv' using PigStorage(',') as (empid:int,emp_name:chararray,fathers_name:chararray,mothers_name:chararray,wifes_name:chararray);
2016-10-09 03:32:44,930 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS

Describe

It describes employee table.

grunt> describe a;
a: {empid: int,emp_name: chararray,fathers_name: chararray,mothers_name: chararray,wifes_name: chararray}

Dump

It dumps the data on the screen

grunt> dump a;
(101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati)
(102,Bheema,Pandu,Kunti,Hidimbi)

Explain

This is the execution plan.

grunt> explain a;
2016-10-09 03:41:37,719 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2016-10-09 03:41:37,720 [main] WARN  org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already been initialized
2016-10-09 03:41:37,721 [main] INFO  org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, ConstantCalculator, GroupByConstParallelSetter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, PartitionFilterOptimizer, PredicatePushdownOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter]}
#-----------------------------------------------
# New Logical Plan:
#-----------------------------------------------
a: (Name: LOStore Schema: empid#31:int,emp_name#32:chararray,fathers_name#33:chararray,mothers_name#34:chararray,wifes_name#35:chararray)
|
|---a: (Name: LOForEach Schema: empid#31:int,emp_name#32:chararray,fathers_name#33:chararray,mothers_name#34:chararray,wifes_name#35:chararray)
    |   |
    |   (Name: LOGenerate[false,false,false,false,false] Schema: empid#31:int,emp_name#32:chararray,fathers_name#33:chararray,mothers_name#34:chararray,wifes_name#35:chararray)ColumnPrune:OutputUids=[32, 33, 34, 35, 31]ColumnPrune:InputUids=[32, 33, 34, 35, 31]
    |   |   |
    |   |   (Name: Cast Type: int Uid: 31)
    |   |   |
    |   |   |---empid:(Name: Project Type: bytearray Uid: 31 Input: 0 Column: (*))
    |   |   |
    |   |   (Name: Cast Type: chararray Uid: 32)
    |   |   |
    |   |   |---emp_name:(Name: Project Type: bytearray Uid: 32 Input: 1 Column: (*))
    |   |   |
    |   |   (Name: Cast Type: chararray Uid: 33)
    |   |   |
    |   |   |---fathers_name:(Name: Project Type: bytearray Uid: 33 Input: 2 Column: (*))
    |   |   |
    |   |   (Name: Cast Type: chararray Uid: 34)
    |   |   |
    |   |   |---mothers_name:(Name: Project Type: bytearray Uid: 34 Input: 3 Column: (*))
    |   |   |
    |   |   (Name: Cast Type: chararray Uid: 35)
    |   |   |
    |   |   |---wifes_name:(Name: Project Type: bytearray Uid: 35 Input: 4 Column: (*))
    |   |
    |   |---(Name: LOInnerLoad[0] Schema: empid#31:bytearray)
    |   |
    |   |---(Name: LOInnerLoad[1] Schema: emp_name#32:bytearray)
    |   |
    |   |---(Name: LOInnerLoad[2] Schema: fathers_name#33:bytearray)
    |   |
    |   |---(Name: LOInnerLoad[3] Schema: mothers_name#34:bytearray)
    |   |
    |   |---(Name: LOInnerLoad[4] Schema: wifes_name#35:bytearray)
    |
    |---a: (Name: LOLoad Schema: empid#31:bytearray,emp_name#32:bytearray,fathers_name#33:bytearray,mothers_name#34:bytearray,wifes_name#35:bytearray)RequiredFields:null
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
a: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---a: New For Each(false,false,false,false,false)[bag] - scope-35
    |   |
    |   Cast[int] - scope-21
    |   |
    |   |---Project[bytearray][0] - scope-20
    |   |
    |   Cast[chararray] - scope-24
    |   |
    |   |---Project[bytearray][1] - scope-23
    |   |
    |   Cast[chararray] - scope-27
    |   |
    |   |---Project[bytearray][2] - scope-26
    |   |
    |   Cast[chararray] - scope-30
    |   |
    |   |---Project[bytearray][3] - scope-29
    |   |
    |   Cast[chararray] - scope-33
    |   |
    |   |---Project[bytearray][4] - scope-32
    |
    |---a: Load(/user/hadoop/lab20/input/employee.csv:PigStorage(',')) - scope-19

2016-10-09 03:41:37,731 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2016-10-09 03:41:37,733 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2016-10-09 03:41:37,733 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-37
Map Plan
a: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---a: New For Each(false,false,false,false,false)[bag] - scope-35
    |   |
    |   Cast[int] - scope-21
    |   |
    |   |---Project[bytearray][0] - scope-20
    |   |
    |   Cast[chararray] - scope-24
    |   |
    |   |---Project[bytearray][1] - scope-23
    |   |
    |   Cast[chararray] - scope-27
    |   |
    |   |---Project[bytearray][2] - scope-26
    |   |
    |   Cast[chararray] - scope-30
    |   |
    |   |---Project[bytearray][3] - scope-29
    |   |
    |   Cast[chararray] - scope-33
    |   |
    |   |---Project[bytearray][4] - scope-32
    |
    |---a: Load(/user/hadoop/lab20/input/employee.csv:PigStorage(',')) - scope-19--------
Global sort: false
----------------

Store

grunt> store a into ‘/user/hadoop/lab22/01’ using PigStorage(‘,’);

Input(s):
Successfully read 2 records (10756671 bytes) from: “/user/hadoop/lab20/input/employee.csv”

Output(s):
Successfully stored 2 records (10756592 bytes) in: “/user/hadoop/lab22/01”

Counters:
Total records written : 2
Total bytes written : 10756592
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_local1725228402_0002

grunt> fs -ls /user/hadoop/lab22/01
Found 2 items
-rw-r–r–   3 hadoop supergroup          0 2016-10-09 08:00 /user/hadoop/lab22/01/_SUCCESS
-rw-r–r–   3 hadoop supergroup         79 2016-10-09 08:00 /user/hadoop/lab22/01/part-m-00000

grunt> fs -cat /user/hadoop/lab22/01/part-m-00000
101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati
102,Bheema,Pandu,Kunti,Hidimbi

Filter by

grunt> b = filter a by empid<102;

grunt> dump b;

(101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati)

Order by

grunt> describe a;
a: {empid: int,emp_name: chararray,fathers_name: chararray,mothers_name: chararray,wifes_name: chararray}
grunt> c = order a by emp_name;

grunt> dump c;

(102,Bheema,Pandu,Kunti,Hidimbi)
(101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati)

Group by

grunt> d = group a by fathers_name;

grunt>dump d;

(Pandu,{(102,Bheema,Pandu,Kunti,Hidimbi)})
(Dhritarashtra,{(101,Duryodhana,Dhritarashtra,Gandhari,Bhanumati)})

Wow. Pig saves my time like anything!

See you in another interesting post.

Hadoop Eco System Installation – Contents

Here is the list of pages, that can help you to install Hadoop and its ecosystem products

Distributed HBASE & ZooKeeper Installation and Configuration

Hue Installation and Configuration

Pig Installation and Configuration

2000px-wikipedia-logo-v2-en-svg

Apache Pig is a high-level platform for creating programs that run on Apache Hadoop. The language for this platform is called Pig Latin. Pig can execute its Hadoop jobs in MapReduce, Apache Tez, or Apache Spark. Pig Latin abstracts the programming from the Java MapReduce idiom into a notation which makes MapReduce programming high level, similar to that of SQL for RDBMSs. Pig Latin can be extended using User Defined Functions (UDFs) which the user can write in Java, Python, JavaScript, Ruby or Groovy and then call directly from the language.

This is the continuation of my Hadoop series. This will follow the folder structures created earlier. Pls refer to the following posts.

Download and Extract

hadoop@gandhari:~$ wget http://download.nus.edu.sg/mirror/apache/pig/pig-0.16.0/pig-0.16.0.tar.gz
hadoop@gandhari:~$ gunzip pig-0.16.0.tar.gz
hadoop@gandhari:~$ tar -xvf pig-0.16.0.tar
hadoop@gandhari:~$ ln -s pig-0.16.0 pig

.bashrc Changes

Add the following environmental variables to your ~/.bashrc

#PIG VARIABLES
export PIG_CONF_DIR=/etc/hadoop/conf
export PIG_CLASSPATH=/etc/hadoop/conf
export PIG_HOME=/opt/hadoop/pig
export PATH=$PATH:/opt/hadoop/pig/bin

Sample Pig Job

Let’s copy a file using hadoop and read it with Pig

hadoop@gandhari:~$ ls -alt>test.txt
hadoop@gandhari:~$ hadoop fs -mkdir /pigdata
hadoop@gandhari:~$ hadoop fs -put test.txt /pigdata

Let’s enure the jobs are running.

hadoop@gandhari:~$ jps
7156 ResourceManager
6788 DataNode
6998 SecondaryNameNode
7846 Jps
7276 NodeManager
6671 NameNode

hadoop@gandhari:~$ pig

grunt> a = LOAD ‘/data/test.txt’ as (name:chararray);

grunt> dump a;

2016-08-24 14:01:27,845 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil – Total input paths to process : 1
(total 452532)
(drwxr-xr-x 15 hadoop hadoop      4096 Aug 24 06:00 .)
(-rw-rw-r–  1 hadoop hadoop         0 Aug 24 06:00 text.txt)
(drwxrwxr-x  3 hadoop hadoop      4096 Aug 24 05:58 logs)
(-rw-rw-r–  1 hadoop hadoop       139 Aug 23 16:09 .hivehistory)
(-rw-r–r–  1 hadoop hadoop      4567 Aug 23 15:59 .bashrc)