Press "Enter" to skip to content

通过Aiomultiprocess大幅提升Python的异步编程能力:全面指南

PYTHON TOOLBOX

利用asyncio和multiprocessing加速您的应用程序

照片来源:作者创作,Canva

在本文中,我将带您进入aiomultiprocess这个库的世界,该库结合了Python的asynciomultiprocessing的强大功能。

本文将通过丰富的代码示例和最佳实践进行解释。

阅读完本文后,您将了解如何利用aiomultiprocess的强大功能来改进您的Python应用程序,就像一位主厨领导着一群厨师创造一道美味的盛宴。

引言

想象一下,您想在周末邀请同事们来共进一顿大餐。您会如何做呢?

作为一位经验丰富的厨师,您肯定不会一次只做一道菜;那样太慢了。您会高效利用时间,让多个任务同时进行。

例如,在等待水烧开的时候,您可以离开一会儿去洗菜。这样,当水开了,您就可以把菜放进锅里了。这就是并发的魅力。

然而,食谱往往会很繁琐:做汤的时候需要不停地搅拌;蔬菜需要洗净和切割;您还需要烤面包、煎牛排等等。

当有很多菜需要准备时,您可能会不知所措。

幸运的是,您的同事们不会坐在那里等着吃饭。他们会进入厨房帮助您,每个额外的人就像是一个额外的工作进程。这就是multiprocessing和并发的强大组合。

对于代码也是如此。即使使用了asyncio,您的Python应用程序是否仍然遇到瓶颈?您是否正在寻找进一步提高并发代码性能的方法?如果是的话,aiomultiprocess就是您一直在寻找的答案。

安装和基本用法

安装

如果您使用pip,可以这样安装:

python -m pip install aiomultiprocess

如果您使用Anaconda,可以从conda-forge安装:

conda install -c conda-forge aiomultiprocess

基本用法

aiomultiprocess由三个主要类组成:

Process是其他两个类的基类,用于启动一个进程并执行协程函数。通常情况下,您不需要使用这个类。

Worker用于启动一个进程,执行协程函数,并返回结果。我们也不会使用这个类。

Pool是我们将重点关注的核心类。与multiprocessing.Pool类似,它启动一个进程池,但它的上下文需要使用async with进行管理。我们将使用Pool的两个方法:mapapply

map方法接受一个协程函数和一个可迭代对象。Pool将迭代可迭代对象,并将协程函数分配给不同的进程运行。map方法的结果可以使用async for进行异步迭代:

import asyncioimport randomimport aiomultiprocessasync def coro_func(value: int) -> int:    await asyncio.sleep(random.randint(1, 3))    return value * 2async def main():    results = []    async with aiomultiprocess.Pool() as pool:        async for result in pool.map(coro_func, [1, 2, 3]):            results.append(result)        print(results)if __name__ == "__main__":    asyncio.run(main())

apply方法接受一个协程函数和函数所需的参数元组。根据调度程序的规则,Pool将协程函数分配给适当的进程进行执行。

import asyncio
import random
import aiomultiprocess

async def coro_func(value: int) -> int:
    await asyncio.sleep(random.randint(1, 3))
    return value * 2

async def main():
    tasks = []
    async with aiomultiprocess.Pool() as pool:
        tasks.append(pool.apply(coro_func, (1,)))
        tasks.append(pool.apply(coro_func, (2,)))
        tasks.append(pool.apply(coro_func, (3,)))
        results = await asyncio.gather(*tasks)
        print(results)  # 输出: [2, 4, 6]

if __name__ == "__main__":
    asyncio.run(main())

实现原理和实际示例

aiomultiprocess.Pool的实现原理

在之前的一篇文章中,我解释了如何将asyncio任务分配到多个CPU核心上。

一般的做法是在主进程中使用loop.run_in_executor启动一个进程池。然后,在进程池中的每个进程中创建一个asyncio事件循环,将协程函数在各自的事件循环中执行。流程图如下:

这个图示展示了如何集成asyncio和multiprocessing。图像由作者提供

aiomultiprocess.Pool的实现类似。它包含调度程序(scheduler)、队列(queue)和进程(process)这三个组件。

  • 调度程序可以理解为总厨,负责将任务以适当的方式分配给每个厨师。当然,你可以雇佣(实现)一个适合你需求的总厨。
  • 队列就像是厨房的流水线,严格来说,它包括一个订单通道和一个交付通道。总厨通过订单通道将菜单传给厨师,厨师通过交付通道将完成的菜肴送回。
  • 进程就像是餐厅中的厨师,它们根据分配同时处理若干道菜肴。每当一道菜准备好时,它会按照分配的顺序交给服务员。

整个流程图如下:

Aiomultiprocess由三个组件组成:调度程序、队列和进程。图像由作者提供

实际示例

根据之前的介绍,你现在应该知道如何使用aiomultiprocess了。让我们通过一个实际示例来体验其强大之处。

首先,我们将使用一个远程调用和循环计算来模拟现实生活中的数据检索和处理过程。这个方法说明了IO密集型和CPU密集型任务经常混合在一起,它们之间的边界并不那么清晰。

import asyncio
import random
import time
from aiohttp import ClientSession
from aiomultiprocess import Pool

def cpu_bound(n: int) -> int:
    result = 0
    for i in range(n*100_000):
        result += 1
    return result

async def invoke_remote(url: str) -> int:
    await asyncio.sleep(random.uniform(0.2, 0.7))
    async with ClientSession() as session:
        async with session.get(url) as response:
            status = response.status
            result = cpu_bound(status)
            return result

async def main():
    start = time.monotonic()
    tasks = [asyncio.create_task(invoke_remote("https://www.example.com"))
             for _ in range(30)]
    await asyncio.gather(*tasks)
    print(f"All jobs done in {time.monotonic() - start} seconds")

if __name__ == "__main__":
    asyncio.run(main())
使用传统的asyncio方法运行代码。作者提供的截图

代码的执行结果如图所示,大约需要21秒。现在让我们看看aiomultiprocess能够改进多少。

使用aiomultiprocess很简单。原始的并发代码不需要修改。你只需要调整主方法中的代码以在Pool内运行:

async def main():    start = time.monotonic()    async with Pool() as pool:        tasks = [pool.apply(invoke_remote, ("https://www.example.com",))                  for _ in range(30)]        await asyncio.gather(*tasks)    print(f"所有任务在 {time.monotonic() - start} 秒内完成")if __name__ == "__main__":    asyncio.run(main())
只需使用修改过的aiomultiprocess版本。作者提供的截图

如你所见,使用aiomultiprocess的代码只需要14秒就能在我的笔记本电脑上完成。在一台性能更强的计算机上,性能提升会更大。

详细最佳实践

最后,根据我的经验,让我分享一些更实用的最佳实践。

仅使用池

尽管aiomultiprocess还为我们提供了ProcessWorker类供选择,但我们应该始终使用Pool类以确保最大效率,因为创建进程需要消耗大量资源。

如何使用队列

在之前的文章中,我解释了如何使用asyncio.Queue实现生产者-消费者模式以平衡资源和性能。在aiomultiprocess中,我们也可以使用队列。然而,由于我们处于进程池中,我们不能使用asyncio.Queue。同时,我们也不能直接在进程池中使用multiprocessing.Queue。在这种情况下,你应该使用multiprocessing.Manager().Queue()来创建一个队列,代码如下:

import randomimport asynciofrom multiprocessing import Managerfrom multiprocessing.queues import Queuefrom aiomultiprocess import Poolasync def worker(name: str, queue: Queue):    while True:        item = queue.get()        if not item:            print(f"worker: {name}收到结束信号,将停止运行。")            queue.put(item)            break        await asyncio.sleep(random.uniform(0.2, 0.7))        print(f"worker: {name}开始处理值 {item}", flush=True)async def producer(queue: Queue):    for i in range(20):        await asyncio.sleep(random.uniform(0.2, 0.7))        queue.put(random.randint(1, 3))    queue.put(None)async def main():    queue: Queue = Manager().Queue()    producer_task = asyncio.create_task(producer(queue))    async with Pool() as pool:        c_tasks = [pool.apply(worker, args=(f"worker-{i}", queue))                    for i in range(5)]        await asyncio.gather(*c_tasks)        await producer_taskif __name__ == "__main__":    asyncio.run(main())

使用initializer来初始化资源

假设你需要在一个协程方法中使用会话或数据库连接池,但我们无法在主进程中创建任务时传递参数,因为这些对象无法被序列化。

一个替代方法是定义一个全局对象和一个初始化方法。在这个初始化方法中,访问全局对象并进行初始化。

就像multiprocessing.Pool一样,aiomultiprocess.Pool在初始化时可以接受一个初始化方法和相应的初始化参数。当每个进程启动时,将调用此方法完成初始化:

import asynciofrom aiomultiprocess import Poolimport aiohttpfrom aiohttp import ClientSession, ClientTimeoutsession: ClientSession | None = Nonedef init_session(timeout: ClientTimeout = None):    global session    session = aiohttp.ClientSession(timeout=timeout)async def get_status(url: str) -> int:    global session    async with session.get(url) as response:        status_code = response.status        return status_codeasync def main():    url = "https://httpbin.org/get"    timeout = ClientTimeout(2)    async with Pool(initializer=init_session, initargs=(timeout,)) as pool:        tasks = [asyncio.create_task(pool.apply(get_status, (url,)))                  for i in range(3)]        status = await asyncio.gather(*tasks)    print(status)if __name__ == "__main__":    asyncio.run(main())

异常处理和重试

尽管aiomultiprocess.Pool提供了exception_handler参数来帮助处理异常,但如果需要更灵活性,则需要将其与asyncio.wait结合使用。关于asyncio.wait的用法,可以参考我的上一篇文章。

使用asyncio.wait,可以获取遇到异常的任务。在提取任务之后,可以进行一些调整,然后重新执行任务,代码示例如下:

import asyncioimport randomfrom aiomultiprocess import Poolasync def worker():    await asyncio.sleep(0.2)    result = random.random()    if result > 0.5:        print("将引发异常")        raise Exception("出现错误")    return resultasync def main():    pending, results = set(), []    async with Pool() as pool:        for i in range(7):            pending.add(asyncio.create_task(pool.apply(worker)))        while len(pending) > 0:            done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)            print(f"现在完成的任务数和待处理任务数分别是{len(done)}, {len(pending)}")            for result in done:                if result.exception():                    pending.add(asyncio.create_task(pool.apply(worker)))                else:                    results.append(await result)        print(results)if __name__ == "__main__":    asyncio.run(main())

使用Tenacity进行重试

当然,我们对异常处理和重试有更灵活和强大的选择,例如使用Tenacity库,我在这篇文章中解释了它。

使用Tenacity,上面的代码可以大大简化。您只需向协程方法添加一个装饰器,当抛出异常时,该方法将自动重试。

import asynciofrom random import randomfrom aiomultiprocess import Poolfrom tenacity import *@retry()async def worker(name: str):    await asyncio.sleep(0.3)    result = random()    if result > 0.6:        print(f"{name}将引发异常")        raise Exception("出现问题")    return resultasync def main():    async with Pool() as pool:        tasks = pool.map(worker, [f"worker-{i}" for i in range(5)])        results = await tasks        print(results)if __name__ == "__main__":    asyncio.run(main())

使用tqdm指示进度

我喜欢tqdm,因为它总是可以告诉我代码运行到了哪里,当我在屏幕前等待时。本文还解释了如何使用它。

由于aiomultiprocess使用asyncio的API等待任务完成,因此它也与tqdm兼容:

import asynciofrom random import uniformfrom aiomultiprocess import Poolfrom tqdm.asyncio import tqdm_asyncioasync def worker():    delay = uniform(0.5, 5)    await asyncio.sleep(delay)    return delay * 10async def main():    async with Pool() as pool:        tasks = [asyncio.create_task(pool.apply(worker)) for _ in range(1000)]        results = await tqdm_asyncio.gather(*tasks)        print(results[:10])if __name__ == "__main__":    asyncio.run(main())

结论

运行asyncio代码就像一位厨师烹饪一顿饭。即使您可以通过同时运行不同的任务来提高效率,最终您仍然会遇到瓶颈。

在这一点上,最简单的解决方案是增加更多的厨师,以增加烹饪过程的并行性。

Aiomultiprocess是一个强大的Python库。通过允许并发任务在多个进程上运行,它完美地突破了asyncio单线程性质引起的性能瓶颈。

本文中关于aiomultiprocess的使用和最佳实践基于我的工作经验。如果您对任何方面感兴趣,请随时评论并参与讨论。

除了提高代码执行速度和性能外,使用各种工具提高工作效率也是一种性能增强:

通过Aiomultiprocess大幅提升Python的异步编程能力:全面指南 四海 第6张

Peng Qian

Python工具箱

查看列表4个故事 通过Aiomultiprocess大幅提升Python的异步编程能力:全面指南 四海 第7张 通过Aiomultiprocess大幅提升Python的异步编程能力:全面指南 四海 第8张 通过Aiomultiprocess大幅提升Python的异步编程能力:全面指南 四海 第9张

作为VoAGI会员,您的会费的一部分将用于支付您阅读的作者,并且您将获得完全访问每个故事的权限…

qtalen.medium.com

Leave a Reply

Your email address will not be published. Required fields are marked *