MQ
Basic
死信队列
每业务队列独享路由 key + 死信队列
步骤 | 说明 | 代码/参数 |
---|---|---|
① 业务队列 | 绑定业务交换机,并设置 DLX & routing-key | x-dead-letter-exchange + x-dead-letter-routing-key |
② 死信交换机 | 可复用同一个(Direct 即可) | exchange_declare('dlx.exchange', 'direct') |
③ 死信队列 | 每个业务队列独享一个,绑定 DLX 且路由 key 唯一 | queue_bind(queue='dlq.order.pay-ttl', exchange='dlx.exchange', routing_key='rk.order.pay-ttl') |
import pika
conn = pika.BlockingConnection()
ch = conn.channel()
# 1. 死信交换机(复用)
ch.exchange_declare('dlx.exchange', exchange_type='direct', durable=True)
# 2. 死信队列(每个业务独享)
dlq_name = 'dlq.order.pay-ttl'
ch.queue_declare(dlq_name, durable=True)
ch.queue_bind(dlq_name, 'dlx.exchange', routing_key='rk.order.pay-ttl')
# 3. 业务队列(带 DLX 配置)
biz_q = 'q.order.pay-ttl'
args = {
'x-message-ttl': 30 * 60 * 1000, # 30 分钟 TTL
'x-dead-letter-exchange': 'dlx.exchange',
'x-dead-letter-routing-key': 'rk.order.pay-ttl'
}
ch.queue_declare(biz_q, durable=True, arguments=args)
# 4. 死信消费者(监听死信队列)
def on_dlq(ch, method, props, body):
order_id = body.decode()
# 查询微信 → 关闭或补偿
ch.basic_ack(method.delivery_tag)
ch.basic_consume(dlq_name, on_dlq, auto_ack=False)
ch.start_consuming()
场景:
- 超时订单:TTL 30 min → DLQ → 查询微信 → 关闭或补偿。
- 重试失败:basicNack(requeue=False) 直接进入对应 DLQ,避免业务队列阻塞。
消息可靠性
消息丢失
产生原因
- 网络故障:消息可能在传输过程中因网络问题而丢失。
- RabbitMQ故障:如果RabbitMQ宕机,消息也可能丢失。
解决
开启事务机制
事务在RabbitMQ中可能会影响性能,因为它们需要在所有节点上同步状态。因此,RabbitMQ尽量避免使用事务。
private static void executeTransaction(Channel channel) throws IOException {
boolean transactionSuccess = false;
try {
// 开启事务
channel.txSelect();
// 执行一系列消息操作,例如:channel.basicPublish(exchange, routingKey, message);
// 提交事务
channel.txCommit();
transactionSuccess = true;
} catch (ShutdownSignalException | IOException e) {
// 回滚事务
if (!transactionSuccess) {
channel.txRollback();
}
throw e;
}
}
生产者确认机制
发布者确认机制允许发布者知道消息 是否已经被RabbitMQ成功接收:
public static void sendPersistentMessage(String host, String queueName, String message) {
try (Connection connection = new ConnectionFactory().setHost(host).newConnection();
Channel channel = connection.createChannel()) {
// 启用发布者确认
channel.confirmSelect();
// 将消息设置为持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.build();
// 添加确认监听器
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息已确认: " + deliveryTag);
// 消息正确到达Broker时的处理逻辑
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息未确认: " + deliveryTag);
// 因为内部错误导致消息丢失时的处理逻辑
}
});
channel.basicPublish("", queueName, properties, message.getBytes());
// 等待消息确认,或者超时
boolean allConfirmed = channel.waitForConfirms();
if (allConfirmed) {
//所有消息都已确认
} else {
//超时或其它
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}
消息持久化在RabbitMQ中,消息的持久化它确保消息不仅存储在内存中,而且也安全地保存在磁盘上。
这样,即使在RabbitMQ服务崩溃或重启的情况下,消息也不会丢失,可以从磁盘恢复。
Exchange持久化:
// 设置 durable = true;
channel.exchangeDeclare(exchangeName, "direct", durable);
消息持久化
// 设置 MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Queue持久化
//设置 boolean durable = true;
channel.queueDeclare(queueName, durable, exclusive, false, null);
消费者确认机制
默认情况下,以下3种原因导致消息丢失:
- 网络故障:消费端还没接收到消息之前,发生网络故障导致消息丢失;
- 未接收消息前服务宕机:消费端突然挂机未接收到消息,此时消息会丢失;
- 处理过程中服务宕机:消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。
这是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后,不管消费端是否接收到,是否处理完,就立即删除这条消息,导致消息丢失。
将自动ack机制改为手动ack机制
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//接收消息,业务处理
//设置手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
//发生异常时,可以选择重新发送消息或进行错误处理
// 例如,可以选择负确认(nack),让消息重回队列
// channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
//设置autoAck为false,表示关闭自动确认机制,改为手动确认
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
消息补偿机制
以上3种解决办法理论上可靠,但是系统的异常或者故障比较偶然,我们没法做到100%消息不丢失。
因此需要介入补偿机制或者人工干预。这是我们的最后一道防线。如何做消息补偿呢?
其实就是将消息入库,通过定时任务重新发送失败的消息。
- 生产端发送消息;
- 确认失败,将消息保存到数据库中,并设置初始状态0;
- 定时任务以一定频率扫描数据库中status=0 的消息(失败消息);
- 重发消息,可多次;重发成功,更新数据库:status=1;
- 超过固定次数重发仍然失败,人工干预。
超过最大失败次数后,对于无法被正常消费的消息可移入死信队列。
可人工干预手动排查也可自动重试,需要实现一个消费者来从死信队列中获取消息,并根据业务逻辑来决定是否以及如何重新发送消息。
这里涉及到消息去重、幂等性处理等。
消息重复
产生原因
- 网络问题:消费者处理完消息后,因网络问题导致确认信息未能成功发送回消息队列。
- 服务中断:消费者在确认消息之前服务崩溃,消息队列未收到确认信号。
- 确认机制:自动确认模式下,如果确认在消息处理完成前发生,消息可能会被重复消费
解决方案
幂等性设计
设计消费者的消息处理逻辑时,要保证即使消息被多次消费,也不会对系统状态产生不良影响。
幂等性可以通过以下方式实现:
- 数据库唯一约束:使用数据库的主键约束或唯一索引防止插入重复记录。
- 业务逻辑检查:在执行业务操作前,先检查是否已经处理过该消息。
消息去重策略
使用唯一标识符(如订单号、massageID)来识别消息,并在消费者中实现去重逻辑:
- 缓存检查:使用内存缓存(如Redis)存储已处理的消息ID。
- 持久化存储:将消息ID与处理状态保存在数据库中,以便跨服务重启后仍然有效。
手动确认与重试机制
通过手动确认消息,控制消息何时从队列中移除:
- 手动确认:在消息成功处理后,显式调用
channel.basicAck()
方法确认消息。 - 重试机制:如果消息处理失败,可以选择将消息重新入队(
channel.basicReject(requeue=true)
)或丢弃(channel.basicReject(requeue=false)
)。
消费者端去重逻辑
@RabbitListener(queues = "queueName", acknowledgeMode = "MANUAL")
public void receiveMessage(Message message, Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
// 检查消息是否已消费
if (messageAlreadyProcessed(messageId)) {
// 消息已消费,确认消息并返回
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return;
}
// 处理消息
try {
processMessage(message);
// 消息处理成功,持久化消息ID并确认消息
persistMessageId(messageId);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,可以选择重新入队或丢弃
boolean requeue = shouldRequeue(message);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), requeue);
}
}
生产者端发布确认
void sendWithConfirm(AmqpTemplate amqpTemplate, Message message) throws IOException {
ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
if (!ack) {
// 处理消息发送失败的逻辑
// ...
}
};
amqpTemplate.setConfirmCallback(confirmCallback);
amqpTemplate.convertAndSend("exchangeName", "routingKey", message);
}
消息消费顺序性
在分布式系统中,消息队列(如 RabbitMQ)本身不保证消息的顺序消费,但在某些业务场景(如 订单创建 → 扣减库存)必须保证顺序执行。
以下是可行的解决方案及实现方式:
状态机 + 版本号(乐观锁)
适用场景:
- 需要严格顺序执行,但允许短暂延迟(如订单创建后必须扣减库存)。
- 适用于 事件溯源(Event Sourcing) 或 CQRS 架构。
消息体增加顺序标识:
{
"msg_id": "order_123",
"parent_msg_id": "create_order_123", // 依赖的上一个消息ID
"version": 2, // 版本号(用于乐观锁)
"status": "pending", // 状态机(pending/processing/done)
"data": {"order_id": 123, "action": "deduct_stock"}
}
消费者逻辑:
def consume_message(msg):
# 1. 检查前置消息是否完成(如查询数据库或Redis)
if not check_parent_msg_completed(msg["parent_msg_id"]):
# 未完成则重新入队(或延迟重试)
requeue_message(msg)
return
# 2. 乐观锁检查版本号
if msg["version"] != get_current_version(msg["msg_id"]):
return # 已过期,丢弃
# 3. 执行业务逻辑(如扣减库存)
process_business(msg)
# 4. 更新状态为完成
update_msg_status(msg["msg_id"], "done")
消息堆积
快速诊断
诊断项 | 命令/方法 | 说明 |
---|---|---|
1. 查看堆积数量 | rabbitmqctl list_queues name messages messages_ready messages_unacknowledged | 瞬时快照 |
2. 查看消费速率 | rabbitmqctl list_consumers -p <vhost> | 观察 ACK 速率 |
3. 查看节点资源 | rabbitmqctl status | CPU、内存、磁盘 |
通用解决策略
策略 | 做法 | FastAPI 代码示例 |
---|---|---|
① 扩容消费者 | 水平增加 worker 进程 | celery -A proj worker -Q high -c 20 |
② 限流 & 预取 | 设置 prefetch_count 防止一次性拿太多 | celery.conf.worker_prefetch_multiplier = 1 |
③ 优化业务逻辑 | 减少单次处理耗时(缓存、批处理) | 见下方优化代码 |
④ 消息 TTL & DLQ | 加过期时间 + 死信队列兜底 | 队列声明参数 |
⑤ 临时快速清空 | 新建无逻辑消费者把消息落库 | 见下方应急代码 |
⑥ 拆分队列 | 按优先级/业务拆分,避免单队列瓶颈 | Queue('urgent') , Queue('normal') |
生产
消费者突然下线
- 压测还是有必要的
- 消费者建议加上prefetchcount,对消费端限流,防止出现把消费者压跨
- 一个Channel对应一个队列,万一Channel出现问题也只影响一个队列
- 日志建议把AMQPClient异常整个打印出来,方便定位问题
- 服务端的监控体系得建立起来
PRECONDITION_FAILED 406
消息已经被确认了,没法再不确认 deliveryTag 对应的消息了。
根因:一个消息只能被确认一次;自动确认 与 手动确认 互斥。
// consumer处理成功后,通知broker删除队列中的消息,如果设置multiple=true,表示支持批量确认机制以减少网络流量
channel.basicAck(deliveryTag, multiple);
// 拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列,该方法reject后,该消费者还是会消费到该条被reject的消息
channel.basicReject(deliveryTag, requeue);
// 不确认 deliveryTag 对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。
channel.basicNack(deliveryTag, multiple, requeue);
// 是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。
channel.basicRecover(false);
需要注意的是,如果要使用手动 ACK,一定要关闭主动 ACK 功能。
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
type: simple
simple:
default-requeue-rejected: false
acknowledge-mode: manual
如果不设置spring.rabbitmq.listener.direct.acknowledge-mode=manual
等相关手动 ACK 设置,就会发生 406 PRECONDITION_FAILED 异常,
即自动确认了一次消息后,又手动再次 ack 或拒绝消息。
这个错误信息:ERROR CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
通常表明在 RabbitMQ 的通信过程中出现了问题,导致通道(Channel)关闭。
可能原因
- 手动ACK与自动ACK冲突:在 RabbitMQ 中,消息的确认(ACK)可以是手动的也可以是自动的。如果在配置中设置了自动 ACK,但在消费者代码中又手动调用了basicAck方法,就可能导致这个错误。这是因为 RabbitMQ 认为消息已经被处理,但客户端又尝试再次确认,造成了冲突。
- 队列声明异常:如果 RabbitMQ 客户端尝试声明一个已经存在的队列或交换机,并且使用了不同的参数(如不同的交换机类型或队列属性),也可能导致这个错误。RabbitMQ不允许在不匹配的参数下重新声明队列或交换机。
- 网络问题:网络不稳定或中断可能导致 RabbitMQ 客户端与服务器之间的连接出现问题,从而触发通道关闭。
- RabbitMQ服务器配置问题:RabbitMQ 服务器的配置问题,如资源限制、权限设置不当等,也可能导致客户端无法正常通信。
- 消费者处理消息超时:如果消费者处理消息的时间过长,超过了 RabbitMQ 服务器设置的超时时间,服务器可能会关闭通道。
修复方法
- 检查ACK模式:确保在 RabbitMQ 的消费者配置中,ACK 模式是一致的。如果配置为手动 ACK,那么在消费者代码中必须显式调用basicAck方法。如果配置为自动 ACK,则不需要在代码中进行确认。
- 检查队列和交换机声明:确保在客户端代码中声明的队列和交换机与 RabbitMQ 服务器上现有的队列和交换机参数一致。如果需要,可以在客户端代码中添加逻辑来检查队列或交换机是否存在,或者使用ignoreDeclarationExceptions属性来忽略声明异常。
- 网络和服务器检查:检查网络连接是否稳定,以及 RabbitMQ 服务器是否正常运行。可以通过 RabbitMQ 的管理界面或命令行工具来检查服务器状态。
- 调整RabbitMQ服务器配置:如果问题是由于服务器配置不当引起的,需要根据实际情况调整配置。例如,可以增加资源限制,或者调整权限设置。
- 优化消费者处理逻辑:如果消费者处理消息的时间过长,可以尝试优化代码逻辑,减少处理时间。此外,可以考虑增加消费者数量来提高处理能力。
- 监控和日志:增加日志记录,以便在出现问题时能够快速定位原因。同时,可以使用RabbitMQ的监控插件来监控系统状态,及时发现并解决问题。
- 使用RabbitMQ的连接恢复机制:Spring Boot集成 RabbitMQ 时,可以使用CachingConnectionFactory的连接恢复机制。在
application.properties
或application.yml中设置spring.rabbitmq.connection-recovery-enabled=true
,这样在连接出现问题时,RabbitMQ 客户端会自动尝试恢复连接。
消息阻塞
维度 | 推荐做法 | 代码/配置示例 | 风险点 |
---|---|---|---|
ACK 模式 | 统一 手动 ACK(ack-mode=manual ) | channel.basicAck(tag) 在 finally 块 | 自动 ACK 下 QOS 失效 |
消息签收 | try-catch-finally 确保必签收 | python try: ... finally: ch.basicAck(tag) | 业务异常导致消息永远 UnACK |
vhost 隔离 | 每个业务项目独立 vhost | rabbitmqctl add_vhost biz_vhost | 资源/权限冲突 |
死信队列 | 设置 x-dead-letter-exchange | arguments={'x-dead-letter-exchange': 'dlx'} | 消息丢失或无法追踪 |
队列长度限制 | x-max-length 或 x-max-length-bytes | queue_declare(..., arguments={'x-max-length': 10000}) | 超出即丢弃或进入 DLX |
Exchange 选型 | Direct 最快(精确路由) | exchange_declare(type='direct') | Topic/Fanout 路由开销更大 |
监控告警 | 队列深度、UnACK、连接数 | Prometheus + Grafana Dashboard | 无监控等于盲飞 |
必须使用手动 Ack 模式
# Python pika 示例
channel.basic_consume(
queue='order_queue',
on_message_callback=callback,
auto_ack=False # 关闭自动ACK
)
def callback(ch, method, properties, body):
try:
process_order(body) # 业务处理
ch.basic_ack(delivery_tag=method.delivery_tag) # 成功确认
except Exception as e:
logger.error(f"处理失败: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) # 不重试,进入DLQ
生产环境隔离策略
VHost 隔离
# 创建业务专用VHost
rabbitmqctl add_vhost order_vhost
rabbitmqctl set_permissions -p order_vhost app_user ".*" ".*" ".*"
连接参数规范
params = pika.ConnectionParameters(
host='rabbitmq.prod',
virtual_host='order_vhost', # 指定VHost
credentials=pika.PlainCredentials('app_user', 'secure_password'),
heartbeat=600, # 心跳检测连接存活
blocked_connection_timeout=30 # 网络阻塞超时
)
死信队列(DLX)标准化配置
声明死信交换机和队列
# 主队列声明时绑定DLX
channel.queue_declare(
queue='order_queue',
arguments={
'x-dead-letter-exchange': 'dlx.order',
'x-dead-letter-routing-key': 'failed.orders'
}
)
# 死信队列声明
channel.exchange_declare('dlx.order', exchange_type='direct')
channel.queue_declare(queue='dead_letter.order')
channel.queue_bind('dead_letter.order', 'dlx.order', routing_key='failed.orders')
消费死信消息
def process_dlx(ch, method, properties, body):
logger.error(f"死信消息: {body}, 原因: {properties.headers.get('x-death')}")
alert_admin(body) # 通知管理员
ch.basic_ack(delivery_tag=method.delivery_tag)
队列限制与性能优化
队列保护参数
channel.queue_declare(
queue='protected_queue',
arguments={
'x-max-length': 5000, # 消息数限制
'x-max-length-bytes': 10485760, # 10MB大小限制
'x-overflow': 'reject-publish' # 超限拒绝新消息
}
)
参考: