8 RDD 常用算子(3)(rdd的操作算子包括哪两种?)
yuyutoo 2025-07-06 17:44 3 浏览 0 评论
双值类型
两个数据源之间的关联操作。
intersection
函数签名
def intersection(other: RDD[T]): RDD[T]
函数说明
对源RDD和参数RDD求交集后返回一个新的RDD。
union()
函数签名
def union(other: RDD[T]): RDD[T]
函数说明
对源RDD和参数RDD求并集后返回一个新的RDD。
subtract()
函数签名
def subtract(other: RDD[T]): RDD[T]
函数说明
以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集。
intersection() 、union()、subtract() 不支持两个数据类型不一致的 RDD 进行求交集操作,参数要求与原 RDD 的元素类型一致。
zip()
函数签名
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
函数说明
将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个 RDD 中的元素,Value 为第 2 个 RDD 中的相同位置的元素。
拉链操作,两个 RDD 的数据类型可以不一致。
分区数不相等时,不能进行拉链操作。
val rdd1 = sc.makeRDD(List(1, 2, 3, 4), numSlices = 2)
val rdd2 = sc.makeRDD(List(3, 4, 5, 6), numSlices = 4)
// 拉链 期望【(1,3),(2,4),(3,5),(4,6)】
val rdd3 = rdd1.zip(rdd2)
println("Zip:" + rdd3.collect().mkString(","))
会报如下错误:
Exception in thread "main" java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 4)
两个数据源中,分区中的数据数量也要保持一致。
val rdd4 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), numSlices = 2)
val rdd5 = sc.makeRDD(List(3, 4, 5, 6), numSlices = 2)
// 拉链 期望【(1,3),(2,4),(3,5),(4,6)】
val rdd6 = rdd4.zip(rdd5)
println("Zip:" + rdd6.collect().mkString(","))
否则报如下错误:
org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
综合示例:
object RDD_Transform_MultipleValue {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 双Value 类型
val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
val rdd2 = sc.makeRDD(List(3, 4, 5, 6))
// 交集 期望【3,4】
val rdd3 = rdd1.intersection(rdd2)
println("Intersection:" + rdd3.collect().mkString(","))
// 并集 期望【1,2,3,4,3,4,5,6】
val rdd4 = rdd1.union(rdd2)
println("Union:" + rdd4.collect().mkString(","))
// 差集 期望【1,2】
val rdd5 = rdd1.subtract(rdd2)
println("Subtract:" + rdd5.collect().mkString(","))
// 拉链 期望【(1,3),(2,4),(3,5),(4,6)】
val rdd6 = rdd1.zip(rdd2)
println("Zip:" + rdd6.collect().mkString(","))
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
Intersection:3,4
Union:1,2,3,4,3,4,5,6
Subtract:1,2
Zip:(1,3),(2,4),(3,5),(4,6)
key-value类型
partitionBy()
函数签名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
函数说明
将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner。
partitionBy() 算子并不是属于 RDD 的方法,而是属于 PairRDDFunctions 类的方法,要求 RDD 必须是 Key-Value 类型的 RDD。Scala 隐式转换,程序在编译错误时,会尝试在作用域范围内查找转换规则,将类型转换成特定类型后编译通过。隐式转换,相当于一种二次编译。
RDD 中有 rddToPairRDDFunctions() 隐式函数,尝试将 [K,V] 类型的 RDD 转换成 PairRDDFunctions 类型。
partitionBy() 要与 coalesce()、repartition() 算子区分开,前者是将其数据进行重分区;后者是调整分区的数量。
object RDD_Transform_partitionBy {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 partitionBy
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD:RDD[(Int, Int)] = rdd.map((_, 1))
mapRDD.partitionBy(new HashPartitioner(partitions = 2)).saveAsTextFile("output")
// Step3: 关闭环境
sc.stop()
}
}
最后的输出:
最终实现【1,3】【2,4】分组。
【说明1】HashPartitioner 继承自 Partitioner,有两个主要方法:
- numPartitions:获取分区数量;
- getPartition:获取分区号,返回一个整数值,其实就是 key 值对于分区号进行取模计算,获得分区值。
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
nonNegativeMod() 方法源码:
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
【说明2】如果 RDD 连续进行两次 partitionBy() 算子操作,partitionBy() 算子会对自身的 partitioner 对象进行比较。Scala 的 == 会进行类型和非空比较。
partithoner 自身又有 equals() 方法,比较两个分区器是否同一。比较逻辑,要看:类型、分区数量。如果同一,分区器将不做任何处理,保留self 的分区器。
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
【问题3】其他类型分区器 Partitioner 有 3 个子类。其中 PythonPartitioner (包含小锁头)说明其是在特定包下运行的分区器。RangePartitioner 多用于排序操作。
reduceByKey()
函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
函数说明
可以将数据按照相同的 Key 对 Value 进行聚合。
- Scala 语音中一般聚合操作都是两两聚合,spark 基于 Scala 开发,所以它的聚合也是两两聚合。
- reduceByKey() 中如果 Key 的数据只有一个,则该 Key 不参与运算。
object RDD_Transform_reduceByKey {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 reduceByKey:相同的 Key 的数据进行 value 数据的聚合操作
// Scala 语音中一般聚合操作都是两两聚合,spark 基于 Scala 开发,所以它的聚合也是两两聚合
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
val reduceRDD = rdd.reduceByKey(
(x: Int, y: Int) => {
println(s"x=${x}, y=${y}")
(x + y)
})
reduceRDD.collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
x=1, y=2
x=3, y=3
(a,6)
(b,4)
groupByKey()
函数签名
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
函数说明
将数据源的数据根据 key 对 value 进行分组。
object RDD_Transform_groupByKey {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 groupByKey
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
groupRDD.collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
(a,CompactBuffer(1, 2, 3))
(b,CompactBuffer(4))
groupByKey() 与 groupBy()的区别:
- groupByKey():1)通过 Key 进行分组;2)返回 RDD[(String, Iterable[Int])],聚合后的结果。
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
- groupBy():1)可以使用元组中任意元素进行分组;2)返回的是分组后的元组,元组仍然是原始的元组。
val groupRDD1: RDD[(String, Iterable[(String, Int)])]=rdd.groupBy(_._1)
groupByKey() 与 reduceByKey() 的区别:
从性能上看:reduceByKey 在分区内进行预聚合(分区内做聚合),在本地将数据量进行压缩,可以使 shuffle 落盘时数据量减少,同时在 reduce 时从文件读取的数据量的大小也进行压缩,从而提高 shuffle 的效果。如果进行聚合,reduceByKey() 性能较高
从功能上看:如果只分组,那只能使用 groupByKey()。
reduceByKey() 分区内和分区间计算规则相同。
aggregateByKey()
函数签名
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
aggregateByKey() 存在参数柯里化,接受两个参数列表。
- 第一个参数列表, 需要传递一个参数:表示初始值。主要用于当我们遇到第一个 key 时,和 value 进行分区计算
- 第二个参数列表:
- 第一个参数表示分区内计算规则;
- 第二个参数表示分区件计算规则。
函数说明
将数据根据不同的规则进行分区内计算和分区间计算。
object RDD_Transform_aggregateByKey {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 aggregateByKey
// 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)
// aggregateByKey() 存在参数柯里化
// 第一个参数列表, 需要传递一个参数:表示初始值
// 主要用于当我们遇到第一个 key 时,和 value 进行分区计算
// 第二个参数列表:
// 第一个参数表示分区内计算规则
// 第二个参数表示分区件计算规则
rdd.aggregateByKey(zeroValue = 0)(
(x, y) => math.max(x,y),
(x, y) => x + y
).collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
(b,8)
(a,8)
修改 zeroValue 值后的效果:
object RDD_Transform_aggregateByKey_zeroValue {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 aggregateByKey
// 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)
// aggregateByKey() 存在参数柯里化
// 第一个参数列表, 需要传递一个参数:表示初始值
// 主要用于当我们遇到第一个 key 时,和 value 进行分区计算
// 第二个参数列表:
// 第一个参数表示分区内计算规则
// 第二个参数表示分区件计算规则
rdd.aggregateByKey(zeroValue = 5)(
(x, y) => math.max(x,y),
(x, y) => x + y
).collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果,如预期:
(b,8)
(a,8)
aggregateByKey() 方法,如果分区内、分区间使用相同的聚合函数,则效果与 reduceByKey() 相同。
rdd.aggregateByKey(zeroValue = 0)(
(x, y) => x + y,
(x, y) => x + y
)
// 简化写法:匿名函数的字典原则
rdd.aggregateByKey(zeroValue = 0)( _+_,_+_)
应用场景,按 Key 值取平均数。
object RDD_Transform_aggregateByKey_average {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 aggregateByKey
// 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)
// 初始值 (0,0),tuple 的第一个元素为求和,第二个值为 key 出现的次数
val averageRDD: RDD[(String,(Int, Int))] = rdd.aggregateByKey(zeroValue = (0, 0))(
(t, v) => {
// 分区内,tuple 第一个元素为值相加,第二个元素为次数相加
(t._1 + v, t._2 + 1)
},
// 分区间,tuple 中的元素,第一个元素值相加,第二个元素数量相加
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
// RDD 中,如果 Key 保持不变,只对 Value 进行处理,可以使用 mapValues() 算子
averageRDD.mapValues{
case (num, cnt) => {
num / cnt
}
}.collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
(b,4)
(a,3)
相关推荐
- 全局和隐式 using 指令详解(全局命令)
-
1.什么是全局和隐式using?在.NET6及更高版本中,Microsoft引入了...
- 请停止微服务,做好单体的模块化才是王道:Spring Modulith介绍
-
1、介绍模块化单体是一种架构风格,代码是根据模块的概念构成的。对于许多组织而言,模块化单体可能是一个很好的选择。它有助于保持一定程度的独立性,这有助于我们在需要的时候轻松过渡到微服务架构。Spri...
- ASP.NET程序集引用之痛:版本冲突、依赖地狱等解析与实战
-
我是一位多年后端经验的工程师,其中前几年用ASP.NET...
- .NET AOT 详解(.net 6 aot)
-
简介AOT(Ahead-Of-TimeCompilation)是一种将代码直接编译为机器码的技术,与传统的...
- 一款基于Yii2开发的免费商城系统(一款基于yii2开发的免费商城系统是什么)
-
哈喽,我是老鱼,一名致力于在技术道路上的终身学习者、实践者、分享者!...
- asar归档解包(游戏arc文件解包)
-
要学习Electron逆向,首先要有一个Electron开发的程序的发布的包,这里就以其官方的electron-quick-start作为例子来进行一下逆向的过程。...
- 在PyCharm 中免费集成Amazon CodeWhisperer
-
CodeWhisperer是Amazon发布的一款免费的AI编程辅助小工具,可在你的集成开发环境(IDE)中生成实时单行或全函数代码建议,帮助你快速构建软件。简单来说,AmazonCodeWhi...
- 2014年最优秀JavaScript编辑器大盘点
-
1.WebstormWebStorm是一种轻量级的、功能强大的IDE,为Node.js复杂的客户端开发和服务器端开发提供完美的解决方案。WebStorm的智能代码编辑器支持JavaScript,...
- 基于springboot、tio、oauth2.0前端vuede 超轻量级聊天软件分享
-
项目简介:基于JS的超轻量级聊天软件。前端:vue、iview、electron实现的PC桌面版聊天程序,主要适用于私有云项目内部聊天,企业内部管理通讯等功能,主要通讯协议websocket。支持...
- JetBrains Toolbox推出全新产品订阅授权模式
-
捷克知名软件开发公司JetBrains最为人所熟知的产品是Java编程语言开发撰写时所用的集成开发环境IntelliJIDEA,相信很多开发者都有所了解。而近期自2015年11月2日起,JetBr...
- idea最新激活jetbrains-agent.jar包,亲测有效
-
这里分享一个2019.3.3版本的jetbrains-agent.jar,亲测有效,在网上找了很多都不能使用,终于找到一个可以使用的了,这里分享一下具体激活步骤,此方法适用于Jebrains家所有产品...
- CountDownTimer的理解(countdowntomars)
-
CountDownTimer是android开发常用的计时类,按照注释中的说明使用方法如下:kotlin:object:CountDownTimer(30000,1000){...
- 反射为什么性能会很慢?(反射时为什么会越来越长)
-
1.背景前段时间维护一个5、6年前的项目,项目总是在某些功能使用上不尽人意,性能上总是差一些,仔细过了一下代码发现使用了不少封装好的工具类,工具类里面用了好多的反射,反射会影响到执行效率吗?盲猜了一...
- btrace 开源!基于 Systrace 高性能 Trace 工具
-
介绍btrace(又名RheaTrace)是抖音基础技术团队自研的一款高性能AndroidTrace工具,它基于Systrace实现,并针对Systrace不足之处加以改进,核心改进...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- .NET 奇葩问题调试经历之3——使用了grpc通讯类库后,内存一直增长......
- 全局和隐式 using 指令详解(全局命令)
- 请停止微服务,做好单体的模块化才是王道:Spring Modulith介绍
- ASP.NET程序集引用之痛:版本冲突、依赖地狱等解析与实战
- .NET AOT 详解(.net 6 aot)
- 一款基于Yii2开发的免费商城系统(一款基于yii2开发的免费商城系统是什么)
- asar归档解包(游戏arc文件解包)
- 在PyCharm 中免费集成Amazon CodeWhisperer
- 2014年最优秀JavaScript编辑器大盘点
- 基于springboot、tio、oauth2.0前端vuede 超轻量级聊天软件分享
- 标签列表
-
- mybatis plus (70)
- scheduledtask (71)
- css滚动条 (60)
- java学生成绩管理系统 (59)
- 结构体数组 (69)
- databasemetadata (64)
- javastatic (68)
- jsp实用教程 (53)
- fontawesome (57)
- widget开发 (57)
- vb net教程 (62)
- hibernate 教程 (63)
- case语句 (57)
- svn连接 (74)
- directoryindex (69)
- session timeout (58)
- textbox换行 (67)
- extension_dir (64)
- linearlayout (58)
- vba高级教程 (75)
- iframe用法 (58)
- sqlparameter (59)
- trim函数 (59)
- flex布局 (63)
- contextloaderlistener (56)