百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

Kotlin协程之一文看懂Channel管道

yuyutoo 2025-04-07 20:59 14 浏览 0 评论

概述

Channel 类似于 Java 的 BlockingQueue 阻塞队列,不同之处在于 Channel 提供了挂起的 send() 和 receive() 方法。另外,通道 Channel 可以被关闭表明不再有数据会进入 Channel, 而接收端可以通过 for 循环取出数据。

Channel 也是生产-消费者模式,这个设计模式在协程中很常见。

基本使用

val channel = Channel()

// 发送
launch {
    repeat(10) {
        channel.send(it)
        delay(200)
    }
    // 关闭
    channel.close()
}

// 接收
launch {
    for (i in channel) {
        println("receive: $i")
    }
    // 关闭后
    println("closed")
}

produce 和 actor

produce 和 actor 是 Kotlin 提供的构造生产者与消费者的便捷方法。

其中 produce 方法用来启动一个生产者协程,并返回一个 ReceiveChannel 在其他协程中接收数据:

// produce 生产协程
val receiveChannel = CoroutineScope(Dispatchers.IO).produce {
    repeat(10) {
        send(it)
        delay(200)
    }
}

// 接收者 1
launch {
    for (i in receiveChannel) {
        println("receive-1: $i")
    }
}

// 接收者 2
launch {
    for (i in receiveChannel) {
        println("receive-2: $i")
    }
}

输出:

2022-11-29 10:48:03.045 I/System.out: receive-1: 0
2022-11-29 10:48:03.250 I/System.out: receive-1: 1
2022-11-29 10:48:03.451 I/System.out: receive-2: 2
2022-11-29 10:48:03.654 I/System.out: receive-1: 3
2022-11-29 10:48:03.856 I/System.out: receive-2: 4
2022-11-29 10:48:04.059 I/System.out: receive-1: 5
2022-11-29 10:48:04.262 I/System.out: receive-2: 6
2022-11-29 10:48:04.466 I/System.out: receive-1: 7
2022-11-29 10:48:04.669 I/System.out: receive-2: 8
2022-11-29 10:48:04.871 I/System.out: receive-1: 9

反之也可以用 actor 来启动一个消费协程:

// actor 消费协程
val sendChannel = CoroutineScope(Dispatchers.IO).actor {
    while (true) {
        println("receive: ${receive()}")
    }
}

// 发送者 1
launch {
    repeat(10) {
        sendChannel.send(it)
        delay(200)
    }
}

// 发送者 2
launch {
    repeat(10) {
        sendChannel.send(it * it)
        delay(200)
    }
}

可以看出 produce 创建的是一个单生产者——多消费者的模型,而 actor 创建的是一个单消费者--多生产者的模型

不过这些相关的 API 要不就是 ExperimentalCoroutinesApi 实验性标记的,要不就是 ObsoleteCoroutinesApi 废弃标记的,个人感觉暂时没必要使用它们。

Channel 是公平的

发送和接收操作是公平的,它们遵守先进先出原则。官方也给了一个例子:

data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel() // 一个共享的 table(桌子)
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // 率先打出第一个球
    delay(1000) // 延迟 1 秒钟
    coroutineContext.cancelChildren() // 游戏结束,取消它们
}

suspend fun player(name: String, table: Channel) {
    for (ball in table) { // 在循环中接收球
        ball.hits++
        println("$name $ball")
        delay(300) // 等待一段时间
        table.send(ball) // 将球发送回去
    }
}

由于 ping 协程首先被启动,所以它首先接收到了球,接着即使 ping 协程在将球发送后会立即开始接收,但是球还是被 pong 协程接收了,因为它一直在等待着接收球:

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

带缓冲的 Channel

前面已经说过 Channel 实际上是一个队列,那它当然也存在一个缓存区以及缓存满后的策略(处理背压之类的问题),在创建 Channel 时可以指定两个相关的参数:

public fun  Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel

这里的 Channel() 其实并不是构造函数,而是一个顶层函数,它内部会根据不同的入参来创建不同类型的 Channel 实例。其参数含义如下:

  • capacity: Channel 缓存区的容量,默认为 RENDEZVOUS = 0
  • onBufferOverflow: 缓冲区满后发送端的处理策略,默认挂起。当消费者处理数据比生产者生产数据慢时,新生产的数据会存入缓存区,当缓存区满后,生产者再调用 send() 方法会挂起,等待消费者处理数据。

看个小栗子:

// 创建缓存区大小为 4 的 Channel
val channel = Channel(4)

// 发送
launch {
    repeat(10) {
        channel.send(it)
        println("send: $it")
        delay(200)
    }
}

// 接收
launch {
    val channel = viewModel.channel
    for (i in channel) {
        println("receive: $i")
        delay(1000)
    }
}

输出结果:

2022-11-28 17:16:47.905 I/System.out: send: 0
2022-11-28 17:16:47.907 I/System.out: receive: 0
2022-11-28 17:16:48.107 I/System.out: send: 1
2022-11-28 17:16:48.310 I/System.out: send: 2
2022-11-28 17:16:48.512 I/System.out: send: 3
2022-11-28 17:16:48.715 I/System.out: send: 4
2022-11-28 17:16:48.910 I/System.out: receive: 1
2022-11-28 17:16:48.916 I/System.out: send: 5 // 缓存区满了, receive 后才能继续发送
2022-11-28 17:16:49.913 I/System.out: receive: 2
2022-11-28 17:16:49.914 I/System.out: send: 6
2022-11-28 17:16:50.917 I/System.out: receive: 3
2022-11-28 17:16:50.917 I/System.out: send: 7
2022-11-28 17:16:51.920 I/System.out: receive: 4
2022-11-28 17:16:51.920 I/System.out: send: 8
2022-11-28 17:16:52.923 I/System.out: receive: 5
2022-11-28 17:16:52.923 I/System.out: send: 9
2022-11-28 17:16:53.925 I/System.out: receive: 6
2022-11-28 17:16:54.928 I/System.out: receive: 7
2022-11-28 17:16:55.932 I/System.out: receive: 8
2022-11-28 17:16:56.935 I/System.out: receive: 9

Channel 构造类型

这一节来简单看看 Channel 构造的几种类型,为防止内容过于枯燥,就不深入剖析一些源码细节了。

Channel 构造

public fun  Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel =
    when (capacity) {
        RENDEZVOUS -> {
            if (onBufferOverflow == BufferOverflow.SUSPEND)
                RendezvousChannel(onUndeliveredElement)
            else
                ArrayChannel(1, onBufferOverflow, onUndeliveredElement)
        }
        CONFLATED -> {
            require(onBufferOverflow == BufferOverflow.SUSPEND) {
                "CONFLATED capacity cannot be used with non-default onBufferOverflow"
            }
            ConflatedChannel(onUndeliveredElement)
        }
        UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
        BUFFERED -> ArrayChannel( // uses default capacity with SUSPEND
            if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,
            onBufferOverflow, onUndeliveredElement
        )
        else -> {
            if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
                ConflatedChannel(onUndeliveredElement)
            else
                ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
        }
    }

前面我们说了 Channel() 并不是构造函数,而是一个顶层函数,它内部会根据不同的入参来创建不同类型的 Channel 实例。我们看看入参可取的值:

public const val UNLIMITED: Int = Int.MAX_VALUE
public const val RENDEZVOUS: Int = 0
public const val CONFLATED: Int = -1
public const val BUFFERED: Int = -2

public enum class BufferOverflow {
    SUSPEND, DROP_OLDEST, DROP_LATEST
}

其实光看这个构造的过程,以及两个入参的取值,我们基本上就能知道生成的这个 Channel 实例的表现了。

比如说 UNLIMITED 表示缓存区无限大的管道,它所创建的 Channel 叫 LinkedListChannel; 而 BUFFERED 或指定 capacity 大小的入参,创建的则是 ArrayChannel 实例,这也正是命名为 LinkedList(链表) 和 Array(数组) 的数据结构一个区别,前者可以视为无限大,后者有固定的容量大小。

比如说 SUSPEND 表示缓存区满后挂起, DROP_OLDEST 表示缓存区满后会删除缓存区里最旧的那个元素且把当前 send 的数据存入缓存区, DROP_LATEST 表示缓存区满后会删除缓存区里最新的那个元素且把当前 send 的数据存入缓存区。

Channel 类型

上面创建的这四种 Channel 都有一个共同的基类——AbstractChannel,简单看看他们的继承关系:

在 AbstractSendChannel 中有个重要的成员变量:

protected val queue = LockFreeLinkedListHead()

它是一个循环双向链表,形成了一个队列 queue 结构,send() 数据时存入链表尾部,receive() 数据时就从链表头第一个节点取。至于具体的挂起,恢复等流程,感兴趣的可以自己看看源码。

值得一提的是, queue 中的节点类型可以大体分为三种:

  • Send
  • Receive
  • Closed: 当调用 Channel.close() 方法时,会往 queue 队列中加入 Closed 节点,这样当 send or receive 时就知道 Channel 已经关闭了。

另外,对于 ArrayChannel 管道,它有一个成员变量:

private var buffer: Array = arrayOfNulls(min(capacity, 8)).apply { fill(EMPTY) }

这是一个数组类型,用来实现指定 capacity 的缓存区。但是它的初始大小不是 capacity, 主要是用来防止一些不必要的内存分配。

总结

Channel 类似于 BlockingQueue 阻塞队列,其不同之处是默认把阻塞行为换成了挂起,这也是协程的一大特性。它的思想是生产-消费模式(观察者模式)。

简单比较一下四种 Channel 类型:

  • RendezvousChannel: 翻译成约会类型,缓存区大小为0,且指定为 SUSPEND 挂起策略。发送者和接收者一对一出现,接收者没出现,则发送者 send 会被挂起;发送者没出现,则接收者 receive 会被挂起。
  • ConflatedChannel: 混合类型。发送者不会挂起,它只有一个 value 值,会被新的值覆盖掉;如果没有数据,则接收者会被挂起。
  • LinkedListChannel: 不限缓存区大小的类型。发送者不会挂起,能一直往队列里存数据;队列无数据时接收者会被挂起。
  • ArrayChannel: 指定缓存区大小的类型。当缓存区满时,发送者根据 BufferOverflow 策略来处理(是否挂起);当缓存区空时,接收者会被挂起。

作者:苍耳叔叔
链接:
https://juejin.cn/post/7171272840426029063

来源:稀土掘金

相关推荐

VBA中利用Instr函数(vba int函数)

【分享成果,随喜正能量】每一个在你的生命里出现的人,都有原因,喜欢你的人给了你温暖和勇气,你喜欢的人让你学会了爱和自持,你不喜欢的人教会你宽容与尊重,不喜欢你的人让你自省与成长。。...

Insta360 Link体验:支持4K画质,一款使用场景丰富的AI云台摄像头

记者|王公逸伴随直播、线上会议需求的兴起,网络直播的需求愈发增大,8月2日,影石Insta360正式推出全新产品:Insta360Link,这是一款AI智能云台摄像头。从产品形态来说,Insta3...

VBA技术资料MF299:利用Instr进行文本查找

我给VBA的定义:VBA是个人小型自动化处理的有效工具。利用好了,可以大大提高自己的工作效率,而且可以提高数据的准确度。“VBA语言専攻”提供的教程一共九套,分为初级、中级、高级三大部分,教程是对VB...

Fabric.js 拖放元素进画布 - 掘金

本文简介点赞+关注+收藏=学会了学习Fabric.js,我的建议是看文档不如看demo。本文实现的功能:将元素拖进到画布中并生成对应的图形或图片。效果如下图所示:...

Vue3为什么推荐使用ref而不是reactive

为什么推荐使用ref而不是reactivereactive本身具有很大局限性导致使用过程需要额外注意,如果忽视这些问题将对开发造成不小的麻烦;ref更像是vue2时代optionapi的data的替...

Fabric.js 样式不更新怎么办?(js更改样式)

本文简介带尬猴,我嗨德育处主任不知道你有没有遇到过在使用Fabric.js时无意中一些骚操作修改了元素的样式,但刷新画布却没更新元素样式?如果你也遇到同样的问题的话,可以尝试使用本文的方法。...

Fabric.js 修改画布交互方式到底有什么用?

本文简介点赞+关注+收藏=学会了fabric.js为我们提供了很多厉害的方法。今天要搞明白的一个东西是canvas.interactive。官方文档对canvas.interact...

Rust Web编程:第五章 在浏览器上显示内容

我们现在正处于可以构建一个Web应用程序的阶段,该应用程序可以使用不同的方法和数据管理一系列HTTP请求。这很有用,特别是当我们为微服务构建服务器时。然而,我们也希望非程序员能够与我们的应...

Fabric.js 自由绘制椭圆 - 掘金(canvas画椭圆)

本文简介点赞+关注+收藏=学会了本文讲解在Fabric.js中如何自由绘制椭圆形,如果你还不了解Fabric.js,可以查阅《Fabric.js从入门到精通》。效果如下图所示...

手把手教你实现JS手搓"防抖"优化代码——专业的事用专业的方法!

前言在我们前端编程中,假如我们要给后端发送请求,万一手抖多点了几次,多发送了几遍怎么办?解决方案:防抖!这种事就要交给我们专业的“防抖”先生来处理!今天,我们就来教大家手搓“防抖”...

详解虚拟DOM与Diff算法(虚拟dom一定比实际dom快吗)

vue的虚拟DOM,Diff算法,其中一些关键的地方从别处搬运了一些图进行说明(感谢制图的大佬),也包含比较详细的源码解读。...

走进 React Fiber 的世界(我走进你的世界手势舞视频)

文/阿里淘系F(x)Team-冷卉Fiber设计思想Fiber是对React核心算法的重构,facebook团队使用两年多的时间去重构React的核心算法,在React16以上...

前端新一代框架 Svelte 火了!十个场景带你简单认识它!

近几年听到的主流框架都是Vue、React、Angular,但其实有一个框架在国外非常火,用起来也是很方便,那就是...

借助DeepSeek实现了一个PDF阅读器

1、简介使用pdf.js库加载和显示PDF文件。实现了翻页、缩放功能。提供了基本的错误处理。功能特点:支持选择本地PDF文件。可以逐页查看PDF内容。支持放大缩小功能。界面简洁,易于使...

DeepSeek代码之旅1:卫星地图标记方法之——html语言的实现

最近遇到一个任务,具体功能如下:1、调用高德地图API,图层为卫星图层,根据需要标记兴趣点;2、标记完成后可以保存兴趣点,便于下次加载历史兴趣点。...

取消回复欢迎 发表评论: