Vanson's Eternal Blog

Python中的池化技术

Python pool.png
Published on
/15 mins read/---

池化技术

池化(Pooling)是一种重要的编程技术,用于提高资源利用率和程序性能。

在Python中,池化主要有两种形式:线程池(ThreadPool)和进程池(ProcessPool)。

线程池

特性

  • 降低资源创建销毁的开销
  • 控制并发数量,防止系统过载
  • 简化并发编程模型
  • 提高资源利用率

设计要点

  1. 池管理器(Pool Manager)
  • 初始化参数:线程/进程数量、任务队列大小、超时设置等
  • 生命周期管理:启动、关闭(优雅关闭/强制关闭)、重启机制
  • 资源分配策略:固定大小、动态扩容/缩容
  1. 任务队列(Task Queue)
  • 队列实现:同步队列(线程安全)、优先级队列
  • 容量控制:有界队列/无界队列及满队列处理策略
  • 任务表示:支持Callable对象、函数+参数、Future对象
  1. 工作线程/进程(Workers)
  • 创建方式:预创建(pre-fork) vs 按需创建
  • 工作循环:任务获取、执行、结果处理、异常处理
  • 状态管理:空闲、忙碌、终止等状态跟踪

功能设计

  1. 任务调度
  • 提交接口:submit(callable)、map(func, iterable)等
  • 调度策略:FIFO、优先级、轮询等
  • 超时控制:任务执行超时、队列等待超时
  1. 结果处理
  • 返回机制:Future对象、回调函数、结果队列
  • 异常处理:自定义异常处理器、异常传播
  • 超时获取:支持带超时的结果获取
  1. 资源管理
  • 动态调整:根据负载动态增减工作线程/进程
  • 资源回收:空闲worker回收、内存泄漏防护
  • 上下文管理:支持with语句自动清理

高级特性设计

  1. 容错机制
  • Worker崩溃处理:自动重启策略
  • 任务重试:可配置的重试逻辑
  • 死锁检测:超时自动中断
  1. 监控统计
  • 运行时指标:活跃任务数、队列长度、平均耗时
  • 性能分析:任务执行时间分布、资源使用率
  • 事件钩子:任务开始/结束/失败时的回调
  1. 特殊需求支持
  • 上下文传递:线程局部变量、进程间共享状态
  • 依赖管理:任务间依赖关系处理
  • 批量处理:任务分片、合并执行
import threading
import queue
import time
from typing import Callable, Any, Optional, Dict
from concurrent.futures import Future
 
