参考站点
https://www.rabbitmq.com/getstarted.html
https://www.kuangstudy.com/zl/rabbitmq#1366709857276755970
$channel->basic_consume(queue, '', false, false, false, false, $callback);
#公平分发需要消费者开启手动应答,关闭自动应答 basic_consume中no_ack设置为false
消费者开启手动应答代码
$callback = function ($msg) {
//消费者开启手动应答代码
$msg->ack();
};
特点:由于消息接收者处理消息的能力不同,存在处理快慢的问题,我们就需要能者多劳,处理快的多处理,处理慢的少处理;
生产者(push.php)
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
require_once __DIR__ . '/../vendor/autoload.php';
$config = [
'host' => "192.168.56.56",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'vhost' => "/",
];
$routingKey = 'key1';
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();
$exchange = "testExchange";
// 声明一个交换机
$channel->exchange_declare($exchange, 'direct', false, true, false);
for ($i=0;$i<20;$i++) {
$message = new AMQPMessage($i, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
//向$exchange交换机中绑定routing_key为$routingKey的队列推送消息
$channel->basic_publish($message, $exchange, $routingKey);
}
$channel->close();
$connection->close();
消费者1(pop1.php)
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
require_once __DIR__ . '/../vendor/autoload.php';
$config = [
'host' => "192.168.56.56",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'vhost' => "/",
];
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();
$testQueue1 = "testQueue1";
$exchange = "testExchange";
$queueRk1 = "key1";
// 声明队列
$channel->queue_declare($testQueue1, false, true, false, false);
// 声明一个交换机
$channel->exchange_declare($exchange, 'direct', false, true, false);
//为队列绑定不同的routing key
$channel->queue_bind($testQueue1, $exchange, $queueRk1);
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
echo " [x] Done\n";
sleep(5);
//开启手动应答代码
$msg->ack();
};
//创建信道的消费者 #no_ack 开启自动应答
$channel->basic_consume($testQueue1, '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
消费者2(pop2.php)
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
require_once __DIR__ . '/../vendor/autoload.php';
$config = [
'host' => "192.168.56.56",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'vhost' => "/",
];
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();
$testQueue1 = "testQueue1";
$exchange = "testExchange";
$queueRk1 = "key1";
// 声明队列
$channel->queue_declare($testQueue1, false, true, false, false);
// 声明一个交换机
$channel->exchange_declare($exchange, 'direct', false, true, false);
//为队列绑定不同的routing key
$channel->queue_bind($testQueue1, $exchange, $queueRk1);
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
var_dump(1);
echo ' [x] Received-1 ', $msg->body, "\n";
echo " [x] Done-1\n";
sleep(2);
//开启手动应答代码
$msg->ack();
};
//创建信道的消费者 #no_ack 开启自动应答
$channel->basic_consume($testQueue1, '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
运行
#shell 1
php pop1.php
#shell 2
php pop2.php
#shell 3
php push.php