RabbitMQ - 工作队列

官网教程:https://www.rabbitmq.com/tutorials/tutorial-two-php.html

轮询调度

  • 生产者:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    <?php
    require_once '../../vendor/autoload.php';

    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $queue_name = 'queue_work';
    $channel->queue_declare($queue_name, false, true, false, false);

    for ($i = 1; $i <= 20; $i++) {
    $msg = new AMQPMessage('消息' . $i);
    $channel->basic_publish($msg, '', $queue_name);
    }

    $channel->close();
    $connection->close();
  • 消费者:
    work_1.php
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    <?php
    require_once '../../vendor/autoload.php';

    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $queue_name = 'queue_work';
    $channel->queue_declare($queue_name, false, true, false, false);

    echo "work1 开始接收消息……\n";

    $channel->basic_consume($queue_name, '',
    false, false, false, false, function (AMQPMessage $msg) {
    echo '接收到消息:', $msg->body, "\n";
    sleep(1); // 模拟延迟 1s
    });

    while (count($channel->callbacks)) {
    $channel->wait();
    }

    $channel->close();
    $connection->close();
    work_2.php
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    <?php
    require_once '../../vendor/autoload.php';

    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $queue_name = 'queue_work';
    $channel->queue_declare($queue_name, false, true, false, false);
    echo "work2 开始接收消息……\n";

    $channel->basic_consume($queue_name, '',
    false, false, false, false, function (AMQPMessage $msg) {
    echo '接收到消息:', $msg->body, "\n";
    });

    while (count($channel->callbacks)) {
    $channel->wait();
    }

    $channel->close();
    $connection->close();
  • 运行
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    >php work_1.php
    work1 开始接收消息……
    接收到消息:消息1
    接收到消息:消息3
    接收到消息:消息5
    接收到消息:消息7
    接收到消息:消息9
    接收到消息:消息11
    接收到消息:消息13
    接收到消息:消息15
    接收到消息:消息17
    接收到消息:消息19
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    >php work_2.php
    work2 开始接收消息……
    接收到消息:消息2
    接收到消息:消息4
    接收到消息:消息6
    接收到消息:消息8
    接收到消息:消息10
    接收到消息:消息12
    接收到消息:消息14
    接收到消息:消息16
    接收到消息:消息18
    接收到消息:消息20

公平分发

  • 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    <?php
    require_once '../../vendor/autoload.php';

    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $queue_name = 'queue_work2';
    $channel->queue_declare($queue_name, false, true, false, false);

    for ($i = 1; $i <= 20; $i++) {
    $msg = new AMQPMessage('消息' . $i, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // 将消息标记为持久性
    ]);
    $channel->basic_publish($msg, '', $queue_name);
    }

    $channel->close();
    $connection->close();
  • 消费者
    work_1.php

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    require_once '../../vendor/autoload.php';

    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $queue_name = 'queue_work2';
    $channel->queue_declare($queue_name, false, true, false, false);
    echo "work1 开始接收消息……\n";

    // 限制每次只接收处理一条消息
    $channel->basic_qos(null, 1, null);
    $channel->basic_consume($queue_name, '',
    false, false, false, false, function (AMQPMessage $msg) {
    echo '接收到消息:', $msg->body, "\n";
    sleep(1);
    $msg->ack(); // 消息确认
    });

    while (count($channel->callbacks)) {
    $channel->wait();
    }

    work_2.php

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    require_once '../../vendor/autoload.php';

    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $queue_name = 'queue_work2';
    $channel->queue_declare($queue_name, false, true, false, false);
    echo "work2 开始接收消息……\n";

    // 限制每次只接收处理一条消息
    $channel->basic_qos(null, 1, null);
    $channel->basic_consume($queue_name, '',
    false, false, false, false, function (AMQPMessage $msg) {
    echo '接收到消息:', $msg->body, "\n";
    $msg->ack(); // 消息确认
    });

    while (count($channel->callbacks)) {
    $channel->wait();
    }
  • 运行

    1
    2
    3
    > php work_1.php
    work1 开始接收消息……
    接收到消息:消息2
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    > php work_2.php
    work2 开始接收消息……
    接收到消息:消息1
    接收到消息:消息3
    接收到消息:消息4
    接收到消息:消息5
    接收到消息:消息6
    接收到消息:消息7
    接收到消息:消息8
    接收到消息:消息9
    接收到消息:消息10
    接收到消息:消息11
    接收到消息:消息12
    接收到消息:消息13
    接收到消息:消息14
    接收到消息:消息15
    接收到消息:消息16
    接收到消息:消息17
    接收到消息:消息18
    接收到消息:消息19
    接收到消息:消息20