问题描述
请注意,该演示代码会生成一些GB数据。
一段时间以来,我一直在使用以下代码的版本进行多处理。 当池中每个进程的运行时间相似时,它会很好地工作,但是如果一个进程花费的时间长得多,我最终将有许多阻塞的进程在等待一个进程,因此,我试图使其异步运行-仅针对一个函数一个时间。
例如,如果我有70个内核,并且需要运行2000次函数,那么我希望该函数异步运行,那么请等待上一个进程,然后再调用下一个函数。 目前,它只是按我提供的核心数量分批提交流程,每批必须等待最长的流程。
如您所见,我尝试使用map_async,但这显然是错误的语法。 谁能帮我吗?
import os
p='PATH/test/'
def f1(tup):
x,y=tup
to_write = x*(y**5)
with open(p+x+str(y)+'.txt','w') as fout:
fout.write(to_write)
def f2(tup):
x,y=tup
print (os.path.exists(p+x+str(y)+'.txt'))
def call_func(f,nos,threads,call):
print (call)
for i in range(0, len(nos), threads):
print (i)
chunk = nos[i:i + threads]
tmp = [('args', no) for no in chunk]
pool.map(f, tmp)
#pool.map_async(f, tmp)
nos=[i for i in range(55)]
threads=8
if __name__ == '__main__':
with Pool(processes=threads) as pool:
call_func(f1,nos,threads,'f1')
call_func(f2,nos,threads,'f2')
1楼
map
将仅返回,而map_async
仅在当前块的所有任务完成后才调用回调。
因此,您只能一次将所有任务分配给map
/ map_async
,也可以在callback
为下一个任务调用apply_async
的情况下使用apply_async
(最初称为threads
时间)。
如果调用的实际返回值无关紧要(或至少它们的顺序imap_unordered
),则一次给所有任务(或由迭代器/生成器按需生成任务)时, imap_unordered
可能是另一种有效的解决方案