Apache Pig with Examples

Hi Hadoopers,

I’m glad to convey my Mattu Pongal wishes to you. This post will help you to quickly go through the different operators and commands of Apache Pig.
Input files

pig-hadoop

Data Preparation

We have two files as inputs. Following is the employee records

$ cat employee
Naranbhai						 55					 Director							 Gujarat 1-1-1990
Prabhaben						 34					 Engineer							 Maharashtra				 1-2-1990
Sarania 47					 Manager Goa				 1-3-1990
Chintaman						 29					 Asst Engineer		 Karnataka						 1-4-1990
Sakuntala						 19					 Jr Engineer				 Tamilnadu						 1-5-1990

This contains 4 fields – Name, age, designation, State and Date of joining.

Following is the student records.

$ cat student
Vikram	 13					 Grade VII						 Gujarat 1-1-2003
Dalpat	 15					 Grade X Maharashtra				 1-2-2005
Sarania 10					 Grade V Goa				 1-3-2007
Vasava	 8						 Grade III						 Karnataka						 1-5-2009
Chavan	 9						 Grade IV							 Tamilnadu						 1-4-2012

Each record contains 4 fields – Name, age, class, state and date of birth.
Copying the files to HDFS

Let’s create a folder in HDFS and copy our input files.

$ hadoop fs -mkdir /user/cloudera/pig
$ hadoop fs -put * /user/cloudera/pig
$ hadoop fs -ls /user/cloudera/pig
Found 2 items
-rw-r--r--		 1 cloudera cloudera							 205 2017-01-14 20:51 /user/cloudera/pig/employee
-rw-r--r--		 1 cloudera cloudera							 183 2017-01-14 20:51 /user/cloudera/pig/student

Let’s verify the data of the files.

$ hadoop fs -cat /user/cloudera/pig/employee
Naranbhai						 55					 Director							 Gujarat 1-1-1990
Prabhaben						 34					 Engineer							 Maharashtra				 1-2-1990
Sarania 47					 Manager Goa				 1-3-1990
Chintaman						 29					 Asst Engineer		 Karnataka						 1-4-1990
Sakuntala						 19					 Jr Engineer				 Tamilnadu						 1-5-1990

$ hadoop fs -cat /user/cloudera/pig/student
Vikram	 13					 Grade VII						 Gujarat 1-1-2003
Dalpat	 15					 Grade X Maharashtra				 1-2-2005
Sarania 10					 Grade V Goa				 1-3-2007
Vasava	 8						 Grade III						 Karnataka						 1-5-2009
Chavan	 9						 Grade IV							 Tamilnadu						 1-4-2012

Opening Grunt Shell

The following command will open the grunt shell in mapreduce mode.

$ pig -x mapreduce
grunt>

Loading the employee record in Pig – LOAD

Let’s create the references for our input files using LOAD operator.

grunt> employee = LOAD '/user/cloudera/pig/employee' USING PigStorage ('\t') as (name:chararray,age:int,designation:chararray,state:chararray,doj:chararray);

grunt> describe employee;
employee: {name: chararray,age: int,designation: chararray,state: chararray,doj: chararray}

grunt> illustrate employee;
-------------------------------------------------------------------------------------------------------------------------
| employee                 | name:chararray                 | age:int                 | designation:chararray                 | state:chararray                 | doj:chararray                 |
-------------------------------------------------------------------------------------------------------------------------
|                                                     | Naranbhai                                     | 55                                     | Director                                                                     | Gujarat                                                 | 1-1-1990                                     |
-------------------------------------------------------------------------------------------------------------------------

grunt> dump employee;
(Naranbhai,55,Director,Gujarat,1-1-1990)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990)
(Sarania,47,Manager,Goa,1-3-1990)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990)
(,,,,)

Loading the student record in Pig – LOAD

Here, we load the relation for the student.

grunt> student = LOAD '/user/cloudera/pig/student' USING PigStorage('\t') as (name:chararray,age:int,class:chararray,state:chararray,dob:chararray);

grunt> describe student;
student: {name: chararray,age: int,class: chararray,state: chararray,dob: chararray}

grunt> illustrate student;
-------------------------------------------------------------------------------------------------------------
| student				 | name:chararray			 | age:int			 | class:chararray			 | state:chararray			 | dob:chararray			 |
-------------------------------------------------------------------------------------------------------------
|												 | Vasava											 | 8									 | Grade III									 | Karnataka									 | 1-5-2009								 |
-------------------------------------------------------------------------------------------------------------

Grouping the student by their age – GROUP

We have defined the relations for the input. Lets try to apply the transforming logic one by one. Here is an example. We group the student by age.

grunt> groupStudentByAge = group student by age;

grunt> dump groupStudentByAge;
HadoopVersion         PigVersion                     UserId     StartedAt                         FinishedAt                     Features
2.6.0-cdh5.8.0     0.12.0-cdh5.8.0 cloudera                             2017-01-14 22:54:37                 2017-01-14 22:55:08                 GROUP_BY

