Map parallelism optimization and source code analysis in MapReduce

Map parallelism optimization and source code analysis in MapReduce

The decision mechanism of mapTask parallelism

  The parallelism of the map stage of a job is determined by the client when the job is submitted , and the basic logic of the client's plan for the parallelism of the map stage is: logically slice the data to be processed (that is, divide the data to be processed according to a specific slice size) Divided into multiple logical splits), and then each split is assigned a parallel instance of mapTask for processing.

FileInputFormat slicing mechanism

1. The default slice is defined in the getSplit() method in the InputFormat class

2. The default slicing mechanism in FileInputFormat:

a) Simply slice according to the content length of the file

b) The slice size, which is equal to the block size of hdfs by default

c) When slicing, the entire data set is not considered, but each file is sliced ​​separately one by one

For example, there are two files for the data to be processed:

file1.txt 260M
file2.txt 10M

After the slicing mechanism operation of FileInputFormat, the slice information formed is as follows:  

file1.txt.split1-- 0~128
file1.txt.split2-- 128~260//If the remaining file length/slice length <=1.1, the length of the remaining file will not be a slice
file2.txt.split1-- 0~10M

3. The parameter configuration of the size of the slice in FileInputFormat

By analyzing the source code, in FileInputFormat, the logic of calculating the slice size: Math.max(minSize, Math.min(maxSize, blockSize)); The slice is mainly determined by these values.

minsize: Default value: 1  
   Configuration parameters: mapreduce.input.fileinputformat.split.minsize    

maxsize: Default value: Long.MAXValue  
    Configuration parameter: mapreduce.input.fileinputformat.split.maxsize

blocksize: the blocksize of the corresponding file whose value is hdfs

Configure the number of threads to read the number of files in the directory: public static final String LIST_STATUS_NUM_THREADS =
      "mapreduce.input.fileinputformat.list-status.num-threads";

Therefore, by default, Math.max(minSize, Math.min(maxSize, blockSize)); slice size=blocksize

maxsize (slice maximum): If the parameter is adjusted smaller than blocksize, the slice will be made smaller.

minsize (slice minimum): If the parameter is adjusted larger than blockSize, the slice can be made larger than blocksize.

Factors affecting the selection of the number of concurrency:

1. The hardware configuration of the computing node

2. The type of computing task: CPU-intensive or IO-intensive

3. The amount of data for computing tasks

3. Source code analysis of hadoop 2.6.4

org.apache.hadoop.mapreduce.JobSubmitter class

  //Get the parallel number of map tasks of job
   private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }
  
  @SuppressWarnings("unchecked")
  private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
     ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
   
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

   //sort the splits into order based on size, so that the biggest
   //go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

For slice calculation logic, just pay attention to the red font code.

public List<InputSplit> getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();

    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

   //generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
  //Traverse the files, and process each file as follows: get the blocksize of the file, get the length of the file, and get the slice information (spilt file path, slice number, offset range)
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
         long splitSize = computeSplitSize(blockSize, minSize, maxSize);
         long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize> SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else {//not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
       //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
   //Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: "+ splits.size()
          + ", TimeTaken: "+ sw.elapsedMillis());
    }
    return splits;
  }
 public static final String SPLIT_MINSIZE = 
    "mapreduce.input.fileinputformat.split.minsize";
  
  public static final String SPLIT_MAXSIZE = 
    "mapreduce.input.fileinputformat.split.maxsize";
    
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    
 //Ensure that the length of the split file must not be less than 1 byte
  protected long getFormatMinSplitSize() {
    return 1;
  }
  
 //If the SPLIT_MINSIZE parameter is not set in conf, the default value is 1 byte.
  public static long getMinSplitSize(JobContext job) {
    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
  }
  
 //Get the maximum length of the slice file
  long maxSize = getMaxSplitSize(job);
  
 //If the SPLIT_MAXSIZE parameter is not set in conf, go to the default value Long.MAX_VALUE bytes.
  public static long getMaxSplitSize(JobContext context) {
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE);
  }
  
  //Read the information of all files in the specified directory
   List<FileStatus> files = listStatus(job);
  //If you do not specify to open several threads to read, one thread will read file information by default. Because there are hundreds of millions of files in the directory, it is necessary to open multiple threads to speed up reading.
   int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
        DEFAULT_LIST_STATUS_NUM_THREADS);
   public static final String LIST_STATUS_NUM_THREADS =
      "mapreduce.input.fileinputformat.list-status.num-threads";
   public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
  
 //Calculate the logical size of the slice file
  long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }
  
  private static final double SPLIT_SLOP = 1.1;//10% slop
 //Determine whether the ratio of the remaining file to the slice size is 1.1.
  while (((double) bytesRemaining)/splitSize> SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                      blkLocations[blkIndex].getHosts(),
                      blkLocations[blkIndex].getCachedHosts()));
          bytesRemaining -= splitSize;
    }

map parallelism

  If the running time of each map or reduce task of the job is only 30-40 seconds ( it is best that the execution time of each map is not less than one minute at least) ), then reduce the number of maps or reduce the job. Each task is started and added to the scheduler for scheduling. This intermediate process may take a few seconds, so if each task runs out very quickly, it will waste too much at the beginning and end of the task. Much time.

  The JVM reuse configuration task can improve the problem:

  (Mapred.job.reuse.jvm.num.tasks, the default is 1, which means that the maximum number of tasks (belonging to the same job) that can be executed sequentially on a JVM is 1. That is to say, a task starts a JVM).

In the small file scenario, the default slicing mechanism will cause a large number of maptasks to process a small amount of data, which is inefficient:

solution:

  Recommendation: preprocess small files before saving them into hdfs, merge them into large files before uploading.

  Compromise: Write a program to merge small files on hdfs and then run job processing.

  Remedy: If a large number of small files already exist in hdfs, using the combineInputFormate component, it can logically plan a large number of small files into a slice, so that multiple small files can be handed over to a maptask for operation.