定义
多进程是指同时运行多个进程(Process),每个进程有独立的内存空间。Python中使用multiprocessing模块来实现多进程。
核心组件:
- Process:创建进程对象
- Queue/Pipe:进程间通信 (IPC)
- Lock/Event:同步机制
- Pool:进程池管理
- Value/Array:共享内存
创建多进程
import multiprocessing
import time
def worker(name):
print(f"Process {name} started")
time.sleep(2) # 模拟耗时任务
print(f"Process {name} finished")
if __name__ == "__main__":
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start() # 启动进程
for p in processes:
p.join() # 等待所有进程完成
print("All processes finished")
进程间通信
多进程之间可以通过Queue或Pipe进行通信。
使用Queue
import multiprocessing
def producer(queue):
for i in range(5):
queue.put(f"Message {i}") # 向队列中放入数据
queue.put(None) # 发送结束信号
def consumer(queue):
while True:
msg = queue.get() # 从队列中获取数据
if msg is None:
break
print(f"Consumed: {msg}")
if __name__ == "__main__":
queue = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(queue,))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
print("All done")
使用Pipe
import multiprocessing
def sender(conn):
conn.send("Hello from sender") # 发送数据
conn.close()
def receiver(conn):
print(f"Received: {conn.recv()}") # 接收数据
if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender, args=(child_conn,))
p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
print("All done")
进程池(Pool)
multiprocessing.Pool提供了更高级的接口,可以方便地管理多个进程。
使用Pool
import multiprocessing
def compute_square(n):
return n * n
if __name__ == '__main__':
with multiprocessing.Pool(4) as pool: # 4个worker进程
# 方法1: map
results = pool.map(compute_square, range(10))
print("Map results:", results)
# 方法2: apply_async(异步)
async_result = pool.apply_async(compute_square, (5,))
print("Async result:", async_result.get())
动态分配任务
import multiprocessing
import time
def task(x):
time.sleep(1) # 模拟任务耗时
return x * x
if __name__ == "__main__":
with multiprocessing.Pool(4) as pool:
results = []
for i in range(10):
result = pool.apply_async(task, args=(i,)) # 异步提交任务
results.append(result)
for result in results:
print(result.get()) # 获取任务结果
进程池的回调函数
可以为任务设置回调函数,任务完成后自动调用。
import multiprocessing
import time
def task(x):
time.sleep(1)
return x * x
def callback(result):
print(f"Callback received: {result}")
if __name__ == "__main__":
with multiprocessing.Pool(4) as pool:
for i in range(10):
pool.apply_async(task, args=(i,), callback=callback)
pool.close()
pool.join()
守护进程(Daemon Process)
守护进程会在主进程结束时自动终止。
import multiprocessing
import time
def daemon_process():
while True:
print("Daemon process is running...")
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=daemon_process)
p.daemon = True # 设置为守护进程
p.start()
time.sleep(5) # 主进程运行5秒后结束
print("Main process finished")
共享内存
from multiprocessing import Value, Array
def modify_shared_data(shared_value, shared_array):
with shared_value.get_lock(): # 需要加锁保证原子性
shared_value.value += 1
for i in range(len(shared_array)):
shared_array[i] *= 2
if __name__ == '__main__':
counter = multiprocessing.Value('i', 0)
arr = multiprocessing.Array('d', [1.0, 2.0, 3.0])
processes = []
for _ in range(3):
p = multiprocessing.Process(target=modify_shared_data, args=(counter, arr))
processes.append(p)
p.start()
for p in processes:
p.join()
print("Final counter:", counter.value)
print("Final array:", list(arr))
同步机制:Lock
from multiprocessing import Lock
def printer(item, lock):
with lock: # 确保同一时刻只有一个进程打印
print(f"Process {os.getpid()} prints: {item}")
if __name__ == '__main__':
lock = multiprocessing.Lock()
items = ['A', 'B', 'C', 'D']
processes = []
for item in items:
p = multiprocessing.Process(target=printer, args=(item, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
应用
圆周率
import multiprocessing
import numpy as np
import math
from multiprocessing import Pool
def monte_carlo_simulation(samples):
"""
执行蒙特卡洛模拟
:param samples: 需要生成的随机点数量
:return: 落在1/4圆内的点数
"""
# 使用numpy生成随机坐标
x = np.random.random(samples)
y = np.random.random(samples)
# 向量化计算点是否在单位圆内
inside = np.sum(x**2 + y**2 <= 1.0)
return inside
def calculate_pi(total_samples, num_processes=4):
"""
使用多进程并行计算圆周率
:param total_samples: 总样本数
:param num_processes: 使用的进程数
:return: 估算的圆周率值
"""
# 计算每个进程需要处理的样本数
samples_per_process = total_samples // num_processes
# 创建进程池
with Pool(processes=num_processes) as pool:
# 生成任务参数列表(每个进程分配等量样本)
tasks = [samples_per_process] * num_processes
# 使用map并行执行任务
results = pool.map(monte_carlo_simulation, tasks)
# 汇总所有落在圆内的点数
total_inside = sum(results)
# 计算圆周率 (4 * 圆内点数 / 总点数)
pi_estimate = 4 * total_inside / total_samples
return pi_estimate
if __name__ == '__main__':
# 参数配置
total_samples = 10_000_000 # 总样本数
num_processes = multiprocessing.cpu_count() # 使用全部CPU核心
# 执行计算
pi_estimate = calculate_pi(total_samples, num_processes)
# 输出结果
print(f"真实圆周率: {math.pi:.8f}")
print(f"使用 {total_samples} 个样本的估算值: {pi_estimate:.8f}")
print(f"绝对误差: {abs(math.pi - pi_estimate):.8f}")
- 使用 numpy 生成随机数:numpy.random.random 可以一次性生成大量随机数,效率远高于逐次调用 random.random()。
- 向量化操作:使用 numpy 的数组操作,可以一次性计算所有点是否在单位圆内,避免逐次判断,显著提高性能。
- 多进程并行:使用 multiprocessing.Pool 创建进程池,将任务分配到多个进程,充分利用多核CPU资源。
并行数值积分
科学计算中的并行数值积分,数值积分是一种常见的科学计算任务,可以使用多进程来加速计算。
import multiprocessing
import numpy as np
from scipy.integrate import quad
def integrate_function(func, a, b, args=()):
return quad(func, a, b, args=args)[0]
def parallel_integration(func, a, b, num_processes=4, intervals=1000):
interval_size = (b - a) / intervals
tasks = [(func, a + i * interval_size, a + (i + 1) * interval_size, args) for i in range(intervals)]
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.starmap(integrate_function, tasks)
return sum(results)
if __name__ == '__main__':
# 定义一个简单的积分函数
def f(x):
return x**2
a, b = 0, 10
num_processes = multiprocessing.cpu_count()
result = parallel_integration(f, a, b, num_processes=num_processes)
print(f"Integral result: {result}")
多进程滤波
图像处理中的多进程滤波,图像处理任务通常可以并行化,例如对图像应用滤波器。
from multiprocessing import Pool
from PIL import Image, ImageFilter
import os
def process_image(image_path):
img = Image.open(image_path)
filtered = img.filter(ImageFilter.BLUR)
save_path = f"processed_{os.path.basename(image_path)}"
filtered.save(save_path)
return save_path
if __name__ == '__main__':
image_paths = ["image1.jpg", "image2.jpg", "image3.jpg", "image4.jpg"]
with Pool(processes=4) as pool:
results = pool.map(process_image, image_paths)
print(f"Processed images: {results}")
多进程数据预处理
深度学习中的多进程数据预处理,在深度学习中,数据预处理是一个耗时的过程,可以利用多进程来加速。
import torch
import torch.multiprocessing as mp
from PIL import Image
import torchvision.transforms as transforms
def process_image(rank, image_path, output_tensor):
image = Image.open(image_path)
transform = transforms.Compose([transforms.Resize((128, 128)), transforms.ToTensor()])
output_tensor[rank] = transform(image)
print(f"Process {rank} processed image")
if __name__ == "__main__":
image_paths = ["image1.jpg", "image2.jpg", "image3.jpg", "image4.jpg"]
output_tensor = torch.zeros((4, 3, 128, 128)).share_memory_()
processes = []
for rank, image_path in enumerate(image_paths):
p = mp.Process(target=process_image, args=(rank, image_path, output_tensor))
p.start()
processes.append(p)
for p in processes:
p.join()
print("Processed images stored in tensor:", output_tensor)
并行特征提取
在机器学习中,特征提取是一个常见的任务,可以使用多进程来加速。
from multiprocessing import Pool
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
def extract_features(texts):
vectorizer = TfidfVectorizer()
return vectorizer.fit_transform(texts).toarray()
def parallel_feature_extraction(texts, num_processes=4):
chunk_size = len(texts) // num_processes
chunks = [texts[i*chunk_size : (i+1)*chunk_size] for i in range(num_processes)]
with Pool(processes=num_processes) as pool:
results = pool.map(extract_features, chunks)
return np.concatenate(results, axis=0)
if __name__ == '__main__':
texts = ["text1", "text2", "text3", "text4", "text5", "text6", "text7", "text8"]
num_processes = 4
features = parallel_feature_extraction(texts, num_processes)
print(f"Features shape: {features.shape}")
主流框架中的应用
Celery(分布式任务队列)
- Worker进程池:通过prefork模式创建子进程池,每个进程独立执行任务。
- 任务分发机制:父进程通过消息队列(如RabbitMQ)接收任务,通过os.fork()动态分配子进程执行。
- 共享内存优化:使用mmap实现进程间共享任务状态数据,减少序列化开销。
# celery/worker/consumer.py
class Consumer:
def start(self):
# 创建预分配进程池 :ml-citation{ref="2,5" data="citationList"}
self.pool = self._create_process_pool()
def _create_process_pool(self):
return self.controller.spawn_worker_processes(
max_concurrency=self.max_workers, # 最大进程数
on_start=self.on_process_start # 进程启动回调
)
# celery/worker/process.py
class WorkerProcess:
def _boot(self):
# 通过os.fork创建子进程 :ml-citation{ref="2,7" data="citationList"}
pid = os.fork()
if pid == 0: # 子进程
self.run_worker()
def run_worker(self):
# 独立执行任务的子进程逻辑 :ml-citation{ref="5" data="citationList"}
while not self.should_stop:
self.process_task()
Scrapy(爬虫框架)
- 多核利用率优化:通过Twisted事件循环+多进程组合,实现I/O与计算并行。
- 进程间通信:使用multiprocessing.Queue传递抓取结果,父进程统一持久化存储。
- 资源隔离机制:每个爬虫子进程独立维护DNS解析缓存与请求会话。
# scrapy/core/downloader/__init__.py
class Downloader:
def __init__(self, slot_size=8):
# 基于CPU核数设置下载槽位 :ml-citation{ref="3,6" data="citationList"}
self.slots = SlotManager(slot_size * 2)
def _enqueue_request(self, request):
# 多进程分发下载任务 :ml-citation{ref="6" data="citationList"}
slot = self.slots.get_slot(request)
slot.add_request(request)
PyTorch
- DataLoader优化:使用fork启动方式+共享内存,避免数据复制到子进程。
- CUDA多进程支持:通过torch.multiprocessing实现GPU显存隔离,防止多进程冲突。
- 梯度聚合机制:多进程模型参数通过multiprocessing.Manager实现分布式更新。
# torch/utils/data/_utils/worker.py
def _worker_loop(dataset, index_queue, data_queue):
# 子进程初始化共享内存 :ml-citation{ref="5,7" data="citationList"}
dataset = torch.utils.data.get_dataset(dataset)
while True:
idx = index_queue.get()
data = dataset[idx]
data_queue.put(data)
Gunicorn(WSGI服务器)
- Master-Worker模型:主进程监控负载,动态fork()子进程处理请求。
- 零拷贝优化:通过sendfile()系统调用在父子进程间直接传输文件描述符。
- 热重启机制:使用SIGUSR2信号通知子进程优雅退出,保留TCP连接状态。
# gunicorn/arbiter.py
class Arbiter:
def spawn_workers(self):
# 基于os.fork生成Worker进程 :ml-citation{ref="7,8" data="citationList"}
for _ in range(self.num_workers - len(self.workers)):
pid = os.fork()
if pid == 0: # Worker进程
self.worker_class(self).run()
def kill_workers(self, sig):
# 通过信号控制进程重启 :ml-citation{ref="8" data="citationList"}
for pid in self.worker_pids:
os.kill(pid, sig)
总结
- GIL限制:多进程绕过了GIL的限制,适合CPU密集型任务。
- 内存开销:每个进程有独立的内存空间,内存开销较大。
- 进程间通信:需要显式地使用Queue或Pipe来通信。
多进程与多线程的对比
- 多线程:适合I/O密集型任务(如文件操作、网络请求)。
- 多进程:适合CPU密集型任务(如计算密集型任务)。
← Previous postPython中的池化技术
Next post →Python中的多线程编程