百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

Java 22 的流收集器,自定义流处理中间操作

yuyutoo 2024-10-23 16:41 8 浏览 0 评论

Java 中的流终于可以自定义中间处理操作了。这是 Java 22 中新增的预览功能。

自从 Java 8 引入了 Stream 之后,Stream 在 Java 应用中的使用频率非常高。

Stream 处理时的 pipeline 由 3 个部分组成,分别是源、中间操作和终结操作:

  • 源是 Stream 中元素的来源,可以通过 Stream.of 方法创建,或者从集合类得到。
  • 中间操作对流中的元素进行处理,定义在 Stream 类中,包括 map、filter、flatMap、distinct 等。
  • 终结操作会最终产生一个结果或者副作用,包括 collect、reduce、count、forEach 等

在使用 Stream 时,一个很大的痛点是中间操作不能自定义,只能用 Stream 类提供的那些。标准库中的 Stream 类也不会随意更改来添加更多的中间操作。这就导致在某些情况下,Stream 的使用很别扭。

举个例子,Stream 里面有中间操作 distinct,可以去除流中的重复元素,但是只能按照对象相等性来去重。在实际开发中,一个很常见的需求是按照对象的属性来去重。比如,Stream 中包含的是表示学生的 Student 对象。我们希望根据 Student 对象的 id 属性来去重。这个是 distinct 无法实现的。

Java 22 引入了Gatherer 接口,可以提供自定义的 Stream 中间操作。

static <T, A, R> Gatherer<T, A, R> of(
            Supplier<A> initializer,
            Integrator<A, T, R> integrator,
            BinaryOperator<A> combiner,
            BiConsumer<A, Downstream<? super R>> finisher) {
        return new Gatherers.GathererImpl<>(
                Objects.requireNonNull(initializer),
                Objects.requireNonNull(integrator),
                Objects.requireNonNull(combiner),
                Objects.requireNonNull(finisher)
        );
    }

一个 Gatherer 接口由4个方法组成, 分别是 initializer, integrator,combiner 和 finisher。只有 integrator 是必须的,其他都是可选的。

  • initializer 创建一个新的可变状态,无状态的 Gatherer不需要提供。
  • integrator 提供传递给下游的元素。
  • combiner 在并行处理时使用,用来合并中间的状态,顺序处理时不需要提供。
  • finisher 对最终的结果执行额外的处理。

使用 Gatherer 的 of 方法来创建 Gatherer,并由 Stream 的 gather 方法来使用。

下面展示一下如何用 Gatherer来实现基于对象属性的 distinct 操作。这个 Gatherer 实现采用顺序处理,因此不需要用到 combiner。

  • State 表示中间状态,里面用一个 Map 来记录已经出现过的值。initializer 返回一个新的 State 对象。
  • integrator 对于每个元素,调用提供的 extrator 函数,从元素对象中提取属性值。属性值作为 Map 的 key, value 则是元素本身。
  • finisher 查看 State 中 Map 的全部值,并把这些值对象,通过 Downstream 的 push 方法添加进去,作为传递给下游的值。
/**
 * Distinct by an object's property, keep last duplicated element
 */
public class DistinctBy {

  /**
   * Create a new {@linkplain DistinctBy} gatherer
   *
   * @param extractor Extract the property from an element
   * @param <T>       Type of the element
   * @param <R>       Type of the element's property
   * @return A new {@linkplain DistinctBy} gatherer
   */
  static <T, R> Gatherer<T, ?, T> of(Function<? super T, ? extends R> extractor) {
    class State {

      final Map<R, T> seen = new HashMap<>();
    }

    return Gatherer.ofSequential(State::new, Integrator.ofGreedy(
            ((state, element, _) -> {
              state.seen.put(extractor.apply(element), element);
              return true;
            })),
        (state, downstream) -> state.seen.values().forEach(downstream::push)
    );
  }
}


下面的代码展示了如何使用这个 Gatherer。调用 Stream 的 gather 方法,并传入 Gatherer 对象,整体的代码清晰易懂。

public class CustomGatherers {

  record Student(String id, String name) {

  }

  void distinctBy() {
    var result = Stream.of(
            new Student("001", "Alex"),
            new Student("002", "Bob"),
            new Student("001", "Alex")
        ).gather(DistinctBy.of(Student::id))
        .toList();
    // [Student[id=001, name=Alex], Student[id=002, name=Bob]]
    System.out.println(result);
  }

