Python 开发结合多处理和异步来提高性能
简介
感谢 GIL,使用多个线程来执行 CPU 密集型任务从来都不是一种选择。随着多核处理器的普及,Python 提供了用于执行 CPU 密集型任务的多处理解决方案。但目前来说,直接使用多进程API仍然存在一些问题。
在开始本文[1]之前,我们有一个小代码片段来说明:
import time
from multiprocessing import Process
def sum_to_num(final_num: int) -> int:
start = time.monotonic()
result = 0
for i in range(0, final_num+1, 1):
result += i
print(f"The method with {final_num} completed in {time.monotonic() - start:.2f} second(s).")
return result
此方法接受一个参数并从零开始计数到该参数。打印方法的执行时间并返回结果。
多进程中的问题
def main():
# We initialize the two processes with two parameters, from largest to smallest
process_a = Process(target=sum_to_num, args=(200_000_000,))
process_b = Process(target=sum_to_num, args=(50_000_000,))
# And then let them start executing
process_a.start()
process_b.start()
# Note that the join method is blocking and gets results sequentially
start_a = time.monotonic()
process_a.join()
print(f"Process_a completed in {time.monotonic() - start_a:.2f} seconds")
# Because when we wait process_a for join. The process_b has joined already.
# so the time counter is 0 seconds.
start_b = time.monotonic()
process_b.join()
print(f"Process_b completed in {time.monotonic() - start_b:.2f} seconds")
如代码所示,我们直接创建并启动多个进程,调用每个进程的start和join方法。不过,这里存在一些问题:
- join 的方法无法返回执行任务的结果。
- join的方法是阻塞主进程并顺序执行。
虽然后面的任务会比前面的任务运行得更快,如下图所示:
使用池的问题
如果我们使用multiprocessing.Pool,也会出现一些问题:
def main():
with Pool() as pool:
result_a = pool.apply(sum_to_num, args=(200_000_000,))
result_b = pool.apply(sum_to_num, args=(50_000_000,))
print(f"sum_to_num with 200_000_000 got a result of {result_a}.")
print(f"sum_to_num with 50_000_000 got a result of {result_b}.")
如图所示。代码中可以看到,Pool的apply方法是同步的,这意味着你必须等待上一个应用程序任务完成后才能开始下一个应用程序任务。
当然,我们可以使用application_async方法来异步创建任务。但同样,您需要使用 get 方法抢先获取结果。这让我们回到join方法的问题:
def main():
with Pool() as pool:
result_a = pool.apply_async(sum_to_num, args=(200_000_000,))
result_b = pool.apply_async(sum_to_num, args=(50_000_000,))
print(f"sum_to_num with 200_000_000 got a result of {result_a.get()}.")
print(f"sum_to_num with 50_000_000 got a result of {result_b.get()}.")

直接使用 ProcessPoolExecutor 的问题
如果我们使用并发.futures.ProcesssPoolExecutor 来运行 CPU 密集型任务会怎样?
def main():
with ProcessPoolExecutor() as executor:
numbers = [200_000_000, 50_000_000]
for result in executor.map(sum_to_num, numbers):
print(f"sum_to_num got a result which is {result}.")
从代码中可以看到,一切看起来都很好,并且就像 asyncio.as_completed 一样被调用。但看看结果;它们仍然按启动顺序获取。这与 asyncio.as_completed 完全不同,后者按执行顺序获取结果:
用 asyncio 的 run_in_executor 修复
幸运的是,我们可以使用 asyncio 来处理 IO 密集型任务,并且可以使用其 run_in_executor 方法,例如asyncio 调用多进程任务。它不仅结合了并发和并行API,还解决了上面遇到的各种问题:
async def main():
loop = asyncio.get_running_loop()
tasks = []
with ProcessPoolExecutor() as executor:
for number in [200_000_000, 50_000_000]:
tasks.append(loop.run_in_executor(executor, sum_to_num, number))
# Or we can just use the method asyncio.gather(*tasks)
for done in asyncio.as_completed(tasks):
result = await done
print(f"sum_to_num got a result which is {result}")

由于上一篇文章中的示例代码都是模拟我们应该调用的并发过程的方法,所以很多读者在实际编码中可能仍然需要帮助来理解其用途。因此,在了解了为什么我们需要在 asyncio 中运行 CPU 密集型并行任务后,今天我们将通过一个真实的示例来解释如何使用 asyncio 同时处理 IO 密集型和 CPU 密集型任务并欣赏其效率。 asyncio 我们的代码。
版权声明
本文仅代表作者观点,不代表Code前端网立场。
本文系作者Code前端网发表,如需转载,请注明页面地址。
发表评论:
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。