参考站点
https://www.rabbitmq.com/getstarted.html
https://www.kuangstudy.com/zl/rabbitmq#1366709857276755970
轮询模式的分发:一个消费者一条,按均分配
开启自动应答 basic_consume中no_ack设置为true
特点:该模式接收消息是当有多个消费者接入时,消息的分配模式是一个消费者分配一条,直至消息消费完成;
生产者(push.php)
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
require_once __DIR__ . '/../vendor/autoload.php';
$config = [
'host' => "192.168.56.56",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'vhost' => "/",
];
$routingKey = 'key1';
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();
$exchange = "testExchange";
// 声明一个交换机
$channel->exchange_declare($exchange, 'direct', false, true, false);
for ($i=0;$i<20;$i++) {
$message = new AMQPMessage($i, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
//向$exchange交换机中绑定routing_key为$routingKey的队列推送消息
$channel->basic_publish($message, $exchange, $routingKey);
}
$channel->close();
$connection->close();
消费者1(pop1.php)
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
require_once __DIR__ . '/../vendor/autoload.php';
$config = [
'host' => "192.168.56.56",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'vhost' => "/",
];
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();
$testQueue1 = "testQueue1";
$exchange = "testExchange";
$queueRk1 = "key1";
// 声明队列
$channel->queue_declare($testQueue1, false, true, false, false);
// 声明一个交换机
$channel->exchange_declare($exchange, 'direct', false, true, false);
//为队列绑定不同的routing key
$channel->queue_bind($testQueue1, $exchange, $queueRk1);
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
echo " [x] Done\n";
sleep(5);
};
//创建信道的消费者 #no_ack 开启自动应答
$channel->basic_consume($testQueue1, '', false, true, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
消费者2(pop2.php)
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
require_once __DIR__ . '/../vendor/autoload.php';
$config = [
'host' => "192.168.56.56",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'vhost' => "/",
];
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();
$testQueue1 = "testQueue1";
$exchange = "testExchange";
$queueRk1 = "key1";
// 声明队列
$channel->queue_declare($testQueue1, false, true, false, false);
// 声明一个交换机
$channel->exchange_declare($exchange, 'direct', false, true, false);
//为队列绑定不同的routing key
$channel->queue_bind($testQueue1, $exchange, $queueRk1);
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
sleep(2);
echo ' [x] Received-1 ', $msg->body, "\n";
echo " [x] Done-1\n";
};
//创建信道的消费者 #no_ack 开启自动应答
$channel->basic_consume($testQueue1, '', false, true, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
运行
#shell 1
php pop1.php
#shell 2
php pop2.php
#shell 3
php push.php
pop1和pop2的消息处理能力不同,但是最后处理的消息条数相同,是“按均分配”。