  public static void main(String[] args) {
    new CustomGatherers().distinctBy();
  }
}

与 Gatherer 接口同时发布的还有几个内置的 Gatherer 实现,在 Gatherers 这个类里面。

这些内置的实现包括:

  • windowFixed 把元素收集到指定大小的窗口中。
  • windowSliding 把元素收集到指定大小的滑动窗口中。
  • fold 类似 reduce,产生一个值给下游。
  • scan 从一个初始值开始,后续的每个值根据当前值和流中的输入元素,计算而来。所有这些值都会被提供给下游。
  • mapConcurrent 执行并发的 map 操作,可以指定同时运行的最大任务数量

流收集器目前还是预览功能,我们大概率会在 Java的 下一个 LTS Java 25 中使用它。

相关推荐

ETCD 故障恢复(etc常见故障)

概述Kubernetes集群外部ETCD节点故障,导致kube-apiserver无法启动。...

在Ubuntu 16.04 LTS服务器上安装FreeRADIUS和Daloradius的方法

FreeRADIUS为AAARadiusLinux下开源解决方案,DaloRadius为图形化web管理工具。...

如何排查服务器被黑客入侵的迹象(黑客 抓取服务器数据)

---排查服务器是否被黑客入侵需要系统性地检查多个关键点,以下是一份详细的排查指南,包含具体命令、工具和应对策略:---###**一、快速初步检查**####1.**检查异常登录记录**...

使用 Fail Ban 日志分析 SSH 攻击行为

通过分析`fail2ban`日志可以识别和应对SSH暴力破解等攻击行为。以下是详细的操作流程和关键分析方法:---###**一、Fail2ban日志位置**Fail2ban的日志路径因系统配置...

《5 个实用技巧,提升你的服务器安全性,避免被黑客盯上!》

服务器的安全性至关重要,特别是在如今网络攻击频繁的情况下。如果你的服务器存在漏洞,黑客可能会利用这些漏洞进行攻击,甚至窃取数据。今天我们就来聊聊5个实用技巧,帮助你提升服务器的安全性,让你的系统更...

聊聊Spring AI Alibaba的YuQueDocumentReader

序本文主要研究一下SpringAIAlibaba的YuQueDocumentReaderYuQueDocumentReader...

Mac Docker环境,利用Canal实现MySQL同步ES

Canal的使用使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中,并在springboo...

RustDesk:开源远程控制工具的技术架构与全场景部署实战

一、开源远程控制领域的革新者1.1行业痛点与解决方案...

长安汽车一代CS75Plus2020款安装高德地图7.5

不用破解原车机,一代CS75Plus2020款,安装车机版高德地图7.5,有红绿灯读秒!废话不多讲,安装步骤如下:一、在拨号状态输入:在电话拨号界面,输入:*#518200#*(进入安卓设置界面,...

Zookeeper使用详解之常见操作篇(zookeeper ui)

一、Zookeeper的数据结构对于ZooKeeper而言,其存储结构类似于文件系统,也是一个树形目录服务,并通过Key-Value键值对的形式进行数据存储。其中,Key由斜线间隔的路径元素构成。对...

zk源码—4.会话的实现原理一(会话层的基本功能是什么)

大纲1.创建会话...

Zookeeper 可观测性最佳实践(zookeeper能够确保)

Zookeeper介绍ZooKeeper是一个开源的分布式协调服务,用于管理和协调分布式系统中的节点。它提供了一种高效、可靠的方式来解决分布式系统中的常见问题,如数据同步、配置管理、命名服务和集群...

服务器密码错误被锁定怎么解决(服务器密码错几次锁)

#服务器密码错误被锁定解决方案当服务器因多次密码错误导致账户被锁定时,可以按照以下步骤进行排查和解决:##一、确认锁定状态###1.检查账户锁定状态(Linux)```bash#查看账户锁定...

zk基础—4.zk实现分布式功能(分布式zk的使用)

大纲1.zk实现数据发布订阅...

《死神魂魄觉醒》卡死问题终极解决方案:从原理到实战的深度解析

在《死神魂魄觉醒》的斩魄刀交锋中,游戏卡死犹如突现的虚圈屏障,阻断玩家与尸魂界的连接。本文将从技术架构、解决方案、预防策略三个维度,深度剖析卡死问题的成因与应对之策,助力玩家突破次元壁障,畅享灵魂共鸣...

取消回复欢迎 发表评论: