Hadoop study notes-06MapReduce MapTask source code analysis

Hadoop study notes-06MapReduce MapTask source code analysis

Written in front: This article mainly introduces

MapTask
The workflow, namely:
input-map-output
.

MapTask input process

Source code analysis

First look

input
The entrance of
MapTask
of
run
method

public void run ( final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this .umbilical = umbilical; if (isMapTask()) { //If there are no reducers then there won't be any sort. Hence the map //phase will govern the entire attempt's progress. //If there is no reducer, all resources are given to the map method if (conf.getNumReduceTasks() == 0 ) { //No need to sort mapPhase = getProgress().addPhase( "map" , 1.0f ); } else { //If there are reducers then the entire attempt's progress will be //split between the map phase (67%) and the sort phase (33%). //If there are reducers, 66.7% of the resources will be allocated to the map Method, 33.3% of the resources are sorted mapPhase = getProgress().addPhase( "map" , 0.667f ); sortPhase = getProgress().addPhase( "sort" , 0.333f ); } } TaskReporter reporter = startReporter(umbilical); boolean useNewApi = job.getUseNewMapper(); //Perform context initialization and determine the outputFormat format class of the map output. initialize(job, getJobID(), reporter, useNewApi); //check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return ; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return ; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return ; } if (useNewApi) { //hadoop2.x uses the new API to start Mapper by default runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); } Copy code

run
The method will first determine whether the follow-up is needed
reduce
, If not needed, all resources are used for
map
Process, if necessary, 33.3% of the resources are allocated for
partition
with
key
Sort of. then
initialize
Method to initialize the context, and determine
map
Output
OutputFormat
Format class. Final call
runNewMapper
method.

private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper ( final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { //make a task context so we can get the classes //construct TaskAttemptContext object taskContext org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); //make a mapper //Through taskContext, get the job task through the reflection mechanism and pass it to the Mapper target object. org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); //make the input format //Through taskContext, an object of the formatting class of the input map passed in from the job can be constructed. org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); //rebuild the input split //Create the split object needed for this MapTask. //Internally, this method will go to hdfs to get the file based on the file block address corresponding to the split passed in by the current mapTask, //then call the seek method to jump to the offset coordinates of the read file block corresponding to this split. org.apache.hadoop.mapreduce.InputSplit split = null ; split = getSplitDetails( new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info( "Processing split: " + split); //Construct RecordReader, that is, read a record from the split slice, format it into RecordReader and pass it to the map for processing org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null ; //get an output object if (job.getNumReduceTasks() == 0 ) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { //The NewOutputCollector object determines the sorter, the partitioner output = new NewOutputCollector(taskContext, job, umbilical, reporter); } org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split); org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext); try { //The core method is generally in try input.initialize(split, mapperContext); //mapper is our own mapper class mapper.run(mapperContext); mapPhase.complete(); setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); input.close(); input = null ; output.close(mapperContext); output = null ; } finally { closeQuietly(input); closeQuietly(output, mapperContext); } } Copy code

The method first constructs

TaskAttemptContext
Object
taskContext
, And then pass
taskContext
Constructed from
job
Incoming input
map
Object of the formatting class. Then pass
getSplitDetails
Method creation
split
Object, this method will be based on
MapTask
Incoming
split
corresponding
block location
go with
HDFS
Get this file in
inFile.seek(offset);
Jump to this
split
correspond
block
of
offset
coordinate.

Then it will construct

RecordReader
, That is, from
split
Read a record in the slice and format it as
RecordReader
Pass to
map
deal with. can be seen
NewTrackingRecordReader
In the construction method,
this.real = inputFormat.createRecordReader(split, taskContext);
,
RecordReader real
Just
map
Task read
split
The object used is also written out
map
The object used to process the result.

due to

inputFormat
The object defaults to
TextInputFormat
Class, so what is returned is
LineRecordReader
Object. Into the object
initialize
Take a look at the method:

public void initialize (InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this .maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); //open the file and seek to the start of the split final FutureDataInputStreamBuilder builder = file.getFileSystem(job).openFile(file); FutureIOSupport.propagateOptions(builder, job, MRJobConfig.INPUT_FILE_OPTION_PREFIX, MRJobConfig.INPUT_FILE_MANDATORY_PREFIX); fileIn = FutureIOSupport.awaitFuture(builder.build()); //file-oriented input stream CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if ( null !=codec) { isCompressedInput = true ; decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); in = new CompressedSplitLineReader(cIn, job, this .recordDelimiterBytes); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { if (start != 0 ) { //So we have a split that is only part of a file stored using //a Compression codec that cannot be split. throw new IOException( "Cannot seek in " + codec.getClass().getSimpleName() + "compressed stream" ); } in = new SplitLineReader(codec.createInputStream(fileIn, decompressor), job, this .recordDelimiterBytes); filePosition = fileIn; } } else { fileIn.seek(start); //It is impossible for each map to read the full text, it needs to be read from a specific offset in = new UncompressedSplitLineReader( fileIn, job, this .recordDelimiterBytes, split.getLength()); filePosition = fileIn; } //If this is not the first split, we always throw away first record //because we always (except the last split) read one extra line in //Confirm where the split starts to read if (start != 0 ) { start += in.readLine( new Text(), 0 , maxBytesToConsume(start)); } this .pos = start; } Copy code

begin

start
Is assigned as the first
split
Corresponding to the first row of data
block
of
offset
,and
fileIn
As a file-oriented input stream will pass
seek
Method from
start
Start reading. Then the most interesting part of the method appeared:

if (start != 0 ) { start += in.readLine( new Text(), 0 , maxBytesToConsume(start)); } Copy code

due to

HDFS
Used to divide files
block
Is divided by bytes, a row of data may be divided into two
block
in,
map
The calculation must read a complete line. So this time I will judge
start
Whether it is not 0, that is, to judge the
split
Is it the first
split
. If not, discard the first line and put the starting
offset
which is
start
Update to this
split
The second line. In short, except for the first
split
, Will discard the first row of data, except for the last
split
, Will read one more row of data. This can make up for
HDFS
of
block
The problem of cutting data.

when

LineRecordReader
After the initialization is complete, execute
mapper.run(mapperContext)
, Which is what we wrote ourselves
Mapper
method.

public void run (Context context) throws IOException, InterruptedException { setup(context); try { //Finally call the nextKeyValue method of LineRecordReader //1 Read a record in the data, and assign key and value 2 Return a boolean value to the caller to declare whether there is data. while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } Copy code

here

context
Yes
MapContextImpl
,
context.nextKeyValue()
Method is equivalent to
LineRecordReader
of
nextKeyValue
method. This method reads a record in the data,
key
with
value
Assign a value and return whether there is data. Then pass
getCurrentKey
with
getCurrentValue
Get separately
key
with
value
.

MapTask output process

Source code analysis

Keep watching

MapTask
middle
run
method.

if (job.getNumReduceTasks() == 0 ) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { //The NewOutputCollector object determines the sorter, the partitioner output = new NewOutputCollector(taskContext, job, umbilical, reporter); } Copy code

Here to check the situation that needs to be sorted by partition, enter

NewOutputCollector
Construction method:

NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { //buffer buffer related logic collector = createSortingCollector(job, reporter); partitions = jobContext.getNumReduceTasks(); //There are as many partitions as there are reduce tasks. //If reduceTask is greater than 1, the partitioner is obtained through reflection mechanism, and the partitioner can be customized! //If there is only one ReduceTask, then return 0 directly, because it will only be pulled by that ReduceTask. if (partitions> 1 ) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { //In the case of partitions == 1 partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { @Override public int getPartition (K key, V value, int numPartitions) { return partitions- 1 ; //returns 0 } }; } } Copy code

NewOutputCollector
Object determination sequencer
collector
And partitioner
partitioner
. First look at the method of creating a sequencer
createSortingCollector
, Given by default in this method
MapOutputBuffer
As a sequencer. We look at the key methods and initialize the sorter
collector.init(context)

public void init (MapOutputCollector.Context context ) throws IOException, ClassNotFoundException { job = context.getJobConf(); reporter = context.getReporter(); mapTask = context.getMapTask(); mapOutputFile = mapTask.getMapOutputFile(); sortPhase = mapTask.getSortPhase(); spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS); partitions = job.getNumReduceTasks(); rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw(); //sanity checks //When this sorting buffer space is occupied by a certain percentage of map output records, sorting occurs and overflows to disk final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, ( float ) 0.8 ); //The default value of overflow writing, 80% instead of 100% is to block when the map is written to the buffer //current buffer size final int sortmb = job .getInt(MRJobConfig.IO_SORT_MB, MRJobConfig.DEFAULT_IO_SORT_MB); indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); if (spillper> ( float ) 1.0 || spillper <= ( float ) 0.0 ) { throw new IOException( "Invalid/"" + JobContext.MAP_SORT_SPILL_PERCENT + "\": " + spillper); } if ((sortmb & 0x7FF ) != sortmb) { throw new IOException( "Invalid/"" + JobContext.IO_SORT_MB + "\": " + sortmb); } //Sorter, fast sort by default sorter = ReflectionUtils.newInstance(job.getClass( MRJobConfig.MAP_SORT_CLASS, QuickSort.class, IndexedSorter.class), job); //buffers and accounting int maxMemUsage = sortmb << 20 ; maxMemUsage -= maxMemUsage% METASIZE; kvbuffer = new byte [maxMemUsage]; bufvoid = kvbuffer.length; kvmeta = ByteBuffer.wrap(kvbuffer) .order(ByteOrder.nativeOrder()) .asIntBuffer(); setEquator( 0 ); bufstart = bufend = bufindex = equator; kvstart = kvend = kvindex; maxRec = kvmeta.capacity()/NMETA; softLimit = ( int )(kvbuffer.length * spillper); bufferRemaining = softLimit; LOG.info(JobContext.IO_SORT_MB + ": " + sortmb); LOG.info( "soft limit at " + softLimit); LOG.info( "bufstart = " + bufstart + "; bufvoid =" + bufvoid); LOG.info( "kvstart = " + kvstart + "; length =" + maxRec); //k/v serialization //comparator comparator = job.getOutputKeyComparator(); keyClass = (Class<K>)job.getMapOutputKeyClass(); valClass = (Class<V>)job.getMapOutputValueClass(); serializationFactory = new SerializationFactory(job); keySerializer = serializationFactory.getSerializer(keyClass); keySerializer.open(bb); valSerializer = serializationFactory.getSerializer(valClass); valSerializer.open(bb); //output counters mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); fileOutputByteCounter = reporter .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES); //compression if (job.getCompressMapOutput()) { Class<? extends CompressionCodec> codecClass = job.getMapOutputCompressorClass(DefaultCodec.class); codec = ReflectionUtils.newInstance(codecClass, job); } else { codec = null ; } //combiner //If there are a lot of repeated keys in the map stage, it is not necessary, you can do a small reduce and perform a compression final Counters.Counter combineInputCounter = reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); combinerRunner = CombinerRunner.create(job, getTaskID(), combineInputCounter, reporter, null ); if (combinerRunner != null ) { final Counters.Counter combineOutputCounter = reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); combineCollector = new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job); } else { combineCollector = null ; } spillInProgress = false ; minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3 ); spillThread.setDaemon( true ); spillThread.setName( "SpillThread" ); spillLock.lock(); try { spillThread.start(); while (!spillThreadRunning) { spillDone.await(); } } catch (InterruptedException e) { throw new IOException( "Spill thread failed to initialize" , e); } finally { spillLock.unlock(); } if (sortSpillException != null ) { throw new IOException( "Spill thread failed to initialize" , sortSpillException); } } Copy code

Take a look at the method, there are several key points:

  • spillper
    : When the buffer is
    map
    When the output of the occupies a certain proportion, partitioning, sorting and overflowing to the disk will occur. 80% by default;
  • sortmb
    : Buffer size;
  • sorter
    : The sorter used when sorting occurs, the default is fast sorting;
  • comparator
    : The comparator required by the sorter, the user-defined sorting comparator is used first, and the default is used
    key
    Its own comparator;
  • combinerRunner
    : The default is not to merge, you need to manually change the settings. Used for
    map
    Repetitive
    key
    get on
    reduce
    Operation, the default number of merging is 3;
  • SpillThread
    : In this thread
    sortAndSpill
    The method implements sorting and overflow writing.

Back again

Mapper
in.
run
Method called
map
method.

protected void map (KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } Copy code

here

context
Yes
MapContextImpl
,enter
write
Method, found
MapContextImpl
If there is no such method, look for his parent class. Followed by
TaskInputOutputContextImpl
The implementation was found in:

public void write (KEYOUT key, VALUEOUT value ) throws IOException, InterruptedException { output.write(key, value); } Copy code

here

output
Is mentioned above
NewOutputCollector
, You can see it
write
Methods as below:

public void write (K key, V value) throws IOException, InterruptedException { //When the map enters the buffer, the parameter is kvp collector.collect(key, value, partitioner.getPartition(key, value, partitions)); } Copy code

and

partitions
Is through
getPartition
Method obtained:

public int getPartition (K key, V value, int numReduceTasks) { //key.hashCode() & Integer.MAX_VALUE get a non-negative integer //% numReduceTasks will enter the same partition if the same key modulo the same return (key.hashCode( ) & Integer.MAX_VALUE)% numReduceTasks; } Copy code

Will eventually

key value partitions
From
map
Deposited
buffer
in.

Graphical MapOutputBuffer process

buffer
Essentially a linear byte array, storing
key value
Later, the corresponding index is needed to facilitate the query. Regarding the index, it is fixed at 16 and contains the following content:

  • P
    :
    partition
  • KS
    :
    key
    in
    buffer
    Starting position in
  • VS
    :
    value
    in
    buffer
    The starting position in can also be calculated
    key
    End position
  • VL
    :
    value
    The length can also be calculated
    value
    End of

In the Hadoop 1.x version, the index is stored in a separate byte array. Imagine such a situation,

key value
Very small, but large in quantity, in
buffer
Occupies less space. At the same time, the space for storing the index is full, so it has to be overwritten, thus wasting
buffer
Space.

After Hadoop 2.x,

key value
And the index are stored in the same byte array, and
key value
Stored from left to right in order, and the index is stored from right to left as needed. So solved
buffer
The problem of wasted space.

Assuming it is the default situation now,

buffer
Occupies 80% of the space, and the current
key value
And the space occupied by the index is locked, and then start
SpillThread
The thread quickly sorts 80% of the data, and at the same time
map
Write data to the remaining space. The sorting at this time is secondary sorting, first pass the index in
P
Sort, and then in the same
P
Pass
key
Sorting, and finally achieve the order of the partition and the partition
key
Orderly.

It should be noted that sorting involves the movement of memory data, because

key value
The size of is inconsistent, data movement will be very complicated. So only the index is moved here, because the index size is fixed. Finally, as long as the index is sorted during overflow, the data written to the disk file is in order.

Next, we need to pay attention to the overflow writing

map
How to
buffer
Write data in.

Perform a division in free memory, add an index to the left of the dividing line, and append to the right of the dividing line

key value
, So that you can
buffer
Think of it as a ring buffer.

combiner TODO to be improved

combiner
When doing data in advance
reduce
Operation, the operation happened on
buffer
After sorting and before overflow writing.

map
Finally, some small files will be merged into one large file to avoid fragmentation of small files.
reduce
Random reads and writes are caused when data is pulled.