1. 收集阶段
在Mapper
中,调用context.write(key,value)
实际是调用代理NewOutPutCollector
的wirte
方法
public void write(KEYOUT key, VALUEOUT value ) throws IOException, InterruptedException { output.write(key, value); }
实际调用的是MapOutPutBuffer
的collect()
,在进行收集前,调用partitioner来计算每个key-value的分区号
@Override public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions)); }
2. NewOutPutCollector对象的创建
@SuppressWarnings("unchecked") NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { // 创建实际用来收集key-value的缓存区对象 collector = createSortingCollector(job, reporter); // 获取总的分区个数 partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { // 默认情况,直接创建一个匿名内部类,所有的key-value都分配到0号分区 partitioner = new org.apache.hadoop.mapreduce.Partitioner () { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; } }
3. 创建环形缓冲区对象
@SuppressWarnings("unchecked") privateMapOutputCollector createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException { MapOutputCollector.Context context = new MapOutputCollector.Context(this, job, reporter); // 从当前Job的配置中,获取mapreduce.job.map.output.collector.class,如果没有设置,使用MapOutputBuffer.class Class<?>[] collectorClasses = job.getClasses( JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class); int remainingCollectors = collectorClasses.length; Exception lastException = null; for (Class clazz : collectorClasses) { try { if (!MapOutputCollector.class.isAssignableFrom(clazz)) { throw new IOException("Invalid output collector class: " + clazz.getName() + " (does not implement MapOutputCollector)"); } Class<? extends MapOutputCollector> subclazz = clazz.asSubclass(MapOutputCollector.class); LOG.debug("Trying map output collector class: " + subclazz.getName()); // 创建缓冲区对象 MapOutputCollector collector = ReflectionUtils.newInstance(subclazz, job); // 创建完缓冲区对象后,执行初始化 collector.init(context); LOG.info("Map output collector class = " + collector.getClass().getName()); return collector; } catch (Exception e) { String msg = "Unable to initialize MapOutputCollector " + clazz.getName(); if (--remainingCollectors > 0) { msg += " (" + remainingCollectors + " more collector(s) to try)"; } lastException = e; LOG.warn(msg, e); } } throw new IOException("Initialization of all the collectors failed. " + "Error in last collector was :" + lastException.getMessage(), lastException); }
3. MapOutPutBuffer的初始化 环形缓冲区对象
@SuppressWarnings("unchecked") public void init(MapOutputCollector.Context context ) throws IOException, ClassNotFoundException { job = context.getJobConf(); reporter = context.getReporter(); mapTask = context.getMapTask(); mapOutputFile = mapTask.getMapOutputFile(); sortPhase = mapTask.getSortPhase(); spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS); // 获取分区总个数,取决于ReduceTask的数量 partitions = job.getNumReduceTasks(); rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw(); //sanity checks // 从当前配置中,获取mapreduce.map.sort.spill.percent,如果没有设置,就是0.8 final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); // 获取mapreduce.task.io.sort.mb,如果没设置,就是100MB final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); if (spillper > (float)1.0 || spillper <= (float)0.0) { throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + "\": " + spillper); } if ((sortmb & 0x7FF) != sortmb) { throw new IOException( "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); } // 在溢写前,对key-value排序,采用的排序器,使用快速排序,只排索引 sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job); // buffers and accounting int maxMemUsage = sortmb << 20; maxMemUsage -= maxMemUsage % METASIZE; // 存放key-value kvbuffer = new byte[maxMemUsage]; bufvoid = kvbuffer.length; // 存储key-value的属性信息,分区号,索引等 kvmeta = ByteBuffer.wrap(kvbuffer) .order(ByteOrder.nativeOrder()) .asIntBuffer(); setEquator(0); bufstart = bufend = bufindex = equator; kvstart = kvend = kvindex; maxRec = kvmeta.capacity() / NMETA; softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit; LOG.info(JobContext.IO_SORT_MB + ": " + sortmb); LOG.info("soft limit at " + softLimit); LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid); LOG.info("kvstart = " + kvstart + "; length = " + maxRec); // k/v serialization // 获取快速排序的Key的比较器,排序只按照key进行排序! comparator = job.getOutputKeyComparator(); // 获取key-value的序列化器 keyClass = (Class)job.getMapOutputKeyClass(); valClass = (Class )job.getMapOutputValueClass(); serializationFactory = new SerializationFactory(job); keySerializer = serializationFactory.getSerializer(keyClass); keySerializer.open(bb); valSerializer = serializationFactory.getSerializer(valClass); valSerializer.open(bb); // output counters mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); fileOutputByteCounter = reporter .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES); // 溢写到磁盘,可以使用一个压缩格式! 获取指定的压缩编解码器 // compression if (job.getCompressMapOutput()) { Class<? extends CompressionCodec> codecClass = job.getMapOutputCompressorClass(DefaultCodec.class); codec = ReflectionUtils.newInstance(codecClass, job); } else { codec = null; } // 获取Combiner组件 // combiner final Counters.Counter combineInputCounter = reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); combinerRunner = CombinerRunner.create(job, getTaskID(), combineInputCounter, reporter, null); if (combinerRunner != null) { final Counters.Counter combineOutputCounter = reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); combineCollector= new CombineOutputCollector (combineOutputCounter, reporter, job); } else { combineCollector = null; } spillInProgress = false; minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3); // 设置溢写线程在后台运行,溢写是在后台运行另外一个溢写线程!和收集是两个线程! spillThread.setDaemon(true); spillThread.setName("SpillThread"); spillLock.lock(); try { // 启动线程 spillThread.start(); while (!spillThreadRunning) { spillDone.await(); } } catch (InterruptedException e) { throw new IOException("Spill thread failed to initialize", e); } finally { spillLock.unlock(); } if (sortSpillException != null) { throw new IOException("Spill thread failed to initialize", sortSpillException); } }
4. Paritionner的获取
从配置中读取mapreduce.job.partitioner.class
,如果没有指定,采用HashPartitioner.class
如果reduceTask > 1, 还没有设置分区组件,使用HashPartitioner
@SuppressWarnings("unchecked") public Class<? extends Partitioner<?,?>> getPartitionerClass() throws ClassNotFoundException { return (Class<? extends Partitioner<?,?>>) conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class); }
public class HashPartitionerextends Partitioner { /** Use {@link Object#hashCode()} to partition. **/ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
分区号的限制:0 <= 分区号 < 总的分区数(reduceTask的个数)
if (partition < 0 || partition >= partitions) { throw new IOException("Illegal partition for " + key + " (" + partition + ")"); }
5.MapTask shuffle的流程
①在map()调用context.write()
②调用MapoutPutBuffer的collect()
③将当前key-value收集到MapOutPutBuffer中
④溢写前,先根据分区号,将相同分区号的key-value,采用快速排序算法,进行排序!
⑤ 开始溢写,按照排序后有序的索引,将文件写入到一个临时的溢写文件中
⑥多次溢写后,每次溢写都会产生一个临时文件
⑦最后,执行一次flush(),将剩余的key-value进行溢写
⑧MergeParts: 将多次溢写的结果,保存为一个总的文件!
⑨最终保证生成一个最终的文件,这个文件根据总区号,分为若干部分,每个部分的key-value都已经排好序,等待ReduceTask来拷贝相应分区的数据
6. Combiner
combiner其实就是Reducer类型:
Class<? extends Reducer> cls = (Class<? extends Reducer >) job.getCombinerClass();
Combiner的运行时机:
MapTask:
ReduceTask:
③reduceTask在运行时,需要启动shuffle进程拷贝MapTask产生的数据!
①一定会运行,②,③需要条件!
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。如果你想了解更多相关内容请查看下面相关链接