基本概念
引入异步
-
同步、异步(线程间)
所谓同步,就是发出一个功能的调用时,在没有得到结果之前,该调用就不返回或继续执行后续操作。简单来说,同步就是必须一件一件事做,等前一件做完了才能做下一件事
同步与异步相对,当一个异步调用发出后,调用者在没有得到结果之前,就可以继续执行后续操作。当这个调用完成后,一般通过状态、通知和回调来通知调用者。对于异步调用,调用的返回并不受调用者控制。通知调用者的三种方式,具体如下:方式 描述 状态 即监听被调用者的状态(轮询),调用者每个一定时间检查一次,效率很低 通知 当被调用者执行完成后,发出通知告知被调用者,无需消耗太多性能 回调 与通知类似,当被调用者执行完成后,会调用调用者提供的回调函数 -
阻塞、非阻塞(线程内)
阻塞和非阻塞两个概念仅仅与等待消息通知的状态相关。跟同步、异步没有太大关系,也就是说阻塞与非阻塞主要是程序等待通知时的状态来讲的
阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果后才会返回
非阻塞是指在不能立刻得到结果之前,该调用不会阻塞当前线程 -
组合
式 描述 同步阻塞 发送方请求之后一直等待响应。接收方处理请求时进行的IO操作如果不能等到返回结果,就一直等到返回结果,才响应发送方 同步非阻塞 发送方发送请求之后一直等待。接收方处理请求时进行的IO操作如果不能得到结果,就立即返回,去做其他事。但是由于没有得到请求结果,不响应发送方,发送方一直等待。当IO操作完成以后,将完成状态和结果通知接收方,接收方响应发送方,发送方进入下一次请求过程 异步阻塞 发送方向接收方请求后,不等待响应,可以继续其他工作。接收方处理请求时进行IO操作如果不能马上得到结果,就一直等到返回结果后才响应发送方 异步非阻塞 发送方向接收方发送请求后,不等待响应,可以继续其他工作。接收方处理请求时进行IO操作如果不能得到结果,也不等待,而是马上返回去做其他的事。当IO操作完成后,将完成状态和结果通知接收方,接收方在响应发送方
引入协程
- 协程
协程,又称微线程,英文名Coroutine。 - 子进程
在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。子程序调用总是一个入口,一次返回,调用顺序是明确的。 - 多线程
避免顺序执行的方式之一是多线程,但是考虑到python语言的特性(GIL锁),再执行计算密集型的任务时,多线程的执行效果反而变慢,再执行IO密集型的任务时候虽然有不错的性能提升,但是依然会有线程管理与切换、同步的开销等等(具体原因这里不详细说明,请参见相关的GIL说明) - 协程优势
- 最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显
- 就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多
asyncio概念与框架
- 什么是task任务
Task用来并发调度的协程,即对协程函数的进一步包装?那为什么还需要包装呢?因为单纯的协程函数仅仅是一个函数而已,将其包装成任务,任务是可以包含各种状态的,异步编程最重要的就是对异步操作状态的把控了。
-
创建任务
task = asyncio.create_task(coro()) ## task = asyncio.ensure_future(coro()) ## loop.create_future(coro()) # loop为循环对象 ## loop.create_task(coro())
-
获取某一任务
task=asyncio.current_task(loop=None) ## 返回正在执行的任务 asyncio.all_tasks(loop=None) ## 返回还没有结束的任务
-
future对象
Future是一个较低层的可等待(awaitable)对象,他表示的是异步操作的最终结果,当一个Future对象被等待的时候,协程会一直等待,直到Future已经运算完毕。
Future是Task的父类,一般情况下,已不用去管它们两者的详细区别,也没有必要去用Future,用Task就可以了,返回 future 对象的低级函数的一个很好的例子是 loop.run_in_executor(). -
异步函数获取结果
-
直接通过result获取
import asyncio import time async def hello1(a,b):print("hello world 01 begin")await asyncio.sleep(3)print("hello again 01 end")return a+bcoroutine = hello1(10,5) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task) print('--------------------------') print(task.result()) loop.close()
-
通过调用回调函数
import asyncio import time async def hello1(a,b):print("hello world 01 begin")await asyncio.sleep(3)print("hello again 01 end")return a+b def callback(future):print(future.result()) coroutine = hello1(10,5) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) loop.run_until_complete(task) loop.close()
- 模板(3.7之前)
-
无参无返回值
import asyncio async def hello1():print("hello world 01 begin")await asyncio.sleep(3)print("hello again 01 end")async def hello2():print("hello world 02 begin")await asyncio.sleep(2)print("hello again 02 end")async def hello3():print("hello world 03 begin")await asyncio.sleep(1)print("hello again 03 end") def callback(future):print(future.result())loop = asyncio.get_event_loop() ## 创建事件循环 tasks = [hello1(),hello2(),hello3()] ## 将多个协程函数包装成任务列表 loop.run_until_complete(asyncio.wait(tasks))## 通过事件循环运行 loop.close()## 取消事件循环
-
有参有返回值
import asyncio async def hello1(a,b):print("hello world 01 begin")await asyncio.sleep(3)print("hello again 01 end")return a+basync def hello2(a,b):print("hello world 02 begin")await asyncio.sleep(2)print("hello again 02 end")return a-basync def hello3(a,b):print("hello world 03 begin")await asyncio.sleep(1)print("hello again 03 end")return a*bloop = asyncio.get_event_loop() ## 创建事件循环 task1 = asyncio.ensure_future(hello1(10,2)) task2 = asyncio.ensure_future(hello2(19,2)) task3= asyncio.ensure_future(hello3(9,2)) tasks = [task1,task2,task3] loop.run_until_complete(asyncio.wait(tasks))## 通过事件循环运行 print(task1.result()) print(task2.result()) print(task3.result()) loop.close()## 取消事件循环
4.1. 流程
-
构造事件循环
loop=asyncio.get_running_loop() #返回(获取)在当前线程中正在运行的事件循环,如果没有正在运行的事件循环,则会显示错误;它是python3.7中新添加的loop=asyncio.get_event_loop() #获得一个事件循环,如果当前线程还没有事件循环,则创建一个新的事件循环loop;loop=asyncio.set_event_loop(loop) #设置一个事件循环为当前线程的事件循环;loop=asyncio.new_event_loop() #创建一个新的事件循环
-
包装task
task = asyncio.create_task(coro(参数列表)) # 这是3.7版本新添加的 task = asyncio.ensure_future(coro(参数列表))
-
运行
loop.run_until_complete(asyncio.wait(tasks)) #通过asyncio.wait()整合多个task loop.run_until_complete(asyncio.gather(tasks)) #通过asyncio.gather()整合多个task loop.run_until_complete(task_1) #单个任务则不需要整合 loop.run_forever() #但是这个方法在新版本已经取消,不再推荐使用,因为使用起来不简洁使用gather或者wait可以同时注册多个任务,实现并发,但他们的设计是完全不一样的,主要区别如下: (1)参数形式不一样 gather的参数为 *coroutines_or_futures,即如这种形式tasks = asyncio.gather(*[task1,task2,task3])或者tasks = asyncio.gather(task1,task2,task3)loop.run_until_complete(tasks) wait的参数为列表或者集合的形式,如下tasks = asyncio.wait([task1,task2,task3])loop.run_until_complete(tasks) (2)返回的值不一样 gather的定义如下,gather返回的是每一个任务运行的结果,results = await asyncio.gather(*tasks) wait的定义如下,返回dones是已经完成的任务,pending是未完成的任务,都是集合类型done, pending = yield from asyncio.wait(fs) (3)后面还会讲到他们的进一步使用
-
关闭
loop.close()
- 模板(3.7之后)
-
无参无返回值
import asyncio import timeasync def hello1():print("Hello world 01 begin")await asyncio.sleep(3) #模拟耗时任务3秒print("Hello again 01 end")async def hello2():print("Hello world 02 begin")await asyncio.sleep(2) #模拟耗时任务2秒print("Hello again 02 end")async def hello3():print("Hello world 03 begin")await asyncio.sleep(4) #模拟耗时任务4秒print("Hello again 03 end")async def main():results=await asyncio.gather(hello1(),hello2(),hello3())for result in results:print(result) #因为没返回值,故而返回Noneasyncio.run(main())
-
有参有返回值
import asyncio import timeasync def hello1(a,b):print("Hello world 01 begin")await asyncio.sleep(3) #模拟耗时任务3秒print("Hello again 01 end")return a+basync def hello2(a,b):print("Hello world 02 begin")await asyncio.sleep(2) #模拟耗时任务2秒print("Hello again 02 end")return a-basync def hello3(a,b):print("Hello world 03 begin")await asyncio.sleep(4) #模拟耗时任务4秒print("Hello again 03 end")return a*basync def main():results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5))for result in results:print(result)asyncio.run(main())
- gather 与 wait区别
-
gather
import asyncio from pprint import pprintimport randomasync def coro(tag):print(">", tag)await asyncio.sleep(random.uniform(1, 3))print("<", tag)return tagloop = asyncio.get_event_loop()group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)]) group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)]) group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])all_groups = asyncio.gather(group1, group2, group3)results = loop.run_until_complete(all_groups)loop.close()pprint(results)
组中的所有任务都可以通过调用group2.cancel()甚至all_groups.cancel()…。
-
wait
import asyncio import randomasync def coro(tag):print(">", tag)await asyncio.sleep(random.uniform(0.5, 5))print("<", tag)return tagloop = asyncio.get_event_loop()tasks = [coro(i) for i in range(1, 11)]print("Get first result:") finished, unfinished = loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))for task in finished:print(task.result()) print("unfinished:", len(unfinished))print("Get more results in 2 seconds:") finished2, unfinished2 = loop.run_until_complete(asyncio.wait(unfinished, timeout=2))for task in finished2:print(task.result()) print("unfinished2:", len(unfinished2))print("Get all other results:") finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))for task in finished3:print(task.result())loop.close()
支持在完成第一个任务后或在指定的超时之后等待停止,从而允许操作的精度降低: