池化技术
池化(Pooling)是一种重要的编程技术,用于提高资源利用率和程序性能。
在Python中,池化主要有两种形式:线程池(ThreadPool)和进程池(ProcessPool)。
线程池
特性
- 降低资源创建销毁的开销
- 控制并发数量,防止系统过载
- 简化并发编程模型
- 提高资源利用率
设计要点
- 池管理器(Pool Manager)
- 初始化参数:线程/进程数量、任务队列大小、超时设置等
- 生命周期管理:启动、关闭(优雅关闭/强制关闭)、重启机制
- 资源分配策略:固定大小、动态扩容/缩容
- 任务队列(Task Queue)
- 队列实现:同步队列(线程安全)、优先级队列
- 容量控制:有界队列/无界队列及满队列处理策略
- 任务表示:支持Callable对象、函数+参数、Future对象
- 工作线程/进程(Workers)
- 创建方式:预创建(pre-fork) vs 按需创建
- 工作循环:任务获取、执行、结果处理、异常处理
- 状态管理:空闲、忙碌、终止等状态跟踪
功能设计
- 任务调度
- 提交接口:submit(callable)、map(func, iterable)等
- 调度策略:FIFO、优先级、轮询等
- 超时控制:任务执行超时、队列等待超时
- 结果处理
- 返回机制:Future对象、回调函数、结果队列
- 异常处理:自定义异常处理器、异常传播
- 超时获取:支持带超时的结果获取
- 资源管理
- 动态调整:根据负载动态增减工作线程/进程
- 资源回收:空闲worker回收、内存泄漏防护
- 上下文管理:支持with语句自动清理
高级特性设计
- 容错机制
- Worker崩溃处理:自动重启策略
- 任务重试:可配置的重试逻辑
- 死锁检测:超时自动中断
- 监控统计
- 运行时指标:活跃任务数、队列长度、平均耗时
- 性能分析:任务执行时间分布、资源使用率
- 事件钩子:任务开始/结束/失败时的回调
- 特殊需求支持
- 上下文传递:线程局部变量、进程间共享状态
- 依赖管理:任务间依赖关系处理
- 批量处理:任务分片、合并执行
import threading
import queue
import time
from typing import Callable, Any, Optional, Dict
from concurrent.futures import Future
class AdvancedThreadPool:
def __init__(self, max_workers: int = None, task_timeout: float = None):
"""
高级线程池实现
Args:
max_workers: 最大工作线程数,默认CPU核心数*5
task_timeout: 任务超时时间(秒),None表示不超时
"""
self.max_workers = max_workers or (threading.cpu_count() * 5)
self.task_timeout = task_timeout
self.task_queue = queue.Queue()
self.workers = set()
self.worker_lock = threading.Lock()
self.shutdown_event = threading.Event()
self.result_callbacks = []
self.exception_callbacks = []
# 启动工作线程
for _ in range(self.max_workers):
self._add_worker()
def _add_worker(self):
"""添加新的工作线程"""
worker = threading.Thread(
target=self._worker_loop,
daemon=True
)
with self.worker_lock:
self.workers.add(worker)
worker.start()
def _worker_loop(self):
"""工作线程的主循环"""
while not self.shutdown_event.is_set():
try:
# 获取任务,带超时检查
task = self.task_queue.get(timeout=0.1)
future, func, args, kwargs = task
try:
# 执行任务
if self.task_timeout is not None:
# 带超时的执行
result = self._execute_with_timeout(func, *args, **kwargs)
else:
result = func(*args, **kwargs)
future.set_result(result)
self._run_callbacks(self.result_callbacks, future)
except Exception as e:
future.set_exception(e)
self._run_callbacks(self.exception_callbacks, future)
finally:
self.task_queue.task_done()
except queue.Empty:
continue
def _execute_with_timeout(self, func: Callable, *args, **kwargs) -> Any:
"""带超时执行任务"""
result = None
exception = None
def worker():
nonlocal result, exception
try:
result = func(*args, **kwargs)
except Exception as e:
exception = e
t = threading.Thread(target=worker)
t.start()
t.join(timeout=self.task_timeout)
if t.is_alive():
t.join(0.1) # 再给一点清理时间
raise TimeoutError(f"Task timed out after {self.task_timeout} seconds")
elif exception is not None:
raise exception
else:
return result
def submit(self, func: Callable, *args, **kwargs) -> Future:
"""提交任务到线程池"""
if self.shutdown_event.is_set():
raise RuntimeError("ThreadPool is shutting down")
future = Future()
self.task_queue.put((future, func, args, kwargs))
return future
def map(self, func: Callable, iterable, timeout: float = None):
"""批量提交任务"""
futures = [self.submit(func, arg) for arg in iterable]
for future in futures:
yield future.result(timeout=timeout)
def add_result_callback(self, callback: Callable[[Future], None]):
"""添加结果回调"""
self.result_callbacks.append(callback)
def add_exception_callback(self, callback: Callable[[Future], None]):
"""添加异常回调"""
self.exception_callbacks.append(callback)
def _run_callbacks(self, callbacks: list, future: Future):
"""执行回调函数"""
for callback in callbacks:
try:
callback(future)
except Exception:
pass
def shutdown(self, wait: bool = True, cancel_futures: bool = False):
"""关闭线程池"""
self.shutdown_event.set()
if cancel_futures:
while not self.task_queue.empty():
try:
future = self.task_queue.get_nowait()[0]
future.cancel()
except queue.Empty:
break
if wait:
self.task_queue.join()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
最佳实践
- 合理设置池大小:通常设置为CPU核心数的2-5倍
- 资源清理:确保池关闭时释放所有资源
- 错误处理:妥善处理任务中的异常
- 避免共享状态:特别是线程池中避免共享可变状态
- IO密集型 vs CPU密集型:
- IO密集型:适合线程池
- CPU密集型:适合进程池
进程池
任务提交、结果收集、动态扩缩容、异常处理。
设计要点
动态进程管理:
- 根据负载自动调整进程数量
- 空闲进程回收机制
- 最小/最大进程数限制
任务调度:
- 优先级队列支持
- 超时控制机制
- 批量任务提交(map接口)
异常处理:
- 任务异常捕获和传播
- 进程崩溃处理
- 超时任务终止
资源管理:
- 优雅关闭机制
- 上下文管理支持(with语句)
- 进程状态监控
结果处理:
- Future模式支持
- 异步结果收集
- 结果回调机制
import multiprocessing
import queue
import time
import signal
from typing import Callable, Any, Optional, Union, List, Dict, Tuple
from concurrent.futures import Future
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("AdvancedProcessPool")
class ProcessPool:
"""高级进程池实现
特性:
- 动态调整进程数量
- 任务超时控制
- 优雅关闭
- 任务优先级支持
- 完善的异常处理
- 结果回调机制
"""
def __init__(self,
max_workers: Optional[int] = None,
min_workers: int = 1,
task_timeout: Optional[float] = None,
idle_timeout: float = 60.0):
"""
初始化进程池
Args:
max_workers: 最大工作进程数,默认CPU核心数
min_workers: 最小工作进程数,默认1
task_timeout: 任务超时时间(秒),None表示不限制
idle_timeout: 空闲进程回收时间(秒)
"""
# 参数校验
if max_workers is None:
max_workers = multiprocessing.cpu_count()
if min_workers < 1:
raise ValueError("min_workers must be at least 1")
if max_workers < min_workers:
raise ValueError("max_workers must be >= min_workers")
self.max_workers = max_workers
self.min_workers = min_workers
self.task_timeout = task_timeout
self.idle_timeout = idle_timeout
# 进程管理
self.workers: Dict[int, multiprocessing.Process] = {}
self.worker_status: Dict[int, Dict[str, Any]] = {} # 进程状态信息
# 任务队列 (使用优先级队列)
self.task_queue = multiprocessing.Queue()
# 结果队列
self.result_queue = multiprocessing.Queue()
# 控制信号
self.shutdown_event = multiprocessing.Event()
self.management_lock = multiprocessing.Lock()
# 初始化工作进程
self._init_workers(self.min_workers)
# 启动管理线程
self.manager_thread = threading.Thread(
target=self._manage_pool,
daemon=True
)
self.manager_thread.start()
def _init_workers(self, num_workers: int) -> None:
"""初始化工作进程"""
with self.management_lock:
for _ in range(num_workers):
if len(self.workers) >= self.max_workers:
break
worker_id = len(self.workers) + 1
process = multiprocessing.Process(
target=self._worker_loop,
args=(worker_id,),
daemon=True
)
process.start()
self.workers[worker_id] = process
self.worker_status[worker_id] = {
'last_active': time.time(),
'busy': False
}
logger.info(f"Started worker process {worker_id}")
def _worker_loop(self, worker_id: int) -> None:
"""工作进程的主循环"""
# 忽略中断信号(由主进程处理)
signal.signal(signal.SIGINT, signal.SIG_IGN)
while not self.shutdown_event.is_set():
try:
# 获取任务
try:
priority, task_id, func, args, kwargs = self.task_queue.get_nowait()
# 更新状态为忙碌
with self.management_lock:
self.worker_status[worker_id]['busy'] = True
self.worker_status[worker_id]['last_active'] = time.time()
# 执行任务
result = None
exception = None
start_time = time.time()
try:
if self.task_timeout is not None:
# 带超时的执行
result = self._execute_with_timeout(func, *args, **kwargs)
else:
result = func(*args, **kwargs)
except Exception as e:
exception = e
# 发送结果
self.result_queue.put((
task_id,
result,
exception,
time.time() - start_time,
worker_id
))
except queue.Empty:
# 无任务时短暂休眠
time.sleep(0.1)
continue
except (KeyboardInterrupt, SystemExit):
break
except Exception as e:
logger.error(f"Worker {worker_id} encountered error: {str(e)}")
self.result_queue.put((
None, # task_id
None, # result
e, # exception
0, # duration
worker_id
))
finally:
# 更新状态为空闲
with self.management_lock:
self.worker_status[worker_id]['busy'] = False
def _execute_with_timeout(self, func: Callable, *args, **kwargs) -> Any:
"""带超时执行任务"""
result = None
exception = None
def worker():
nonlocal result, exception
try:
result = func(*args, **kwargs)
except Exception as e:
exception = e
p = multiprocessing.Process(target=worker)
p.start()
p.join(timeout=self.task_timeout)
if p.is_alive():
p.terminate()
p.join()
raise TimeoutError(f"Task timed out after {self.task_timeout} seconds")
elif exception is not None:
raise exception
else:
return result
def _manage_pool(self) -> None:
"""管理线程,负责动态调整进程池大小和结果处理"""
last_adjustment_time = time.time()
while not self.shutdown_event.is_set():
# 检查是否需要调整进程数量
current_time = time.time()
if current_time - last_adjustment_time > 5.0: # 每5秒检查一次
self._adjust_pool_size()
last_adjustment_time = current_time
# 处理结果
try:
task_id, result, exception, duration, worker_id = self.result_queue.get_nowait()
if task_id is not None:
# 正常任务结果
future = self._get_future(task_id)
if future is not None:
if exception is not None:
future.set_exception(exception)
else:
future.set_result(result)
# 更新worker状态
with self.management_lock:
if worker_id in self.worker_status:
self.worker_status[worker_id]['last_active'] = time.time()
except queue.Empty:
time.sleep(0.1)
continue
def _adjust_pool_size(self) -> None:
"""根据负载动态调整进程池大小"""
with self.management_lock:
# 计算当前负载
busy_workers = sum(1 for w in self.worker_status.values() if w['busy'])
total_workers = len(self.workers)
queue_size = self.task_queue.qsize()
# 计算需要的worker数量
target_workers = min(
max(
self.min_workers,
busy_workers + (queue_size // 2) # 简单启发式
),
self.max_workers
)
# 回收空闲worker
if total_workers > target_workers:
idle_workers = [
(wid, status['last_active'])
for wid, status in self.worker_status.items()
if not status['busy']
and (time.time() - status['last_active']) > self.idle_timeout
]
# 按空闲时间排序,优先回收最空闲的
idle_workers.sort(key=lambda x: x[1])
for wid, _ in idle_workers[:total_workers - target_workers]:
if wid in self.workers:
self.workers[wid].terminate()
del self.workers[wid]
del self.worker_status[wid]
logger.info(f"Terminated idle worker {wid}")
# 增加worker
elif total_workers < target_workers:
self._init_workers(target_workers - total_workers)
def submit(self,
func: Callable,
*args,
priority: int = 0,
**kwargs) -> Future:
"""提交任务到进程池
Args:
func: 要执行的可调用对象
*args: 位置参数
priority: 任务优先级(数字越小优先级越高)
**kwargs: 关键字参数
Returns:
Future对象,可用于获取结果
"""
if self.shutdown_event.is_set():
raise RuntimeError("ProcessPool is shutting down")
future = Future()
task_id = id(future)
self._register_future(task_id, future)
self.task_queue.put((
priority,
task_id,
func,
args,
kwargs
))
# 检查是否需要增加worker
with self.management_lock:
idle_workers = sum(1 for w in self.worker_status.values() if not w['busy'])
if idle_workers == 0 and len(self.workers) < self.max_workers:
self._init_workers(1)
return future
def map(self, func: Callable, iterable, timeout: Optional[float] = None) -> List:
"""并行执行map操作
Args:
func: 映射函数
iterable: 可迭代对象
timeout: 超时时间(秒)
Returns:
结果列表,顺序与输入iterable一致
"""
futures = [self.submit(func, arg) for arg in iterable]
results = []
for future in futures:
try:
results.append(future.result(timeout=timeout))
except Exception as e:
results.append(e)
return results
def shutdown(self, wait: bool = True, cancel_futures: bool = False) -> None:
"""关闭进程池
Args:
wait: 是否等待所有任务完成
cancel_futures: 是否取消未开始的任务
"""
self.shutdown_event.set()
if cancel_futures:
# 清空任务队列
while not self.task_queue.empty():
try:
self.task_queue.get_nowait()
except queue.Empty:
break
if wait:
# 等待所有worker完成当前任务
while any(status['busy'] for status in self.worker_status.values()):
time.sleep(0.1)
# 终止所有worker进程
with self.management_lock:
for worker_id, process in self.workers.items():
if process.is_alive():
process.terminate()
process.join()
logger.info(f"Terminated worker {worker_id}")
self.workers.clear()
self.worker_status.clear()
# 等待管理线程结束
if self.manager_thread.is_alive():
self.manager_thread.join()
# Future管理相关方法
def _register_future(self, task_id: int, future: Future) -> None:
"""注册Future对象"""
with self.management_lock:
if not hasattr(self, '_futures'):
self._futures = {}
self._futures[task_id] = future
def _get_future(self, task_id: int) -> Optional[Future]:
"""获取Future对象"""
with self.management_lock:
if hasattr(self, '_futures'):
return self._futures.get(task_id)
return None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
# 使用示例
if __name__ == "__main__":
import random
def sample_task(task_id: int, duration: float) -> str:
"""示例任务函数"""
print(f"Task {task_id} started (pid: {os.getpid()})")
time.sleep(duration)
if random.random() < 0.2: # 20%概率失败
raise ValueError(f"Task {task_id} failed")
return f"Task {task_id} completed in {duration:.2f}s"
with ProcessPool(max_workers=4, task_timeout=5.0) as pool:
# 提交多个任务
futures = [pool.submit(sample_task, i, random.uniform(0.5, 3.0))
for i in range(10)]
# 添加高优先级任务
urgent_future = pool.submit(sample_task, 99, 1.0, priority=-1)
# 获取结果
for future in futures:
try:
result = future.result()
print(f"Result: {result}")
except Exception as e:
print(f"Error: {str(e)}")
try:
print(f"Urgent task result: {urgent_future.result()}")
except Exception as e:
print(f"Urgent task failed: {str(e)}")
# 使用map
results = pool.map(sample_task, range(5), [0.5]*5)
print(f"Map results: {results}")
框架中的实现
SQLAlchemy的连接池
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# 创建带连接池的数据库引擎
engine = create_engine(
'postgresql://user:password@localhost/dbname',
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_timeout=30
)
# 使用连接
with engine.connect() as conn:
result = conn.execute("SELECT * FROM users")
print(result.fetchall())
网络请求池
使用requests的Session对象实现连接池
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
# 设置连接池大小和重试策略
adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=100,
max_retries=Retry(total=3, backoff_factor=1)
)
session.mount('http://', adapter)
session.mount('https://', adapter)
# 使用连接池发送请求
urls = ['https://example.com'] * 10
for url in urls:
response = session.get(url)
print(response.status_code)
Celery工作池
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def process_data(data):
# 数据处理逻辑
return data.upper()
# 启动worker时指定并发数(池大小)
# celery -A tasks worker --loglevel=info --concurrency=4
← Previous postPython中的上下文管理
Next post →Python中的网络编程