参考站点
https://www.rabbitmq.com/getstarted.html
https://www.jianshu.com/p/c1bee195a485
fanout是扇形的意思,该类型通常叫作广播类型。fanout类型的Exchange不处理Routing key,而是会将发送给它的消息路由到所有与它绑定的Queue上。
生产者(push.php)
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
require_once __DIR__ . '/../vendor/autoload.php';
$config = [
'host' => "192.168.56.56",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'vhost' => "/",
];
//routing_key 为key2的也会收到推送消息
$routingKey = 'key1';
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();
$exchange = "testExchange-fanout";
// 声明一个交换机
$channel->exchange_declare($exchange, 'fanout', false, true, false);
$message = new AMQPMessage("123");
//向$exchange交换机中绑定routing_key为$routingKey的队列推送消息
$channel->basic_publish($message, $exchange, $routingKey);
$channel->close();
$connection->close();
消费者(pop.php)
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
require_once __DIR__ . '/../vendor/autoload.php';
$config = [
'host' => "192.168.56.56",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'vhost' => "/",
];
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();
$testQueue1 = "testQueue1-fanout";
$testQueue2 = "testQueue2-fanout";
$testQueue3 = "testQueue3-fanout";
$exchange = "testExchange-fanout";
$queueRk1 = "key1";
$queueRk2 = "key2";
// 声明3队列
$channel->queue_declare($testQueue1, false, true, false, false);
$channel->queue_declare($testQueue2, false, true, false, false);
$channel->queue_declare($testQueue3, false, true, false, false);
// 声明一个交换机
$channel->exchange_declare($exchange, 'fanout', false, true, false);
//为队列绑定不同的routing key
$channel->queue_bind($testQueue1, $exchange, $queueRk1);
$channel->queue_bind($testQueue2, $exchange, $queueRk2);
$channel->queue_bind($testQueue3, $exchange, $queueRk2);
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
echo " [x] Done\n";
$msg->ack();
};
//创建信道的消费者
$channel->basic_consume($testQueue1, '', false, false, false, false, $callback);
$channel->basic_consume($testQueue2, '', false, false, false, false, $callback);
$channel->basic_consume($testQueue3, '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
运行
#shell 1
php pop.php
#shell 2
php push.php