Press "Enter" to skip to content

Python应用程序 | 利用多进程提高速度和效率

介绍

利用现代多核处理器的全部功能,多进程是计算机科学中的一个基本概念,使程序能够同时运行多个任务或进程。通过将任务分为几个进程,每个进程都有自己的内存空间,多进程能够克服性能限制,与传统的单线程技术相比具有优势。由于进程是隔离的,所以能够保证稳定性和安全性,避免内存冲突。特别是对于需要进行大量计算操作的 CPU 绑定作业,多进程优化代码执行的能力至关重要。它是 Python 应用程序的一项重大改革,其中速度和效果至关重要,例如数据处理、科学模拟、图像和视频处理以及机器学习。

学习目标

  • 全面了解多进程及其在利用现代多核处理器提高 Python 应用程序性能方面的重要性。
  • 学习如何使用 Python 的 ‘multiprocessing’ 模块创建、管理和同步多个进程,实现任务的并行执行,同时确保稳定性和数据完整性。
  • 探索优化多进程性能的策略,包括考虑任务性质、资源利用和解决通信开销等因素,以开发高效和响应迅速的 Python 应用程序。
  • 多进程

利用现代多核处理器的能力,多进程是计算机编程中的一种强大方法,使程序能够同时执行多个任务或进程。多进程生成多个具有自己内存空间的进程,而不是多线程,多线程是在单个进程内运行多个线程。这种隔离可以防止进程之间干扰彼此的内存,从而提高稳定性和安全性。

Python应用程序 | 利用多进程提高速度和效率 四海 第1张

本文是数据科学博文的一部分。

多进程在优化代码执行中的重要性

软件开发的一个重要目标是优化代码执行。单个核心的处理能力对于传统的顺序编程来说可能是一个限制。通过允许将任务分配到多个核心上,多进程克服了这个限制,并充分利用了现代处理器的能力。因此,需要大量处理的作业运行速度更快,性能显著提高。

多进程有益的场景

  • CPU 绑定任务:多进程可以显著加快需要进行大量计算操作的应用程序的速度,例如复杂的数学计算或模拟。每个进程可以并行执行一部分计算,以最大化 CPU 的利用。
  • 并行处理:多进程使得可以同时处理多个独立的子任务,将许多现实世界的问题分解为更容易处理的部分。这减少了完成任务所需的总时间。
  • 图像和视频处理:对媒体的不同部分应用滤镜、修改和分析是处理照片和视频的常见任务。通过多进程将这些操作分散到不同的进程中,提高效率。
  • 科学模拟:对于蛋白质折叠或天气模拟等复杂模拟,多进程是有优势的。模拟可以在独立的进程中运行,得到更快的结果。
  • 网络爬虫和抓取:多进程可以通过同时从多个来源获取数据,减少获取信息所需的时间,帮助从多个网站提取信息。
  • 并发服务器:创建并发服务器时,多进程非常有帮助,每个进程处理一个不同的客户端请求。这样可以防止较慢的请求阻塞较快的请求。
  • 批处理:在需要按批次完成任务的情况下,通过多进程加快处理速度。

进程和线程的理解

并发和并行的实现很大程度上取决于使用进程和线程,它们是计算机程序中的基本执行单元。

Python应用程序 | 利用多进程提高速度和效率 四海 第2张

进程:

进程是一个独立的用户程序实例。每个进程都有自己的执行环境、内存空间和资源。由于进程是隔离的,它们不直接共享内存。进程间通信(IPC)是一种最复杂的机制之一,用于促进进程之间的通信。由于其体积和固有的隔离性,进程非常适合处理繁重的任务,例如执行许多独立的程序。

线程:

线程是进程内的较小执行单元。在单个进程内可以存在多个具有相同资源和内存的线程。由于它们共享相同的内存环境,因此运行在同一进程中的线程可以通过共享变量进行通信。与进程相比,线程更轻量级,更适合处理涉及大量共享数据和轻微分离的活动。

全局解释器锁(GIL)的限制及其对多线程的影响

在最流行的Python实现CPython中,使用一个名为全局解释器锁(GIL)的互斥锁来同步对Python对象的访问,阻止多个线程在同一进程中并发运行Python字节码。这意味着即使在具有多个核心的系统上,每个进程内只能同时运行一个线程的Python代码。

GIL的影响

