当前位置: 代码迷 >> 综合 >> [python]asyncio task取消
  详细解决方案

[python]asyncio task取消

热度:72   发布时间:2023-12-05 12:54:18.0
#1. run_until_complete
# import asyncio
# loop = asyncio.get_event_loop()
# loop.run_forever()            # 这个会一直运行
# loop.run_until_complete()    # 运行完tasks,之后会停止掉#1. loop会被放到future中
#2. 取消future(task)import asyncio
import timeasync def get_html(sleep_times):print("waiting")await asyncio.sleep(sleep_times)print("done after {}s".format(sleep_times))if __name__ == "__main__":task1 = get_html(2)task2 = get_html(3)task3 = get_html(3)tasks = [task1, task2, task3]loop = asyncio.get_event_loop()try:loop.run_until_complete(asyncio.wait(tasks))except KeyboardInterrupt as e:all_tasks = asyncio.Task.all_tasks()    # 获取所有的tasksfor task in all_tasks:print("cancel task")print(task.cancel())    # 取消成功返回True,取消失败返回Falseloop.stop()         # stop之后一定要run_foreverloop.run_forever()  # 如果不执行run_forever,是会抛异常的finally:loop.close()