Success!

Job Stats (time in seconds):
JobId         Maps             Reduces MaxMapTime                     MinMapTIme                     AvgMapTime                     MedianMapTime         MaxReduceTime         MinReduceTime         AvgReduceTime         MedianReducetime                             Alias         Feature Outputs
job_1484398440913_0005     1                         1                         4                         4                         4                         4                         4                         4                         4                         4                         groupStudentByAge,student                         GROUP_BY                             hdfs://quickstart.cloudera:8020/tmp/temp-676874408/tmp1007195946,

Input(s):
Successfully read 5 records (559 bytes) from: "/user/cloudera/pig/student"

Output(s):
Successfully stored 5 records (265 bytes) in: "hdfs://quickstart.cloudera:8020/tmp/temp-676874408/tmp1007195946"

Counters:
Total records written : 5
Total bytes written : 265
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_1484398440913_0005

(8,{(Vasava,8,Grade III,Karnataka,1-5-2009)})
(9,{(Chavan,9,Grade IV,Tamilnadu,1-4-2012)})
(10,{(Sarania,10,Grade V,Goa,1-3-2007)})
(13,{(Vikram,13,Grade VII,Gujarat,1-1-2003)})
(15,{(Dalpat,15,Grade X,Maharashtra,1-2-2005)})

Similarly, lets group the student by state;

grunt> dump groupStudentByState;
(Goa,{(Sarania,10,Grade V,Goa,1-3-2007)})
(Gujarat,{(Vikram,13,Grade VII,Gujarat,1-1-2003)})
(Karnataka,{(Vasava,8,Grade III,Karnataka,1-5-2009)})
(Tamilnadu,{(Chavan,9,Grade IV,Tamilnadu,1-4-2012)})
(Maharashtra,{(Dalpat,15,Grade X,Maharashtra,1-2-2005)})

Here is how we group using two fields.


grunt> groupStudentByAgeState = group student by (age, state);

grunt> dump groupStudentByAgeState;
((8,Karnataka),{(Vasava,8,Grade III,Karnataka,1-5-2009)})
((9,Tamilnadu),{(Chavan,9,Grade IV,Tamilnadu,1-4-2012)})
((10,Goa),{(Sarania,10,Grade V,Goa,1-3-2007)})
((13,Gujarat),{(Vikram,13,Grade VII,Gujarat,1-1-2003)})
((15,Maharashtra),{(Dalpat,15,Grade X,Maharashtra,1-2-2005)})

grunt> cogroupByAge = cogroup student by age, employee by age;

grunt> dump cogroupByAge;
(8,{(Vasava,8,Grade III,Karnataka,1-5-2009)},{})
(9,{(Chavan,9,Grade IV,Tamilnadu,1-4-2012)},{})
(10,{(Sarania,10,Grade V,Goa,1-3-2007)},{})
(13,{(Vikram,13,Grade VII,Gujarat,1-1-2003)},{})
(15,{(Dalpat,15,Grade X,Maharashtra,1-2-2005)},{})
(19,{},{(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990)})
(29,{},{(Chintaman,29,Asst Engineer,Karnataka,1-4-1990)})
(34,{},{(Prabhaben,34,Engineer,Maharashtra,1-2-1990)})
(47,{},{(Sarania,47,Manager,Goa,1-3-1990)})
(55,{},{(Naranbhai,55,Director,Gujarat,1-1-1990)})
(,{},{(,,,,)})

Self join

Let’s create joins operations now.

We need another file for join operation. The following file may help us.

$ cat salary
Naranbhai						 5500
Prabhaben						 3000
Sarania								 3300
Chintaman						 2900
Sakuntala						 1900

We’ll join employee table with salary table. Let’s copy to HDFS first.

$ hadoop fs -put salary /user/cloudera/pig

Here is the definition for salary.

grunt> salary = load '/user/cloudera/pig/salary' USING PigStorage('\t') as (name:chararray,salary:int);

grunt> illustrate salary;
------------------------------------------------------
| salary     | name:chararray      | salary:int      |
------------------------------------------------------
|            | Naranbhai           | 5500            |
------------------------------------------------------

grunt> employeeSalary = join employee by name, salary by name;

grunt> describe employeeSalary;
employeeSalary: {employee::name: chararray,employee::age: int,employee::designation: chararray,employee::state: chararray,employee::doj: chararray,salary::name: chararray,salary::salary: int}

grunt> illustrate employeeSalary;
------------------------------------------------------------------------------------------------------------------------------
| employee     | name:chararray      | age:int      | designation:chararray      | state:chararray      | doj:chararray      |
------------------------------------------------------------------------------------------------------------------------------
|              | Chintaman           | 29           | Asst Engineer              | Karnataka            | 1-4-1990           |
|              | Chintaman           | 29           | Asst Engineer              | Karnataka            | 1-4-1990           |
------------------------------------------------------------------------------------------------------------------------------
------------------------------------------------------
| salary     | name:chararray      | salary:int      |
------------------------------------------------------
|            | Chintaman           | 2900            |
|            | Chintaman           | 2900            |
------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| employeeSalary     | employee::name:chararray      | employee::age:int      | employee::designation:chararray      | employee::state:chararray      | employee::doj:chararray      | salary::name:chararray      | salary::salary:int      |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|                    | Chintaman                     | 29                     | Asst Engineer                        | Karnataka                      | 1-4-1990                     | Chintaman                   | 2900                    |
|                    | Chintaman                     | 29                     | Asst Engineer                        | Karnataka                      | 1-4-1990                     | Chintaman                   | 2900                    |
|                    | Chintaman                     | 29                     | Asst Engineer                        | Karnataka                      | 1-4-1990                     | Chintaman                   | 2900                    |
|                    | Chintaman                     | 29                     | Asst Engineer                        | Karnataka                      | 1-4-1990                     | Chintaman                   | 2900                    |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

grunt> dump employeeSalary;
(Sarania,47,Manager,Goa,1-3-1990,Sarania,3300)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990,Chintaman,2900)
(Naranbhai,55,Director,Gujarat,1-1-1990,Naranbhai,5500)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990,Prabhaben,3000)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990,Sakuntala,1900)

Left Outer Join

How to perform a left outer join in Pig – Choose all records from left table and matching records from right table.

grunt> leftOuter = join employee by name left outer, salary by name;

grunt> dump leftOuter;
(Sarania,47,Manager,Goa,1-3-1990,Sarania,3300)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990,Chintaman,2900)
(Naranbhai,55,Director,Gujarat,1-1-1990,Naranbhai,5500)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990,Prabhaben,3000)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990,Sakuntala,1900)
(,,,,,,)

Right Outer Join

Let’s perform a right outer join – to choose all records from right table and matching records from left table.

grunt> rightJoin = join employee by name right, salary by name;

grunt> dump rightJoin;
(Sarania,47,Manager,Goa,1-3-1990,Sarania,3300)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990,Chintaman,2900)
(Naranbhai,55,Director,Gujarat,1-1-1990,Naranbhai,5500)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990,Prabhaben,3000)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990,Sakuntala,1900)
(,,,,,,)

Full Outer Join

Lets perform a full outer join now. As long as there is a match, it will pickup the data.

grunt> fullOuter = join employee by name full outer, salary by name;
grunt> dump fullOuter;

(Sarania,47,Manager,Goa,1-3-1990,Sarania,3300)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990,Chintaman,2900)
(Naranbhai,55,Director,Gujarat,1-1-1990,Naranbhai,5500)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990,Prabhaben,3000)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990,Sakuntala,1900)
(,,,,,,)
(,,,,,,)

Crossing

Let’s perform a cartesian joins of two tables employee and salary using CROSS operator.

grunt> crossData = cross employee, salary;
grunt> dump crossData;
(,,,,,,)
(,,,,,Sakuntala,1900)
(,,,,,Chintaman,2900)
(,,,,,Sarania,3300)
(,,,,,Prabhaben,3000)
(,,,,,Naranbhai,5500)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990,,)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990,Sakuntala,1900)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990,Chintaman,2900)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990,Sarania,3300)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990,Prabhaben,3000)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990,Naranbhai,5500)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990,,)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990,Sakuntala,1900)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990,Chintaman,2900)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990,Sarania,3300)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990,Prabhaben,3000)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990,Naranbhai,5500)
(Sarania,47,Manager,Goa,1-3-1990,,)
(Sarania,47,Manager,Goa,1-3-1990,Sakuntala,1900)
(Sarania,47,Manager,Goa,1-3-1990,Chintaman,2900)
(Sarania,47,Manager,Goa,1-3-1990,Sarania,3300)
(Sarania,47,Manager,Goa,1-3-1990,Prabhaben,3000)
(Sarania,47,Manager,Goa,1-3-1990,Naranbhai,5500)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990,,)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990,Sakuntala,1900)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990,Chintaman,2900)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990,Sarania,3300)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990,Prabhaben,3000)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990,Naranbhai,5500)
(Naranbhai,55,Director,Gujarat,1-1-1990,,)
(Naranbhai,55,Director,Gujarat,1-1-1990,Sakuntala,1900)
(Naranbhai,55,Director,Gujarat,1-1-1990,Chintaman,2900)
(Naranbhai,55,Director,Gujarat,1-1-1990,Sarania,3300)
(Naranbhai,55,Director,Gujarat,1-1-1990,Prabhaben,3000)
(Naranbhai,55,Director,Gujarat,1-1-1990,Naranbhai,5500)

Union

Lets create another list of employees

$ cat employee_retired
Manoj						 70					 Asst Engineer		 Jammu Kashmir						 1-9-1983

Copy the employee_retired to HDFS

$ hadoop fs -put employee_retired /user/cloudera/pig

grunt> employee_retired = load '/user/cloudera/pig/employee_retired' using PigStorage('\t') as (name: chararray,age: int,designation: chararray,state: chararray,doj: chararray);

grunt> illustrate employee_retired;
--------------------------------------------------------------------------------------------------------------------------------------
| employee_retired     | name:chararray      | age:int      | designation:chararray      | state:chararray      | doj:chararray      |
--------------------------------------------------------------------------------------------------------------------------------------
|                      | Manoj               | 70           | Asst Engineer              | Jammu Kashmir        | 1-9-1983           |
--------------------------------------------------------------------------------------------------------------------------------------

grunt> allEmployee = union employee, employee_retired;

grunt> illustrate allEmployee;
------------------------------------------------------------------------------------------------------------------------------
| employee     | name:chararray      | age:int      | designation:chararray      | state:chararray      | doj:chararray      |
------------------------------------------------------------------------------------------------------------------------------
|              | Chintaman           | 29           | Asst Engineer              | Karnataka            | 1-4-1990           |
------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------
| employee_retired     | name:chararray      | age:int      | designation:chararray      | state:chararray      | doj:chararray      |
--------------------------------------------------------------------------------------------------------------------------------------
|                      | Manoj               | 70           | Asst Engineer              | Jammu Kashmir        | 1-9-1983           |
--------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------------
| allEmployee     | name:chararray      | age:int      | designation:chararray      | state:chararray      | doj:chararray      |
---------------------------------------------------------------------------------------------------------------------------------
|                 | Chintaman           | 29           | Asst Engineer              | Karnataka            | 1-4-1990           |
|                 | Manoj               | 70           | Asst Engineer              | Jammu Kashmir        | 1-9-1983           |
---------------------------------------------------------------------------------------------------------------------------------

grunt> dump allEmployee;
HadoopVersion		 PigVersion					 UserId	 StartedAt						 FinishedAt					 Features
2.6.0-cdh5.8.0	 0.12.0-cdh5.8.0 cloudera							 2017-01-16 14:12:36				 2017-01-16 14:13:29				 UNION

Success!

Job Stats (time in seconds):
JobId		 Maps			 Reduces MaxMapTime					 MinMapTIme					 AvgMapTime					 MedianMapTime		 MaxReduceTime		 MinReduceTime		 AvgReduceTime		 MedianReducetime							 Alias	 Feature	 Outputs
job_1484398440913_0020	 2						 0						 29					 27					 28					 28					 n/a				 n/a				 n/a				 n/a				 allEmployee,employee,employee_retired		 MAP_ONLY							 hdfs://quickstart.cloudera:8020/tmp/temp1173368828/tmp409375138,

Input(s):
Successfully read 6 records from: "/user/cloudera/pig/employee"
Successfully read 1 records from: "/user/cloudera/pig/employee_retired"

Output(s):
Successfully stored 7 records (325 bytes) in: "hdfs://quickstart.cloudera:8020/tmp/temp1173368828/tmp409375138"

Counters:
Total records written : 7
Total bytes written : 325
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_1484398440913_0020

(Naranbhai,55,Director,Gujarat,1-1-1990)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990)
(Sarania,47,Manager,Goa,1-3-1990)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990)
(,,,,)
(Manoj,70,Asst Engineer,Jammu Kashmir,1-9-1983)

Split

Split is the powerful operator, which inspires the ETL specialists. We split the relations into multiple relations based on conditional statements.

Lets split allEmployee based on age of the employees.


grunt> split employee into employee40less if age<40, employee40plus if (age>=40);

grunt> dump employee40less;
(Prabhaben,34,Engineer,Maharashtra,1-2-1990)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990)

grunt>dump employee40plus;
(Naranbhai,55,Director,Gujarat,1-1-1990)
(Sarania,47,Manager,Goa,1-3-1990)

grunt> goaEmployees = filter employee by state == 'Goa';

grunt> dump goaEmployees;
(Sarania,47,Manager,Goa,1-3-1990)

Distinct

As the name employees, it will fetch the distinct records from the given relation.

grunt> distinctEmployee = distinct employee;

grunt> dump distinctEmployee;
(Sarania,47,Manager,Goa,1-3-1990)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990)
(Naranbhai,55,Director,Gujarat,1-1-1990)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990)
(,,,,)

ForEach

Here is a powerful operator for your ETL process. Let’s iterate through student records

grunt> studentRecord = foreach student generate name, age;
grunt> dump studentRecord;
(Vikram,13)
(Dalpat,15)
(Sarania,10)
(Vasava,8)
(Chavan,9)

Order By

Let’s order the records in descending order of student names.

grunt> orderedStudent = order student by name desc;
grunt> dump orderedStudent;
(Vikram,13,Grade VII,Gujarat,1-1-2003)
(Vasava,8,Grade III,Karnataka,1-5-2009)
(Sarania,10,Grade V,Goa,1-3-2007)
(Dalpat,15,Grade X,Maharashtra,1-2-2005)
(Chavan,9,Grade IV,Tamilnadu,1-4-2012)

Limit records

Select top 2 students

grunt> twoStudent = limit orderedStudent 2;

grunt> dump twoStudent;
(Vikram,13,Grade VII,Gujarat,1-1-2003)
(Vasava,8,Grade III,Karnataka,1-5-2009)

Conditional Split

Store the output as flat file

grunt> store employeeSalary into '/user/cloudera/pig/employeeSalary' using PigStorage('\t');

HadoopVersion		 PigVersion					 UserId	 StartedAt						 FinishedAt					 Features
2.6.0-cdh5.8.0	 0.12.0-cdh5.8.0 cloudera							 2017-01-15 13:08:57				 2017-01-15 13:09:38				 HASH_JOIN

Success!

Job Stats (time in seconds):
JobId		 Maps			 Reduces MaxMapTime					 MinMapTIme					 AvgMapTime					 MedianMapTime		 MaxReduceTime		 MinReduceTime		 AvgReduceTime		 MedianReducetime							 Alias	 Feature	 Outputs
job_1484398440913_0010	 2						 1						 8						 7						 8						 8						 4						 4						 4						 4						 employee,employeeSalary,salary	 HASH_JOIN						 /user/cloudera/pig/employeeSalary,

Input(s):
Successfully read 6 records from: "/user/cloudera/pig/salary"
Successfully read 6 records from: "/user/cloudera/pig/employee"

Output(s):
Successfully stored 5 records (273 bytes) in: "/user/cloudera/pig/employeeSalary"

Counters:
Total records written : 5
Total bytes written : 273
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_1484398440913_0010

Average – AVG()

Let’s see how to calculate the average using AVG() method. Let’s group-all employee table and iterate the records using for-each.

grunt> allEmployee = group employee all;

grunt> dump allEmployee;
(all,{(,,,,),(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990),(Chintaman,29,Asst Engineer,Karnataka,1-4-1990),(Sarania,47,Manager,Goa,1-3-1990),(Prabhaben,34,Engineer,Maharashtra,1-2-1990),(Naranbhai,55,Director,Gujarat,1-1-1990)})

grunt> avgEmployeeAge = foreach allEmployee generate (employee.name), AVG(employee.age);
({(),(Sakuntala),(Chintaman),(Sarania),(Prabhaben),(Naranbhai)},36.8)

Concatenation of string – CONCAT()

As the title implies, we’d concatenate two fields of the record using CONCAT().

grunt> employeeState = foreach employee generate CONCAT(name, state);

grunt> dump employeeState;
(NaranbhaiGujarat)
(PrabhabenMaharashtra)
(SaraniaGoa)
(ChintamanKarnataka)
(SakuntalaTamilnadu)
()

grunt> employeeState = foreach employee generate CONCAT(name, ' - ', state);
grunt> dump employeeState;
(Naranbhai - Gujarat)
(Prabhaben - Maharashtra)
(Sarania - Goa)
(Chintaman - Karnataka)
(Sakuntala - Tamilnadu)
()

Count of records – COUNT()

We need perform a all-group to use COUNT(). Later

grunt> employeeCount = foreach allEmployee generate COUNT(employee.name);

grunt> dump employeeCount;
(5)

This will ignore the null records as the first field. To include null records also use COUNT_STAR().

Is Empty – IsEmpty()

This is another important function in ETL. I have given multiple examples below to show the capabililty of Pig.

(all,{(,,,,),(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990),(Chintaman,29,Asst Engineer,Karnataka,1-4-1990),(Sarania,47,Manager,Goa,1-3-1990),(Prabhaben,34,Engineer,Maharashtra,1-2-1990),(Naranbhai,55,Director,Gujarat,1-1-1990)})

grunt> isEmpty = foreach allEmployee generate IsEmpty(employee.name);

grunt> dump isEmpty;
(false)

grunt> cogroupByAge = cogroup student by age, employee by age;

grunt> dump cogroupByAge;
(8,{(Vasava,8,Grade III,Karnataka,1-5-2009)},{})
(9,{(Chavan,9,Grade IV,Tamilnadu,1-4-2012)},{})
(10,{(Sarania,10,Grade V,Goa,1-3-2007)},{})
(13,{(Vikram,13,Grade VII,Gujarat,1-1-2003)},{})
(15,{(Dalpat,15,Grade X,Maharashtra,1-2-2005)},{})
(19,{},{(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990)})
(29,{},{(Chintaman,29,Asst Engineer,Karnataka,1-4-1990)})
(34,{},{(Prabhaben,34,Engineer,Maharashtra,1-2-1990)})
(47,{},{(Sarania,47,Manager,Goa,1-3-1990)})
(55,{},{(Naranbhai,55,Director,Gujarat,1-1-1990)})
(,{},{(,,,,)})

grunt> isEmpty = filter cogroupByAge by IsEmpty(employee.name);
(8,{(Vasava,8,Grade III,Karnataka,1-5-2009)},{})
(9,{(Chavan,9,Grade IV,Tamilnadu,1-4-2012)},{})
(10,{(Sarania,10,Grade V,Goa,1-3-2007)},{})
(13,{(Vikram,13,Grade VII,Gujarat,1-1-2003)},{})
(15,{(Dalpat,15,Grade X,Maharashtra,1-2-2005)},{})

grunt> isEmpty = filter cogroupByAge by IsEmpty(student.name);

grunt> dump isEmpty;
(19,{},{(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990)})
(29,{},{(Chintaman,29,Asst Engineer,Karnataka,1-4-1990)})
(34,{},{(Prabhaben,34,Engineer,Maharashtra,1-2-1990)})
(47,{},{(Sarania,47,Manager,Goa,1-3-1990)})
(55,{},{(Naranbhai,55,Director,Gujarat,1-1-1990)})
(,{},{(,,,,)})

grunt> isEmpty = filter cogroupByAge by IsEmpty(employee);

grunt> dump isEmpty;
(8,{(Vasava,8,Grade III,Karnataka,1-5-2009)},{})
(9,{(Chavan,9,Grade IV,Tamilnadu,1-4-2012)},{})
(10,{(Sarania,10,Grade V,Goa,1-3-2007)},{})
(13,{(Vikram,13,Grade VII,Gujarat,1-1-2003)},{})
(15,{(Dalpat,15,Grade X,Maharashtra,1-2-2005)},{})

Find the Minimum or Maximum – MAX() MIN()

To perform min/max operations, we need group-all data as we did above for IsEmpty.

(all,{(,,,,),(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990),(Chintaman,29,Asst Engineer,Karnataka,1-4-1990),(Sarania,47,Manager,Goa,1-3-1990),(Prabhaben,34,Engineer,Maharashtra,1-2-1990),(Naranbhai,55,Director,Gujarat,1-1-1990)})

grunt> maxAge = foreach allEmployee generate (employee.name, employee.age), MAX(employee.age);

grunt> dump maxAge;
(({(),(Sakuntala),(Chintaman),(Sarania),(Prabhaben),(Naranbhai)},{(),(19),(29),(47),(34),(55)}),55)

grunt> minAge = foreach allEmployee generate (employee.name, employee.age), MIN(employee.age);

grunt> dump minAge;
(({(),(Sakuntala),(Chintaman),(Sarania),(Prabhaben),(Naranbhai)},{(),(19),(29),(47),(34),(55)}),19)

PluckTuple

grunt> salary = load '/user/cloudera/pig/salary' USING PigStorage('\t') as (name:chararray,salary:int);

grunt> employeeSalary = join employee by name, salary by name;

grunt> dump employeeSalary;
(Sarania,47,Manager,Goa,1-3-1990,Sarania,3300)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990,Chintaman,2900)
(Naranbhai,55,Director,Gujarat,1-1-1990,Naranbhai,5500)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990,Prabhaben,3000)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990,Sakuntala,1900)

grunt> define pluck PluckTuple('a::');

grunt> pluckedData = foreach employeeSalary generate FLATTEN(pluck(*));

grunt> describe pluckedData
Schema for pluckedData unknown.

grunt> dump pluckedData;
()
()
()
()
()

String length – SIZE()

Another interesting function SIZE() computes the length of the string. To calculate the number of character in the employee name, we shall issue the following commands.

grunt> sizeData = foreach employee generate name, SIZE(name);

grunt> dump sizeData;
(Naranbhai,9)
(Prabhaben,9)
(Sarania,7)
(Chintaman,9)
(Sakuntala,9)
(,)

Subtract /remove all – SUBTRACT()

Let’s co-group two tables and subtract from one another as given below.

(8,{(Vasava,8,Grade III,Karnataka,1-5-2009)},{})
(9,{(Chavan,9,Grade IV,Tamilnadu,1-4-2012)},{})
(10,{(Sarania,10,Grade V,Goa,1-3-2007)},{})
(13,{(Vikram,13,Grade VII,Gujarat,1-1-2003)},{})
(15,{(Dalpat,15,Grade X,Maharashtra,1-2-2005)},{})
(19,{},{(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990)})
(29,{},{(Chintaman,29,Asst Engineer,Karnataka,1-4-1990)})
(34,{},{(Prabhaben,34,Engineer,Maharashtra,1-2-1990)})
(47,{},{(Sarania,47,Manager,Goa,1-3-1990)})
(55,{},{(Naranbhai,55,Director,Gujarat,1-1-1990)})
(,{},{(,,,,)})

grunt> subtractData = foreach cogroupByAge generate (student, employee);

grunt> dump subtractData;
(({(Vasava,8,Grade III,Karnataka,1-5-2009)},{}))
(({(Chavan,9,Grade IV,Tamilnadu,1-4-2012)},{}))
(({(Sarania,10,Grade V,Goa,1-3-2007)},{}))
(({(Vikram,13,Grade VII,Gujarat,1-1-2003)},{}))
(({(Dalpat,15,Grade X,Maharashtra,1-2-2005)},{}))
(({},{(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990)}))
(({},{(Chintaman,29,Asst Engineer,Karnataka,1-4-1990)}))
(({},{(Prabhaben,34,Engineer,Maharashtra,1-2-1990)}))
(({},{(Sarania,47,Manager,Goa,1-3-1990)}))
(({},{(Naranbhai,55,Director,Gujarat,1-1-1990)}))

Sum of numbers – SUM()

As the name implies, let’s sum of the ages of all employees.

grunt> maxAge = foreach allEmployee generate (employee.name, employee.age), SUM(employee.age);

grunt> dump maxAge;
(({(),(Sakuntala),(Chintaman),(Sarania),(Prabhaben),(Naranbhai)},{(),(19),(29),(47),(34),(55)}),184)

So the sum of all ages is 184.

Split the string – TOKENIZE() STRSPLIT()

Let’s tokenize the fields.

grunt> fs -cat /user/cloudera/pig/employee
Naranbhai     55     Director     Gujarat    1-1-1990
Prabhaben     34     Engineer     Maharashtra     1-2-1990
Sarania 47     Manager Goa     1-3-1990
Chintaman     29     Asst Engineer     Karnataka     1-4-1990
Sakuntala     19     Jr Engineer     Tamilnadu     1-5-1990

grunt> describe employee;
employee: {name: chararray,age: int,designation: chararray,state: chararray,doj: chararray}

grunt> tokenizeEmployee = foreach employee generate TOKENIZE(doj);

grunt> dump tokenizeEmployee;
({(1-1-1990)})
({(1-2-1990)})
({(1-3-1990)})
({(1-4-1990)})
({(1-5-1990)})
()

Symbols including space [], double quote [” “], coma [ , ], parenthesis [ () ], star [ * ] are used as delimeters. In the above example, we d’t have any such symbols. So we see only one token.

Let’s use another method STRSPLIT() now to split the field with – as token. Number 5 represents number of fields.

grunt> tokenizeEmployee = foreach employee generate flatten(STRSPLIT(doj, '-', 5));

grunt> dump tokenizeEmployee;
(1,1,1990)
(1,2,1990)
(1,3,1990)
(1,4,1990)
(1,5,1990)
()

STARTSWITH() ENDSWITH()

This is one of the slowest functions I have executed this week. Even for 5 records, it took 2 minutes to complete the job. Here you go.

grunt> employeeEndsWith = foreach employee generate (name), ENDSWITH (name, 'n');

grunt> dump employeeEndsWith;
(Naranbhai,false)
(Prabhaben,true)
(Sarania,false)
(Chintaman,true)
(Sakuntala,false)
(,)

grunt> employeeStartsWith = foreach employee generate (name), STARTSWITH (name , 'N');

grunt> dump employeeStartsWith;
(Naranbhai,true)
(Prabhaben,false)
(Sarania,false)
(Chintaman,false)
(Sakuntala,false)
(,)

SUBSTRING()

substring with beginning and ending indexes.

grunt> employeeSubstring = foreach employee generate (name), SUBSTRING (name, 0, 2);
grunt> dump employeeSubstring;
(Naranbhai,Na)
(Prabhaben,Pr)
(Sarania,Sa)
(Chintaman,Ch)
(Sakuntala,Sa)
(,)

EqualsIgnoreCase()

As the name implies, we’d check the text equality for the fields using this method. Here we’ll iterate employee data and find if the name equals Sarania.

grunt> equals_data = FOREACH employee GENERATE (name,age), EqualsIgnoreCase(name,'Sarania');
grunt> dump equals_data;
((Naranbhai,55),false)
((Prabhaben,34),false)
((Sarania,47),true)
((Chintaman,29),false)
((Sakuntala,19),false)

INDEXOF()/LAST_INDEX_OF()

find the index of the specified String.

grunt> indexOfData = FOREACH employee GENERATE
>> (name), INDEXOF (name, 'a', 0);
grunt> dump indexOfData;
(Naranbhai,1)
(Prabhaben,2)
(Sarania,1)
(Chintaman,5)
(Sakuntala,1)

Similarly for last index –

grunt> indexOfData = FOREACH employee GENERATE (name, age), LAST_INDEX_OF(name, 'a');
grunt> dump indexOfData;
((Naranbhai,55),7)
((Prabhaben,34),5)
((Sarania,47),6)
((Chintaman,29),7)
((Sakuntala,19),8)

LCFIRST()

Convert the first letter into lower case

grunt> lcFirstData = FOREACH employee GENERATE (name, age), LCFIRST(name);
grunt> dump lcFirstData;
((Naranbhai,55),naranbhai)
((Prabhaben,34),prabhaben)
((Sarania,47),sarania)
((Chintaman,29),chintaman)
((Sakuntala,19),sakuntala)

UPPER()

Converting the lower case to upper case.

grunt> upper_data = foreach employee generate (name, age), UPPER(name);
grunt> dump upper_data;
((Naranbhai,55),NARANBHAI)
((Prabhaben,34),PRABHABEN)
((Sarania,47),SARANIA)
((Chintaman,29),CHINTAMAN)
((Sakuntala,19),SAKUNTALA)

LOWER()

Converting to lower case

grunt> lower_data = foreach employee generate (name), LOWER(name);
grunt> dump lower_data;
(Naranbhai,naranbhai)
(Prabhaben,prabhaben)
(Sarania,sarania)
(Chintaman,chintaman)
(Sakuntala,sakuntala)

REPLACE()

Replacing String

grunt> replaceData = foreach employee generate (name), REPLACE (name, 'Sa', 'SA');
grunt> dump replaceData;
(Naranbhai,Naranbhai)
(Prabhaben,Prabhaben)
(Sarania,SArania)
(Chintaman,Chintaman)
(Sakuntala,SAkuntala)

STRSPLIT()

Split the text

grunt> strsplitData = foreach employee generate (name), STRSPLIT (name, 'a', 2);
grunt> dump strsplitData;
(Naranbhai,(N,ranbhai))
(Prabhaben,(Pr,bhaben))
(Sarania,(S,rania))
(Chintaman,(Chint,man))
(Sakuntala,(S,kuntala))

grunt> strsplitData = foreach employee generate (name), STRSPLIT (name, 'a', 3);
grunt> dump strsplitData;
(Naranbhai,(N,r,nbhai))
(Prabhaben,(Pr,bh,ben))
(Sarania,(S,r,nia))
(Chintaman,(Chint,m,n))
(Sakuntala,(S,kunt,la))

TRIM()

This is to remove the leading and trailing spaces of a field. Leading and trailing spaces alone will be removed using LTRIM() and RTRIM() respectively.

grunt> trimData = FOREACH employee GENERATE (name), TRIM(name);
grunt> dump trimData;
(Naranbhai,Naranbhai)
(Prabhaben,Prabhaben)
(Sarania,Sarania)
(Chintaman,Chintaman)
(Sakuntala,Sakuntala)

CurrentTime()

Get the current time of Hadoop system

grunt> dojData = FOREACH employee GENERATE (name), CurrentTime();
grunt> dump dojData;
(Naranbhai,2017-01-28T00:18:59.868-08:00)
(Prabhaben,2017-01-28T00:18:59.868-08:00)
(Sarania,2017-01-28T00:18:59.868-08:00)
(Chintaman,2017-01-28T00:18:59.868-08:00)
(Sakuntala,2017-01-28T00:18:59.868-08:00)

ToDate()

Convert text to DateTime.

(Naranbhai,55,Director,Gujarat,1-1-1990)
(Prabhaben,34,Engineer,Maharashtra,1-2-1990)
(Sarania,47,Manager,Goa,1-3-1990)
(Chintaman,29,Asst Engineer,Karnataka,1-4-1990)
(Sakuntala,19,Jr Engineer,Tamilnadu,1-5-1990)

grunt> describe employee;

employee: {name: chararray,age: int,designation: chararray,state: chararray,doj: chararray}

grunt> dojData = foreach employee generate (name), ToDate(doj, 'dd-MM-yyyy');

grunt> dump dojData;
(Naranbhai,1990-01-01T00:00:00.000-08:00)
(Prabhaben,1990-02-01T00:00:00.000-08:00)
(Sarania,1990-03-01T00:00:00.000-08:00)
(Chintaman,1990-04-01T00:00:00.000-08:00)
(Sakuntala,1990-05-01T00:00:00.000-07:00)

grunt> dojData = foreach employee generate (name), ToDate(doj, 'dd-MM-yyyy') as (doj:DateTime);

grunt> describe dojData;
dojData: {name: chararray,doj: datetime}

grunt> dump dojData;
(Naranbhai,1990-01-01T00:00:00.000-08:00)
(Prabhaben,1990-02-01T00:00:00.000-08:00)
(Sarania,1990-03-01T00:00:00.000-08:00)
(Chintaman,1990-04-01T00:00:00.000-08:00)
(Sakuntala,1990-05-01T00:00:00.000-07:00)

grunt> dayData = foreach dojData generate (name), GetDay(doj);

grunt> dump dayData;
(Naranbhai,1)
(Prabhaben,1)
(Sarania,1)
(Chintaman,1)
(Sakuntala,1)

grunt> hourData = foreach dojData generate (name), GetHour(doj);

grunt> dump hourData;
(Naranbhai,0)
(Prabhaben,0)
(Sarania,0)
(Chintaman,0)
(Sakuntala,0)

grunt> monthData = foreach dojData generate (name), GetMonth(doj);

grunt> dump monthData;
(Naranbhai,1)
(Prabhaben,2)
(Sarania,3)
(Chintaman,4)
(Sakuntala,5)
Advertisements