问题描述
我想使用Python将本地文件并行复制到多个远程主机。
我正在尝试使用asyncio
和asyncio
,因为我已经在我的程序中将这些库用于其他目的。
我正在使用和默认的ThreadPoolExecutor
,它实际上是旧threading
库的新接口,以及Paramiko的SFTP功能来进行复制。
这是一个如何简化的例子。
import sys
import asyncio
import paramiko
import functools
def copy_file_node(
*,
user: str,
host: str,
identity_file: str,
local_path: str,
remote_path: str):
ssh_client = paramiko.client.SSHClient()
ssh_client.load_system_host_keys()
ssh_client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
ssh_client.connect(
username=user,
hostname=host,
key_filename=identity_file,
timeout=3)
with ssh_client:
with ssh_client.open_sftp() as sftp:
print("[{h}] Copying file...".format(h=host))
sftp.put(localpath=local_path, remotepath=remote_path)
print("[{h}] Copy complete.".format(h=host))
loop = asyncio.get_event_loop()
tasks = []
# NOTE: You'll have to update the values being passed in to
# `functools.partial(copy_file_node, ...)`
# to get this working on on your machine.
for host in ['10.0.0.1', '10.0.0.2']:
task = loop.run_in_executor(
None,
functools.partial(
copy_file_node,
user='user',
host=host,
identity_file='/path/to/identity_file',
local_path='/path/to/local/file',
remote_path='/path/to/remote/file'))
tasks.append(task)
try:
loop.run_until_complete(asyncio.gather(*tasks))
except Exception as e:
print("At least one node raised an error:", e, file=sys.stderr)
sys.exit(1)
loop.close()
我看到的问题是文件被串行复制到主机而不是并行复制。 因此,如果单个主机的副本需要5秒钟,则两个主机需要10秒钟,依此类推。
我已经试过各种其他方法,包括开沟SFTP和管道文件以dd
通过在每个远程主机的但始终复制连续发生。
我可能在这里误解了一些基本想法。 什么阻止不同的线程并行复制文件?
从我的测试来看,似乎持久性发生在远程写入,而不是读取本地文件。 但是为什么会这样,因为我们正在尝试针对独立远程主机的网络I / O?
1楼
我不确定这是接近它的最好方法,但它对我有用
#start
from multiprocessing import Process
#omitted
tasks = []
for host in hosts:
p = Process(
None,
functools.partial(
copy_file_node,
user=user,
host=host,
identity_file=identity_file,
local_path=local_path,
remote_path=remote_path))
tasks.append(p)
[t.start() for t in tasks]
[t.join() for t in tasks]
基于注释,添加了一个日期戳并捕获了多处理的输出并获得了:
2015-10-24 03:06:08.749683[vagrant1] Copying file...
2015-10-24 03:06:08.751826[basement] Copying file...
2015-10-24 03:06:08.757040[upstairs] Copying file...
2015-10-24 03:06:16.222416[vagrant1] Copy complete.
2015-10-24 03:06:18.094373[upstairs] Copy complete.
2015-10-24 03:06:22.478711[basement] Copy complete.
2楼
您使用asyncio没有任何问题。
为了证明这一点,让我们试试你的脚本的简化版本 - 没有 ,只是纯Python。
import asyncio, functools, sys, time
START_TIME = time.monotonic()
def log(msg):
print('{:>7.3f} {}'.format(time.monotonic() - START_TIME, msg))
def dummy(thread_id):
log('Thread {} started'.format(thread_id))
time.sleep(1)
log('Thread {} finished'.format(thread_id))
loop = asyncio.get_event_loop()
tasks = []
for i in range(0, int(sys.argv[1])):
task = loop.run_in_executor(None, functools.partial(dummy, thread_id=i))
tasks.append(task)
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
有两个线程,这将打印:
$ python3 async.py 2
0.001 Thread 0 started
0.002 Thread 1 started <-- 2 tasks are executed concurrently
1.003 Thread 0 finished
1.003 Thread 1 finished <-- Total time is 1 second
这种并发最多可扩展到5个线程:
$ python3 async.py 5
0.001 Thread 0 started
...
0.003 Thread 4 started <-- 5 tasks are executed concurrently
1.002 Thread 0 finished
...
1.005 Thread 4 finished <-- Total time is still 1 second
如果我们再添加一个线程,我们就会达到线程池限制:
$ python3 async.py 6
0.001 Thread 0 started
0.001 Thread 1 started
0.002 Thread 2 started
0.003 Thread 3 started
0.003 Thread 4 started <-- 5 tasks are executed concurrently
1.002 Thread 0 finished
1.003 Thread 5 started <-- 6th task is executed after 1 second
1.003 Thread 1 finished
1.004 Thread 2 finished
1.004 Thread 3 finished
1.004 Thread 4 finished <-- 5 task are completed after 1 second
2.005 Thread 5 finished <-- 6th task is completed after 2 seconds
一切都按预期进行,每5件物品的总时间增加1秒。 文档中记录了幻数5:
在版本3.5中更改 :如果max_workers为
None
或未给出,则默认为机器上的处理器数量,乘以5
,假设ThreadPoolExecutor经常用于重叠I / O而不是CPU工作,并且工作者数量应该高于ProcessPoolExecutor的工作者数量。
第三方库如何阻止我的ThreadPoolExecutor?
库使用某种全局锁。 这意味着库不支持多线程。 尝试使用ProcessPoolExecutor,但要小心:库可能包含其他反模式,例如使用相同的硬编码临时文件名。
函数执行很长时间并且不释放GIL。 它可能表示C扩展代码中存在错误,但持有GIL的最常见原因是进行一些CPU密集型计算。 同样,您可以尝试ProcessPoolExecutor,因为它不受GIL的影响。
预计这些都不会像paramiko这样的库发生。
第三方库如何阻止我的ProcessPoolExecutor?
它通常不能。 您的任务在不同的进程中执行。 如果您发现ProcessPoolExecutor中的两个任务花费了两倍的时间,则怀疑资源瓶颈(例如占用100%的网络带宽)。