来源:腾讯技术工程微信号
作者:calvinrzluo丨腾讯 IEG 后台开发工程师本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正。为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容。
Spark Core
RDD
RDD(Resilient Distributed Dataset),即弹性数据集是 Spark 中的基础结构。RDD 是 distributive 的、immutable 的,可以被 persist 到磁盘或者内存中。
对 RDD 具有转换操作和行动操作两种截然不同的操作。转换(Transform)操作从一个 RDD 生成另一个 RDD,但行动(Action)操作会去掉 RDD 的 Context。例如take是行动操作,返回的是一个数组而不是 RDD 了,如下所示
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21scala> rdd1.take(1)res0: Array[Int] = Array(10)scala> rdd1.take(2)res1: Array[Int] = Array(10, 4)转换操作是 Lazy 的,直到遇到一个 Eager 的 Action 操作,Spark 才会生成关于整条链的执行计划并执行。这些 Action 操作将一个 Spark Application 分为了多个 Job。
常见的Action 操作包括:reduce、collect、count、take(n)、first、takeSample(withReplacement, num, [seed])、takeOrdered(n, [ordering])、saveAsTextFile(path)、saveAsSequenceFile(path)、saveAsObjectFile(path)、countByKey()、foreach(func)。
常见 RDD
RDD 是一个抽象类abstract class RDD[T] extends Serializable with Logging,在 Spark 中有诸如ShuffledRDD、HadoopRDD等实现。每个 RDD 都有对应的compute方法,用来描述这个 RDD 的计算方法。需要注意的是,这些 RDD 可能被作为某些 RDD 计算的中间结果,例如CoGroupedRDD,对应的,例如MapPartitionsRDD也可能是经过多个 RDD 变换得到的,其决定权在于所使用的算子。
我们来具体查看一些 RDD。
ParallelCollectionRDD
这个 RDD 由parallelize得到scala> val arr = sc.parallelize(0 to 1000)arr: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24HadoopRDDclass HadoopRDD[K, V] extends RDD[(K, V)] with LoggingFileScanRDD
这个 RDD 一般从spark.read.text(...)语句中产生,所以实现在sql 模块中。class FileScanRDD( @transient private val sparkSession: SparkSession, readFunction: (PartitionedFile) => Iterator[InternalRow], @transient val filePartitions: Seq[FilePartition]) extends RDD[InternalRow](sparkSession.sparkContext, Nil "InternalRow") {MapPartitionsRDDclass MapPartitionsRDD[U, T] extends RDD[U]这个 RDD 是
map、mapPartitions、mapPartitionsWithIndex操作的结果。注意,在较早期的版本中,
map会得到一个MappedRDD,filter会得到一个FilteredRDD、flatMap会得到一个FlatMappedRDD,不过目前已经找不到了,统一变成MapPartitionsRDDscala> val a3 = arr.map(i => (i+1, i))a3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:25scala> val a3 = arr.filter(i => i > 3)a3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at filter at <console>:25scala> val a3 = arr.flatMap(i => Array(i))a3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at flatMap at <console>:25join操作的结果也是MapPartitionsRDD,这是因为其执行过程的最后一步flatMapValues会创建一个MapPartitionsRDDscala> val rdd1 = sc.parallelize(Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)))rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:24scala> val rdd2 = sc.parallelize(Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)))rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:24scala> val rddj = rdd1.join(rdd2)rddj: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:27ShuffledRDD
ShuffledRDD用来存储所有 Shuffle 操作的结果,其中K、V很好理解,C是 Combiner Class。class ShuffledRDD[K, V, C] extends RDD[(K, C)]以
groupByKey为例scala> val a2 = arr.map(i => (i+1, i))a2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:25scala> a2.groupByKeyres1: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[3] at groupByKey at <console>:26注意,
groupByKey需要 K 是 Hashable 的,否则会报错。scala> val a2 = arr.map(i => (Array.fill(10)(i), i))a2: org.apache.spark.rdd.RDD[(Array[Int], Int)] = MapPartitionsRDD[2] at map at <console>:25scala> a2.groupByKeyorg.apache.spark.SparkException: HashPartitioner cannot partition array keys. at org.apache.spark.rdd.PairRDDFunctions不能识别此Latex公式:anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:84) at org.apache.spark.rdd.PairRDDFunctionsanonfun<span class="katex-html" aria-hidden="true" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;"><span class="strut" style="height:0.8888799999999999em;vertical-align:-0.19444em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;"><span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">c<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">o<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">m<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">b<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">i<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">n<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">e<span class="mord mathit" style="margin-right:0.05017em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">B<span class="mord mathit" style="margin-right:0.03588em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">y<span class="mord mathit" style="margin-right:0.07153em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">K<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">e<span class="mord mathit" style="margin-right:0.03588em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">y<span class="mord mathit" style="margin-right:0.13889em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">W<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">i<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">t<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">h<span class="mord mathit" style="margin-right:0.07153em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">C<span class="mord mathit" style="margin-right:0.01968em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">l<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="margin-right:0.13889em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">T<span class="mord mathit" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="margin-right:0.03588em;" style="font-size: inherit;color: inherit;line-height: 1.75;overflow-wrap: inherit !important;word-break: inherit !important;">g1.apply(PairRDDFunctions.scala:77)</span class="mord mathit" style="margin-right:0.03588em;"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.13889em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.01968em;"></span class="mord mathit" style="margin-right:0.07153em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.13889em;"></span class="mord mathit" style="margin-right:0.03588em;"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.07153em;"></span class="mord mathit" style="margin-right:0.03588em;"></span class="mord mathit" style="margin-right:0.05017em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="strut" style="height:0.8888799999999999em;vertical-align:-0.19444em;"></span class="katex-html" aria-hidden="true">CoGroupedRDDclass CoGroupedRDD[K] extends RDD[(K, Array[Iterable[_]])]首先,我们需要了解一下什么是
cogroup操作,这个方法有多个重载版本。如下所示的版本,对this或other1或other2的所有的 key,生成一个RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2])),表示对于这个 key,这三个 RDD 中所有值的集合。容易看到,这个算子能够被用来实现 Join 和 Union(不过后者有点大材小用了)def cogroup[W1, W2](other1: RDD[(K, W1 "W1, W2")], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]这里的
Partitioner是一个abstract class,具有numPartitions: Int和getPartition(key: Any): Int两个方法。通过继承Partitioner可自定义分区的实现方式,目前官方提供有RangePartitioner和HashPartitioner等。UnionRDDclass UnionRDD[T] extends RDD[T]UnionRDD一般通过union算子得到scala> val a5 = arr.union(arr2)a5: org.apache.spark.rdd.RDD[Int] = UnionRDD[7] at union at <console>:27CoalescedRDD
常见 RDD 外部函数
Spark 在 RDD 之外提供了一些外部函数,它们可以通过隐式转换的方式变成 RDD。
PairRDDFunctions
这个 RDD 被用来处理 KV 对,相比RDD,它提供了groupByKey、join等方法。以combineByKey为例,他有三个模板参数,从 RDD 过来的K和V以及自己的C。相比 reduce 和 fold 系列的(V, V)=> V,这多出来的C使combineByKey更灵活,通过combineByKey能够将V变换为C。
def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = { //实现略 }OrderedRDDFunctions
这个用来提供sortByKey、filterByRange等方法。
Spark 的架构概览
Spark 在设计上的一个特点是它和下层的集群管理是分开的,一个 Spark Application 可以看做是由集群上的若干进程组成的。因此,我们需要区分 Spark 中的概念和下层集群中的概念,例如我们常见的 Master 和 Worker 是集群中的概念,表示节点;而 Driver 和 Executor 是 Spark 中的概念,表示进程。根据爆栈网,Driver 可能位于某个 Worker 节点中,或者位于 Master 节点上,这取决于部署的方式
在官网上给了这样一幅图,详细阐明了 Spark 集群下的基础架构。SparkContext是整个 Application 的管理核心,由 Driver 来负责管理。SparkContext负责管理所有的 Executor,并且和下层的集群管理进行交互,以请求资源。
在 Stage 层次及以上接受DAGScheduler的调度,而TaskScheduler则调度一个 Taskset。在 Spark on Yarn 模式下,CoarseGrainedExecutorBackend 和 Executor 一一对应,它是一个独立于 Worker 主进程之外的一个进程,我们可以 jps 查看到。而 Task 是作为一个 Executor 启动的一个线程来跑的,一个 Executor 中可以跑多个 Task。
在实现上,CoarseGrainedExecutorBackend继承了ExecutorBackend这个 trait,作为一个IsolatedRpcEndpoint,维护Executor对象实例,并通过创建的DriverEndpoint实例的与 Driver 进行交互。
在进程启动时,CoarseGrainedExecutorBackend调用onStart()方法向 Driver 注册自己,并产生一条"Connecting to driver的 INFO。CoarseGrainedExecutorBackend通过DriverEndpoint.receive方法来处理来自 Driver 的命令,包括LaunchTask、KillTask等。这里注意一下,在 scheduler 中有一个CoarseGrainedSchedulerBackend,里面实现相似,在看代码时要注意区分开。
有关 Executor 和 Driver 的关系,下面这张图更加直观,需要说明的是,一个 Worker 上面也可能跑有多个 Executor,每个 Task 也可以在多个 CPU 核心上面运行
Spark 上下文
在代码里我们操作一个 Spark 任务有两种方式,通过 SparkContext,或者通过 SparkSession
SparkContext 方式
SparkContext 是 Spark 自创建来一直存在的类。我们通过 SparkConf 直接创建 SparkContextval sparkConf = new SparkConf().setAppName("AppName").setMaster("local")val sc = new SparkContext(sparkConf).set("spark.some.config.option", "some-value")- SparkSession 方式
SparkSession 是在 Spark2.0 之后提供的 API,相比 SparkContext,他提供了对 SparkSQL 的支持(持有SQLContext),例如createDataFrame等方法就可以通过 SparkSession 来访问。
在builder.getOrCreate()的过程中,虽然最终得到的是一个 SparkSession,但实际上内部已经创建了一个 SparkContext,并由这个 SparkSession 持有。
val spark: SparkSession = SparkSession.builder() // 得到一个Builder .master("local").appName("AppName").config("spark.some.config.option", "some-value").getOrCreate() // 得到一个SparkSession // SparkSession.scala val sparkContext = userSuppliedContext.getOrElse { val sparkConf = new SparkConf() options.foreach { case (k, v) => sparkConf.set(k, v) } // set a random app name if not given. if (!sparkConf.contains("spark.app.name")) { sparkConf.setAppName(java.util.UUID.randomUUID().toString) } SparkContext.getOrCreate(sparkConf) // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions.}applyExtensions(sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),extensions) session = new SparkSession(sparkContext, None, None, extensions)SparkEnv
SparkEnv持有一个 Spark 实例在运行时所需要的所有对象,包括 Serializer、RpcEndpoint(在早期用的是 Akka actor)、BlockManager、MemoryManager、BroadcastManager、SecurityManager、MapOutputTrackerMaster/Worker 等等。
SparkEnv 由 SparkContext 创建,并在之后通过伴生对象SparkEnv的get方法来访问。
在创建时,Driver 端的 SparkEnv 是 SparkContext 创建的时候调用SparkEnv.createDriverEnv创建的。Executor 端的是其守护进程CoarseGrainedExecutorBackend创建的时候调用SparkEnv.createExecutorEnv方法创建的。这两个方法最后都会调用create方法
// Driver端private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))}_env = createSparkEnv(_conf, isLocal, listenerBus)SparkEnv.set(_env)// Executor端// CoarseGrainedExecutorBackend.scalaval env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)env.rpcEnv.setupEndpoint("Executor", backendCreateFn(env.rpcEnv, arguments, env))arguments.workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))}env.rpcEnv.awaitTermination()// SparkEnv.scala// create函数val blockManager = new BlockManager(...)Spark 的任务调度
Spark 的操作可以分为两种,Transform 操作是 Lazy 的,而 Action 操作是 Eager 的。每一个 Action 会产生一个 Job。
Spark 的 Transform 操作可以分为宽依赖(ShuffleDependency)和窄依赖(NarrowDependency)操作两种,其中窄依赖还有两个子类OneToOneDependency和RangeDependency。窄依赖操作表示父 RDD 的每个分区只被子 RDD 的一个分区所使用,例如union、map、filter等的操作;而宽依赖恰恰相反。宽依赖需要 shuffle 操作,因为需要将父 RDD 的结果需要复制给不同节点用来生成子 RDD,有关ShuffleDependency将在下面的 Shuffle 源码分析中详细说明。当 DAG 的执行中出现宽依赖操作时,Spark 会将其前后划分为不同的 Stage,在下一章节中将具体分析相关代码。
在 Stage 之下,就是若干个 Task 了。这些 Task 也就是 Spark 的并行单元,通常来说,按照当前 Stage 的最后一个 RDD 的分区数来计算,每一个分区都会启动一个 Task 来进行计算。我们可以通过rdd.partitions.size来获取一个 RDD 有多少个分区。
Task 具有两种类型,ShuffleMapTask和ResultTask。其中ResultTask是ResultStage的 Task,也就是最后一个 Stage 的 Task。
Spark 的存储管理
为了实现与底层细节的解耦,Spark 的存储基于 BlockManager 给计算部分提供服务。类似于 Driver 和 Executor,BlockManager 机制也分为 BlockManagerMaster 和 BlockManager。Driver 上的 BlockManagerMaster 对于存在与 Executor 上的 BlockManager 统一管理。BlockManager 只是负责管理所在 Executor 上的 Block。
BlockManagerMaster 和 BlockManager 都是在 SparkEnv 中创建的,
// Mapping from block manager id to the block manager's information.val blockManagerInfo = new concurrent.TrieMap[BlockManagerId, BlockManagerInfo]( "BlockManagerId, BlockManagerInfo")val blockManagerMaster = new BlockManagerMaster( registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint( rpcEnv, isLocal, conf, listenerBus, // 是否使用ExternalShuffleService读取持久化在磁盘上的数据 if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) { externalShuffleClient } else { None }, blockManagerInfo)), registerOrLookupEndpoint( BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME, new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)), conf, isDriver)val blockTransferService = new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)// NB: blockManager is not valid until initialize() is called later.val blockManager = new BlockManager( executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, externalShuffleClient)Driver 节点和 Executor 节点的 BlockManager 之间的交互可以使用下图来描述,在此就不详细说明。
BlockId 和 BlockInfo
抽象类BlockId被用来唯一标识一个 Block,具有全局唯一的名字,通常和一个文件相对应。BlockId有着确定的命名规则,并且和它实际的类型有关。
如果它是用来 Shuffle 的ShuffleBlockId,那么他的命名就是
String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId抑或它是用来 Broadcast 的BroadcastBlockId,他的命名就是
"broadcast_" + broadcastId + (if (field == "") "" else "_" + field)或者它是一个 RDD,它的命名就是
"rdd_" + rddId + "_" + splitIndex通过在 Spark.log 里面跟踪这些 block 名字,我们可以了解到当前 Spark 任务的执行和存储情况。
BlockInfo中的level项表示这个 block 的存储级别。
// BlockInfoManager.scalaprivate[storage] class BlockInfo( val level: StorageLevel, val classTag: ClassTag[_], val tellMaster: Boolean) {持久化
Spark 提供了如下的持久化级别,其中选项为useDisk、useMemory、useOffHeap、deserialized、replication,分别表示是否采用磁盘、内存、堆外内存、反序列化以及持久化维护的副本数。其中反序列化为 false 时(好绕啊),会对对象进行序列化存储,能够节省一定空间,但同时会消耗计算资源。需要注意的是,cache操作是persist的一个特例,等于MEMORY_ONLY的 persist。所有的广播对象都是MEMORY_AND_DISK的存储级别
object StorageLevel extends scala.AnyRef with scala.Serializable { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) // 默认存储类别 val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(true, true, true, false, 1)}想在 Spark 任务完成之后检查每一个 RDD 的缓存状况是比较困难的,虽然在 Spark EventLog 中,我们也能看到在每一个 RDD 的 RDD Info 中有一个 StorageLevel 的条目。RDDInfo的源码建议我们可以通过(Use Disk||Use Memory)&&NumberofCachedPartitions这样的条件来判断一个 RDD 到底有没有被 cache。但实际上,似乎 EventLog 里面的NumberofCachedPartitions、Memory Size、Disk Size永远是 0,这可能是只能在执行过程中才能看到这些字段的值,毕竟 WebUI 的 Storage 标签就只在执行时能看到。不过(Use Disk||Use Memory)在 cache 调用的 RDD 上是 true 的,所以可以以这个 RDD 为根做一个 BFS,将所有不需要计算的 RDD 找出来。
BlockInfoManager
BlockInfoManager用来管理 Block 的元信息,例如它维护了所有 BlockId 的 BlockInfo 信息infos: mutable.HashMap[BlockId, BlockInfo]。不过它最主要的功能还是为读写 Block 提供锁服务
本地读 Block
本地读方法位于 BlockManager.scala 中,从前叫getBlockData,现在叫getLocalBlockData,名字更易懂了。getLocalBlockData的主要内容就对 Block 的性质进行讨论,如果是 Shuffle 的,那么就借助于ShuffleBlockResolver。
ShuffleBlockResolver是一个 trait,它有两个子类IndexShuffleBlockResolver和ExternalShuffleBlockResolver,它们定义如何从一个 logical shuffle block identifier(例如 map、reduce 或 shuffle)中取回 Block。这个类维护 Block 和文件的映射关系,维护 index 文件,向BlockStore提供抽象。
// BlockManager.scalaoverride def getLocalBlockData(blockId: BlockId): ManagedBuffer = { if (blockId.isShuffle) { // 如果这个BlockId是Shuffle的,那么就通过shuffleManager的shuffleBlockResolver来获取BlockData shuffleManager.shuffleBlockResolver.getBlockData(blockId) } else { // 否则使用getLocalBytes getLocalBytes(blockId) match { case Some(blockData) => new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true) case None => // If this block manager receives a request for a block that it doesn't have then it's // likely that the master has outdated block statuses for this block. Therefore, we send // an RPC so that this block is marked as being unavailable from this block manager. reportBlockStatus(blockId, BlockStatus.empty) throw new BlockNotFoundException(blockId.toString) } }}我们看getLocalBytes函数,它带锁地调用doGetLocalBytes
def getLocalBytes(blockId: BlockId): Option[BlockData] = { logDebug(s"Getting local block $blockId as bytes") assert(!blockId.isShuffle, s"Unexpected ShuffleBlockId $blockId") blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }}上面的这一段代码会在 spark.log 中产生类似下面的 Log,我们由此可以对 Block 的用途,存储级别等进行分析。
19/11/26 17:24:52 DEBUG BlockManager: Getting local block broadcast_3_piece0 as bytes19/11/26 17:24:52 TRACE BlockInfoManager: Task -1024 trying to acquire read lock for broadcast_3_piece019/11/26 17:24:52 TRACE BlockInfoManager: Task -1024 acquired read lock for broadcast_3_piece019/11/26 17:24:52 DEBUG BlockManager: Level for block broadcast_3_piece0 is StorageLevel(disk, memory, 1 replicas)doGetLocalBytes负责根据 Block 的存储级别,以最小的代价取到序列化后的数据。从下面的代码中可以看到,Spark 认为序列化一个对象的开销是高于从磁盘中读取一个已经序列化之后的对象的开销的,因为它宁可从磁盘里面取也不愿意直接从内存序列化。
private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): BlockData = { val level = info.level logDebug(s"Level for block $blockId is $level") // 如果内容是序列化的,先尝试读序列化的到内存和磁盘。 // 如果内容是非序列化的,尝试序列化内存中的对象,最后抛出异常表示不存在 if (level.deserialized) { // 因为内存中是非序列化的,尝试能不能先从磁盘中读到非序列化的。 if (level.useDisk && diskStore.contains(blockId)) { // Note: Spark在这里故意不将block放到内存里面,因为这个if分支是处理非序列化块的, // 这个块可能被按照非序列化对象的形式存在内存里面,因此没必要在在内存里面存一份序列化了的。 diskStore.getBytes(blockId) } else if (level.useMemory && memoryStore.contains(blockId)) { // 不在硬盘上,就序列化内存中的对象 new ByteBufferBlockData(serializerManager.dataSerializeWithExplicitClassTag( blockId, memoryStore.getValues(blockId).get, info.classTag), true) } else { handleLocalReadFailure(blockId) } } else { // 如果存在已经序列化的对象 if (level.useMemory && memoryStore.contains(blockId)) { // 先找内存 new ByteBufferBlockData(memoryStore.getBytes(blockId).get, false) } else if (level.useDisk && diskStore.contains(blockId)) { // 再找磁盘 val diskData = diskStore.getBytes(blockId) maybeCacheDiskBytesInMemory(info, blockId, level, diskData) .map(new ByteBufferBlockData(_, false)) .getOrElse(diskData) } else { handleLocalReadFailure(blockId) } }}Spark 的内存管理
在 Spark 1.6 之后,内存管理模式发生了大变化,从前版本的内存管理需要通过指定spark.memory.useLegacyMode来手动启用,因此在这里只对之后的进行论述。
Spark 内存布局
如下图所示,Spark 的堆内存空间可以分为 Spark 托管区、用户区和保留区三块。
其中保留区占 300MB,是固定的。托管区的大小由spark.memory.fraction节制,而1 - spark.memory.fraction的部分用户区。这个值越小,就越容易 Spill 或者 Cache evict。这个设置的用途是将 internal metadata、user data structures 区分开来。从而减少对_稀疏的或者不常出现的大对象的大小_的不准确估计造成的影响(限定词有点多,是翻译的注释、、、)。默认spark.memory.fraction是 0.6。
// package.scalaprivate[spark] val MEMORY_FRACTION = ConfigBuilder("spark.memory.fraction") .doc("...").doubleConf.createWithDefault(0.6)Spark 的托管区又分为 Execution 和 Storage 两个部分。其中 Storage 主要用来缓存 RDD、Broadcast 之类的对象,Execution 被用来存 Mapside 的 Shuffle 数据。
Storage 和 Execution 共享的内存,spark.storage.storageFraction(现在应该已经改成了spark.memory.storageFraction)表示对 eviction 免疫的 Storage 部分的大小,它的值越大,Execution 内存就越小,Task 就越容易 Spill。反之,Cache 就越容易被 evict。默认spark.memory.storageFraction是 0.5。
// package.scalaprivate[spark] val MEMORY_STORAGE_FRACTION = ConfigBuilder("spark.memory.storageFraction") .doc("...").doubleConf.checkValue(v => v >= 0.0 && v < 1.0, "Storage fraction must be in [0,1)").createWithDefault(0.5)Storage 可以借用任意多的 Execution 内存,直到 Execution 重新要回。此时被 Cache 的块会被从内存中 evict 掉(具体如何 evict,根据每个 Block 的存储级别)。Execution 也可以借用任意多的 Storage 的,但是 Execution 的借用不能被 Storage 驱逐,原因是因为实现起来很复杂。我们在稍后将看到,Spark 没有一个统一的资源分配的入口。
除了堆内内存,Spark 还可以使用堆外内存。
MemoryManager
Spark 中负责文件管理的类是MemoryManager,它是一个抽象类,被SparkEnv持有。在 1.6 版本后引入的UnifiedMemoryManager是它的一个实现。
// SparkEnv.scalaval memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores)UnifiedMemoryManager实现了诸如acquireExecutionMemory等方法来分配内存。通过在acquireExecutionMemory时传入一个MemoryMode可以告知是从堆内请求还是从堆外请求。需要注意的是,这类的函数并不像malloc一样直接去请求一段内存,并返回内存的地址,而是全局去维护每个 Task 所使用的内存大小。每一个 Task 在申请内存(new 对象)之前都会去检查一下自己有没有超标,否则就去 Spill。也就是说MemoryManager实际上是一个外挂式的内存管理系统,它不实际上托管内存,整个内存还是由 JVM 管理的。
对 Task 的 Execution 内存使用进行跟踪的这个机制被实现ExecutionMemoryPool中,如下面的代码所示。
// ExecutionMemoryPool.scala// 保存每一个Task所占用的内存大小private val memoryForTask = new mutable.HashMap[Long, Long]( "Long, Long")当然,有ExecutionMemoryPool就也有StorageMemoryPool,他们都不出所料继承了MemoryPool。而以上这些 Pool 最后都被MemoryManager所持有。
// MemoryManager.scala@GuardedBy("this")protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)@GuardedBy("this")protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)@GuardedBy("this")protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)@GuardedBy("this")protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)请求内存的流程
我们知道,在 Shuffle 操作中有两个内存使用大户ExecutorSorter和ExternalAppendOnlyMap,都继承了Spillable,从而实现了在内存不足时进行 Spill。我们查看对应的maybeSpill方法,它调用了自己父类MemoryConsumer中的acquireExecutionMemory方法。
由于从代码注释上看似乎MemoryConsumer包括它引用到的TaskMemoryManager类都与 Tungsten 有关,所以我们将在稍后进行研究。目前只是列明调用过程,因为如果其中涉及要向 Spark 托管内存请求分配,最终调用的还是UnifiedMemoryManager中的对应方法。
// Spillable.scala// 在maybeSpill方法中val granted = acquireMemory(amountToRequest)// MemoryConsumer.scalapublic long acquireMemory(long size) { long granted = taskMemoryManager.acquireExecutionMemory(size, this); used += granted; return granted;}// TaskMemoryManager.javapublic long acquireExecutionMemory(long required, MemoryConsumer consumer) { assert(required >= 0); assert(consumer != null); MemoryMode mode = consumer.getMode(); synchronized (this) { long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode); ...// Executor.scala// TaskMemoryManager中的memoryManager,其实就是一个UnifiedMemoryManagerval taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)下面,我们来看acquireExecutionMemory的详细实现。它前面会首先根据memoryMode选择使用的MemoryPool,是堆内的,还是堆外的。然后它会有个函数maybeGrowExecutionPool,用来处理在需要的情况下从 Storage 部分挤占一些内存回来。我们可以在稍后详看这个方法。现在,我们发现acquireExecutionMemory会往对应的MemoryPool发一个调用acquireMemory。
// UnifiedMemoryManager.scalaoverride private[memory] def acquireExecutionMemory( ... // 实际上是一个ExecutionMemoryPool executionPool.acquireMemory( numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize)}// MemoryManager.scala@GuardedBy("this")protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)由于我们讨论的场景就是请求堆内的执行内存,所以就进入ExecutionMemoryPool.scala查看相关代码。在 Spark 中,会尝试保证每个 Task 能够得到合理份额的内存,而不是让某些 Task 的内存持续增大到一定的数量,然后导致其他人持续地 Spill 到 Disk。
如果有 N 个任务,那么保证每个 Task 在 Spill 前可以获得至少1 / 2N的内存,并且最多只能获得1 / N。因为N是持续变化的,所以我们需要跟踪活跃 Task 集合,并且持续在等待 Task 集合中更新1 / 2N和1 / N的值。这个是借助于同步机制实现的,在 1.6 之前,是由ShuffleMemoryManager来仲裁的。
// ExecutionMemoryPool.scala// 保存每一个Task所占用的内存大小private val memoryForTask = new mutable.HashMap[Long, Long]( "Long, Long")private[memory] def acquireMemory( numBytes: Long, taskAttemptId: Long, maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit, computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized { assert(numBytes > 0, s"invalid number of bytes requested: $numBytes") // TODO: clean up this clunky method signature // 如果我们没有Track到这个Task,那么就加到memoryForTask if (!memoryForTask.contains(taskAttemptId)) { memoryForTask(taskAttemptId) = 0L // 通知wait集合中的Task更新自己的numTasks lock.notifyAll() } // TODO: simplify this to limit each task to its own slot // 尝试寻找,直到要么我们确定我们不愿意给它内存(因为超过1/N)了, // 或者我们有足够的内存提供。注意我们保证每个Task的1/2N的底线 while (true) { val numActiveTasks = memoryForTask.keys.size val curMem = memoryForTask(taskAttemptId) // 在每一次迭代中,首先尝试从Storage借用的内存中拿回部分内存。 // 这是必要的,否则可能发生竞态,此时新的Storage Block会再把这个Task需要的执行内存拿回来。 maybeGrowPool(numBytes - memoryFree) // maxPoolSize是内存池扩容之后可能的最大大小。 // 通过这个值,可以计算所谓的1/N和1/2N具体有多大。在计算时必须考虑可能被释放的内存(例如evicting cached blocks),否则就会导致SPARK-12155的问题 val maxPoolSize = computeMaxPoolSize() val maxMemoryPerTask = maxPoolSize / numActiveTasks val minMemoryPerTask = poolSize / (2 * numActiveTasks) // 最多再给这么多内存 val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem)) // 实际上能给这么多内存 val toGrant = math.min(maxToGrant, memoryFree) // 虽然我们尝试让每个Task尽可能得到1/2N的内存, // 但由于Task数量是动态变化的,可能在N增长前,老的Task就把内存吃完了 // 所以如果我们给不了这么多内存的话,就让它睡在wait上面 if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) { logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free") lock.wait() } else { memoryForTask(taskAttemptId) += toGrant return toGrant } } 0L // Never reached}Tungsten 内存管理机制
Tungsten 不依赖于 Java 对象,所以堆内和堆外的内存分配都可以支持。序列化时间相比原生的要加快很多。其优化主要包含三点:
- Memory Management and Binary Processing
- Cache-aware computation
- Code generation
这个是为了解决在 Spark 2.0 之前 SparkSQL 使用的Volcano中大量的链式next()导致的性能(虚函数等)问题。
在内存管理部分,能看到诸如TaskMemoryManager.java的文件;在稍后的 Shuffle 部分,能看到诸如UnsafeWriter.java的文件。这些 Java 文件在实现上就有对 Tungsten 的使用,因为用到了 sun.misc.Unsafe 的 API,所以使用 Tungsten 的 shuffle 又叫 Unsafe shuffle。
在MemoryManager中持有了 Tungsten 内存管理机制的核心类tungstenMemoryAllocator: MemoryAllocator。并设置了tungstenMemoryMode指示其分配内存的默认位置,如果MEMORY_OFFHEAP_ENABLED是打开的且MEMORY_OFFHEAP_SIZE是大于 0 的,那么默认使用堆外内存。
TaskMemoryManager
TaskMemoryManager这个对象被用来管理一个 Task 的堆内和对外内存分配,因此它能够调度一个 Task 中各个组件的内存使用情况。当组件需要使用TaskMemoryManager提供的内存时,他们需要继承一个MemoryConsumer类,以便向TaskMemoryManager请求内存。TaskMemoryManager中集成了普通的内存分配机制和 Tungsten 内存分配机制。
普通分配 acquireExecutionMemory
我们跟踪TaskMemoryManager.acquireExecutionMemory相关代码,它先尝试从MemoryManager直接请求内存:
// TaskMemoryManager.scalapublic long acquireExecutionMemory(long required, MemoryConsumer consumer) { assert(required >= 0); assert(consumer != null); MemoryMode mode = consumer.getMode(); // 如果我们在分配堆外内存的页,并且受到一个对堆内内存的请求, // 那么没必要去Spill,因为怎么说也只是Spill的堆外内存。 // 不过现在改这个风险很大。。。。 synchronized (this) { long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);如果请求不到,那么先尝试让同一个TaskMemoryManager上的其他的 Consumer Spill,以减少 Spill 频率,从而减少 Spill 出来的小文件数量。主要是根据每个 Consumer 的内存使用排个序,从而避免重复对同一个 Consumer 进行 Spill,导致产生很多小文件。
... if (got < required) { TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>(); for (MemoryConsumer c: consumers) { if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) { long key = c.getUsed(); List<MemoryConsumer> list = sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1)); list.add(c); } }...现在,我们对排序得到的一系列sortedConsumers进行 spill,一旦成功释放出内存,就立刻向 MemoryManager 去请求这些内存,相关代码没啥可看的,故省略。如果内存还是不够,就 Spill 自己,如果成功了,就向 MemoryManager 请求内存。
... // call spill() on itself if (got < required) { try { long released = consumer.spill(required - got, consumer); if (released > 0) { logger.debug("Task {} released {} from itself ({})", taskAttemptId, Utils.bytesToString(released), consumer); got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); } } catch (ClosedByInterruptException e) { ... } } consumers.add(consumer); logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer); return got; }}Tungsten 分配 allocatePage
TaskMemoryManager还有个allocatePage方法,用来获得MemoryBlock,这个是通过 Tungsten 机制分配的。TaskMemoryManager使用了类似操作系统中分页的机制来操控内存。每个“页”,也就是MemoryBlock对象,维护了一段堆内或者堆外的内存。页的总数由PAGE_NUMBER_BITS来决定,即对于一个 64 位的地址,高PAGE_NUMBER_BITS(默认 13)位表示一个页,而后面的位表示在页内的偏移。当然,如果是堆外内存,那么这个 64 位就直接是内存地址了。有关使用分页机制的原因在TaskMemoryManager.java有介绍,我暂时没看懂。
需要注意的是,即使使用 Tungsten 分配,仍然不能绕开UnifiedMemoryManager机制的管理,所以我们看到在allocatePage方法中先要通过acquireExecutionMemory方法注册,请求到逻辑内存之后,再通过下面的方法请求物理内存
// TaskMemoryManager.scalalong acquired = acquireExecutionMemory(size, consumer);if (acquired <= 0) { return null;}page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
推荐阅读:
更多腾讯AI相关技术干货,请关注专栏腾讯技术工程