So we are looking at the 7th circle today – which is the job clean up.
MR job writes many intermediate results and junk files during the operation. Once the job is completed, these junks would occupy space on HDFS which is of no benefit any more. Hence the clean up task is launched.
- Job tracker informs all the task trackers to perform the cleanup.
- Individual task tracker cleans up the work folders
- They clean up the temporary directory
- Once the cleanup task is successful, Task Tracker ends the job by writing _SUCCESS file
We are in 6th circle today, which is the reducer function. A job is submitted by the user, which has been initiated in 2nd circle for which the setup is completed in 3rd circle.
Map Task was executed in 4th circle and sort & shuffle was completed in 5th circle.
The reducer will collect the output from all the mappers to apply the user defined reduce function.
- Task tracker launches the reduce task
- Reduce task (not reduce function) read the jar and xml of the job.
- It execute the shuffle. Because the time the reducer task started, all the mappers may not have completed the job. So it goes to individual mapper machines to collect the output and shuffles them.
- Once all the mapping activity is finished it invokes the user reducer function (one more reducers).
- Each reducers will complete their jobwrite the output records to HDFS.
- Those output would be stored in temporary output file first.
- Once all the reducers have completed their job, final output would be written to the reducer partition file.
Mappers run on individual machines and prepare intermediate results. They accept splits as inputs. Reducer accepts partition of data as inputs. Preparing the partitions from the intermediate mapper results is the responsibility of this sort & spill phase, which is the 5th circle given below.
During this phase, we find the keys from all the mappers, sort and shuffle them before sending them to one machine, where the reducer will run.
- The task tracker 1 initiates a reducer task.
- TT1 updates the job tracker about the completion status.
- Similarly TT2 or any other task trackers also updates the Job Tracker about completion status.
- The reducer task goes to TT1 where the mapping task is finished to collect the interim results.
- TT2 read the mapper output streams it to the reducer
- Task 4 and 5 is repeated for other task trackers also who are all involved in mapping tasks.
- Once the reducer received all the mapping results, it performs a sorting and spilling.
The user had submitted his job. He has permissions. We have slots in the cluster. Job setup is completed. We look at 4th circle given below – The Map Task Execution
The below given diagram depicts the Map Task Execution.
- The task tracker launches the Map Task
- The Map task read the jar file given the user. This is what we write in Eclipse. In the entire frameworks, this is what our contribution 🙂
The Map task also reads the job config (input path, output path etc). It gets everything from HDFS, as all these are already uploaded to HDFS initially.
- The Map task reads the input splits from HDFS
- From the input splits, Map task creates the record.
- The Map task invokes the user Mapper with the record
- The mapper writes intermediate output
- The task sort them based on key and flush them to disk.
- Map task informs Task Tracker about the completion of the job.
We shall talk about 3rd circle today, as we talk about Job submission and Job initialilzation already.
Scheduling the jobs is an interesting concept. I’m really excited to see the communication between Scheduler, Job tracker and Task tracker.
- The task tracker keeps on sending heartbeats to Job Tracker about the status of the job. So, it says to Job Tracker that job is completed and it wants more jobs.
- Job Tracker updates the task status and make a note of Task Tracker’s message.
- Job Tracker goes to Scheduler asking for tasks.
- Scheduler updates the tasks scheduler record. Based on job scheduling policy, either it makes the job client to wait or process the job. It is based on execution policy, priority etc.
- Job tracker gets the task.
- It submits the task to the task tracker.
I wrote about the first step of the MR Job execution – Job Submission in my earlier post.
In this post, we talk about 2nd circle, which is Job initialization.
I got the job, How will I execute it. This is what hadoop elephant is thinking with a yarn in its trunk!
- Once the job is submitted, it becomes Job Tracker’s responsibility to initialize it.
- The job xml uploaded at the staging directory created as given in my earlier post. Job Tracker reads it and perform the validation.
- Once the XML validation is completed, It goes to scheduler for job validations. Scheduler check is the user is authorized for this job, content is allowed etc.
- If the job validation is also successful, the job is added by the Scheduler. The schedule information is updated.
- Job Scheduler initializes the job.
- It reads the number of splits needed for the job to get executed.
- Tasks are created to exec the job. If we have many splits, that many map tasks would be spawned.
After publishing many posts about MapReduce code, we’ll see the MR internals like, how the MR job is submitted and executed.
This post talks about first circle – Job Submission.
We compiled the MR code and jar is ready. We execute the job with hadoop jar xxxxxx. First the job is submitted to hadoop. There are schedulers which runs the job, based on cluster capacity and availability.
I want to scribble down quick notes on Job Submission using the below given gantt diagram.
- The user submits the job to Job Client.
- Job client talks to Job Tracker to get the job id
- The job client creates a staging directory in HDFS. This is where all the files related to the job would get uploaded.
- The MR code and configurations with their 10 replicas of the blocks would be uploaded to Staging directory. Jar file of the job, job splits, split meta data and job.xml which has the job description would be uploaded.
- Splits are computed automatically and input is read.
- Meta data of split is uploaded to HDFS
- Job is submitted and it is ready to execute.
Yesterday I wrote my note on MR v1. After perceiving the problems, YARN, which is MR v2 is released.
It has lot of advantages as given below.
- MR v1 has two major daemons – Job Tracker and Task Tracker. As all the tasks are aggregated under these daemons, it may hang when we process large set of data
- It does not provide HA.
Yarn – MR2 – Process flow
- JOB SUMISSION: User executes his YARN code. You are going to launch a job. A new JVM is launched.
- JOB SUMISSION: The job is submitted to App Manager of Resource Manager.
- JOB SUMISSION: In turn, Resource manager gives you back an job identifier. Job resources are copied to the HDFS. Name node has the meta data of the resources. (It is not the file or content. It is the job detail)
- RESOURCE MGMT: Resource Manager talks to one of the machines where the node manager is running – Machine A. It will ask the node manager to start an Application Master in that machine.
- RESOURCE MGMT: Node manager accepts the call and spawns Application Master.
- RESOURCE MGMT: Application master goes to Name node to get the meta data of the job details. It calculates the resource usage, split information etc to execute the task.
- RESOURCE MGMT: It requests the RM for the application master instance ID.
- RESOURCE MGMT: The Application Manager identify machines and calculates the resource information of them. It informs the Resource Manager about the identification of the Node Manager to execute the task.
- RESOURCE MGMT: The Resource Manager gives an acknowledgement to Application manager to allocate container from the identified Node Manager.
- RESOURCE MGMT: Application Master talks to Node Manager of Machine B to inform the approval of containers to Node Manager on its box where the data lies.
- TASK EXECUTION: It creates the container. The task will be executed inside the same.
- TASK EXECUTION: The Task Container will ge the job resources from HDFS, we copied in Step #3.
- TASK EXECUTION: Task starts.
- TASK EXECUTION: It is the Application Master’s duty to monitor the container. Hence the container sends the MR status to the same. (Task related information)
- TASK EXECUTION: Node manager of Machine B will update the Resource Manager about the resource consumption (CPU/MEM..) Based on this data, Resource manager decides the Machines for another job execution.
- TASK COMPLETION: Container informs the Application Master about the job completion. Resource Manager is updated. Job queue is updated. Application master de-registers the container and the container is terminated.
Happy Weekend, Hadoopers!