百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

8 RDD 常用算子(3)(rdd的操作算子包括哪两种?)

yuyutoo 2025-07-06 17:44 3 浏览 0 评论

双值类型

两个数据源之间的关联操作。

intersection

函数签名

def intersection(other: RDD[T]): RDD[T]

函数说明

对源RDD和参数RDD求交集后返回一个新的RDD。

union()

函数签名

def union(other: RDD[T]): RDD[T]

函数说明

对源RDD和参数RDD求并集后返回一个新的RDD。

subtract()

函数签名

def subtract(other: RDD[T]): RDD[T]

函数说明

以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集。

intersection() 、union()、subtract() 不支持两个数据类型不一致的 RDD 进行求交集操作,参数要求与原 RDD 的元素类型一致。

zip()

函数签名

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

函数说明

将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个 RDD 中的元素,Value 为第 2 个 RDD 中的相同位置的元素。

拉链操作,两个 RDD 的数据类型可以不一致。

分区数不相等时,不能进行拉链操作。

val rdd1 = sc.makeRDD(List(1, 2, 3, 4), numSlices = 2)
val rdd2 = sc.makeRDD(List(3, 4, 5, 6), numSlices = 4)

// 拉链 期望【(1,3),(2,4),(3,5),(4,6)】
val rdd3 = rdd1.zip(rdd2)
println("Zip:" + rdd3.collect().mkString(","))

会报如下错误:

Exception in thread "main" java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 4)

两个数据源中,分区中的数据数量也要保持一致。

val rdd4 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), numSlices = 2)
val rdd5 = sc.makeRDD(List(3, 4, 5, 6), numSlices = 2)

// 拉链 期望【(1,3),(2,4),(3,5),(4,6)】
val rdd6 = rdd4.zip(rdd5)
println("Zip:" + rdd6.collect().mkString(","))

否则报如下错误:

org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition

综合示例:

object RDD_Transform_MultipleValue {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 双Value 类型
    val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
    val rdd2 = sc.makeRDD(List(3, 4, 5, 6))

    // 交集 期望【3,4】
    val rdd3 = rdd1.intersection(rdd2)
    println("Intersection:" + rdd3.collect().mkString(","))

    // 并集 期望【1,2,3,4,3,4,5,6】
    val rdd4 = rdd1.union(rdd2)
    println("Union:" + rdd4.collect().mkString(","))

    // 差集 期望【1,2】
    val rdd5 = rdd1.subtract(rdd2)
    println("Subtract:" + rdd5.collect().mkString(","))

    // 拉链 期望【(1,3),(2,4),(3,5),(4,6)】
    val rdd6 = rdd1.zip(rdd2)
    println("Zip:" + rdd6.collect().mkString(","))


    // Step3: 关闭环境
    sc.stop()
  }

}

输出结果:

Intersection:3,4
Union:1,2,3,4,3,4,5,6
Subtract:1,2
Zip:(1,3),(2,4),(3,5),(4,6)

key-value类型

partitionBy()

函数签名

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

函数说明

将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner。

partitionBy() 算子并不是属于 RDD 的方法,而是属于 PairRDDFunctions 类的方法,要求 RDD 必须是 Key-Value 类型的 RDD。Scala 隐式转换,程序在编译错误时,会尝试在作用域范围内查找转换规则,将类型转换成特定类型后编译通过。隐式转换,相当于一种二次编译。

RDD 中有 rddToPairRDDFunctions() 隐式函数,尝试将 [K,V] 类型的 RDD 转换成 PairRDDFunctions 类型。

partitionBy() 要与 coalesce()、repartition() 算子区分开,前者是将其数据进行重分区;后者是调整分区的数量。

object RDD_Transform_partitionBy {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 partitionBy
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    val mapRDD:RDD[(Int, Int)] = rdd.map((_, 1))
    mapRDD.partitionBy(new HashPartitioner(partitions = 2)).saveAsTextFile("output")
    // Step3: 关闭环境
    sc.stop()
  }
}

最后的输出:

最终实现【1,3】【2,4】分组。

【说明1】HashPartitioner 继承自 Partitioner,有两个主要方法:

  • numPartitions:获取分区数量;
  • getPartition:获取分区号,返回一个整数值,其实就是 key 值对于分区号进行取模计算,获得分区值。
def getPartition(key: Any): Int = key match {
  case null => 0
  case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}

nonNegativeMod() 方法源码:

def nonNegativeMod(x: Int, mod: Int): Int = {
  val rawMod = x % mod
  rawMod + (if (rawMod < 0) mod else 0)
}

【说明2】如果 RDD 连续进行两次 partitionBy() 算子操作,partitionBy() 算子会对自身的 partitioner 对象进行比较。Scala 的 == 会进行类型和非空比较。

