破解 Kotlin 协程(10):Select 篇

Select 并不是什么新鲜概念,我们在 IO 多路复用的时候就见过它,在 Java NIO 里面也见过它。接下来给各位介绍的是 Kotlin 协程的 Select。

复用多个 await

我们前面已经接触了很多挂起函数,那么如果我有这样一个场景,两个 API 分别从网络和本地缓存获取数据,期望哪个先返回就先用哪个做展示:

1
2
3
4
5
6
7
fun CoroutineScope.getUserFromApi(login: String) = async(Dispatchers.IO){
gitHubServiceApi.getUserSuspend(login)
}

fun CoroutineScope.getUserFromLocal(login:String) = async(Dispatchers.IO){
File(localDir, login).takeIf { it.exists() }?.readText()?.let { gson.fromJson(it, User::class.java) }
}

不管先调用哪个 API 返回的 Deferredawait,都会被挂起,如果想要实现这一需求就要启动两个协程来调用 await,这样反而将问题复杂化了。

接下来我们用 select 来解决这个问题:

1
2
3
4
5
6
7
8
9
10
GlobalScope.launch {
val localDeferred = getUserFromLocal(login)
val remoteDeferred = getUserFromApi(login)

val userResponse = select<Response<User?>> {
localDeferred.onAwait { Response(it, true) }
remoteDeferred.onAwait { Response(it, false) }
}
...
}.join()

大家可以看到,我们没有直接调用 await,而是调用了 onAwaitselect 当中注册了个回调,不管哪个先回调,select 立即返回对应回调中的结果。假设 localDeferred.onAwait 先返回,那么 userResponse 的值就是 Response(it, true),当然由于我们的本地缓存可能不存在,因此 select 的结果类型是 Response<User?>

对于这个案例本身,如果先返回的是本地缓存,那么我们还需要获取网络结果来展示最新结果:

1
2
3
4
5
6
7
8
9
GlobalScope.launch {
...
userResponse.value?.let { log(it) }
userResponse.isLocal.takeIf { it }?.let {
val userFromApi = remoteDeferred.await()
cacheUser(login, userFromApi)
log(userFromApi)
}
}.join()

复用多个 Channel

对于多个 Channel 的情况,也比较类似:

1
2
3
4
5
6
7
8
9
val channels = List(10) { Channel<Int>() }

select<Int?> {
channels.forEach { channel ->
channel.onReceive { it }
// OR
channel.onReceiveOrNull { it }
}
}

对于 onReceive,如果 Channel 被关闭,select 会直接抛出异常;而对于 onReceiveOrNull 如果遇到 Channel 被关闭的情况,it 的值就是 null

SelectClause

我们怎么知道哪些事件可以被 select 呢?其实所有能够被 select 的事件都是 SelectClauseN 类型,包括:

  • SelectClause0:对应事件没有返回值,例如 join 没有返回值,对应的 onJoin 就是这个类型,使用时 onJoin 的参数是一个无参函数:

    1
    2
    3
    select<Unit> {
    job.onJoin { log("Join resumed!") }
    }
  • SelectClause1:对应事件有返回值,前面的 onAwaitonReceive 都是此类情况。

  • SelectClause2:对应事件有返回值,此外还需要额外的一个参数,例如 Channel.onSend 有两个参数,第一个就是一个 Channel 数据类型的值,表示即将发送的值,第二个是发送成功时的回调:

    1
    2
    3
    4
    5
    6
    7
    List(100) { element ->
    select<Unit> {
    channels.forEach { channel ->
    channel.onSend(element) { sentChannel -> log("sent on $sentChannel") }
    }
    }
    }

    在消费者的消费效率较低时,数据能发给哪个就发给哪个进行处理,onSend 的第二个参数的参数是数据成功发送到的 Channel 对象。

因此如果大家想要确认挂起函数是否支持 select,只需要查看其是否存在对应的 SelectClauseN 即可。

小结

在协程当中,Select 的语义与 Java NIO 或者 Unix 的 IO 多路复用类似,它的存在使得我们可以轻松实现 1 拖 N,实现哪个先来就处理哪个。尽管 Select 和 Channel 比起标准库的协程 API 已经更接近业务开发了,不过个人认为它们仍属于相对底层的 API 封装,在实践当中多数情况下也可以使用 Flow API 来解决。

而这个 Flow API,完全就是响应式编程的协程版 API,我们简直可以照着 RxJava 来学习它,所以我们下一篇再见吧~~~


关于作者

霍丙乾 bennyhuo,Google 开发者专家(Kotlin 方向);《深入理解 Kotlin 协程》 作者(机械工业出版社,2020.6);《深入实践 Kotlin 元编程》 作者(机械工业出版社,2023.8);前腾讯高级工程师,现就职于猿辅导