官网教程:https://www.rabbitmq.com/tutorials/tutorial-two-php.html
轮询调度
- 生产者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
require_once '../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queue_name = 'queue_work';
$channel->queue_declare($queue_name, false, true, false, false);
for ($i = 1; $i <= 20; $i++) {
$msg = new AMQPMessage('消息' . $i);
$channel->basic_publish($msg, '', $queue_name);
}
$channel->close();
$connection->close(); - 消费者:
work_1.phpwork_2.php1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
require_once '../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queue_name = 'queue_work';
$channel->queue_declare($queue_name, false, true, false, false);
echo "work1 开始接收消息……\n";
$channel->basic_consume($queue_name, '',
false, false, false, false, function (AMQPMessage $msg) {
echo '接收到消息:', $msg->body, "\n";
sleep(1); // 模拟延迟 1s
});
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
require_once '../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queue_name = 'queue_work';
$channel->queue_declare($queue_name, false, true, false, false);
echo "work2 开始接收消息……\n";
$channel->basic_consume($queue_name, '',
false, false, false, false, function (AMQPMessage $msg) {
echo '接收到消息:', $msg->body, "\n";
});
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close(); - 运行
1
2
3
4
5
6
7
8
9
10
11
12>php work_1.php
work1 开始接收消息……
接收到消息:消息1
接收到消息:消息3
接收到消息:消息5
接收到消息:消息7
接收到消息:消息9
接收到消息:消息11
接收到消息:消息13
接收到消息:消息15
接收到消息:消息17
接收到消息:消息191
2
3
4
5
6
7
8
9
10
11
12>php work_2.php
work2 开始接收消息……
接收到消息:消息2
接收到消息:消息4
接收到消息:消息6
接收到消息:消息8
接收到消息:消息10
接收到消息:消息12
接收到消息:消息14
接收到消息:消息16
接收到消息:消息18
接收到消息:消息20
公平分发
生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
require_once '../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queue_name = 'queue_work2';
$channel->queue_declare($queue_name, false, true, false, false);
for ($i = 1; $i <= 20; $i++) {
$msg = new AMQPMessage('消息' . $i, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // 将消息标记为持久性
]);
$channel->basic_publish($msg, '', $queue_name);
}
$channel->close();
$connection->close();消费者
work_1.php1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24require_once '../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queue_name = 'queue_work2';
$channel->queue_declare($queue_name, false, true, false, false);
echo "work1 开始接收消息……\n";
// 限制每次只接收处理一条消息
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queue_name, '',
false, false, false, false, function (AMQPMessage $msg) {
echo '接收到消息:', $msg->body, "\n";
sleep(1);
$msg->ack(); // 消息确认
});
while (count($channel->callbacks)) {
$channel->wait();
}work_2.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23require_once '../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queue_name = 'queue_work2';
$channel->queue_declare($queue_name, false, true, false, false);
echo "work2 开始接收消息……\n";
// 限制每次只接收处理一条消息
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queue_name, '',
false, false, false, false, function (AMQPMessage $msg) {
echo '接收到消息:', $msg->body, "\n";
$msg->ack(); // 消息确认
});
while (count($channel->callbacks)) {
$channel->wait();
}运行
1
2
3> php work_1.php
work1 开始接收消息……
接收到消息:消息21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21> php work_2.php
work2 开始接收消息……
接收到消息:消息1
接收到消息:消息3
接收到消息:消息4
接收到消息:消息5
接收到消息:消息6
接收到消息:消息7
接收到消息:消息8
接收到消息:消息9
接收到消息:消息10
接收到消息:消息11
接收到消息:消息12
接收到消息:消息13
接收到消息:消息14
接收到消息:消息15
接收到消息:消息16
接收到消息:消息17
接收到消息:消息18
接收到消息:消息19
接收到消息:消息20