RabbitMQ-发布/订阅模式(Fanout)

官网教程:https://www.rabbitmq.com/tutorials/tutorial-three-php.html

RabbitMQ 中消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,生产者通常根本不知道消息是否会被传递到任何队列。

生产者只能向交换器发送消息。交换是一件非常简单的事情。一方面它接收来自生产者的消息,另一方面它将它们推送到队列中。交换必须确切地知道如何处理它收到的消息。是否应该将其附加到特定队列?它应该附加到许多队列中吗?或者它应该被丢弃。其规则由 交换类型定义。

简单模式 中使用的是默认类型交换机。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创建服务器的连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 建立通道
$channel = $connection->channel();

// 创建交换机
$exchange = 'fanout_exchange';
$channel->exchange_declare($exchange, 'fanout', false, false, false);

// 发布消息
$data = 'Hello World!';
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, $exchange);

echo " [x] 发送 ", $data, "\n";

// 关闭通道和连接
$channel->close();
$connection->close();

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创建服务器的连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 创建交换机 类型:direct、topic、headers 和 fanout
$exchange = 'fanout_exchange';
$channel->exchange_declare($exchange, 'fanout', false, false, false);

// 队列名称为 '' 时,自动创建一个随机队列 (随着消费者的断开自动删除)
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

/**
* 交换机和队列绑定
* @pararm1 queue 队列名称
* @pararm2 exchange 交换机名称
* @pararm3 routing_key 路由 key (fanout 模式没有效果!)
*/
$channel->queue_bind($queue_name, $exchange);

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
echo '消费队列:', $queue_name , "\n";

$callback = function ($msg) {
echo ' [x] 接收 ', $msg->body, "\n";
};
// 接收消息
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
$channel->wait();
}

// 关闭通道和连接
$channel->close();
$connection->close();

运行

运行多个消费者后,并运行生产者。

1
2
3
4
C:\Users\Administrator\Code\base\RabbitMQ>php receive_logs.php
[*] Waiting for logs. To exit press CTRL+C
消费队列:amq.gen-T23d5DY-KIxPjeiDkEJNOw
[x] 接收 Hello World!
1
2
3
4
C:\Users\Administrator\Code\base\RabbitMQ>php receive_logs.php
[*] Waiting for logs. To exit press CTRL+C
消费队列:amq.gen-xEHdMTSElljtCqJnTC40xQ
[x] 接收 Hello World!
1
2
3
4
C:\Users\Administrator\Code\base\RabbitMQ>php receive_logs.php
[*] Waiting for logs. To exit press CTRL+C
消费队列:amq.gen-p3Ok8B4BAurCiKLIFoFrQw
[x] 接收 Hello World!