RabbitMQ - Topic 模式

官网教程:https://www.rabbitmq.com/tutorials/tutorial-five-php.html

Messages sent to a topic exchange can’t have an arbitrary routing_key - it must be a list of words, delimited by dots. The words can be anything, but usually they specify some features connected to the message. A few valid routing key examples: “stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”. There can be as many words in the routing key as you like, up to the limit of 255 bytes.
发送到主题交换的消息不能有任意的 routing_key —— 它必须是由点分隔的单词列表。这些词可以是任何词,但通常它们指明了与信息相关的一些特征。以下是一些有效的路由关键示例:"stock.usd.nyse"、"nyse.vmw"、"quick.orange.rabbit"。路由键中可以有任意多的单词,最多为 255 个字节。
The binding key must also be in the same form. The logic behind the topic exchange is similar to a direct one - a message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key.
绑定键的形式也必须相同。topic 交换机背后的逻辑与 direct 交换机类似 —— 带有特定路由键的消息将被发送到所有绑定了匹配绑定键的队列。

绑定键有两个重要的特殊情况:
- *(星号)可以只代替一个单词。
- #(井号)可以代替零个或多个单词。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchange = 'topic_exchange';
$channel->exchange_declare($exchange, 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, $exchange, $routing_key);

echo " [x] 发送 ",$routing_key,':',$data," \n";

$channel->close();
$connection->close();

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchange = 'topic_exchange';
$channel->exchange_declare($exchange, 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if( empty($binding_keys )) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}

foreach($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, $exchange, $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();

运行

1
2
3
4
5
6
7
>php receive_logs_topic.php "#"
[*] Waiting for logs. To exit press CTRL+C
[x] error.order:Error: xxxxxxxxxxxx
[x] order:创建订单
[x] error.user.order:Error:xxx
[x] error.user:Error:user xxxxx
[x] info.user:INFO:user xxxxxxx
1
2
3
4
>php receive_logs_topic.php "error.*"
[*] Waiting for logs. To exit press CTRL+C
[x] error.order:Error: xxxxxxxxxxxx
[x] error.user:Error:user xxxxx
1
2
3
4
>php receive_logs_topic.php "#.user"
[*] Waiting for logs. To exit press CTRL+C
[x] error.user:Error:user xxxxx
[x] info.user:INFO:user xxxxxxx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
>php emit_log_topic.php "error.order" "Error: xxxxxxxxxxxx"
[x] Sent error.order:Error: xxxxxxxxxxxx

>php emit_log_topic.php "order" "创建订单"
[x] Sent order:创建订单

>php emit_log_topic.php "error.user.order" "Error:xxx"
[x] Sent error.user.order:Error:xxx

>php emit_log_topic.php "error.user" "Error:user xxxxx"
[x] Sent error.user:Error:user xxxxx

>php emit_log_topic.php "info.user" "INFO:user xxxxxxx"
[x] Sent info.user:INFO:user xxxxxxx