当前位置: 代码迷 >> python >> 一个函数的多处理池map_async然后在下一个函数之前阻塞(python 3)
  详细解决方案

一个函数的多处理池map_async然后在下一个函数之前阻塞(python 3)

热度:99   发布时间:2023-07-14 08:46:15.0

请注意,该演示代码会生成一些GB数据。

一段时间以来,我一直在使用以下代码的版本进行多处理。 当池中每个进程的运行时间相似时,它会很好地工作,但是如果一个进程花费的时间长得多,我最终将有许多阻塞的进程在等待一个进程,因此,我试图使其异步运行-仅针对一个函数一个时间。

例如,如果我有70个内核,并且需要运行2000次函数,那么我希望该函数异步运行,那么请等待上一个进程,然后再调用下一个函数。 目前,它只是按我提供的核心数量分批提交流程,每批必须等待最长的流程。

如您所见,我尝试使用map_async,但这显然是错误的语法。 谁能帮我吗?

import os
p='PATH/test/'

def f1(tup):
    x,y=tup
    to_write = x*(y**5)
    with open(p+x+str(y)+'.txt','w') as fout:
        fout.write(to_write)

def f2(tup):
    x,y=tup
    print (os.path.exists(p+x+str(y)+'.txt'))

def call_func(f,nos,threads,call):
    print (call)
    for i in range(0, len(nos), threads):
        print (i)
        chunk = nos[i:i + threads]
        tmp = [('args', no) for no in chunk]
        pool.map(f, tmp)
        #pool.map_async(f, tmp)

nos=[i for i in range(55)]
threads=8
if __name__ == '__main__':
with Pool(processes=threads) as pool:
    call_func(f1,nos,threads,'f1')
    call_func(f2,nos,threads,'f2')

map将仅返回,而map_async仅在当前块的所有任务完成后才调用回调。

因此,您只能一次将所有任务分配给map / map_async ,也可以在callback为下一个任务调用apply_async的情况下使用apply_async (最初称为threads时间)。

如果调用的实际返回值无关紧要(或至少它们的顺序imap_unordered ),则一次给所有任务(或由迭代器/生成器按需生成任务)时, imap_unordered可能是另一种有效的解决方案

  相关解决方案