Thursday, December 21, 2023

How to Read And Write SequenceFile in Hadoop

In this post we’ll see how you can read and write a sequence file using Java API in Hadoop. We’ll also see how to read and write sequence file using MapReduce.


Java program to write a sequence file

Using the createWriter() method of the SeqeunceFile you can get a writer that can then be used to write a SequenceFile. In this Java program a file from local file system is written as a SequenceFile into HDFS.

    
import java.io.File;
import java.io.IOException;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;

public class FileWriter {

 public static void main(String[] args) {
  Configuration conf = new Configuration();
  int i =0;
  try {
   FileSystem fs = FileSystem.get(conf);
   // path to input file – in local file system
   File file = new File("netjs/Hadoop/Data/log.txt");
   // Path for output file
   Path outFile = new Path(args[0]);
   IntWritable key = new IntWritable();
   Text value = new Text();
   SequenceFile.Writer writer = null;
   try {
      // creating writer
      writer = SequenceFile.createWriter(conf, Writer.file(outFile), 
      Writer.keyClass(key.getClass()), Writer.valueClass(value.getClass()), 
      Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()));
    for (String line : FileUtils.readLines(file)) {     
     key.set(i++);
     value.set(line);
     writer.append(key, value);
    }
   }finally {
    if(writer != null) {
     writer.close();
    }
   }
  
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }

}
To run this Java program in Hadoop environment export the class path where your .class file for the Java program resides.
export HADOOP_CLASSPATH=/home/netjs/eclipse-workspace/bin  
Then you can run the Java program using the following command.
hadoop org.netjs.FileWriter /user/out/data.seq

Java program to read a sequence file

To read a SequenceFile using Java API in Hadoop create an instance of SequenceFile.Reader. Using that reader instance you can iterate the (key, value) pairs in the SequenceFile using the next() method.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Text;
 
public class FileReader {
 public static void main(String[] args) {
  
  Configuration conf = new Configuration();
  try {
   Path inFile = new Path(args[0]);
   SequenceFile.Reader reader = null;
   try {
    IntWritable key = new IntWritable();
    Text value = new Text();
    reader = new SequenceFile.Reader(conf, Reader.file(inFile), Reader.bufferSize(4096));
    while(reader.next(key, value)) {
     System.out.println("Key " + key + "Value " + value);
    }
 
   }finally {
    if(reader != null) {
     reader.close();
    }
   }
  } catch (IOException e) {
   // TODO Auto-generated catch bloc
   e.printStackTrace();
  }
 }
}
Then you can read the previously written SequenceFile using the following command.
hadoop org.netjs.FileReader /user/out/data.seq

MapReduce job to write a SequenceFile

If you have a very big file and you want to take advantage of parallel processing then you can also use MapReduce to write a sequence file. Only change that is required is to set Output format class as SequenceFileOutputFormat. Also set number of reducers as 0 since you need Mapper only job.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class FileWriter extends Configured implements Tool{
    // Map function
    public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
         public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {
                 context.write(key, value);
         }
    }
    public static void main(String[] args)  throws Exception{
         int flag = ToolRunner.run(new FileWriter(), args);
         System.exit(flag);
        
    }
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "seqfilewrite");
        job.setJarByClass(FileWriter.class);
        job.setMapperClass(SFMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // If you want to compress
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
        int returnFlag = job.waitForCompletion(true) ? 0 : 1;
        return returnFlag;
    }    
}

MapReduce job to read a SequenceFile

In this case set Input format class as SequenceFileInputFormat.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
public class FileReader extends Configured implements Tool{
    // Map function
    public static class SFMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
         public void map(LongWritable key, Text value, Context context) 
                 throws IOException, InterruptedException {
                 context.write(key, value);
         }
    }
    public static void main(String[] args)  throws Exception{
         int flag = ToolRunner.run(new FileReader(), args);
         System.exit(flag);
        
    }
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "seqfileread");
        job.setJarByClass(FileReader.class);
        job.setMapperClass(SFMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        int returnFlag = job.waitForCompletion(true) ? 0 : 1;
        return returnFlag;
    }
}

