当前位置: 代码迷 >> 综合 >> asyncio(异步io)
  详细解决方案

asyncio(异步io)

热度:41   发布时间:2024-02-27 13:25:29.0

基本概念


引入异步
  1. 同步、异步(线程间)
    所谓同步,就是发出一个功能的调用时,在没有得到结果之前,该调用就不返回或继续执行后续操作。简单来说,同步就是必须一件一件事做,等前一件做完了才能做下一件事
    同步与异步相对,当一个异步调用发出后,调用者在没有得到结果之前,就可以继续执行后续操作。当这个调用完成后,一般通过状态、通知和回调来通知调用者。对于异步调用,调用的返回并不受调用者控制。通知调用者的三种方式,具体如下:

    方式 描述
    状态 即监听被调用者的状态(轮询),调用者每个一定时间检查一次,效率很低
    通知 当被调用者执行完成后,发出通知告知被调用者,无需消耗太多性能
    回调 与通知类似,当被调用者执行完成后,会调用调用者提供的回调函数
  2. 阻塞、非阻塞(线程内)
    阻塞和非阻塞两个概念仅仅与等待消息通知的状态相关。跟同步、异步没有太大关系,也就是说阻塞与非阻塞主要是程序等待通知时的状态来讲的
    阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果后才会返回
    非阻塞是指在不能立刻得到结果之前,该调用不会阻塞当前线程

  3. 组合

    描述
    同步阻塞 发送方请求之后一直等待响应。接收方处理请求时进行的IO操作如果不能等到返回结果,就一直等到返回结果,才响应发送方
    同步非阻塞 发送方发送请求之后一直等待。接收方处理请求时进行的IO操作如果不能得到结果,就立即返回,去做其他事。但是由于没有得到请求结果,不响应发送方,发送方一直等待。当IO操作完成以后,将完成状态和结果通知接收方,接收方响应发送方,发送方进入下一次请求过程
    异步阻塞 发送方向接收方请求后,不等待响应,可以继续其他工作。接收方处理请求时进行IO操作如果不能马上得到结果,就一直等到返回结果后才响应发送方
    异步非阻塞 发送方向接收方发送请求后,不等待响应,可以继续其他工作。接收方处理请求时进行IO操作如果不能得到结果,也不等待,而是马上返回去做其他的事。当IO操作完成后,将完成状态和结果通知接收方,接收方在响应发送方
引入协程
  1. 协程
    协程,又称微线程,英文名Coroutine。
  2. 子进程
    在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。子程序调用总是一个入口,一次返回,调用顺序是明确的。
  3. 多线程
    避免顺序执行的方式之一是多线程,但是考虑到python语言的特性(GIL锁),再执行计算密集型的任务时,多线程的执行效果反而变慢,再执行IO密集型的任务时候虽然有不错的性能提升,但是依然会有线程管理与切换、同步的开销等等(具体原因这里不详细说明,请参见相关的GIL说明)
  4. 协程优势
  • 最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显
  • 就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多
asyncio概念与框架
  1. 什么是task任务
    Task用来并发调度的协程,即对协程函数的进一步包装?那为什么还需要包装呢?因为单纯的协程函数仅仅是一个函数而已,将其包装成任务,任务是可以包含各种状态的,异步编程最重要的就是对异步操作状态的把控了。
  • 创建任务

    task = asyncio.create_task(coro())
    ## task = asyncio.ensure_future(coro()) 
    ## loop.create_future(coro()) # loop为循环对象
    ## loop.create_task(coro()) 
    
  • 获取某一任务

    task=asyncio.current_task(loop=None) ## 返回正在执行的任务
    asyncio.all_tasks(loop=None) ## 返回还没有结束的任务
    
  1. future对象
    Future是一个较低层的可等待(awaitable)对象,他表示的是异步操作的最终结果,当一个Future对象被等待的时候,协程会一直等待,直到Future已经运算完毕。
    Future是Task的父类,一般情况下,已不用去管它们两者的详细区别,也没有必要去用Future,用Task就可以了,返回 future 对象的低级函数的一个很好的例子是 loop.run_in_executor().

  2. 异步函数获取结果

  • 直接通过result获取

    import asyncio
    import time
    async def hello1(a,b):print("hello world 01 begin")await asyncio.sleep(3)print("hello again 01 end")return a+bcoroutine = hello1(10,5)
    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(coroutine)
    loop.run_until_complete(task)
    print('--------------------------')
    print(task.result())
    loop.close()
    
  • 通过调用回调函数

    import asyncio
    import time
    async def hello1(a,b):print("hello world 01 begin")await asyncio.sleep(3)print("hello again 01 end")return a+b
    def callback(future):print(future.result())
    coroutine = hello1(10,5)
    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(coroutine)
    task.add_done_callback(callback)
    loop.run_until_complete(task)
    loop.close()
    
  1. 模板(3.7之前)
  • 无参无返回值

    import asyncio
    async def hello1():print("hello world 01 begin")await asyncio.sleep(3)print("hello again 01 end")async def hello2():print("hello world 02 begin")await asyncio.sleep(2)print("hello again 02 end")async def hello3():print("hello world 03 begin")await asyncio.sleep(1)print("hello again 03 end")
    def callback(future):print(future.result())loop = asyncio.get_event_loop() ## 创建事件循环
    tasks = [hello1(),hello2(),hello3()] ## 将多个协程函数包装成任务列表
    loop.run_until_complete(asyncio.wait(tasks))## 通过事件循环运行
    loop.close()## 取消事件循环
    
  • 有参有返回值

    import asyncio
    async def hello1(a,b):print("hello world 01 begin")await asyncio.sleep(3)print("hello again 01 end")return a+basync def hello2(a,b):print("hello world 02 begin")await asyncio.sleep(2)print("hello again 02 end")return a-basync def hello3(a,b):print("hello world 03 begin")await asyncio.sleep(1)print("hello again 03 end")return a*bloop = asyncio.get_event_loop() ## 创建事件循环
    task1 = asyncio.ensure_future(hello1(10,2))
    task2 = asyncio.ensure_future(hello2(19,2))
    task3= asyncio.ensure_future(hello3(9,2))
    tasks = [task1,task2,task3]
    loop.run_until_complete(asyncio.wait(tasks))## 通过事件循环运行
    print(task1.result())
    print(task2.result())
    print(task3.result())
    loop.close()## 取消事件循环
    

4.1. 流程

  • 构造事件循环

    loop=asyncio.get_running_loop() #返回(获取)在当前线程中正在运行的事件循环,如果没有正在运行的事件循环,则会显示错误;它是python3.7中新添加的loop=asyncio.get_event_loop() #获得一个事件循环,如果当前线程还没有事件循环,则创建一个新的事件循环loop;loop=asyncio.set_event_loop(loop) #设置一个事件循环为当前线程的事件循环;loop=asyncio.new_event_loop()  #创建一个新的事件循环
  • 包装task

    
    task = asyncio.create_task(coro(参数列表))   # 这是3.7版本新添加的
    task = asyncio.ensure_future(coro(参数列表)) 
    
  • 运行

    loop.run_until_complete(asyncio.wait(tasks))  #通过asyncio.wait()整合多个task
    loop.run_until_complete(asyncio.gather(tasks))  #通过asyncio.gather()整合多个task
    loop.run_until_complete(task_1)  #单个任务则不需要整合
    loop.run_forever()  #但是这个方法在新版本已经取消,不再推荐使用,因为使用起来不简洁使用gather或者wait可以同时注册多个任务,实现并发,但他们的设计是完全不一样的,主要区别如下:
    (1)参数形式不一样
    gather的参数为 *coroutines_or_futures,即如这种形式tasks = asyncio.gather(*[task1,task2,task3])或者tasks = asyncio.gather(task1,task2,task3)loop.run_until_complete(tasks)
    wait的参数为列表或者集合的形式,如下tasks = asyncio.wait([task1,task2,task3])loop.run_until_complete(tasks)2)返回的值不一样
    gather的定义如下,gather返回的是每一个任务运行的结果,results = await asyncio.gather(*tasks) 
    wait的定义如下,返回dones是已经完成的任务,pending是未完成的任务,都是集合类型done, pending = yield from asyncio.wait(fs)3)后面还会讲到他们的进一步使用
    
  • 关闭

    loop.close()
    
  1. 模板(3.7之后)
  • 无参无返回值

    import asyncio
    import timeasync def hello1():print("Hello world 01 begin")await asyncio.sleep(3)  #模拟耗时任务3秒print("Hello again 01 end")async def hello2():print("Hello world 02 begin")await asyncio.sleep(2)   #模拟耗时任务2秒print("Hello again 02 end")async def hello3():print("Hello world 03 begin")await asyncio.sleep(4)   #模拟耗时任务4秒print("Hello again 03 end")async def main():results=await asyncio.gather(hello1(),hello2(),hello3())for result in results:print(result)     #因为没返回值,故而返回Noneasyncio.run(main())
  • 有参有返回值

    import asyncio
    import timeasync def hello1(a,b):print("Hello world 01 begin")await asyncio.sleep(3)  #模拟耗时任务3print("Hello again 01 end")return a+basync def hello2(a,b):print("Hello world 02 begin")await asyncio.sleep(2)   #模拟耗时任务2print("Hello again 02 end")return a-basync def hello3(a,b):print("Hello world 03 begin")await asyncio.sleep(4)   #模拟耗时任务4print("Hello again 03 end")return a*basync def main():results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5))for result in results:print(result)asyncio.run(main())
    
  1. gather 与 wait区别
  • gather

    import asyncio
    from pprint import pprintimport randomasync def coro(tag):print(">", tag)await asyncio.sleep(random.uniform(1, 3))print("<", tag)return tagloop = asyncio.get_event_loop()group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
    group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
    group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])all_groups = asyncio.gather(group1, group2, group3)results = loop.run_until_complete(all_groups)loop.close()pprint(results)
    

    组中的所有任务都可以通过调用group2.cancel()甚至all_groups.cancel()…。

  • wait

    import asyncio
    import randomasync def coro(tag):print(">", tag)await asyncio.sleep(random.uniform(0.5, 5))print("<", tag)return tagloop = asyncio.get_event_loop()tasks = [coro(i) for i in range(1, 11)]print("Get first result:")
    finished, unfinished = loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))for task in finished:print(task.result())
    print("unfinished:", len(unfinished))print("Get more results in 2 seconds:")
    finished2, unfinished2 = loop.run_until_complete(asyncio.wait(unfinished, timeout=2))for task in finished2:print(task.result())
    print("unfinished2:", len(unfinished2))print("Get all other results:")
    finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))for task in finished3:print(task.result())loop.close()
    

    支持在完成第一个任务后或在指定的超时之后等待停止,从而允许操作的精度降低: