分析spark初版本的源码,代码行数1w+,比较简单,但是基本架构变换不大,有必要阅读。先提出阅读Spark论文的一些疑问,再阅读源码。

阅读论文的问题

RDD

lazy问题

cache问题:

操作问题

架构问题

start

sc.parallelize(data, numSlices).count 这个最简单的例子为例,学习流程

阅读sparkContext

构建&&初始化一些重要的变量

runJob函数

该函数是运行任务的入口,触发的Action都会调用该函数触发计算:stage划分,task生成,提交task到集群

读取输入数据函数

形成输入数据的RDD

object SparkContext内定义隐式转换

contains a number of implicit conversions and parameters for use with various Spark features.

SparkEnv内重要的组件

Cache设计要点

每个exector上已有一个Cache实例,

|414x114

key的设计--KeySpace
原因:每个exector上已有一个cache实例,所有模块共享,需要区分命名空间
实现:

cache对象大小评估--SizeEstimator

jvm内存分配 -- Runtime.getRuntime.maxMemory
https://stackoverflow.com/questions/23701207/why-do-xmx-and-runtime-maxmemory-not-agree

内部表示-LinkedHashMap(存储顺序与读取一致):
原因:实现LRU order
注意:

理解LRU设计逻辑
Cannot make space without removing part of the same dataset, or a more recently used one
|314x314
https://blog.csdn.net/luanlouis/article/details/43017071

总结

  1. cache的基本架构,driver的Actor服务 + 每个exector想他汇报信息
  2. 如何实现缓存,LRU,所以我们不需要手动uncache,具体实现在Cache极其子类中
  3. 添加缓存的过程:

iterator函数何时调用?参考下面的schedule

Serializer

线程安全的序列化封装,返回一个序列化工具的实例SerializerInstance,该接口是序列化相关的方法,one instance used by one thread at a time.
|295x84
|730x281

使用时val ser = SparkEnv.get.serializer.newInstance()

疑问:
效率低吗?每次都是构建了一个新的实例,不过一个线程一个实例其实还好

CacheTracker

wait&&notify并发编程

rdd.compute(split).

MapOutputTracker
ConcurrentHashMap
什么是generation?

SimpleShuffleFetcher

重点逻辑

Schedule

action代码运行过程:
action(如rdd.count()) --> sc.runjob(业务逻辑函数作为参数) --> schedule.runjob(DAGScheduler) --> schedule.submitTask(LocalScheduler)

Scheduler:

总结:核心逻辑都在DAGScheduler的runJob函数中

  1. 根据RDD生成Stage:参考下面Stage与Dependency/RDD的关系
  2. 根据Stage生成Task:参考下面Task与Stage
  3. 按顺序调度Task:参考下面Task与Stage

生成DAG的Stage图 -- newStage-getParentStages函数,递归调用,使用了rdd.dependencies判断ShuffleDependency/Narrow

查找表
shuffleToMapStage:更具shuffleid
idToStage

理清Dependency与RDD的关系:

Stage与Dependency/RDD的关系(重点!!):
Stage是对上述的图的一种合并&&划分,代码逻辑如下:

Task与Stage:

xxxTask中的rdd.iterator调用会引发rdd的链式调用,注意一个特点,这个链式调用/合并的头一般是一个窄依赖的开始RDD,一般是Shuffle后的一个RDD,如果ShuffledRDD/CoGroupedRDD。他们具有如下特点:
* 他们的compute函数包含了fetcher逻辑,从上游读取数据
* 他们没有parent的RDD,rdd链已经到头了

窄依赖的合并原理:在RDD的compute接口实现中(子类实现)对prev的RDD进行调用,因此形如rdd.map().filter()会先生成MappedRDD,在生成FilteredRDD,在生成这些rdd时的构造参数会传入前一个RDD,即prevFilteredRDDcompute()的实现类似prev.iterator(split).filter(f)。后期shedule调用时,只要引用并调用最后一个RDDcompute即可合并所有计算过程。

shuffle的原理:参考Task与Stage