面向I/O的任务:在频繁等待外部资源(如文件I/O或网络响应)的I/O绑定操作中,GIL的影响较小。在这种情况下,GIL的锁定和释放操作对性能的影响相对较小。

何时在Python中使用线程和进程?

线程:在处理面向I/O的活动时,当软件必须长时间等待外部资源时,线程具有优势。它们可以在后台运行而不干扰主线程,适用于需要响应性用户界面的应用程序。

进程:对于CPU绑定操作或者当你希望充分利用多个CPU核心时,进程更合适。多进程可以在没有GIL限制的情况下跨多个核心并行执行,因为每个进程都有自己的GIL。

“Multiprocessing”模块

Python的multiprocessing模块是一个强大的工具,通过创建和管理多个进程来实现并发和并行。它提供了一个高级接口,用于启动和管理进程,使程序员能够在多核机器上运行并行活动。

通过多个进程实现并发执行:

通过建立多个独立的进程,每个进程都有自己的Python解释器和内存空间,multiprocessing模块使得同时运行多个程序成为可能。因此,通过绕过默认的线程模块的全局解释器锁(GIL)限制,可以在多核平台上实现真正的并行执行。

主要类和函数概述

Process类:

Process类是multiprocessing模块的核心。使用这个类可以构建和管理一个独立的进程,它代表一个进程。主要的技术和特性包括:

start():启动进程,使目标函数在一个新的进程中运行。

terminate():强制终止进程。

Queue类:Queue类通过一个同步的队列提供了一种安全的进程间通信的方法。它支持使用put()和get()等方法向队列中添加和删除项。

Pool类:通过Pool类,可以将一个函数在多个输入值上并行执行,控制一个工作进程池。主要的技术包括:

Pool(processes):用指定数量的工作进程创建一个进程池的构造函数。

Lock类:当多个进程使用相同的共享资源时,可以使用Lock类来实现互斥,避免竞争情况。

Value和Array类:这些类允许您创建其他进程可以使用的共享对象。用于在进程之间安全地传输数据。

Manager类:使用Manager类可以使多个进程访问共享对象和数据结构。它提供了更复杂的抽象,如命名空间、字典和列表。

Pipe函数:

Pipe()函数构建一对连接对象,用于进程之间的双向通信。

您可以使用此函数返回的当前对象来标识正在运行的进程。

返回可用的CPU核心数量,这对于确定要同时运行多少个任务很有用。

使用Process类创建进程

您可以使用multiprocessing包中的Process类在Python中构建和控制不同的进程。以下是使用Process类建立进程以及如何使用目标参数在新进程中运行函数的逐步解释:

import multiprocessing

# 在新进程中运行的示例函数
def worker_function(number):
    print(f"工作进程 {number} 正在运行")

if __name__ == "__main__":
    # 创建进程列表
    processes = []

    num_processes = 4

    for i in range(num_processes):
        # 创建新进程,指定目标函数及其参数
        process = multiprocessing.Process(target=worker_function, args=(i,))
        processes.append(process)
        process.start()  # 启动进程

    # 等待所有进程完成
    for process in processes:
        process.join()

    print("所有进程已完成")

工作进程 0 正在运行。

工作进程 1 正在运行。

工作进程 2 正在运行。

工作进程 3 正在运行。

所有进程已完成。

进程间通信

您可以使用multiprocessing包中的Process类在Python中构建和控制不同的进程。以下是使用Process类建立进程以及如何使用目标参数在新进程中运行函数的逐步解释。

在多进程环境中,进程可以使用各种技术和过程进行操作同步和共享数据,这被称为进程间通信 (IPC)。在多进程环境中,通信是至关重要的,因为许多进程同时运行。这使得进程能够合作、共享信息并计划它们的操作。

IPC的方法

Python应用程序 | 利用多进程提高速度和效率 四海 第3张

管道:

数据通过被称为管道的基本IPC结构在两个进程之间传递。当另一个进程从管道读取时,第一个进程写入数据。管道可以是命名的或匿名的。然而,管道只能用于两个不同的进程之间进行通信。

队列:

multiprocessing模块的队列提供了一种更灵活的IPC方法。通过在队列之间发送消息,它们使多个进程之间可以进行通信。发送进程将消息添加到队列中,接收进程检索它们。通过队列,数据的完整性和同步性会自动处理。

共享内存:

多个进程可以访问相同的内存区域,从而实现有效的数据共享和通信。控制共享内存需要精确的同步,以避免竞态条件并确保数据一致性。

