RabbitMQ - 路由模式 (direct)

官网教程:https://www.rabbitmq.com/tutorials/tutorial-four-php.html

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 创建交换机 类型:di rect、topic、headers 和 fanout
$exchange = 'direct_exchange';
$channel->exchange_declare($exchange, 'direct', false, false, false);

// 级别 'info'、'warning'、'error'
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = 'Hello World!!';

$msg = new AMQPMessage($data);

// 设置路由 $severity
$channel->basic_publish($msg, $exchange, $severity);

echo " [x] 发送 ",$severity,':',$data," \n";

$channel->close();
$connection->close();

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 创建交换机
$exchange = 'direct_exchange';
$channel->exchange_declare($exchange, 'direct', false, false, false);
// 创建随机队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if(empty($severities )) {
file_put_contents('php://s tderr', "Usage: $argv[0] [info] [warning] [error]\n");
exit(1);
}

// 为每个级别创建新绑定
foreach($severities as $severity) {
/**
* @pararm1 queue 队列名称
* @pararm2 exchange 交换机名称
* @pararm3 routing_key 路由 key (fanout 模式没有效果!)
*/
$channel->queue_bind($queue_name, $exchange, $severity);
echo $queue_name, ' 绑定 routingKey:', $severity, "\n";
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();

运行

1
2
3
4
5
6
7
8
>php emit_log_direct.php error '发生一个错误'
[x] 发送 error:'发生一个错误'

>php emit_log_direct.php info 'Hello world!'
[x] 发送 info:'Hello world!'

>php emit_log_direct.php warning '警告!警告'
[x] 发送 warning:'警告!警告'
1
2
3
4
5
6
>php receive_logs_direct.php error warning
amq.gen-Pl1yQBt6tjp-zCzNoucYXA 绑定 routingKey:error
amq.gen-Pl1yQBt6tjp-zCzNoucYXA 绑定 routingKey:warning
[*] Waiting for logs. To exit press CTRL+C
[x] error:'发生一个错误'
[x] warning:'警告!警告'
1
2
3
4
5
6
>php receive_logs_direct.php info warning
amq.gen-SFe2hF_A7i9fjD5ynFUW9A 绑定 routingKey:info
amq.gen-SFe2hF_A7i9fjD5ynFUW9A 绑定 routingKey:warning
[*] Waiting for logs. To exit press CTRL+C
[x] info:'Hello world!'
[x] warning:'警告!警告'