php rabbitmq案例-fanout模式

管理员

参考站点
https://www.rabbitmq.com/getstarted.html
https://www.jianshu.com/p/c1bee195a485

fanout是扇形的意思,该类型通常叫作广播类型。fanout类型的Exchange不处理Routing key,而是会将发送给它的消息路由到所有与它绑定的Queue上。

生产者(push.php)

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

require_once __DIR__ . '/../vendor/autoload.php';

$config = [
    'host' => "192.168.56.56",
    'port' => "5672",
    'user' => "admin",
    'password' => "admin",
    'vhost' => "/",
];

//routing_key 为key2的也会收到推送消息
$routingKey = 'key1';
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();

$exchange = "testExchange-fanout";
// 声明一个交换机
$channel->exchange_declare($exchange, 'fanout', false, true, false);
$message = new AMQPMessage("123");
//向$exchange交换机中绑定routing_key为$routingKey的队列推送消息
$channel->basic_publish($message, $exchange, $routingKey);
$channel->close();
$connection->close();

消费者(pop.php)

<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;

require_once __DIR__ . '/../vendor/autoload.php';


$config = [
    'host' => "192.168.56.56",
    'port' => "5672",
    'user' => "admin",
    'password' => "admin",
    'vhost' => "/",
];
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();

$testQueue1 = "testQueue1-fanout";
$testQueue2 = "testQueue2-fanout";
$testQueue3 = "testQueue3-fanout";
$exchange = "testExchange-fanout";
$queueRk1 = "key1";
$queueRk2 = "key2";
// 声明3队列
$channel->queue_declare($testQueue1, false, true, false, false);
$channel->queue_declare($testQueue2, false, true, false, false);
$channel->queue_declare($testQueue3, false, true, false, false);
// 声明一个交换机
$channel->exchange_declare($exchange, 'fanout', false, true, false);
//为队列绑定不同的routing key
$channel->queue_bind($testQueue1, $exchange, $queueRk1);
$channel->queue_bind($testQueue2, $exchange, $queueRk2);
$channel->queue_bind($testQueue3, $exchange, $queueRk2);

$channel->basic_qos(null, 1, null);

$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    echo " [x] Done\n";
    $msg->ack();
};
//创建信道的消费者
$channel->basic_consume($testQueue1, '', false, false, false, false, $callback);
$channel->basic_consume($testQueue2, '', false, false, false, false, $callback);
$channel->basic_consume($testQueue3, '', false, false, false, false, $callback);


while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();


运行

#shell 1
php pop.php
#shell 2
php push.php
0人点赞
PHP
管理员

全部评论 0

推荐阅读 更多精彩内容