使用队列进行通信

由于其简单性和内置的同步功能,队列是Python的multiprocessing模块中常用的IPC技术。以下是使用队列进行进程间通信的示例:

import multiprocessing

# 将数据放入队列的工作函数
def producer(queue):
    for i in range(5):
        queue.put(i)
        print(f"生产:{i}")

# 从队列中检索数据的工作函数
def consumer(queue):
    while True:
        data = queue.get()
        if data is None:  # 停止循环的标志值
            break
        print(f"消费:{data}")

if __name__ == "__main__":
    # 创建一个用于通信的队列
    queue = multiprocessing.Queue()

    # 创建生产者和消费者进程
    producer_process = multiprocessing.Process(target=producer, args=(queue,))
    consumer_process = multiprocessing.Process(target=consumer, args=(queue,))

    # 启动进程
    producer_process.start()
    consumer_process.start()

    # 等待生产者完成
    producer_process.join()

    # 通过向队列添加一个停止标志值来通知消费者停止
    queue.put(None)

    # 等待消费者完成
    consumer_process.join()

    print("所有进程已完成")

在这个示例中,生产者进程使用put()方法将数据添加到队列中。消费者进程使用get()方法从队列中检索数据。一旦生产者完成,通过使用停止标志值(None)告知消费者停止。使用join()函数等待两个进程完成。这个示例演示了队列如何为进程提供一种实用且安全的方法来交换数据,无需显式的同步技术。

使用池化进行并行处理

您可以使用多进程模块中的Pool类来将函数在不同的输入值上并行执行,这是一个管理工作进程池的有用工具。它使任务的分配和结果的收集更加简单。常用的并行执行方法包括Pool类的map()和apply()操作。

在Pool类中使用map()和apply()

map()函数:

map()方法将提供的函数应用于可迭代对象的每个成员,并将负载分配给可用的进程。返回的结果列表与输入值的顺序相同。下面是一个示例:

import multiprocessing

def square(number):
    return number ** 2

if __name__ == "__main__":
    input_data = [1, 2, 3, 4, 5]

    with multiprocessing.Pool() as pool:
        results = pool.map(square, input_data)

    print("平方结果:", results)

apply()函数:

当您需要将函数应用于一组参数时,可以使用apply()函数在进程池中执行。它返回在输入上使用函数的结果。下面是一个示例:

import multiprocessing

def cube(number):
    return number ** 3

if __name__ == "__main__":
    number = 4

    with multiprocessing.Pool() as pool:
        result = pool.apply(cube, (number,))

    print(f"{number}的立方是:", result)

池化提升性能的场景

CPU密集型任务:Pool类可以执行需要大量CPU计算能力的任务的并行版本,例如模拟或计算。通过将负载分布到活动任务中,可以有效地使用多个CPU核心。

数据处理:当处理数据转换、过滤或分析等数据处理任务时,Pool类可以同时处理多个数据集组件。这样可以大大缩短处理时间。

网络爬虫:Pool类可以同时从多个URL请求数据,同时从多个网站中抓取信息。这加快了数据收集过程。

同步和锁定:在多进程系统中,当两个或多个进程同时访问相同的共享资源或变量时,会发生竞争情况,导致不可预测或不准确的行为。使用锁定等同步技术可以避免数据损坏、崩溃和不准确的程序输出。通过要求进程按顺序访问资源,锁定创建了一种避免竞争情况的合作形式。

使用锁定来防止竞争条件

被称为“锁定”(互斥锁)的同步原语确保只有一个进程可以在任何给定时刻访问关键代码段或共享资源。一旦进程获得锁定,它就独自访问受保护的区域,直到释放锁定为止,其他进程无法访问。

通过要求进程按顺序访问资源,锁定创建了一种避免竞争情况的合作形式。

用于保护数据完整性的锁定示例

import multiprocessing

def increment(counter, lock):
    for _ in range(100000):
        with lock:
            counter.value += 1

if __name__ == "__main__":
    counter = multiprocessing.Value("i", 0)
    lock = multiprocessing.Lock()

    processes = []

    for _ in range(4):
        process = multiprocessing.Process(target=increment, args=(counter, lock))
        processes.append(process)
        process.start()

    for process in processes:
        process.join()

    print("最终计数器的值:", counter.value)

区分CPU密集型和I/O密集型任务

CPU密集型任务: CPU密集型任务广泛利用CPU的处理能力。这些任务需要大量的CPU资源,包括复杂的计算、数学运算、模拟和数据处理。CPU密集型任务很少与文件和网络等外部资源进行交互,大部分时间都在执行代码。

I/O密集型任务: I/O密集型任务包括读写文件、在网络上发送请求和与数据库通信等操作,所有这些操作都需要大量的等待时间来完成I/O操作。这些任务花费更多时间“等待”I/O操作完成,而不是主动使用CPU。

使用进程池管理CPU密集型任务

进程池对于控制CPU密集型工作负载非常有益。进程池将CPU密集型任务分配到多个进程中,使它们可以在各个CPU核心上并发运行,因为大多数情况下,它们涉及可以并行计算的计算任务。这大大缩短了执行时间并有效利用了可用的CPU资源。

使用进程池,您可以确保多核处理器充分利用以更快地完成CPU密集型任务。多进程模块的Pool类使得创建和管理这些工作进程更加容易。

异步编程处理I/O密集型任务

异步编程是处理I/O密集型任务的合适策略,其中主要瓶颈是等待I/O操作(例如读写文件或进行网络请求)。通过在等待I/O时有效地切换活动,异步编程使得单个线程能够同时管理多个任务,而不需要使用多个进程。

使用异步编程时,不需要设置独立的进程,例如进程池。相反,它采用协作式多任务处理策略,在等待I/O发生时,活动将控制权交给事件循环,以便其他任务可以继续工作。这可以显著提高I/O密集型应用程序的响应性。

影响多进程性能的因素

多进程解决方案的性能受到多个因素的影响:

  • 任务性质:多进程的潜在性能优势取决于工作是CPU密集型还是I/O密集型。由于等待外部资源,I/O密集型操作可能只会看到适度的性能提升,但CPU密集型任务会获得更多好处,因为它们可以利用多个核心。
  • 核心数量:多进程实现的潜在加速取决于可用CPU核心的数量。更多的核心可以实现更大的并行执行。进程必须协调和通信,这增加了开销。队列和其他有效的通信技术有助于减少这种开销。
  • 任务粒度:将任务分解为较小的片段可以增加并行性和负载平衡。将通信开销引入到非常细粒度的活动中。

比较不同实现的基准测试

下面是使用简单的CPU密集型任务(计算阶乘)进行不同实现的对比示例:

import time
import multiprocessing
import threading
import math

def factorial(n):
    return math.factorial(n)

def single_thread():
    for _ in range(4):
        factorial(5000)

def multi_thread():
    threads = []
    for _ in range(4):
        thread = threading.Thread(target=factorial, args=(5000,))
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()

def multi_process():
    processes = []
    for _ in range(4):
        process = multiprocessing.Process(target=factorial, args=(5000,))
        processes.append(process)
        process.start()
    for process in processes:
        process.join()

if __name__ == "__main__":
    start_time = time.time()
    single_thread()
    print("单线程:", time.time() - start_time)

    start_time = time.time()
    multi_thread()
    print("多线程:", time.time() - start_time)

    start_time = time.time()
    multi_process()
    print("多进程:", time.time() - start_time)

解决开销和权衡问题

即使多进程可以显著提升CPU密集型任务的性能,它也存在一些缺点:

  • 通信开销:在开发和运行进程时,可能存在显著的通信开销,特别是对于简单操作。在开销和处理时间之间取得平衡非常重要。
  • 内存使用:因为每个进程有自己的内存区域,内存使用可能会增加。需要谨慎处理内存。
  • 可扩展性:虽然多进程可以提高多核系统的性能,但过度强调并行性可能不会导致相应的加速,因为通信开销过大。
  • 任务分配:为了实现平衡的执行,有效地划分任务并管理进程之间的工作负载至关重要。

使用Matplotlib进行可视化

可视化是理解多进程行为和效果的一种有效技术。您可以跟踪进程的进展,评估不同场景下的数据,并通过制作图表和图形直观展示并行处理的性能提升。

使用Matplotlib进行可视化的示例

这里有两个示例,展示了如何使用Matplotlib来可视化多进程执行和加速:

示例1:可视化进程执行

假设您正在使用多个进程处理一批图像。您可以使用条形图可视化每个进程的进度:

import multiprocessing
import time
import matplotlib.pyplot as plt

def process_image(image):
    time.sleep(2)  # 模拟图像处理
    return f"处理完毕:{image}"

if __name__ == "__main__":
    images = ["image1.jpg", "image2.jpg", "image3.jpg", "image4.jpg"]
    num_processes = 4

    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.map(process_image, images)

    plt.bar(range(len(images)), [1] * len(images), align="center", color="blue", 
    label="正在处理")
    plt.bar(range(len(results)), [1] * len(results), align="center", color="green", 
    label="已处理")

    plt.xticks(range(len(results)), images)
    plt.ylabel("进度")
    plt.title("图像处理进度")
    plt.legend()

    plt.show()

示例2:加速比较

import time
import threading
import multiprocessing
import matplotlib.pyplot as plt

def task():
    time.sleep(1)  # 模拟工作

def run_single_thread():
    for _ in range(4):
        task()

def run_multi_thread():
    threads = []
    for _ in range(4):
        thread = threading.Thread(target=task)
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()

def run_multi_process():
    processes = []
    for _ in range(4):
        process = multiprocessing.Process(target=task)
        processes.append(process)
        process.start()
    for process in processes:
        process.join()

if __name__ == "__main__":
    times = []

    start_time = time.time()
    run_single_thread()
    times.append(time.time() - start_time)

    start_time = time.time()
    run_multi_thread()
    times.append(time.time() - start_time)

    start_time = time.time()
    run_multi_process()
    times.append(time.time() - start_time)

    labels = ["单线程", "多线程", "多进程"]
    plt.bar(labels, times)
    plt.ylabel("执行时间 (秒)")
    plt.title("加速比较")

    plt.show()

应用

在许多领域中,任务可以被拆分成较小的工作单元并可以并行完成,多进程是至关重要的。以下是一些多进程至关重要的真实场景:

  • 数据处理:进程保持隔离,防止直接内存共享。进程间通信(IPC)是一种最复杂的机制,用于实现进程间的通信。通过其庞大的大小和固有的隔离性,进程在管理资源密集型任务(例如执行多个独立程序)方面表现出色。
  • 图像和视频处理:多进程可以帮助应用滤镜、缩放和对象检测等图像和视频处理。并行处理每个图片或帧以加快操作速度,并在视频应用中实现实时处理。

多进程可以加速网络爬取和抓取过程,从多个网站收集数据。使用多个进程来从不同来源检索数据进行数据收集和分析。

深度学习和机器学习:使用大规模数据集进行训练机器学习模型通常需要计算密集的任务。通过使用多个核心或GPU进行数据和训练操作,可以减少训练时间并提高模型收敛性。

  • 并行计算和数值分析:多进程对于大规模数学计算、复杂问题求解和数值模拟非常有帮助。并行矩阵计算和蒙特卡罗模拟是两个示例。

许多应用程序(如渲染动画帧或处理报告的业务程序)需要批量处理。通过多进程进行高效的并行执行这些任务。

金融建模

复杂的金融模拟、风险分析和情景建模可能涉及大量计算。多进程可以加速这些计算,实现更快的决策和分析。

结论

探索Python的多进程能力可以改变代码的性能,加速应用程序。这次旅程揭示了线程、进程和多进程模块之间复杂的相互作用。通过多进程提供的效率和优化,为代码注入新的生命。请记住,多进程是您创新、速度和效率的关键。您新获得的技能将为您准备处理困难的项目,包括复杂的模拟和数据密集型任务。让这些信息激发您对编程的热情,推动您的应用程序实现更高的效果和影响力。旅程还在继续,现在您可以使用多进程,您的代码的可能性是无限的。

要点

  • 多进程是同时运行多个进程,允许程序充分利用现代多核处理器以实现最佳性能。
  • 进程:独立的执行单元,具有自己的内存空间,而线程在进程内共享内存。了解这些区别有助于选择正确的并发方法。
  • Python的全局解释器锁(GIL)限制了多线程场景下的真正并行执行,使得多进程更适合于需要密集计算的CPU绑定任务。
  • 像管道、队列和共享内存这样的进程间通信(IPC)机制可以安全地进行进程间通信和数据交换。
  • 任务的性质、核心数量、GIL的影响、通信开销、内存使用和任务粒度都会影响多进程的性能。需要仔细考虑平衡资源使用并实现最佳可伸缩性。

常见问题

本文中显示的媒体不归Analytics Vidhya所有,仅根据作者的自由裁量使用。

Leave a Reply

Your email address will not be published. Required fields are marked *