Scheduler的同步方法使用wait(waitForEvent函数)与notify(taskEnded函数)

DAG->stage图->转化为n个ShuffleMapTask和m个ResultTask(n=ShuffledDependency的数目,m=最后一个rdd的partition数目)

RDD

建议阅读RDD的源码的头部注释,定义了RDD最关键的几个函数

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.
Each RDD is characterized by five main properties:

This class also contains transformation methods available on all RDDs (e.g. map and filter).
In addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs, and SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles.

创建RDD

输出RDD-存储数据

转换rdd

单个RDD间转换

多个RDD合并

转化为PairRDDFunctions

转化为OrderedRDDFunctions

RDD的action

collect take first toArray
count
reduce fold aggregate
foreach
采样:
takeSample

总结
不涉及shuffle的情况下,mapPartitions是一个最通用的函数

Mesos调度

调度的过程由DAGScheduler调用submitTasks方法触发,流程如下:
MesosScheduler -> SimpleJob -> Task(Mesos) -> ...setData发送序列化的task(Spark)... -> Executor

代码逻辑可以简单总结为:Scheduler管理Job,Job管理Task

MesosScheduler

功能包括

Mesos-job的概念

spark在使用mesos运行时有个job的概念,对应的类是SimpleJob,会被MesosScheduler调用。我们知道调度的过程由DAGScheduler调用submitTasks方法触发,一次submitTasks的所有tasks形成一个job(参考MesosScheduler的submitTasks函数)。由于,在DAGSchedule中一次submitTasks就是提交一个Stage的所有未完成的Tasks,因此,在初始情况下,Job可以理解成一个Stage。

SimpleJob:

思考:这个调度框架如何实现任务失败重新获取资源运行?

spark的容错基于一个基本假设:某个机器失败后,大概率不会把失败的机器又分配过来。具体而言:
任务失败后会重新加入等待任务队列allPendingTasks。下次slaveOffer时会重新运行失败的任务。这里只能说下次提供的新资源大概率不是失败的节点。如果需要优化可以添加一个资源黑名单,offer中由经常失败的slaver直接拒绝。(DAGScheduler,108行代码的todo中也说了)

spark-shell

入口类:SparkILoop
入口方法:process函数
基本原理:

accumulator

broadcast

几个重要的问题

shuffle fetcher错误处理(分布式):

spark.default.parallelism

表示程序的并行度,

设置由Scheduler决定:

影响范围:

不影响 :

资源问题

mesos集群的Executor的资源由两部分组成:

preferredLocations问题

这里讨论一下spark中选择任务执行位置时的策略,我们知道选择发生在Scheduler中,具体而言,对于Mesos来说发送在MesosScheduler对资源offer的选择时,从上面的分析可以知道,就近调度的策略只使用了一个关键数据来判断位置,就是Task中的preferredLocations函数,该函数中只是简单的返回了参数传入的位置locs,这个值是在DAGSchedule中决定的。
我们知道一个Task,无论是ResultTask还是ShuffleMapTask都包含了某个Stage中N个RDD的逻辑(或者说一个窄依赖内PipeLine形成的逻辑,在Stage中可以由最后一个RDD表示),因此,这个Task的preferredLocations值是由这N个RDD共同决定的。决策关键代码在DAGSchedule中getPreferredLocs,他的输入是finalRDD或者Stage的最后一个RDDstage.rdd会把Stage内的RDD从后到前寻找满足如下条件的Location:

RDD的preferredLocations函数根据当前的分片返回期望调度(preferred)的位置列表。随着我们不断的运行算子,一个RDD不断的Transform为新的RDD,此时也会发生改变。RDD的分类和preferredLocationsd的实现情况如下:

Shuffle相关的任务直接preferredLocations赋值为Null,强调会失去了本地运行

总结可以发现:

如何结合hdfs的多副本就近访问?

在Spark中参考HadoopRDD、NewHadoopRDD类,都是用了hdfs的API来操作,由几个关键点如下

Split是逻辑概念,Block才是HDFS真实的位置
参考这篇文章讲解了HDFS文件读取的本地化机制。

可能的优化点