class AdvancedThreadPool:
    def __init__(self, max_workers: int = None, task_timeout: float = None):
        """
        高级线程池实现
        
        Args:
            max_workers: 最大工作线程数,默认CPU核心数*5
            task_timeout: 任务超时时间(秒),None表示不超时
        """
        self.max_workers = max_workers or (threading.cpu_count() * 5)
        self.task_timeout = task_timeout
        self.task_queue = queue.Queue()
        self.workers = set()
        self.worker_lock = threading.Lock()
        self.shutdown_event = threading.Event()
        self.result_callbacks = []
        self.exception_callbacks = []
        
        # 启动工作线程
        for _ in range(self.max_workers):
            self._add_worker()
 
    def _add_worker(self):
        """添加新的工作线程"""
        worker = threading.Thread(
            target=self._worker_loop,
            daemon=True
        )
        with self.worker_lock:
            self.workers.add(worker)
        worker.start()
 
    def _worker_loop(self):
        """工作线程的主循环"""
        while not self.shutdown_event.is_set():
            try:
                # 获取任务,带超时检查
                task = self.task_queue.get(timeout=0.1)
                future, func, args, kwargs = task
                
                try:
                    # 执行任务
                    if self.task_timeout is not None:
                        # 带超时的执行
                        result = self._execute_with_timeout(func, *args, **kwargs)
                    else:
                        result = func(*args, **kwargs)
                    
                    future.set_result(result)
                    self._run_callbacks(self.result_callbacks, future)
                    
                except Exception as e:
                    future.set_exception(e)
                    self._run_callbacks(self.exception_callbacks, future)
                    
                finally:
                    self.task_queue.task_done()
                    
            except queue.Empty:
                continue
 
    def _execute_with_timeout(self, func: Callable, *args, **kwargs) -> Any:
        """带超时执行任务"""
        result = None
        exception = None
        
        def worker():
            nonlocal result, exception
            try:
                result = func(*args, **kwargs)
            except Exception as e:
                exception = e
        
        t = threading.Thread(target=worker)
        t.start()
        t.join(timeout=self.task_timeout)
        
        if t.is_alive():
            t.join(0.1)  # 再给一点清理时间
            raise TimeoutError(f"Task timed out after {self.task_timeout} seconds")
        elif exception is not None:
            raise exception
        else:
            return result
 
    def submit(self, func: Callable, *args, **kwargs) -> Future:
        """提交任务到线程池"""
        if self.shutdown_event.is_set():
            raise RuntimeError("ThreadPool is shutting down")
            
        future = Future()
        self.task_queue.put((future, func, args, kwargs))
        return future
 
    def map(self, func: Callable, iterable, timeout: float = None):
        """批量提交任务"""
        futures = [self.submit(func, arg) for arg in iterable]
        for future in futures:
            yield future.result(timeout=timeout)
 
    def add_result_callback(self, callback: Callable[[Future], None]):
        """添加结果回调"""
        self.result_callbacks.append(callback)
 
    def add_exception_callback(self, callback: Callable[[Future], None]):
        """添加异常回调"""
        self.exception_callbacks.append(callback)
 
    def _run_callbacks(self, callbacks: list, future: Future):
        """执行回调函数"""
        for callback in callbacks:
            try:
                callback(future)
            except Exception:
                pass
 
    def shutdown(self, wait: bool = True, cancel_futures: bool = False):
        """关闭线程池"""
        self.shutdown_event.set()
        if cancel_futures:
            while not self.task_queue.empty():
                try:
                    future = self.task_queue.get_nowait()[0]
                    future.cancel()
                except queue.Empty:
                    break
        if wait:
            self.task_queue.join()
 
    def __enter__(self):
        return self
 
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False

最佳实践

  • 合理设置池大小:通常设置为CPU核心数的2-5倍
  • 资源清理:确保池关闭时释放所有资源
  • 错误处理:妥善处理任务中的异常
  • 避免共享状态:特别是线程池中避免共享可变状态
  • IO密集型 vs CPU密集型:
    • IO密集型:适合线程池
    • CPU密集型:适合进程池

进程池

任务提交、结果收集、动态扩缩容、异常处理。

设计要点

  • 动态进程管理:

    • 根据负载自动调整进程数量
    • 空闲进程回收机制
    • 最小/最大进程数限制
  • 任务调度:

    • 优先级队列支持
    • 超时控制机制
    • 批量任务提交(map接口)
  • 异常处理:

    • 任务异常捕获和传播
    • 进程崩溃处理
    • 超时任务终止
  • 资源管理:

    • 优雅关闭机制
    • 上下文管理支持(with语句)
    • 进程状态监控
  • 结果处理:

    • Future模式支持
    • 异步结果收集
    • 结果回调机制
 
import multiprocessing
import queue
import time
import signal
from typing import Callable, Any, Optional, Union, List, Dict, Tuple
from concurrent.futures import Future
import logging
 
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("AdvancedProcessPool")
 
class ProcessPool:
    """高级进程池实现
    
    特性:
    - 动态调整进程数量
    - 任务超时控制
    - 优雅关闭
    - 任务优先级支持
    - 完善的异常处理
    - 结果回调机制
    """
    
    def __init__(self, 
                 max_workers: Optional[int] = None,
                 min_workers: int = 1,
                 task_timeout: Optional[float] = None,
                 idle_timeout: float = 60.0):
        """
        初始化进程池
        
        Args:
            max_workers: 最大工作进程数,默认CPU核心数
            min_workers: 最小工作进程数,默认1
            task_timeout: 任务超时时间(秒),None表示不限制
            idle_timeout: 空闲进程回收时间(秒)
        """
        # 参数校验
        if max_workers is None:
            max_workers = multiprocessing.cpu_count()
        if min_workers < 1:
            raise ValueError("min_workers must be at least 1")
        if max_workers < min_workers:
            raise ValueError("max_workers must be >= min_workers")
        
        self.max_workers = max_workers
        self.min_workers = min_workers
        self.task_timeout = task_timeout
        self.idle_timeout = idle_timeout
        
        # 进程管理
        self.workers: Dict[int, multiprocessing.Process] = {}
        self.worker_status: Dict[int, Dict[str, Any]] = {}  # 进程状态信息
        
        # 任务队列 (使用优先级队列)
        self.task_queue = multiprocessing.Queue()
        
        # 结果队列
        self.result_queue = multiprocessing.Queue()
        
        # 控制信号
        self.shutdown_event = multiprocessing.Event()
        self.management_lock = multiprocessing.Lock()
        
        # 初始化工作进程
        self._init_workers(self.min_workers)
        
        # 启动管理线程
        self.manager_thread = threading.Thread(
            target=self._manage_pool,
            daemon=True
        )
        self.manager_thread.start()
    
    def _init_workers(self, num_workers: int) -> None:
        """初始化工作进程"""
        with self.management_lock:
            for _ in range(num_workers):
                if len(self.workers) >= self.max_workers:
                    break
                
                worker_id = len(self.workers) + 1
                process = multiprocessing.Process(
                    target=self._worker_loop,
                    args=(worker_id,),
                    daemon=True
                )
                process.start()
                self.workers[worker_id] = process
                self.worker_status[worker_id] = {
                    'last_active': time.time(),
                    'busy': False
                }
                logger.info(f"Started worker process {worker_id}")
    
    def _worker_loop(self, worker_id: int) -> None:
        """工作进程的主循环"""
        # 忽略中断信号(由主进程处理)
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        
        while not self.shutdown_event.is_set():
            try:
                # 获取任务
                try:
                    priority, task_id, func, args, kwargs = self.task_queue.get_nowait()
                    
                    # 更新状态为忙碌
                    with self.management_lock:
                        self.worker_status[worker_id]['busy'] = True
                        self.worker_status[worker_id]['last_active'] = time.time()
                    
                    # 执行任务
                    result = None
                    exception = None
                    start_time = time.time()
                    
                    try:
                        if self.task_timeout is not None:
                            # 带超时的执行
                            result = self._execute_with_timeout(func, *args, **kwargs)
                        else:
                            result = func(*args, **kwargs)
                    except Exception as e:
                        exception = e
                    
                    # 发送结果
                    self.result_queue.put((
                        task_id,
                        result,
                        exception,
                        time.time() - start_time,
                        worker_id
                    ))
                    
                except queue.Empty:
                    # 无任务时短暂休眠
                    time.sleep(0.1)
                    continue
                    
            except (KeyboardInterrupt, SystemExit):
                break
            except Exception as e:
                logger.error(f"Worker {worker_id} encountered error: {str(e)}")
                self.result_queue.put((
                    None,  # task_id
                    None,  # result
                    e,     # exception
                    0,     # duration
                    worker_id
                ))
            finally:
                # 更新状态为空闲
                with self.management_lock:
                    self.worker_status[worker_id]['busy'] = False
    
    def _execute_with_timeout(self, func: Callable, *args, **kwargs) -> Any:
        """带超时执行任务"""
        result = None
        exception = None
        
        def worker():
            nonlocal result, exception
            try:
                result = func(*args, **kwargs)
            except Exception as e:
                exception = e
        
        p = multiprocessing.Process(target=worker)
        p.start()
        p.join(timeout=self.task_timeout)
        
        if p.is_alive():
            p.terminate()
            p.join()
            raise TimeoutError(f"Task timed out after {self.task_timeout} seconds")
        elif exception is not None:
            raise exception
        else:
            return result
    
    def _manage_pool(self) -> None:
        """管理线程,负责动态调整进程池大小和结果处理"""
        last_adjustment_time = time.time()
        
        while not self.shutdown_event.is_set():
            # 检查是否需要调整进程数量
            current_time = time.time()
            if current_time - last_adjustment_time > 5.0:  # 每5秒检查一次
                self._adjust_pool_size()
                last_adjustment_time = current_time
            
            # 处理结果
            try:
                task_id, result, exception, duration, worker_id = self.result_queue.get_nowait()
                
                if task_id is not None:
                    # 正常任务结果
                    future = self._get_future(task_id)
                    if future is not None:
                        if exception is not None:
                            future.set_exception(exception)
                        else:
                            future.set_result(result)
                
                # 更新worker状态
                with self.management_lock:
                    if worker_id in self.worker_status:
                        self.worker_status[worker_id]['last_active'] = time.time()
                
            except queue.Empty:
                time.sleep(0.1)
                continue
    
    def _adjust_pool_size(self) -> None:
        """根据负载动态调整进程池大小"""
        with self.management_lock:
            # 计算当前负载
            busy_workers = sum(1 for w in self.worker_status.values() if w['busy'])
            total_workers = len(self.workers)
            queue_size = self.task_queue.qsize()
            
            # 计算需要的worker数量
            target_workers = min(
                max(
                    self.min_workers,
                    busy_workers + (queue_size // 2)  # 简单启发式
                ),
                self.max_workers
            )
            
            # 回收空闲worker
            if total_workers > target_workers:
                idle_workers = [
                    (wid, status['last_active'])
                    for wid, status in self.worker_status.items()
                    if not status['busy'] 
                    and (time.time() - status['last_active']) > self.idle_timeout
                ]
                
                # 按空闲时间排序,优先回收最空闲的
                idle_workers.sort(key=lambda x: x[1])
                
                for wid, _ in idle_workers[:total_workers - target_workers]:
                    if wid in self.workers:
                        self.workers[wid].terminate()
                        del self.workers[wid]
                        del self.worker_status[wid]
                        logger.info(f"Terminated idle worker {wid}")
            
            # 增加worker
            elif total_workers < target_workers:
                self._init_workers(target_workers - total_workers)
    
    def submit(self, 
               func: Callable, 
               *args, 
               priority: int = 0,
               **kwargs) -> Future:
        """提交任务到进程池
        
        Args:
            func: 要执行的可调用对象
            *args: 位置参数
            priority: 任务优先级(数字越小优先级越高)
            **kwargs: 关键字参数
            
        Returns:
            Future对象,可用于获取结果
        """
        if self.shutdown_event.is_set():
            raise RuntimeError("ProcessPool is shutting down")
        
        future = Future()
        task_id = id(future)
        self._register_future(task_id, future)
        
        self.task_queue.put((
            priority,
            task_id,
            func,
            args,
            kwargs
        ))
        
        # 检查是否需要增加worker
        with self.management_lock:
            idle_workers = sum(1 for w in self.worker_status.values() if not w['busy'])
            if idle_workers == 0 and len(self.workers) < self.max_workers:
                self._init_workers(1)
        
        return future
    
    def map(self, func: Callable, iterable, timeout: Optional[float] = None) -> List:
        """并行执行map操作
        
        Args:
            func: 映射函数
            iterable: 可迭代对象
            timeout: 超时时间(秒)
            
        Returns:
            结果列表,顺序与输入iterable一致
        """
        futures = [self.submit(func, arg) for arg in iterable]
        results = []
        
        for future in futures:
            try:
                results.append(future.result(timeout=timeout))
            except Exception as e:
                results.append(e)
        
        return results
    
    def shutdown(self, wait: bool = True, cancel_futures: bool = False) -> None:
        """关闭进程池
        
        Args:
            wait: 是否等待所有任务完成
            cancel_futures: 是否取消未开始的任务
        """
        self.shutdown_event.set()
        
        if cancel_futures:
            # 清空任务队列
            while not self.task_queue.empty():
                try:
                    self.task_queue.get_nowait()
                except queue.Empty:
                    break
        
        if wait:
            # 等待所有worker完成当前任务
            while any(status['busy'] for status in self.worker_status.values()):
                time.sleep(0.1)
        
        # 终止所有worker进程
        with self.management_lock:
            for worker_id, process in self.workers.items():
                if process.is_alive():
                    process.terminate()
                    process.join()
                    logger.info(f"Terminated worker {worker_id}")
            
            self.workers.clear()
            self.worker_status.clear()
        
        # 等待管理线程结束
        if self.manager_thread.is_alive():
            self.manager_thread.join()
    
    # Future管理相关方法
    def _register_future(self, task_id: int, future: Future) -> None:
        """注册Future对象"""
        with self.management_lock:
            if not hasattr(self, '_futures'):
                self._futures = {}
            self._futures[task_id] = future
    
    def _get_future(self, task_id: int) -> Optional[Future]:
        """获取Future对象"""
        with self.management_lock:
            if hasattr(self, '_futures'):
                return self._futures.get(task_id)
        return None
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False
 
 
# 使用示例
if __name__ == "__main__":
    import random
    
    def sample_task(task_id: int, duration: float) -> str:
        """示例任务函数"""
        print(f"Task {task_id} started (pid: {os.getpid()})")
        time.sleep(duration)
        if random.random() < 0.2:  # 20%概率失败
            raise ValueError(f"Task {task_id} failed")
        return f"Task {task_id} completed in {duration:.2f}s"
 
    with ProcessPool(max_workers=4, task_timeout=5.0) as pool:
        # 提交多个任务
        futures = [pool.submit(sample_task, i, random.uniform(0.5, 3.0)) 
                  for i in range(10)]
        
        # 添加高优先级任务
        urgent_future = pool.submit(sample_task, 99, 1.0, priority=-1)
        
        # 获取结果
        for future in futures:
            try:
                result = future.result()
                print(f"Result: {result}")
            except Exception as e:
                print(f"Error: {str(e)}")
        
        try:
            print(f"Urgent task result: {urgent_future.result()}")
        except Exception as e:
            print(f"Urgent task failed: {str(e)}")
        
        # 使用map
        results = pool.map(sample_task, range(5), [0.5]*5)
        print(f"Map results: {results}")
 

框架中的实现

SQLAlchemy的连接池

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
 
# 创建带连接池的数据库引擎
engine = create_engine(
    'postgresql://user:password@localhost/dbname',
    poolclass=QueuePool,
    pool_size=5,
    max_overflow=10,
    pool_timeout=30
)
 
# 使用连接
with engine.connect() as conn:
    result = conn.execute("SELECT * FROM users")
    print(result.fetchall())

网络请求池

使用requests的Session对象实现连接池

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
 
session = requests.Session()
 
# 设置连接池大小和重试策略
adapter = HTTPAdapter(
    pool_connections=10,
    pool_maxsize=100,
    max_retries=Retry(total=3, backoff_factor=1)
)
session.mount('http://', adapter)
session.mount('https://', adapter)
 
# 使用连接池发送请求
urls = ['https://example.com'] * 10
for url in urls:
    response = session.get(url)
    print(response.status_code)

Celery工作池

from celery import Celery
 
app = Celery('tasks', broker='pyamqp://guest@localhost//')
 
@app.task
def process_data(data):
    # 数据处理逻辑
    return data.upper()
 
# 启动worker时指定并发数(池大小)
# celery -A tasks worker --loglevel=info --concurrency=4