RabbitMQ - 死信队列

官网:
https://www.rabbitmq.com/dlx.html
https://www.rabbitmq.com/ttl.html

DLX (Dead Letter Exchanges) 当消息在一个队列中变成 Dead 之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX。

产生原因:
- 使用 basic.rejectbasic.nack 并将 requeue 参数设置为false ,消息被拒绝。
- 消息在队列中停留的时间超过了配置的 TTL
- 队列达到最大长度,先入队的消息会被删除

注意:队列的过期不会死信其中的消息。

DLX 是一个普通的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。

创建一个死信队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<?php
require_once '../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 创建一个处理死信的交换机和队列
$dead_exchange = 'dead_direct_exchange';
$dead_queue_name = 'dead_queue';
$dead_routing_key = 'dead';
$channel->exchange_declare($dead_exchange, 'direct', false, false, false);
$channel->queue_declare($dead_queue_name, false, true, false, false);
$channel->queue_bind($dead_queue_name, $dead_exchange, $dead_routing_key);

$ttl_exchange = 'ttl_exchange';
$ttl_routing_key = 'ttl_message';
$channel->exchange_declare($ttl_exchange, 'direct', false, false, false);

// 创建一个 TTL 消息的队列
$arguments = new AMQPTable();
$arguments->set('x-message-ttl', 5000); // 设置队列中消息的过期时间 5s
$arguments->set('x-dead-letter-exchange', $dead_exchange); // 设置处理死信消息的交换机
$arguments->set('x-dead-letter-routing-key', $dead_routing_key); // 指定一个路由关键字用于死信消息。如果没有设置,则使用消息自己的路由键(fanout 模式不需要)
$ttl_queue_message = 'ttl_queue_message';
$channel->queue_declare($ttl_queue_message, false, true, false, false, false, $arguments);
$channel->queue_bind($ttl_queue_message, $ttl_exchange, $ttl_routing_key);

// 创建一个 TTL 的队列 (过期后不会死信消息!)
$arguments = new AMQPTable();
$arguments->set('x-expires', 10000); // 设置队列的过期时间 10s
$arguments->set('x-dead-letter-exchange', $dead_exchange); // 设置处理死信消息的交换机
$arguments->set('x-dead-letter-routing-key', $dead_routing_key); // 指定一个路由关键字用于死信消息。如果没有设置,则使用消息自己的路由键(fanout 模式不需要)
$ttl_queue = 'ttl_queue';
$channel->queue_declare($ttl_queue, false, true, false, false, false, $arguments);
$channel->queue_bind($ttl_queue, $ttl_exchange, $ttl_routing_key);

// 往队列中发送 10 条消息
for ($i = 1; $i <= 10; $i++) {
$msg = new AMQPMessage('AAA test test——' . $i, [
'expiration' => 10000 // 设置每条消息过期时间。(当同时指定了每队列和每条消息的 TTL 时,将选择两者之间的较低值)
]);
$channel->basic_publish($msg, $ttl_exchange, $ttl_routing_key);
}

$channel->close();
$connection->close();

运行

10s 后: