php rabbitmq案例-公平模式

管理员

参考站点
https://www.rabbitmq.com/getstarted.html
https://www.kuangstudy.com/zl/rabbitmq#1366709857276755970

$channel->basic_consume(queue, '', false, false, false, false, $callback);

#公平分发需要消费者开启手动应答,关闭自动应答 basic_consume中no_ack设置为false

消费者开启手动应答代码

$callback = function ($msg) {
//消费者开启手动应答代码
$msg->ack();
};

特点:由于消息接收者处理消息的能力不同,存在处理快慢的问题,我们就需要能者多劳,处理快的多处理,处理慢的少处理;

生产者(push.php)

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

require_once __DIR__ . '/../vendor/autoload.php';

$config = [
    'host' => "192.168.56.56",
    'port' => "5672",
    'user' => "admin",
    'password' => "admin",
    'vhost' => "/",
];

$routingKey = 'key1';
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();

$exchange = "testExchange";
// 声明一个交换机
$channel->exchange_declare($exchange, 'direct', false, true, false);

for ($i=0;$i<20;$i++) {
    $message = new AMQPMessage($i, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
    //向$exchange交换机中绑定routing_key为$routingKey的队列推送消息
    $channel->basic_publish($message, $exchange, $routingKey);
}
$channel->close();
$connection->close();

消费者1(pop1.php)

<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;

require_once __DIR__ . '/../vendor/autoload.php';


$config = [
    'host' => "192.168.56.56",
    'port' => "5672",
    'user' => "admin",
    'password' => "admin",
    'vhost' => "/",
];
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();

$testQueue1 = "testQueue1";
$exchange = "testExchange";
$queueRk1  = "key1";

// 声明队列
$channel->queue_declare($testQueue1, false, true, false, false);

// 声明一个交换机
$channel->exchange_declare($exchange, 'direct', false, true, false);
//为队列绑定不同的routing key
$channel->queue_bind($testQueue1, $exchange, $queueRk1);

$channel->basic_qos(null, 1, null);

$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    echo " [x] Done\n";
    sleep(5);
    //开启手动应答代码
    $msg->ack();
};
//创建信道的消费者 #no_ack 开启自动应答
$channel->basic_consume($testQueue1, '', false, false, false, false, $callback);


while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();


消费者2(pop2.php)

<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;

require_once __DIR__ . '/../vendor/autoload.php';


$config = [
    'host' => "192.168.56.56",
    'port' => "5672",
    'user' => "admin",
    'password' => "admin",
    'vhost' => "/",
];
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();

$testQueue1 = "testQueue1";
$exchange = "testExchange";
$queueRk1  = "key1";

// 声明队列
$channel->queue_declare($testQueue1, false, true, false, false);

// 声明一个交换机
$channel->exchange_declare($exchange, 'direct', false, true, false);
//为队列绑定不同的routing key
$channel->queue_bind($testQueue1, $exchange, $queueRk1);

$channel->basic_qos(null, 1, null);

$callback = function ($msg) {
    var_dump(1);
    echo ' [x] Received-1 ', $msg->body, "\n";
    echo " [x] Done-1\n";
    sleep(2);
    //开启手动应答代码
    $msg->ack();
};

//创建信道的消费者 #no_ack 开启自动应答
$channel->basic_consume($testQueue1, '', false, false, false, false, $callback);


while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();


运行

#shell 1
php pop1.php
#shell 2
php pop2.php
#shell 3
php push.php
0人点赞
PHP
管理员

全部评论 0

推荐阅读 更多精彩内容