Vanson's Eternal Blog

Python中的多进程编程

Python MultiProcess.png
Published on
/12 mins read/---

定义

多进程是指同时运行多个进程(Process),每个进程有独立的内存空间。Python中使用multiprocessing模块来实现多进程。

核心组件:

  • Process:创建进程对象
  • Queue/Pipe:进程间通信 (IPC)
  • Lock/Event:同步机制
  • Pool:进程池管理
  • Value/Array:共享内存

创建多进程

import multiprocessing
import time
 
def worker(name):
    print(f"Process {name} started")
    time.sleep(2)  # 模拟耗时任务
    print(f"Process {name} finished")
 
if __name__ == "__main__":
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()  # 启动进程
 
    for p in processes:
        p.join()  # 等待所有进程完成
 
    print("All processes finished")
 

进程间通信

多进程之间可以通过Queue或Pipe进行通信。

使用Queue

import multiprocessing
 
def producer(queue):
    for i in range(5):
        queue.put(f"Message {i}")  # 向队列中放入数据
    queue.put(None)  # 发送结束信号
 
def consumer(queue):
    while True:
        msg = queue.get()  # 从队列中获取数据
        if msg is None:
            break
        print(f"Consumed: {msg}")
 
if __name__ == "__main__":
    queue = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(queue,))
    p2 = multiprocessing.Process(target=consumer, args=(queue,))
 
    p1.start()
    p2.start()
 
    p1.join()
    p2.join()
 
    print("All done")

使用Pipe

import multiprocessing
 
def sender(conn):
    conn.send("Hello from sender")  # 发送数据
    conn.close()
 
def receiver(conn):
    print(f"Received: {conn.recv()}")  # 接收数据
 
if __name__ == "__main__":
    parent_conn, child_conn = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=sender, args=(child_conn,))
    p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))
 
    p1.start()
    p2.start()
 
    p1.join()
    p2.join()
 
    print("All done")
 

进程池(Pool)

multiprocessing.Pool提供了更高级的接口,可以方便地管理多个进程。

使用Pool

 
import multiprocessing
 
def compute_square(n):
    return n * n
 
if __name__ == '__main__':
    with multiprocessing.Pool(4) as pool:  # 4个worker进程
        # 方法1: map
        results = pool.map(compute_square, range(10))
        print("Map results:", results)
        
        # 方法2: apply_async(异步)
        async_result = pool.apply_async(compute_square, (5,))
        print("Async result:", async_result.get())
 
 
 

动态分配任务

 
import multiprocessing
import time
 
def task(x):
    time.sleep(1)  # 模拟任务耗时
    return x * x
 
if __name__ == "__main__":
    with multiprocessing.Pool(4) as pool:
        results = []
        for i in range(10):
            result = pool.apply_async(task, args=(i,))  # 异步提交任务
            results.append(result)
 
        for result in results:
            print(result.get())  # 获取任务结果
 
 

进程池的回调函数

可以为任务设置回调函数,任务完成后自动调用。

import multiprocessing
import time
 
def task(x):
    time.sleep(1)
    return x * x
 
def callback(result):
    print(f"Callback received: {result}")
 
if __name__ == "__main__":
    with multiprocessing.Pool(4) as pool:
        for i in range(10):
            pool.apply_async(task, args=(i,), callback=callback)
        pool.close()
        pool.join()
 

守护进程(Daemon Process)

守护进程会在主进程结束时自动终止。

import multiprocessing
import time
 
def daemon_process():
    while True:
        print("Daemon process is running...")
        time.sleep(1)
 
if __name__ == "__main__":
    p = multiprocessing.Process(target=daemon_process)
    p.daemon = True  # 设置为守护进程
    p.start()
 
    time.sleep(5)  # 主进程运行5秒后结束
    print("Main process finished")
 

共享内存

from multiprocessing import Value, Array
 
def modify_shared_data(shared_value, shared_array):
    with shared_value.get_lock():  # 需要加锁保证原子性
        shared_value.value += 1
    
    for i in range(len(shared_array)):
        shared_array[i] *= 2
 
if __name__ == '__main__':
    counter = multiprocessing.Value('i', 0)
    arr = multiprocessing.Array('d', [1.0, 2.0, 3.0])
    
    processes = []
    for _ in range(3):
        p = multiprocessing.Process(target=modify_shared_data, args=(counter, arr))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print("Final counter:", counter.value)
    print("Final array:", list(arr))
 

同步机制:Lock

from multiprocessing import Lock
 
def printer(item, lock):
    with lock:  # 确保同一时刻只有一个进程打印
        print(f"Process {os.getpid()} prints: {item}")
 
if __name__ == '__main__':
    lock = multiprocessing.Lock()
    items = ['A', 'B', 'C', 'D']
    
    processes = []
    for item in items:
        p = multiprocessing.Process(target=printer, args=(item, lock))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
 

应用

圆周率

import multiprocessing
import numpy as np
import math
from multiprocessing import Pool
 
def monte_carlo_simulation(samples):
    """
    执行蒙特卡洛模拟
    :param samples: 需要生成的随机点数量
    :return: 落在1/4圆内的点数
    """
    # 使用numpy生成随机坐标
    x = np.random.random(samples)
    y = np.random.random(samples)
    
    # 向量化计算点是否在单位圆内
    inside = np.sum(x**2 + y**2 <= 1.0)
    return inside
 
def calculate_pi(total_samples, num_processes=4):
    """
    使用多进程并行计算圆周率
    :param total_samples: 总样本数
    :param num_processes: 使用的进程数
    :return: 估算的圆周率值
    """
    # 计算每个进程需要处理的样本数
    samples_per_process = total_samples // num_processes
    
    # 创建进程池
    with Pool(processes=num_processes) as pool:
        # 生成任务参数列表(每个进程分配等量样本)
        tasks = [samples_per_process] * num_processes
        
        # 使用map并行执行任务
        results = pool.map(monte_carlo_simulation, tasks)
        
        # 汇总所有落在圆内的点数
        total_inside = sum(results)
    
    # 计算圆周率 (4 * 圆内点数 / 总点数)
    pi_estimate = 4 * total_inside / total_samples
    return pi_estimate
 
if __name__ == '__main__':
    # 参数配置
    total_samples = 10_000_000  # 总样本数
    num_processes = multiprocessing.cpu_count()  # 使用全部CPU核心
    
    # 执行计算
    pi_estimate = calculate_pi(total_samples, num_processes)
    
    # 输出结果
    print(f"真实圆周率: {math.pi:.8f}")
    print(f"使用 {total_samples} 个样本的估算值: {pi_estimate:.8f}")
    print(f"绝对误差: {abs(math.pi - pi_estimate):.8f}")
 
  • 使用 numpy 生成随机数:numpy.random.random 可以一次性生成大量随机数,效率远高于逐次调用 random.random()。
  • 向量化操作:使用 numpy 的数组操作,可以一次性计算所有点是否在单位圆内,避免逐次判断,显著提高性能。
  • 多进程并行:使用 multiprocessing.Pool 创建进程池,将任务分配到多个进程,充分利用多核CPU资源。

并行数值积分

科学计算中的并行数值积分,数值积分是一种常见的科学计算任务,可以使用多进程来加速计算。

import multiprocessing
import numpy as np
from scipy.integrate import quad
 
def integrate_function(func, a, b, args=()):
    return quad(func, a, b, args=args)[0]
 
def parallel_integration(func, a, b, num_processes=4, intervals=1000):
    interval_size = (b - a) / intervals
    tasks = [(func, a + i * interval_size, a + (i + 1) * interval_size, args) for i in range(intervals)]
    
    with multiprocessing.Pool(processes=num_processes) as pool:
        results = pool.starmap(integrate_function, tasks)
    
    return sum(results)
 
if __name__ == '__main__':
    # 定义一个简单的积分函数
    def f(x):
        return x**2
    
    a, b = 0, 10
    num_processes = multiprocessing.cpu_count()
    
    result = parallel_integration(f, a, b, num_processes=num_processes)
    print(f"Integral result: {result}")
 

多进程滤波

图像处理中的多进程滤波,图像处理任务通常可以并行化,例如对图像应用滤波器。

from multiprocessing import Pool
from PIL import Image, ImageFilter
import os
 
def process_image(image_path):
    img = Image.open(image_path)
    filtered = img.filter(ImageFilter.BLUR)
    save_path = f"processed_{os.path.basename(image_path)}"
    filtered.save(save_path)
    return save_path
 
if __name__ == '__main__':
    image_paths = ["image1.jpg", "image2.jpg", "image3.jpg", "image4.jpg"]
    
    with Pool(processes=4) as pool:
        results = pool.map(process_image, image_paths)
    
    print(f"Processed images: {results}")
 

多进程数据预处理

深度学习中的多进程数据预处理,在深度学习中,数据预处理是一个耗时的过程,可以利用多进程来加速。

import torch
import torch.multiprocessing as mp
from PIL import Image
import torchvision.transforms as transforms
 
def process_image(rank, image_path, output_tensor):
    image = Image.open(image_path)
    transform = transforms.Compose([transforms.Resize((128, 128)), transforms.ToTensor()])
    output_tensor[rank] = transform(image)
    print(f"Process {rank} processed image")
 
if __name__ == "__main__":
    image_paths = ["image1.jpg", "image2.jpg", "image3.jpg", "image4.jpg"]
    output_tensor = torch.zeros((4, 3, 128, 128)).share_memory_()
    
    processes = []
    for rank, image_path in enumerate(image_paths):
        p = mp.Process(target=process_image, args=(rank, image_path, output_tensor))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    print("Processed images stored in tensor:", output_tensor)
 

并行特征提取

在机器学习中,特征提取是一个常见的任务,可以使用多进程来加速。

 
from multiprocessing import Pool
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
 
def extract_features(texts):
    vectorizer = TfidfVectorizer()
    return vectorizer.fit_transform(texts).toarray()
 
def parallel_feature_extraction(texts, num_processes=4):
    chunk_size = len(texts) // num_processes
    chunks = [texts[i*chunk_size : (i+1)*chunk_size] for i in range(num_processes)]
    
    with Pool(processes=num_processes) as pool:
        results = pool.map(extract_features, chunks)
    
    return np.concatenate(results, axis=0)
 
if __name__ == '__main__':
    texts = ["text1", "text2", "text3", "text4", "text5", "text6", "text7", "text8"]
    num_processes = 4
    
    features = parallel_feature_extraction(texts, num_processes)
    print(f"Features shape: {features.shape}")
 

主流框架中的应用

Celery(分布式任务队列)

  • Worker进程池‌:通过prefork模式创建子进程池,每个进程独立执行任务‌。
  • ‌任务分发机制‌:父进程通过消息队列(如RabbitMQ)接收任务,通过os.fork()动态分配子进程执行‌。
  • ‌共享内存优化‌:使用mmap实现进程间共享任务状态数据,减少序列化开销‌。
 
# celery/worker/consumer.py
class Consumer:
    def start(self):
        # 创建预分配进程池 ‌:ml-citation{ref="2,5" data="citationList"}
        self.pool = self._create_process_pool()
 
    def _create_process_pool(self):
        return self.controller.spawn_worker_processes(
            max_concurrency=self.max_workers,  # 最大进程数
            on_start=self.on_process_start  # 进程启动回调
        )
 
 
# celery/worker/process.py
class WorkerProcess:
    def _boot(self):
        # 通过os.fork创建子进程 ‌:ml-citation{ref="2,7" data="citationList"}
        pid = os.fork()
        if pid == 0:  # 子进程
            self.run_worker()
 
    def run_worker(self):
        # 独立执行任务的子进程逻辑 ‌:ml-citation{ref="5" data="citationList"}
        while not self.should_stop:
            self.process_task()
 
 
 

Scrapy(爬虫框架)

  • ‌多核利用率优化‌:通过Twisted事件循环+多进程组合,实现I/O与计算并行‌。
  • ‌进程间通信‌:使用multiprocessing.Queue传递抓取结果,父进程统一持久化存储‌。
  • ‌资源隔离机制‌:每个爬虫子进程独立维护DNS解析缓存与请求会话‌。
# scrapy/core/downloader/__init__.py
class Downloader:
    def __init__(self, slot_size=8):
        # 基于CPU核数设置下载槽位 ‌:ml-citation{ref="3,6" data="citationList"}
        self.slots = SlotManager(slot_size * 2)
    
    def _enqueue_request(self, request):
        # 多进程分发下载任务 ‌:ml-citation{ref="6" data="citationList"}
        slot = self.slots.get_slot(request)
        slot.add_request(request)
 

PyTorch

  • DataLoader优化‌:使用fork启动方式+共享内存,避免数据复制到子进程‌。
  • CUDA多进程支持‌:通过torch.multiprocessing实现GPU显存隔离,防止多进程冲突‌。
  • 梯度聚合机制‌:多进程模型参数通过multiprocessing.Manager实现分布式更新‌。
 
# torch/utils/data/_utils/worker.py
def _worker_loop(dataset, index_queue, data_queue):
    # 子进程初始化共享内存 ‌:ml-citation{ref="5,7" data="citationList"}
    dataset = torch.utils.data.get_dataset(dataset)
    while True:
        idx = index_queue.get()
        data = dataset[idx]
        data_queue.put(data)
 
 

Gunicorn(WSGI服务器)

  • ‌Master-Worker模型‌:主进程监控负载,动态fork()子进程处理请求‌。
  • ‌零拷贝优化‌:通过sendfile()系统调用在父子进程间直接传输文件描述符‌。
  • ‌热重启机制‌:使用SIGUSR2信号通知子进程优雅退出,保留TCP连接状态‌。
# gunicorn/arbiter.py
class Arbiter:
    def spawn_workers(self):
        # 基于os.fork生成Worker进程 ‌:ml-citation{ref="7,8" data="citationList"}
        for _ in range(self.num_workers - len(self.workers)):
            pid = os.fork()
            if pid == 0:  # Worker进程
                self.worker_class(self).run()
 
    def kill_workers(self, sig):
        # 通过信号控制进程重启 ‌:ml-citation{ref="8" data="citationList"}
        for pid in self.worker_pids:
            os.kill(pid, sig)
 
 

总结

  • GIL限制:多进程绕过了GIL的限制,适合CPU密集型任务。
  • 内存开销:每个进程有独立的内存空间,内存开销较大。
  • 进程间通信:需要显式地使用Queue或Pipe来通信。

多进程与多线程的对比

  • 多线程:适合I/O密集型任务(如文件操作、网络请求)。
  • 多进程:适合CPU密集型任务(如计算密集型任务)。