RabbitMQ - 简单模式

官网教程:https://www.rabbitmq.com/tutorials/tutorial-one-php.html

安装 php-amqplib

1
composer require php-amqplib

生成者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创建服务器的连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 建立通道
$channel = $connection->channel();

/**
* 创建队列
* @pararm1 queue 队列名
* @pararm2 passive
* @pararm3 durable 是否持久化
* @pararm4 exclusive 是否独有的
* @pararm5 auto_delete 是否自动删除
*/
$queue_name = 'queue1';
$channel->queue_declare($queue_name, false, true, false, false);

$data = 'Hello World';
$msg = new AMQPMessage($data);
/**
* 发布消息
* @pararm1 msg 消息
* @pararm2 exchange 交换机名
* @pararm3 routing_key 路由 key (交换机为空时,用队列名)
*/
$channel->basic_publish($msg, '', $queue_name);

echo " [x] 发送 ", $data, " \n";

// 关闭通道和连接
$channel->close();
$connection->close();

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

// 创建服务器的连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 建立通道
$channel = $connection->channel();

// 创建队列
// 由于我们可能在生产者之前启动消费者,确保队列存在。
$queue_name = 'queue1';
$channel->queue_declare($queue_name, false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function ($msg) {
echo " [x] 接收 ", $msg->body, "\n";
echo " [x] 完成", "\n";
};
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

while (count($channel->callbacks)) {
$channel->wait();
}

运行

查看 RabbitMQ 有哪些队列以及其中有多少消息

1
2
3
4
5
6
7
8
9
10
11
admin_s@SC-201811011347:~$ sudo rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello2 1
queue_work2 0
ttl_queue 0
queue_work 0
dead_queue 0
hello1 0
queue1 1