That's all for this topic How to Read And Write SequenceFile in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Compressing File in snappy Format in Hadoop - Java Program
  2. Uber Mode in Hadoop
  3. Java Program to Read File in HDFS
  4. What is SafeMode in Hadoop
  5. Data Locality in Hadoop

You may also like-

  1. HDFS Commands Reference List
  2. How to Compress MapReduce Job Output in Hadoop
  3. Capacity Scheduler in YARN
  4. MapReduce Flow in YARN
  5. Java Collections Interview Questions
  6. How to Create Password Protected Zip File in Java
  7. Creating Tar File And GZipping Multiple Files - Java Program
  8. Ternary Operator in Java

Wednesday, December 20, 2023

Shuffle And Sort Phases in Hadoop MapReduce

When you run a MapReduce job and mappers start producing output internally lots of processing is done by the Hadoop framework before the reducers get their input. Hadoop framework also guarantees that the map output is sorted by keys. This whole internal processing of sorting map output and transfering it to reducers is known as shuffle phase in Hadoop framework.

The tasks done internally by Hadoop framework with in the shuffle phase are as follows-

  1. Data from mappers is partitioned as per the number of reducers.
  2. Data is also sorted by keys with in a partition.
  3. Output from Maps is written to disk as many temporary files.
  4. Once the map task is finished all the files written to the disk are merged to create a single file.
  5. Data from a particular partition (from all mappers) is transferred to a reducer that is suppose to process that particular partition.
  6. If data transferred to a reducer exceeded the memory limit then it is copied to a disk.
  7. Once reducer has got its portion of data from all the mappers data is again merged while still maintaining the sort order of keys to create reduce task input.

As you can see some of the shuffle phase tasks happen at the nodes where mappers are running and some of them at the nodes where reducers are running.

Shuffle phase process at mappers side

When the map task starts producing output it is not directly written to disk instead there is a memory buffer (size 100 MB by default) where map output is kept. This size is configurable and parameter that is used is – mapreduce.task.io.sort.mb

When that data from memory is spilled to disk is controlled by the configuration parameter mapreduce.map.sort.spill.percent (default is 80% of the memory buffer). Once this threshold of 80% is reached, a thread will begin to spill the contents to disk in the background.

Before writing to the disk the Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. For example let's say there are 4 mappers and 2 reducers for a MapReduce job. Then output of all of these mappers will be divided into 2 partitions one for each reducer.

shuffle phase in Hadoop

If there is a Combiner that is also executed in order to reduce the size of data written to the disk.

This process of keeping data into memory until threshold is reached, partitioning and sorting, creating a new spill file every time threshold is reached and writing data to the disk is repeated until all the records for the particular map tasks are processed. Before the Map task is finished all these spill files are merged, keeping the data partitioned and sorted by keys with in each partition, to create a single merged file.

Following image illustrates the shuffle phase process at the Map end.

shuffle phase map side

Shuffle phase process at Reducer side

By this time you have the Map output ready and stored on a local disk of the node where Map task was executed. Now the relevant partition of the output of all the mappers has to be transferred to the nodes where reducers are running.

Reducers don’t wait for all the map tasks to finish to start copying the data, as soon as a Map task is finished data transfer from that node is started. For example if there are 10 mappers running, framework won’t wait for all the 10 mappers to finish to start map output transfer. As soon as a map task finishes transfer of data starts.

Data copied from mappers is kept is memory buffer at the reducer side too. The size of the buffer is configured using the following parameter.

mapreduce.reduce.shuffle.input.buffer.percent- The percentage of memory- relative to the maximum heapsize as typically specified in mapreduce.reduce.java.opts- that can be allocated to storing map outputs during the shuffle. Default is 70%.

When the buffer reaches a certain threshold map output data is merged and written to disk.

This merging of Map outputs is known as sort phase. During this phase the framework groups Reducer inputs by keys since different mappers may have produced the same key as output.

The threshold for triggering the merge to disk is configured using the following parameter.

mapreduce.reduce.merge.inmem.thresholds- The number of sorted map outputs fetched into memory before being merged to disk. In practice, this is usually set very high (1000) or disabled (0), since merging in-memory segments is often less expensive than merging from disk.

The merged file, which is the combination of data written to the disk as well as the data still kept in memory constitutes the input for Reduce task.

shuffle and sort phase in MapReduce

Points to note-

  1. The Mapper outputs are sorted and then partitioned per Reducer.
  2. The total number of partitions is the same as the number of reduce tasks for the job.
  3. Reducer has 3 primary phases: shuffle, sort and reduce.
  4. Input to the Reducer is the sorted output of the mappers.
  5. In shuffle phase the framework fetches the relevant partition of the output of all the mappers, via HTTP.
  6. In sort phase the framework groups Reducer inputs by keys from different map outputs.
  7. The shuffle and sort phases occur simultaneously; while map-outputs are being fetched they are merged.

That's all for this topic Shuffle And Sort Phases in Hadoop MapReduce. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. MapReduce Flow in YARN
  2. Predefined Mapper And Reducer Classes in Hadoop
  3. Speculative Execution in Hadoop
  4. Uber Mode in Hadoop
  5. Data Compression in Hadoop

You may also like-

  1. HDFS Commands Reference List
  2. Converting Text File to Parquet File Using Hadoop MapReduce
  3. Compressing File in snappy Format in Hadoop - Java Program
  4. Capacity Scheduler in YARN
  5. Java Multi-Threading Interview Questions
  6. ArrayBlockingQueue in Java Concurrency
  7. String Vs StringBuffer Vs StringBuilder in Java
  8. How to Convert a File to Byte Array

Tuesday, December 19, 2023

Using Combiner in Hadoop MapReduce to Improve Performance

In this post we’ll see what is combiner in Hadoop and how combiner helps in speeding up the shuffle and sort phase in Hadoop MapReduce.


What is combiner in Hadoop

Generally in a MapReduce job, data is collated in the Map phase and later aggregated in reduce phase. By specifying a combiner function in MapReduce you can aggregate data at the Map phase also.

You can specify a combiner in your MapReduce driver using the following statement -

job.setCombinerClass(COMBINER_CLASS.class); 

Note that specifying combiner in your MapReduce job is optional.

How combiner helps in improving MapReduce performance

Once the Map tasks start producing output that data has to be stored in memory, partitioned as per the number of reducers, sorted on keys and then spilled to the disk.

Once the Map task is done the data partitions have to be sent to the reducers (on different nodes) working on specific partitions. As you can see this whole shuffle and sort process involves consuming memory, I/O and data transfer across network.

If you specify a combiner function in MapReduce, when the map output stored in memory is written to disk, combiner function is run on the data so that there is less data to be written to the disk (reducing I/O) which also results in less data being transferred to reducer nodes (reducing bandwidth).

For example– Suppose you have sales data of several items and you are trying to find the maximum sales number per item. For Item1 if following (key,value) pair are the output of Map-1 and Map-2.

Map-1
(Item1, 300)
(Item1, 250)
(Item1, 340)
(Item1, 450)
Map-2
(Item1, 120)
(Item1, 540)
(Item1, 290)
(Item1, 300)
Then the reduce function which gets data for this key (Item1) will receive all these (key, value) pairs as input after the shuffle phase.
    
[Item1,(300,250,340,450,120,540,290,300)]
 

Resulting in final output - (Item1, 540)

If you are using a combiner in MapReduce job and the reducer class itself is used as the combiner class  then combiner will be called for each map output.

Map-1 Combiner output

      (Item1, 450) 
    

Map-2 Combiner output

      (Item1, 540)
         

Input to Reducer - [Item1, (450, 540)]

Resulting in final output - (Item1, 540)

So you can see by using a combiner map output is reduced which means less data is written to disk and less data is transferred to reducer nodes.

How to write a Combiner function

For writing Combiner class you need to extend Reducer and implement the reduce method just like you do for writing the reducer. In fact in many cases reducer itself can be used as the Combiner.

The output key value types of combiner must be same as the output key value type of the mapper.

Combiner in Hadoop

Though it is not always possible to use the reducer as the combiner class, classic example of this constraint is calculation of average.

For example- If there are two maps with (key, value) pair as following

Map-1 (1,4,7) and Map-2 (8,9)

Then reduce function will calculate average as – (1+4+7+8+9)/5 = 29/5 = 5.8

where as with combiner where average will also be calculated per map output

Map-1 – (1+4+7)/3 = 12/3 = 4

Map-2 – (8+9)/2 = 17/2 = 8.5

So the average calculated at reduce side will be – (4+8.5)/2 = 12.5/2 = 6.25

Combiner with MapReduce example

Here is a example where combiner is specified while calculating maximum sales figure per item.

  
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MaxSales extends Configured implements Tool{
  // Mapper
  public static class MaxSalesMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text item = new Text();
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      item.set(stringArr[0]);
      Integer sales = Integer.parseInt(stringArr[1]);
      context.write(item, new IntWritable(sales));
    }
  }
    
  // Reducer
  public static class MaxSalesReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
      int maxSalesValue = Integer.MIN_VALUE;
      for(IntWritable val : values) {
        maxSalesValue = Math.max(maxSalesValue, val.get());
      }  
      result.set(maxSalesValue);
      context.write(key, result);
    }
  }
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new MaxSales(), args);
    System.exit(exitFlag);
  }

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "MaxSales");
    job.setJarByClass(getClass());
    job.setMapperClass(MaxSalesMapper.class); 
    // Specifying combiner class
    job.setCombinerClass(MaxSalesReducer.class);
    job.setReducerClass(MaxSalesReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

In the displayed counters for the MapReduce job you can see the reduction in number of records passed to reducer.

   
Map input records=21
  Map output records=21
  Map output bytes=225
  Map output materialized bytes=57
  Input split bytes=103
  Combine input records=21
  Combine output records=4
  Reduce input groups=4
  Reduce shuffle bytes=57
  Reduce input records=4
  Reduce output records=4

For comparison here are the counters when the same MapReduce job is run without a Combiner class.

   
  Map input records=21
  Map output records=21
  Map output bytes=225
  Map output materialized bytes=273
  Input split bytes=103
  Combine input records=0
  Combine output records=0
  Reduce input groups=4
  Reduce shuffle bytes=273
  Reduce input records=21
  Reduce output records=4
  Spilled Records=42
    

That's all for this topic Using Combiner in Hadoop MapReduce to Improve Performance. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Chaining MapReduce Job in Hadoop
  2. What Are Counters in Hadoop MapReduce
  3. MapReduce Flow in YARN
  4. How to Check Hadoop MapReduce Logs
  5. Speculative Execution in Hadoop

You may also like-

  1. Replica Placement Policy in Hadoop Framework
  2. File Write in HDFS - Hadoop Framework Internal Steps
  3. How to Read And Write Parquet File in Hadoop
  4. Using Avro File With Hadoop MapReduce
  5. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  6. Installing Ubuntu Along With Windows
  7. CopyOnWriteArrayList in Java
  8. Creating a Maven Project in Eclipse

Monday, December 18, 2023

How to Check Hadoop MapReduce Logs

In your Hadoop MapReduce job if you are wondering how to put logs or where to check MapReduce logs or even System.out statements then this post shows the same. Note that here accessing logs is shown for MapReuduce 2.

Location of logs in Hadoop MapReduce

An application ID is created for every MapReduce job. You can get that application ID from the console itself after starting your MapReduce job. It will be similar to as shown below.

18/07/11 14:39:23 INFO impl.YarnClientImpl: Submitted application application_1531299441901_0001

A folder with the same application ID will be created in the logs/userlogs of your Hadoop installation directory. For example I can see following directory for the application ID mentioned above.
HADOOP_INSTALLATION_DIR/logs/userlogs/application_1531299441901_0001

With in this directory you will find separate folders created for mappers and reducers and there you will have following files for logs and sysouts.

syslog- Contains the log messages.

sysout- Contains the System.out messages.

MapReduce example with logs

Here is a simple word count MapReduce program with logs and sysouts added.

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool {
  public static final Log log = LogFactory.getLog(WordCount.class);
  // Map function
  public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) 
           throws IOException, InterruptedException {
      log.info("In map method");
      // Splitting the line on spaces
      String[] stringArr = value.toString().split("\\s+");
      System.out.println("Array length- " + stringArr.length);
      for (String str : stringArr) {
        word.set(str);
        context.write(word, new IntWritable(1));
      }       
    }
  }
    
  // Reduce function
  public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
          throws IOException, InterruptedException {
      log.info("In reduce method with key " + key);
      int sum = 0;
      for (IntWritable val : values) {
          sum += val.get();
      }
      System.out.println("Key - " + key + " sum - " + sum);
      result.set(sum);
      context.write(key, result);
    }
  }
    
  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new WordCount(), args);
    System.exit(exitFlag);
  }
    
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "WordCount");
    job.setJarByClass(getClass());
    job.setMapperClass(MyMapper.class);    
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}
 
Once you run this MapReduce job, using the application ID you can go to the location as already explained above and check the log and sysout messages.

That's all for this topic How to Check Hadoop MapReduce Logs. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. How to Handle Missing And Under Replicated Blocks in HDFS
  2. How to Compress Intermediate Map Output in Hadoop
  3. How to Write a Map Only Job in Hadoop MapReduce
  4. Predefined Mapper And Reducer Classes in Hadoop
  5. How to Configure And Use LZO Compression in Hadoop

You may also like-

  1. NameNode, DataNode And Secondary NameNode in HDFS
  2. HDFS Commands Reference List
  3. Parquet File Format in Hadoop
  4. Sequence File in Hadoop
  5. Compressing File in snappy Format in Hadoop - Java Program
  6. Spliterator in Java
  7. PermGen Space Removal in Java 8
  8. Converting double to String - Java Program

Sunday, December 17, 2023

Chaining MapReduce Job in Hadoop

While processing data using MapReduce you may want to break the requirement into a series of task and do them as a chain of MapReduce jobs rather than doing everything with in one MapReduce job and making it more complex. Hadoop provides two predefined classes ChainMapper and ChainReducer for the purpose of chaining MapReduce job in Hadoop.


ChainMapper class in Hadoop

Using ChainMapper class you can use multiple Mapper classes within a single Map task. The Mapper classes are invoked in a chained fashion, the output of the first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

For adding map tasks to the ChainedMapper addMapper() method is used.

ChainReducer class in Hadoop

Using the predefined ChainReducer class in Hadoop you can chain multiple Mapper classes after a Reducer within the Reducer task. For each record output by the Reducer, the Mapper classes are invoked in a chained fashion. The output of the reducer becomes the input of the first mapper and output of first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task's output.

For setting the Reducer class to the chain job setReducer() method is used.

For adding a Mapper class to the chain reducer addMapper() method is used.

How to chain MapReduce jobs

Using the ChainMapper and the ChainReducer classes it is possible to compose Map/Reduce jobs that look like [MAP+ / REDUCE MAP*].

In the chain of MapReduce job you can have-

  • A chain of map tasks executed using ChainMapper
  • A reducer set using ChainReducer.
  • A chain of map tasks added using ChainReducer (This step is optional).

Special care has to be taken when creating chains that the key/values output by a Mapper are valid for the following Mapper in the chain.

Benefits of using a chained MapReduce job

  • When MapReduce jobs are chained data from immediate mappers is kept in memory rather than storing to disk so that another mapper in chain doesn't have to read data from disk. Immediate benefit of this pattern is a dramatic reduction in disk IO.
  • Gives you a chance to break the problem into simpler tasks and execute them as a chain.

Chained MapReduce job example

Let’s take a simple example to show chained MapReduce job in action. Here input file has item, sales and zone columns in the below format (tab separated) and you have to get the total sales per item for zone-1.

Item1 345 zone-1
Item1 234 zone-2
Item3 654 zone-2
Item2 231 zone-3
    

For the sake of example let’s say in first mapper you get all the records, in the second mapper you filter them to get only the records for zone-1. In the reducer you get the total for each item and then you flip the records so that key become value and value becomes key. For that Inverse Mapper is used which is a predefined mapper in Hadoop.

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Sales extends Configured implements Tool{
  // First Mapper
  public static class CollectionMapper extends Mapper<LongWritable, Text, Text, Text>{
    private Text item = new Text();
    
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      //splitting record
      String[] salesArr = value.toString().split("\t");
      item.set(salesArr[0]);
      // Writing (sales,zone) as value
      context.write(item, new Text(salesArr[1] + "," + salesArr[2]));
    }
  }
    
  // Mapper 2
  public static class FilterMapper extends Mapper<Text, Text, Text, IntWritable>{
    public void map(Text key, Text value, Context context) 
        throws IOException, InterruptedException {
    
      String[] recordArr = value.toString().split(",");
      // Filtering on zone
      if(recordArr[1].equals("zone-1")) {
          Integer sales = Integer.parseInt(recordArr[0]);
          context.write(key, new IntWritable(sales));
      }
    }
  }
    
  // Reduce function
  public static class TotalSalesReducer extends Reducer<Text, IntWritable, Text, IntWritable>{      
    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }      
      context.write(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new Sales(), args);
    System.exit(exitFlag);
  }

  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Sales");
    job.setJarByClass(getClass());
        
    // MapReduce chaining
    Configuration mapConf1 = new Configuration(false);
    ChainMapper.addMapper(job, CollectionMapper.class, LongWritable.class, Text.class,
        Text.class, Text.class,  mapConf1);
        
    Configuration mapConf2 = new Configuration(false);
    ChainMapper.addMapper(job, FilterMapper.class, Text.class, Text.class,
        Text.class, IntWritable.class, mapConf2);
        
    Configuration reduceConf = new Configuration(false);        
    ChainReducer.setReducer(job, TotalSalesReducer.class, Text.class, IntWritable.class,
        Text.class, IntWritable.class, reduceConf);

    ChainReducer.addMapper(job, InverseMapper.class, Text.class, IntWritable.class,
        IntWritable.class, Text.class, null);
         
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

That's all for this topic Chaining MapReduce Job in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Converting Text File to Parquet File Using Hadoop MapReduce
  2. How to Write a Map Only Job in Hadoop MapReduce
  3. Data Locality in Hadoop
  4. How to Compress Intermediate Map Output in Hadoop
  5. Java Program to Write File in HDFS

You may also like-

  1. HDFS Commands Reference List
  2. How to Handle Missing And Under Replicated Blocks in HDFS
  3. What is SafeMode in Hadoop
  4. Parquet File Format in Hadoop
  5. Compressing File in snappy Format in Hadoop - Java Program
  6. How to Run a Shell Script From Java Program
  7. Java Collections Interview Questions
  8. Best Practices For Exception Handling in Java

Saturday, December 16, 2023

MapReduce Flow in YARN

How a MapReduce job runs in YARN is different from how it used to run in MRv1. This post shows how MapReduce v2 runs internally in YARN Hadoop.

MapReduce flow - Submitting MapReduce job

First step is of course submitting the job in order to kick start the process.

For submitting the job you can use one of the following methods of the org.apache.hadoop.mapreduce.Job class-

  • void submit()- Submit the job to the cluster and return immediately.
  • boolean waitForCompletion(boolean)- Submit the job to the cluster and wait for it to finish.

When a job is submitted using one of the above mentioned methods, Job class creates an instance of JobSubmitter instance and submitJobInternal() method is called on that instance and following steps are taken.

  1. Getting a new application ID from the resource manager for the MapReduce job.
  2. Checking the input and output specifications of the job.
  3. Computing the InputSplits for the job.
  4. Setup the requisite accounting information for the org.apache.hadoop.mapreduce.filecache.DistributedCache of the job, if necessary.
  5. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
  6. Submit the job using the SubmitApplication method in YARNClient. SubmitApplication method submits a new application to YARN. It is a blocking call, it will not return ApplicationId until the submitted application is submitted successfully and accepted by the ResourceManager.

Important YARN components for running MapReduce job

Main components when running a MapReduce job in YARN are–

  1. Client- An application submission client that submits an application.
  2. ResourceManager- Daemon that manages the cluster resources.
  3. ApplicationMaster- communicates with the ResourceManager to negotiate and allocate resources for future containers to run the map and reduce tasks for the submitted job.
  4. NodeManager- Launches and monitor the resources used by the containers that run the mappers and reducers for the job. NodeManager daemon runs on each node in the cluster.

Interaction among these components is shown here-

Client<-->ResourceManager - By using YarnClient objects.

ApplicationMaster<-->ResourceManager - By using AMRMClientAsync objects, handling events asynchronously by AMRMClientAsync.CallbackHandler

ApplicationMaster<-->NodeManager - Launch containers. Communicate with NodeManagers by using NMClientAsync objects, handling container events by NMClientAsync.CallbackHandler.

Running tasks for the submitted MapReduce job

Once the job is submitted to the ResourceManager, initially a single container is negotiated for executing the application specific ApplicationMaster (Which is MRAppMaster in case of MapReduce applications). The YARN ResourceManager will then launch the ApplicationMaster on the allocated container.

Once launched ApplicationMaster performs the following tasks-

  1. Communicate with the ResourceManager to negotiate and allocate resources for containers required to run mappers and reducers for the submitted MapReduce job.
  2. After the containers are allocated, communicate with YARN NodeManagers to launch application containers on the nodes where the containers are allocated.
  3. Track the progress of the tasks running on the containers.

How ApplicationMaster runs the Map and Reduce tasks

ApplicationMaster retrieves the number of input splits calculated for the job at the time of submission. While running a MapReduce job as many map tasks are created as the count of input splits and the number of reducers is calculated using the mapreduce.job.reduces property which sets the default number of reduce tasks per job.

After knowing the number of mappers and reducers required for the job ApplicationMaster has to decide should it run the tasks sequentially in the same JVM where ApplicationMaster itself is running. If it does that, it is known as running the tasks in uber mode in Hadoop.

If the job is not run as an uber task then ApplicationMaster has to negotiate with ResourceManager to get resource containers to run those map and reduce tasks.

In the resource requests to ResourceManager memory requirements and CPUs for tasks are also specified. Values used for determining memory and CPU requirements for the map and reduce tasks are in mapred-site.xml configuration file.

  • mapreduce.map.memory.mb- The amount of memory to request from the scheduler for each map task. Default value is 1024 MB.
  • mapreduce.map.cpu.vcores– The number of virtual cores to request from the scheduler for each map task. Default value is 1.
  • mapreduce.reduce.memory.mb– The amount of memory to request from the scheduler for each reduce task. Default value is 1024 MB.
  • mapreduce.reduce.cpu.vcores– The number of virtual cores to request from the scheduler for each reduce task. Default value is 1.

ResourceManager’s scheduler will allocate these containers on different nodes in the Hadoop cluster. Reduce tasks can be assigned containers on any node with no locality constraint. For map tasks scheduler tries to allocate containers on the nodes where the split data resides for data locality optimization.

Once the containers are allocated, ApplicationMaster launch those containers on the nodes by contacting the NodeManagers of those nodes. The ApplicationMaster executes the mappers and reducers in a separate jvm on the launched containers.

MapReduce flow - Task progress and completion

The running map and reduce tasks report their progress every three seconds to ApplicationMaster which can create overall job progress from the updates from these separate tasks.

Client also receives the current status from the ApplicationMaster.

ApplicationMaster also emit heartbeats to the ResourceManager to keep it informed that it is alive and still running.

If a task fails to complete, the ApplicationMaster will reschedule that task on another node in the cluster.

When all the map and reduce tasks for the job are completed ApplicationMaster changes the job status to successful. After that ApplicationMaster unregisters itself and then stops the client.

MapReduce flow in YARN Hadoop
MapReduce flow in YARN

Reference- https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html

That's all for this topic MapReduce Flow in YARN. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. YARN in Hadoop
  2. Fair Scheduler in YARN
  3. Replica Placement Policy in Hadoop Framework
  4. How MapReduce Works in Hadoop
  5. Speculative Execution in Hadoop

You may also like-

  1. Word Count MapReduce Program in Hadoop
  2. Java Program to Read File in HDFS
  3. What is SafeMode in Hadoop
  4. HDFS High Availability
  5. Data Compression in Hadoop
  6. Multi-Catch Statement in Java Exception Handling
  7. How HashSet Works Internally in Java
  8. Converting double to String - Java Program

Friday, December 15, 2023

How MapReduce Works in Hadoop

In the post Word Count MapReduce Program in Hadoop a word count MapReduce program is already written in Java. In this post, using that program as reference we’ll see how MapReduce works in Hadoop framework and how data is processed in Map and Reduce tasks respectively.


Map and Reduce tasks in Hadoop

With in a MapReduce job there are two separate tasks map task and reduce task.

Map task- A MapReduce job splits the input dataset into independent chunks known as input splits in Hadoop which are processed by the map tasks in a completely parallel manner. Hadoop framework creates separate map task for each input split.

Reduce task- The output of the maps is sorted by the Hadoop framework which then becomes input to the reduce tasks.

Map and Reduce inputs and outputs

Hadoop MapReduce framework operates exclusively on <key, value> pairs. In a MapReduce job, the input to the Map function is a set of <key, value> pairs and output is also a set of <key, value> pairs. The output <key, value> pair may have different type from the input <key, value> pair.

<K1, V1> -> map -> (K2, V2)

The output from the map tasks is sorted by the Hadoop framework. MapReduce guarantees that the input to every reducer is sorted by key. Input and output of the reduce task can be represented as follows.

<K2, list(V2)> -> reduce -> <K3, V3>

How map task works in Hadoop

Now let’s see how map task works using the word count program as an example which can be seen here.

As already stated both input and output of the map function are <key, value> pairs. For the word count program input file will be read line by line and every line will be passed to the map function as <key, value> pair.

For the word count program TextInputFormat which is the default InputFormat is used. In this format key is the byte offset within the file of the beginning of the line. Whereas value is the content of the line.

Let’s say you have a file wordcount.txt with the following content.

Hello wordcount MapReduce Hadoop program.
This is my first MapReduce program.

Each line will be passed to the map function in the following format.

<0, Hello wordcount MapReduce Hadoop program.>
<41, This is my first MapReduce program.>
In the map function the line is split on space and each word is written to the context along with the value as 1.
public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
    // Splitting the line on spaces
    String[] stringArr = value.toString().split("\\s+");
    for (String str : stringArr) {
        word.set(str);
        context.write(word, new IntWritable(1));
    } 
}

So the output from the map function for the two lines will be as follows.

Line 1 <key, value> output

(Hello, 1) 
(wordcount, 1) 
(MapReduce, 1)
(Hadoop, 1)
(program., 1)

Line 2 <key, value> output

(This, 1) 
(is, 1) 
(my, 1) 
(first, 1) 
(MapReduce, 1) 
(program., 1)

Shuffling and sorting by Hadoop Framework

The output of map function doesn’t become input of the reduce function directly. It goes through shuffling and sorting by Hadoop framework. In this processing the data is sorted and grouped by keys.

After the internal processing the data will be in the following format. This is the input to reduce function.

<Hadoop, (1)>
<Hello, (1)>
<MapReduce, (1, 1)>
<This, (1)>
<first, (1)>
<is, (1)>
<my, (1)>
<program., (1, 1)>
<wordcount, (1)>

How reduce task works in Hadoop

As we just saw the input to the reduce task is in the format (key, list<values>). In the reduce function, for each input <key, value> pair, just iterate the list of values for each key and add the values that will give the count for each key.

public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
}

Write that key and sum of values to context, that <key, value> pair is the output of the reduce function.

Hadoop     1
Hello    1
MapReduce   2
This       1
first       1
is         1
my         1
program.      2
wordcount   1  

That's all for this topic How MapReduce Works in Hadoop. If you have any doubt or any suggestions to make please drop a comment. Thanks!

>>>Return to Hadoop Framework Tutorial Page


Related Topics

  1. Introduction to Hadoop Framework
  2. How to Compress MapReduce Job Output in Hadoop
  3. MapReduce Flow in YARN
  4. Data Locality in Hadoop
  5. YARN in Hadoop

You may also like-

  1. What is Hadoop Distributed File System (HDFS)
  2. What is Big Data
  3. Installing Hadoop on a Single Node Cluster in Pseudo-Distributed Mode
  4. Uber Mode in Hadoop
  5. Compressing File in snappy Format in Hadoop - Java Program
  6. What is SafeMode in Hadoop
  7. Comparing Enum to String in Java
  8. Array in Java With Examples