partithoner 自身又有 equals() 方法,比较两个分区器是否同一。比较逻辑,要看:类型、分区数量。如果同一,分区器将不做任何处理,保留self 的分区器。

override def equals(other: Any): Boolean = other match {
  case h: HashPartitioner =>
    h.numPartitions == numPartitions
  case _ =>
    false
}

【问题3】其他类型分区器 Partitioner 有 3 个子类。其中 PythonPartitioner (包含小锁头)说明其是在特定包下运行的分区器。RangePartitioner 多用于排序操作。

reduceByKey()

函数签名

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

函数说明

可以将数据按照相同的 Key 对 Value 进行聚合。

  1. Scala 语音中一般聚合操作都是两两聚合,spark 基于 Scala 开发,所以它的聚合也是两两聚合。
  2. reduceByKey() 中如果 Key 的数据只有一个,则该 Key 不参与运算。
object RDD_Transform_reduceByKey {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 reduceByKey:相同的 Key 的数据进行 value 数据的聚合操作
    // Scala 语音中一般聚合操作都是两两聚合,spark 基于 Scala 开发,所以它的聚合也是两两聚合
    val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
    val reduceRDD = rdd.reduceByKey(
      (x: Int, y: Int) => {
        println(s"x=${x}, y=${y}")
        (x + y)
      })
    reduceRDD.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

x=1, y=2
x=3, y=3
(a,6)
(b,4)

groupByKey()

函数签名

def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

函数说明

将数据源的数据根据 key 对 value 进行分组。

object RDD_Transform_groupByKey {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 groupByKey
    val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
    val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
    groupRDD.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(a,CompactBuffer(1, 2, 3))
(b,CompactBuffer(4))

groupByKey() 与 groupBy()的区别:

  • groupByKey():1)通过 Key 进行分组;2)返回 RDD[(String, Iterable[Int])],聚合后的结果。
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
  • groupBy():1)可以使用元组中任意元素进行分组;2)返回的是分组后的元组,元组仍然是原始的元组。
val groupRDD1: RDD[(String, Iterable[(String, Int)])]=rdd.groupBy(_._1)

groupByKey() 与 reduceByKey() 的区别:

从性能上看:reduceByKey 在分区内进行预聚合(分区内做聚合),在本地将数据量进行压缩,可以使 shuffle 落盘时数据量减少,同时在 reduce 时从文件读取的数据量的大小也进行压缩,从而提高 shuffle 的效果。如果进行聚合,reduceByKey() 性能较高

从功能上看:如果只分组,那只能使用 groupByKey()。

reduceByKey() 分区内和分区间计算规则相同。

aggregateByKey()

函数签名

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
     combOp: (U, U) => U): RDD[(K, U)]

aggregateByKey() 存在参数柯里化,接受两个参数列表。

  • 第一个参数列表, 需要传递一个参数:表示初始值。主要用于当我们遇到第一个 key 时,和 value 进行分区计算
  • 第二个参数列表:
    • 第一个参数表示分区内计算规则;
    • 第二个参数表示分区件计算规则。

函数说明

将数据根据不同的规则进行分区内计算和分区间计算。

object RDD_Transform_aggregateByKey {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 aggregateByKey
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)

    // aggregateByKey() 存在参数柯里化
    // 第一个参数列表, 需要传递一个参数:表示初始值
    // 主要用于当我们遇到第一个 key 时,和 value 进行分区计算
    // 第二个参数列表:
    //    第一个参数表示分区内计算规则
    //    第二个参数表示分区件计算规则
    rdd.aggregateByKey(zeroValue = 0)(
      (x, y) => math.max(x,y),
      (x, y) => x + y
    ).collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(b,8)
(a,8)

修改 zeroValue 值后的效果:

object RDD_Transform_aggregateByKey_zeroValue {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 aggregateByKey
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)

    // aggregateByKey() 存在参数柯里化
    // 第一个参数列表, 需要传递一个参数:表示初始值
    // 主要用于当我们遇到第一个 key 时,和 value 进行分区计算
    // 第二个参数列表:
    //    第一个参数表示分区内计算规则
    //    第二个参数表示分区件计算规则
    rdd.aggregateByKey(zeroValue = 5)(
      (x, y) => math.max(x,y),
      (x, y) => x + y
    ).collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果,如预期:

(b,8)
(a,8)

aggregateByKey() 方法,如果分区内、分区间使用相同的聚合函数,则效果与 reduceByKey() 相同。

    rdd.aggregateByKey(zeroValue = 0)(
      (x, y) => x + y,
      (x, y) => x + y
    )
    // 简化写法:匿名函数的字典原则
    rdd.aggregateByKey(zeroValue = 0)( _+_,_+_)

应用场景,按 Key 值取平均数。

object RDD_Transform_aggregateByKey_average {

  def main(args: Array[String]): Unit = {
    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 aggregateByKey
    // 数据分区【("a",1),("a",2)】【("a",3),("a",4)】
    val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),numSlices = 2)
    // 初始值 (0,0),tuple 的第一个元素为求和,第二个值为 key 出现的次数
    val averageRDD: RDD[(String,(Int, Int))] = rdd.aggregateByKey(zeroValue = (0, 0))(
      (t, v) => {
        // 分区内,tuple 第一个元素为值相加,第二个元素为次数相加
        (t._1 + v, t._2 + 1)
      },
      // 分区间,tuple 中的元素,第一个元素值相加,第二个元素数量相加
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )
    // RDD 中,如果 Key 保持不变,只对 Value 进行处理,可以使用 mapValues() 算子
    averageRDD.mapValues{
      case (num, cnt) => {
        num / cnt
      }
    }.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

(b,4)
(a,3)

相关推荐

.NET 奇葩问题调试经历之3——使用了grpc通讯类库后,内存一直增长......

...

全局和隐式 using 指令详解(全局命令)

1.什么是全局和隐式using?在.NET6及更高版本中,Microsoft引入了...

请停止微服务,做好单体的模块化才是王道:Spring Modulith介绍

1、介绍模块化单体是一种架构风格,代码是根据模块的概念构成的。对于许多组织而言,模块化单体可能是一个很好的选择。它有助于保持一定程度的独立性,这有助于我们在需要的时候轻松过渡到微服务架构。Spri...

ASP.NET程序集引用之痛:版本冲突、依赖地狱等解析与实战

我是一位多年后端经验的工程师,其中前几年用ASP.NET...

.NET AOT 详解(.net 6 aot)

简介AOT(Ahead-Of-TimeCompilation)是一种将代码直接编译为机器码的技术,与传统的...

一款基于Yii2开发的免费商城系统(一款基于yii2开发的免费商城系统是什么)

哈喽,我是老鱼,一名致力于在技术道路上的终身学习者、实践者、分享者!...

asar归档解包(游戏arc文件解包)

要学习Electron逆向,首先要有一个Electron开发的程序的发布的包,这里就以其官方的electron-quick-start作为例子来进行一下逆向的过程。...

在PyCharm 中免费集成Amazon CodeWhisperer

CodeWhisperer是Amazon发布的一款免费的AI编程辅助小工具,可在你的集成开发环境(IDE)中生成实时单行或全函数代码建议,帮助你快速构建软件。简单来说,AmazonCodeWhi...

2014年最优秀JavaScript编辑器大盘点

1.WebstormWebStorm是一种轻量级的、功能强大的IDE,为Node.js复杂的客户端开发和服务器端开发提供完美的解决方案。WebStorm的智能代码编辑器支持JavaScript,...

基于springboot、tio、oauth2.0前端vuede 超轻量级聊天软件分享

项目简介:基于JS的超轻量级聊天软件。前端:vue、iview、electron实现的PC桌面版聊天程序,主要适用于私有云项目内部聊天,企业内部管理通讯等功能,主要通讯协议websocket。支持...

JetBrains Toolbox推出全新产品订阅授权模式

捷克知名软件开发公司JetBrains最为人所熟知的产品是Java编程语言开发撰写时所用的集成开发环境IntelliJIDEA,相信很多开发者都有所了解。而近期自2015年11月2日起,JetBr...

idea最新激活jetbrains-agent.jar包,亲测有效

这里分享一个2019.3.3版本的jetbrains-agent.jar,亲测有效,在网上找了很多都不能使用,终于找到一个可以使用的了,这里分享一下具体激活步骤,此方法适用于Jebrains家所有产品...

CountDownTimer的理解(countdowntomars)

CountDownTimer是android开发常用的计时类,按照注释中的说明使用方法如下:kotlin:object:CountDownTimer(30000,1000){...

反射为什么性能会很慢?(反射时为什么会越来越长)

1.背景前段时间维护一个5、6年前的项目,项目总是在某些功能使用上不尽人意,性能上总是差一些,仔细过了一下代码发现使用了不少封装好的工具类,工具类里面用了好多的反射,反射会影响到执行效率吗?盲猜了一...

btrace 开源!基于 Systrace 高性能 Trace 工具

介绍btrace(又名RheaTrace)是抖音基础技术团队自研的一款高性能AndroidTrace工具,它基于Systrace实现,并针对Systrace不足之处加以改进,核心改进...

取消回复欢迎 发表评论: