Spark源码阅读:DataFrame.collect 作业提交流程思维导图
yuyutoo 2025-05-03 17:34 5 浏览 0 评论
本文分为两个部分:
- 作业提交流程思维导图
- 关键函数列表
作业提交流程思维导图
点击「链接」查看DataFrame.collect触发的作业提交流程思维导图。
关键函数列表
Dataset.collect
def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)
Dataset.withAction
Dataset.collectFromPlan
触发物理计划的执行,其中 plan 的类型是 SparkPlan
private def collectFromPlan(plan: SparkPlan): Array[T] = {
val fromRow = resolvedEnc.createDeserializer()
plan.executeCollect().map(fromRow)
}
Spark 有很多action 函数,比如:
- collect
- count
- show
最终都是通过 collectFromPlan 去创建 Job
SparkPlan.executeCollect
这个函数分为三部:
- getByteArrayRdd函数 将UnsafeRow RDD 转化为 byte array RDD,加速序列化
- 然后调用了 RDD.collect
- 解析 collect 结果,并返回
RDD.collect
Resilient Distributed Dataset (RDD), 是一种不可变、支持分区的数据集合。由于支持分区,该数据集支持并行访问。
class RDD是一个基类,它有很多子类:
- ShuffledRDD:存储shuffle结果数据,parent RDD 是 Java key-value 对
- ShuffledRowRDD:存储shuffle结果数据,parent RDD 是 InternalRow,SparkSQL使用
- MapPartitionsRDD:算子会被应用到 parent RDD 的所有分区
- UnionRDD:存储 union 的结果数据
- 其他 RDD 子类
collect 方法的主要职能是提交 Spark 作业,该功能代理给了 SparkContext 去支持:
SparkContext.runJob
runJob 方法有很多重载,我们只关心最复杂的一个:
从功能上来说,它实现了
- 准备 callSite,以便出问题知道是哪一行代码出错了
- 通过 DAGScheduler.runJob提交作业
- progressBar: 命令行里 stage的进度条显示
- doCheckpoint 将 RDD的中间和最后结果缓存下来
从代码上来说,方法声明如下:
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit
它有两个泛型类型参数:
- T: ClassTag 输入RDD的类型
- U: ClassTag 输出数据的类型
参数列表:
- rdd: RDD[T] 指输入RDD类型,比如 RDD[(Long, Array[Byte])]
- func: (TaskContext, Iterator[T]) => U。func会被作用到 rdd的每个分区,返回U
- partitions: Seq[Int]。分区下标列表
- resultHandler: (Int, U)。这是一个回调函数。处理func执行完返回的数据,第一个参数是分区index,第二个是func的返回值
返回值: Unit 表示没有任何返回值
DAGScheduler.runJob
对于 DAGScheduler 而言,Stage是最小的调度单元。它会
- 给Job生成以Stage为调度单位的DAG图
- 追踪RDD和Stage的输出状态,比如哪些已经被物化,并基于这些信息提供一个最优的调度方案
- 提交Stage,以TaskSet的形式提交给 TasksetManager
DAGScheduler 对Job的调度是围绕
DAGSchedulerEventProcessLoop 展开的。这是一个经典的EventLoop使用场景。runJob 方法的执行流程如下:
- 提交任务本质上是向 EventLoop 发送一个 JobSubmitted 事件
- 通过一个JobWaiter对象等待结果
在 EventLoop 的另一端,onReceive 接收到 JobSubmitted事件,交给成员函数 handleJobSubmitted 处理该事件。
JobWaiter 内部有一个 Promise 对象,它会不停接收到 taskSucceeded,增加计数,知道成功task的数量等于task的总数量,将promise置为成功。
DAGSchedulerEventProcessLoop.onReceive
onReceive 负责接收各类事件,并分发给特定的 handler 函数处理,具体可以看思维导图或spark代码。
这里我们只看 handleJobSubmitted,它做了五件事情:
- 创建Stage:递归式地创建,先创建parent stage
- 注册Stage
- 创建Job
- 注册Job
- 提交Stage
由于 stage 是一个有向无环图,所以创建和执行都遵循 topological order。
DAGScheduler.createResultStage
在 SparkPlan 对象调用 execute 时,会递归地生成 RDD,从而构成了 RDD Lineage Graph,它是一个有向无环图。那么在 RDD Lineage 上如何切分 stage 呢?
RDD依赖分为宽依赖和窄依赖,代码体现为两个类ShuffleDependency和NarrowDependency。在构建 RDD Lineage时,相邻的两个RDD必须有其中一种依赖关系。Spark通过这种依赖关系划分 Stage。根节点的RDD必须分配到 ResultStage里,而之前所有的Stage,不管有多少级依赖,都是 ShuffleMapStage。
在
DAGScheduler.getShuffleDependenciesAndResourceProfiles
方法中,通过一个栈来记录分配到当前stage中的 RDD(窄依赖中的rdd都会被push到栈里),碰到宽依赖,则加到 shuffleDeps 中。
相关推荐
- 自卑的人容易患抑郁症吗?(自卑会导致抑郁吗)
-
Filephoto[Photo/IC]Lowself-esteemmakesusfeelbadaboutourselves.Butdidyouknowthatovert...
- 中考典型同(近)义词组(同义词考题)
-
中考典型同(近)义词组...
- BroadcastReceiver的原理和使用(broadcast-suppression)
-
一、使用中注意的几点1.动态注册、静态注册的优先级在AndroidManifest.xml中静态注册的receiver比在代码中用registerReceiver动态注册的优先级要低。发送方在send...
- Arduino通过串口透传ESP 13板与java程序交互
-
ESP13---是一个无线板子,配置通过热点通信Arduino通过串口透传ESP13板与java程序交互...
- zookeeper的Leader选举源码解析(zookeeper角色选举角色包括)
-
作者:京东物流梁吉超zookeeper是一个分布式服务框架,主要解决分布式应用中常见的多种数据问题,例如集群管理,状态同步等。为解决这些问题zookeeper需要Leader选举进行保障数据的强一致...
- 接待外国人英文口语(接待外国友人的英语口语对话)
-
接待外国人英文口语询问访客身份: MayIhaveyourname,please? 请问您贵姓? Whatcompanyareyoufrom? 您是哪个公司的? Could...
- 一文深入理解AP架构Nacos注册原理
-
Nacos简介Nacos是一款阿里巴巴开源用于管理分布式微服务的中间件,能够帮助开发人员快速实现动态服务发现、服务配置、服务元数据及流量管理等。这篇文章主要剖析一下Nacos作为注册中心时其服务注册与...
- Android面试宝典之终极大招(android面试及答案)
-
以下内容来自兆隆IT云学院就业部,根据多年成功就业服务经验,以及职业素养课程部分内容,归纳总结:18.请描述一下Intent和IntentFilter。Android中通过Intent...
- 除了Crontab,Swoole Timer也可以实现定时任务的
-
一般的定时器是怎么实现的呢?我总结如下:1.使用Crontab工具,写一个shell脚本,在脚本中调用PHP文件,然后定期执行该脚本;2.ignore_user_abort()和set_time_li...
- Spark源码阅读:DataFrame.collect 作业提交流程思维导图
-
本文分为两个部分:作业提交流程思维导图关键函数列表作业提交流程思维导图...
- 使用Xamarin和Visual Studio开发Android可穿戴设备应用
-
搭建开发环境我们需要做的第一件事情是安装必要的工具。因此,你需要首先安装VisualStudio。如果您使用的是VisualStudio2010,2012或2013,那么请确保它是一个专业版本或...
- Android开发者必知的5个开源库(android 开发相关源码精编解析)
-
过去的时间里,Android开发逐步走向成熟,一个个与Android相关的开发工具也层出不穷。不过,在面对各种新鲜事物时,不要忘了那些我们每天使用的大量开源库。在这里,向大家介绍的就是,在这个任劳任怨...
- Android事件总线还能怎么玩?(android实现事件处理的步骤)
-
顾名思义,AndroidEventBus是一个Android平台的事件总线框架,它简化了Activity、Fragment、Service等组件之间的交互,很大程度上降低了它们之间的耦合,使我们的代码...
- Android 开发中文引导-应用小部件
-
应用小部件是可以嵌入其它应用(例如主屏幕)并收到定期更新的微型应用视图。这些视图在用户界面中被叫做小部件,并可以用应用小部件提供者发布。可以容纳其他应用部件的应用组件叫做应用部件的宿主(1)。下面的截...
你 发表评论:
欢迎- 一周热门
-
-
前端面试:iframe 的优缺点? iframe有那些缺点
-
带斜线的表头制作好了,如何填充内容?这几种方法你更喜欢哪个?
-
漫学笔记之PHP.ini常用的配置信息
-
推荐7个模板代码和其他游戏源码下载的网址
-
其实模版网站在开发工作中很重要,推荐几个参考站给大家
-
[干货] JAVA - JVM - 2 内存两分 [干货]+java+-+jvm+-+2+内存两分吗
-
正在学习使用python搭建自动化测试框架?这个系统包你可能会用到
-
织梦(Dedecms)建站教程 织梦建站详细步骤
-
【开源分享】2024PHP在线客服系统源码(搭建教程+终身使用)
-
2024PHP在线客服系统源码+完全开源 带详细搭建教程
-
- 最近发表
-
- 自卑的人容易患抑郁症吗?(自卑会导致抑郁吗)
- 中考典型同(近)义词组(同义词考题)
- WPF 消息传递简明教程(wpf messagebox.show)
- BroadcastReceiver的原理和使用(broadcast-suppression)
- Arduino通过串口透传ESP 13板与java程序交互
- zookeeper的Leader选举源码解析(zookeeper角色选举角色包括)
- 接待外国人英文口语(接待外国友人的英语口语对话)
- 一文深入理解AP架构Nacos注册原理
- Android面试宝典之终极大招(android面试及答案)
- 除了Crontab,Swoole Timer也可以实现定时任务的
- 标签列表
-
- mybatis plus (70)
- scheduledtask (71)
- css滚动条 (60)
- java学生成绩管理系统 (59)
- 结构体数组 (69)
- databasemetadata (64)
- javastatic (68)
- jsp实用教程 (53)
- fontawesome (57)
- widget开发 (57)
- vb net教程 (62)
- hibernate 教程 (63)
- case语句 (57)
- svn连接 (74)
- directoryindex (69)
- session timeout (58)
- textbox换行 (67)
- extension_dir (64)
- linearlayout (58)
- vba高级教程 (75)
- iframe用法 (58)
- sqlparameter (59)
- trim函数 (59)
- flex布局 (63)
- contextloaderlistener (56)