当前位置: 代码迷 >> 综合 >> kotlin--协程的启动和取消
  详细解决方案

kotlin--协程的启动和取消

热度:92   发布时间:2023-09-14 05:42:48.0

一、协程的启动

1.launch与async构建器都用来启动新协程

launch:我们之前已经使用过了GlobalScope的launch来启动协程,它返回一个Job
async:返回一个Deferred,它也是一个Job,但是可以使用await函数获得运行的结果
除了之前结构化并发中介绍的几种指定CoroutineScope的API外,我们还可以使用runBlocking函数来指定CoroutineScope,他会使用主线程来转换成协程
launch和async内如果有子协程,那么该协程会等待子协程执行结束

fun `test coroutine build`() = runBlocking {val job1 = launch {delay(200)println("job1 finished")}val job2 = async {delay(200)println("job2 finished")"job2 result"}println(job2.await())
}fun main() {`test coroutine build`()
}

runBlocking 作用域内就是main函数的主线程,会等到所有子协程运行完毕后才结束

2.join

Job的join函数,会让后面的协程等待当前的协程执行完毕后再执行

fun `test join build`() = runBlocking {val job1 = launch {delay(200)println("job1 finished :${System.currentTimeMillis()}")delay(2000)}job1.join()println("job1 finished")val job2 = launch {delay(200)println("job2 finished:${System.currentTimeMillis()}")}val job3 = launch {delay(200)println("job2 finished:${System.currentTimeMillis()}")}}fun main() {`test join build`()
}
3.Deferred的await函数,也能达到同样的效果
fun `test async build`() = runBlocking {val job1 = async {delay(200)println("job1 finished :${System.currentTimeMillis()}")delay(2000)}job1.await()println("job1 finished")val job2 = async {delay(200)println("job2 finished:${System.currentTimeMillis()}")}val job3 = async {delay(200)println("job2 finished:${System.currentTimeMillis()}")}
}fun main() {`test async build`()
}
4.async组合并发

首先来看下下面代码:

suspend fun doOne(): Int {delay(1000)return 10
}suspend fun doTwo(): Int {delay(1000)return 20
}fun `test sync`() = runBlocking {val time = measureTimeMillis {//统计时间函数val doOne = doOne()val doTwo = doTwo()println("sum : ${doOne + doTwo}")}println("time : $time")
}

结果:
sum : 30
time : 2032

doOne执行完才会执行doTwo,如果想要它们可以同时执行,又能获取返回结果,可以使用async:

fun `test combine async`() = runBlocking {val time = measureTimeMillis {//统计时间函数val doOne = async { doOne() }val doTwo = async { doTwo() }println("sum : ${doOne.await() + doTwo.await()}")}println("time : $time")
}fun main() {`test combine async`()
}

结果:
sum : 30
time : 1085

5.协程启动模式

kotlin的协程有4中启动模式
1.DEFAULT:协程创建后,立即开始调度,在调度前如果被取消,直接进入取消响应状态
2.ATOMIC:协程创建后,立即开始调度,协程执行到第一个挂起点之前不响应取消
3.LAZY:只有协程被需要时,包括主动调用协程的start、join或await等函数时才会开始调度,如果调度前被取消,那么该协程将直接进入异常结束状态
4.UNDISPATCHED:协程创建后立即在当前函数调用栈中执行,直到遇到第一个真正挂起的点

1-3是需要调度器的,还记得我们前面学习过的调度器么,如果调度器中线程池满了,那么不会马上执行,4则不依赖调度器

1.DEFAULT:取消后,将不会再执行

fun `test start mode`() = runBlocking {val job = launch(start = CoroutineStart.DEFAULT) {delay(10000)println("job finished")}delay(1000)job.cancel()println("finish")
}

结果:
finish

2.ATOMIC:取消了,会先执行第一个挂起函数之前的代码

fun `test start mode`() = runBlocking {val job = launch(start = CoroutineStart.ATOMIC) {println("第一个挂起函数之前")//delay是一个挂起函数delay(10000)println("job finished")}delay(1000)job.cancel()println("finish")
}

结果:
第一个挂起函数之前
finish

3.LAZY:可以先定义好,然后在先要执行的地方调用相应函数执行

fun `test start mode`() = runBlocking {val job = launch(start = CoroutineStart.LAZY) {println("第一个挂起函数之前")//delay是一个挂起函数delay(10000)println("job finished")}delay(1000)
//    job.cancel()job.start()println("finish")
}

4.UNDISPTATCHED:挂起函数之前为主线程

fun `test start mode`() = runBlocking {val job1 = async(context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) {println("Thread ${Thread.currentThread()}")}
}

结果:
Thread Thread[main,5,main]

6.协程的作用域构建器

1.runBlocking与coroutineScope
runBlocking是常规函数,它会阻塞主线程,而coroutineScope是挂起函数,它们都会等待其协程体和子协助执行结束,作用域构建器使用的是父协程的上下文,协程的上下文会在下一篇中重点介绍

fun `test coroutineScope`() = runBlocking {coroutineScope {val job1 = launch {delay(2000)println("job finished")}val job2 = launch {delay(1000)println("job2 finished")throw IllegalArgumentException()}}
}

coroutineScope作用域中,如果一个子协程出错了,那么其所有子协程都会退出并结束

2.coroutineScope与supervisorScope
supervisorScope中,一个子协程出错了,不会影响其他子协程

fun `test supervisorScope`() = runBlocking {supervisorScope {val job1 = launch {delay(2000)println("job finished")}val job2 = launch {delay(1000)println("job2 finished")throw IllegalArgumentException()}}println("finished")
}

结果:

job2 finished
Exception in thread "main" java.lang.IllegalArgumentExceptionat com.aruba.mykotlinapplication.coroutine.RunCoroutineKt$test supervisorScope$1$1$job2$1.invokeSuspend(runCoroutine.kt:148)at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)at kotlinx.coroutines.ResumeModeKt.resumeMode(ResumeMode.kt:67)at kotlinx.coroutines.DispatchedKt.resume(Dispatched.kt:309)at kotlinx.coroutines.DispatchedKt.dispatch(Dispatched.kt:298)at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:250)at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:260)at kotlinx.coroutines.CancellableContinuationImpl.resumeUndispatched(CancellableContinuationImpl.kt:332)at kotlinx.coroutines.EventLoopImplBase$DelayedResumeTask.run(EventLoop.kt:298)at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.kt:116)at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:80)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:54)at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:36)at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)at com.aruba.mykotlinapplication.coroutine.RunCoroutineKt.test supervisorScope(runCoroutine.kt:138)at com.aruba.mykotlinapplication.coroutine.RunCoroutineKt.main(runCoroutine.kt:156)at com.aruba.mykotlinapplication.coroutine.RunCoroutineKt.main(runCoroutine.kt)
job finished
finishedProcess finished with exit code 0
7.Job对象

对于每一个创建的协程(通过launch或async),会返回一个Job对象,它是协程的唯一标识,并且复制管理协程的生命周期
一个任务可以包含一系列状态:新创建(New)、活跃(Active)、完成中(Completing)、已完成(Completed)、取消中(Canceling)和已取消(Canceled)。虽然无法直接访问这些状态,但我们可以通过访问Job的属性:isActive、isCanceled和isCompleted

8.协程的生命周期

如果协程处于活跃状态,协程运行出错或取消都会将该协程置为取消中状态(isActive=false,isCanceled=true)。当所有子协程都完成后,协程会进入已取消状态,此时isCompleted=true。

kotlin--协程的启动和取消

二、协程取消

1.协程的取消

1.取消作用域会取消它的子协程,CoroutineScope是创建一个全新的协程上下文,和coroutineScope作用域是不同的,作用域构建器使用的是父协程的上下文

fun `test cancel`() = runBlocking {//自定义一个作用域val scope = CoroutineScope(Dispatchers.Default)scope.launch {delay(1000)println("job finished")}scope.launch {delay(1000)println("job2 finished")}delay(100)scope.cancel()//主线程等待1200ms,因为runBlocking不会等自定义作用域执行完毕delay(1200)scope
}

2.被取消的子协程,不影响其他兄弟协程

fun `test cancel`() = runBlocking {//自定义一个作用域val scope = CoroutineScope(Dispatchers.Default)val job = scope.launch {delay(1000)println("job finished")}val job2 =scope.launch {delay(1000)println("job2 finished")}delay(100)job.cancel()//主线程等待1200ms,因为runBlocking不会等自定义作用域执行完毕delay(1200)scope
}

结果:
job2 finished

3.协程是通过抛出一个特殊的异常:CancellationException 来处理取消操作

fun `test cancellationException`() = runBlocking {val job = GlobalScope.launch {try {delay(1000)println("finished")} catch (e: Exception) {e.printStackTrace()}}delay(100)job.cancel()
//    可以自定义
//    job.cancel(CancellationException("取消了"))job.join()
}fun main() {`test cancellationException`()
}

结果:
kotlinx.coroutines.JobCancellationException: Job was cancelled; job=StandaloneCoroutine{Cancelling}@759ebb3d

4.所有kotlinx.coroutines中的挂起函数都是可以取消的

2.CPU密集型任务取消

1.isActive是一个可以被使用在CorountineScope中的扩展属性,检查Job是否处于活跃状态

fun `test cancel cpu`() = runBlocking {//指定别的调度器,不然会阻塞runBlockingval job = launch (Dispatchers.Default){var count = 0var time = System.currentTimeMillis()while (count < 10 && isActive) {//调用cancel后,isActive会变成falseif (System.currentTimeMillis() >= time) {//500ms打印一次println("sleep:${count++}")time += 500}}}delay(1000)println("start cancel")job.cancelAndJoin()println("finished")
}

结果:
sleep:0
sleep:1
sleep:2
start cancel
finished

2.ensureActive函数,如果job处于非活跃状态,那么它会抛出异常

fun `test cancel cpu ensure`() = runBlocking {//指定别的调度器,不然会阻塞runBlockingval job = launch(Dispatchers.Default) {var count = 0var time = System.currentTimeMillis()while (count < 10) {//调用ensureActive,非活跃会抛出取消的异常ensureActive()if (System.currentTimeMillis() >= time) {//500ms打印一次println("sleep:${count++}")time += 500}}}delay(1000)println("start cancel")job.cancelAndJoin()println("finished")
}

结果和1.相同

3.yield函数,它会检查所在协程状态,如果已取消,则抛出取消的异常。此外,它还会尝试让出线程的执行权,给其他协程提供执行机会

fun `test cancel cpu yield`() = runBlocking {//指定别的调度器,不然会阻塞runBlockingval job = launch(Dispatchers.Default) {var count = 0var time = System.currentTimeMillis()while (count < 10) {//调用yield,取消的协程会抛出取消的异常yield()if (System.currentTimeMillis() >= time) {//500ms打印一次println("sleep:${count++}")time += 500}}}delay(1000)println("start cancel")job.cancelAndJoin()println("finished")
}

结果也是相同的

3.协程取消的资源释放

1.可以捕获取消异常,然后在finally中释放
2.use函数,该函数只能被实现了closeable的对象使用,程序结束时,会自动调用close方法,适合文件对象

4.不能取消的任务

处于取消中状态的协程不能被挂起,如果想要协程取消后,还能调用挂起函数,我们需要将清理的代码放入NoCancellable CoroutineContext中
这样会挂起运行中的代码,并保持协程的取消中状态,直到任务处理完成

fun `test cancel cpu ensure finally`() = runBlocking {//指定别的调度器,不然会阻塞runBlockingval job = launch(Dispatchers.Default) {var count = 0var time = System.currentTimeMillis()try {while (count < 10) {//调用ensureActive,非活跃会抛出取消的异常ensureActive()if (System.currentTimeMillis() >= time) {//500ms打印一次println("sleep:${count++}")time += 500}}} finally {withContext(NonCancellable) {println("isCanceled")delay(1000)println("delay 1000ms finished")}}}delay(1000)println("start cancel")job.cancelAndJoin()println("finished")
}fun main() {`test cancel cpu ensure finally`()
}

结果:
sleep:0
sleep:1
sleep:2
start cancel
isCanceled
delay 1000ms finished
finished

5.超时任务

很多情况下,取消一个协程的原因可能是超时
1.withTimeout:指定时间,如果规定时间内没有完成,就抛出一个异常

fun `test cancel withtimeout`() = runBlocking { withTimeout(2000){repeat(100){println("delay:$it")delay(1000)}}
}fun main() {`test cancel withtimeout`()
}

结果:

delay:0
delay:1
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2000 msat kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:128)at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:94)at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.kt:307)at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.kt:116)at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:68)at java.lang.Thread.run(Thread.java:745)

2.withTimeoutOrNull:如果超时则返回null,否则返回lambda结果

fun `test cancel withTimeoutOrNull`() = runBlocking {val withTimeoutOrNull = withTimeoutOrNull(2000) {repeat(100) {println("delay:$it")delay(1000)}"Done"}println(withTimeoutOrNull)
}