【大数据技术专题篇】Spark shuffle过程详解

报价        2019-07-20   来源:笔上有魂

一、文章概述

互联网大数据仓库一般基于开源软件Hadoop和Hive进行构建,其中计算引擎主要是偏离线计算的MapReduce和偏实时计算的Spark,在执行过程中两者都包含了一个重要过程--shuffle。通过对shuffle过程的分析,进一步了解如何优化shuffle过程,提高系统执行效率。本文重点介绍Spark的Shuffle过程。


二、Spark的Shuffle过程介绍


  • Shuffle Writer

Spark丰富了任务的类型,有些任务之间的数据流转不再需要Shuffle过程,但是宽依赖(wide dependency)是需要shuffle传递数据的。比如:reducebykey、groupByKey等等。


例如我们有个job的依赖关系如下:


通过上面的依赖关系,可以得到如下的执行过程。

这其中的Shuffle过程是,前一个stage的shufflemaptask进行Shuffle Write,把数据存储在blockManager上面,并把数据位置的元信息读取到driver的mapOutTrack中,下一个stage先获取数据位置元信息,进行Shuffle Read操作,然后拉去上个stage的数据。


spark shuffle 演进的历史

  • Spark 0.8及以前 Hash Based Shuffle

  • Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制

  • Spark 0.9 引入ExternalAppendOnlyMap

  • Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle

  • Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle

  • Spark 1.4 引入Tungsten-Sort Based Shuffle

  • Spark 1.6 Tungsten-sort并入Sort Based Shuffle

  • Spark 2.0 Hash Based Shuffle退出历史舞台


总结:Spark中需要shuffle输出的Map任务会为每个reduce创建对应的bucket,Map产生的结果会根据设置的partition得到对应的bucketId,然后把数据放到bucket中去。每个Map的数据输出结果可能会包含所有的Reduce所需要的数据,所有每个Map需要创建R个bucket(R是reduce的个数),M个Map需要创建M*R个文件。


例如:spark0.8版本(含)以前,3个 map task, 3个 reducer, 会产生 9个小文件;看着有点恐怖。


看着是不是很恐怖,好在spark0.8.1版本之后,进行了改造,4个map task,4个reduce task,如果不使用Consolidation机制,会产生16个文件,好在现在4个map task分两批运行在2个core上,这样会产生8个文件。

在同一个core上先后运行两个map task输出,对应同一个文件的不同segment上,称为一个fileSegment,形成一个ShuffleBlockFile;

在Spark 1.1版本以后引入了Sort Based Shuffle,map端的任务会先按照partition_id排序,然后再按照key排序,同时生成索引文件;

在Spark 1.4版本引入了Tungsten-Sort Based Shuffle,这个是直接使用堆外内存和新的内存管理模型。

现在的Spark 2.1有三种writer:BypassMergeSortShuffleWriter、SortShuffleWriter、UnsafeShuffleWriter,由于Hash Based Shuffle在Spark2.0之后已经退出历史舞台,我们不做介绍。后续章节会详细介绍这三种writer的实现细节。

  • Shuffle Fetcher

Reduce去拖Map的输出数据,Spark提供了socket和netty两种框架方案去拉取数据。每个节点的Executor会创建一个BlockManager,其中一个创建一个BlockManagerWorker用于响应请求。


并不是所有的数据都是通过网络读取,对应在本地节点上的数据,reduce会直接在磁盘上读取数据,而不是通过网络读取数据。


Spark 的Map输出数据和Spark Shuffle过来的数据并没有进行排序,Spark认为不是所有类型的reduce需要的数据都需要进行排序。Reduce拉去过来的数据会临时放在HashMap中,Spark将shuffle取过来的每一条<key,value>数据插入或者更新到HashMap中。


Shuffle取过来的数据会优先放在内存中,但是对于groupByKey这样的操作,当内存中存储的<key, value>数据条数超过1000,并且内存使用超过70%时,如果内存足够,则把内存缓冲区翻倍,否则把内存中的<key,value>排序然后写到磁盘上。最后再把这些内存缓冲区中的数据和磁盘上的小文件组成一个堆,每次从最小堆中读取最小的数据,这个过程和MapReduce的merge过程很相似。