php rabbitmq案例-topic模式

管理员

参考站点
https://www.rabbitmq.com/getstarted.html
https://www.jianshu.com/p/c1bee195a485

topic的意思是主题,topic类型的Exchange会根据通配符对Routing key进行匹配,只要Routing key满足某个通配符的条件,就会被路由到对应的Queue上

Routing key必须是一串字符串,每个单词用“.”分隔;
符号“#”表示匹配一个或多个单词;
符号“”表示匹配一个单词。
例如:“
.123” 能够匹配到 “abc.123”,但匹配不到 “abc.def.123”;“#.123” 既能够匹配到 “abc.123”,也能匹配到 “abc.def.123”。

生产者(push.php)

<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

require_once __DIR__ . '/../vendor/autoload.php';

$config = [
    'host' => "192.168.56.56",
    'port' => "5672",
    'user' => "admin",
    'password' => "admin",
    'vhost' => "/",
];


//USER.ABC.ORDER这个Routing key只可以匹配到 “USER.#”, GOODS.ABC.ORDER这个Routing key什么都匹配不到, GOODS.ABC只可以匹配到GOODS.*
$routingKey = 'GOODS.ABC';
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();

$exchange = "testExchange-topic";
// 声明一个交换机
$channel->exchange_declare($exchange, 'topic', false, true, false);
$message = new AMQPMessage("123", array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
//向$exchange交换机中绑定routing_key为$routingKey的队列推送消息
$channel->basic_publish($message, $exchange, $routingKey);
$channel->close();
$connection->close();

消费者(pop.php)

<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;

require_once __DIR__ . '/../vendor/autoload.php';

$config = [
    'host' => "192.168.56.56",
    'port' => "5672",
    'user' => "admin",
    'password' => "admin",
    'vhost' => "/",
];
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();

$testQueue1 = "testQueue1-topic";
$testQueue2 = "testQueue2-topic";
$testQueue3 = "testQueue3-topic";
$testQueue4 = "testQueue4-topic";
$exchange = "testExchange-topic";
$queueRk1  = "*.ORDER";
$queueRk2 = "GOODS.*";
$queueRk3 = "#.STOCK";
$queueRk4 = "USER.#";
// 声明3队列
$channel->queue_declare($testQueue1, false, true, false, false);
$channel->queue_declare($testQueue2, false, true, false, false);
$channel->queue_declare($testQueue3, false, true, false, false);
$channel->queue_declare($testQueue4, false, true, false, false);
// 声明一个交换机
$channel->exchange_declare($exchange, 'topic', false, true, false);
//为队列绑定不同的routing key
$channel->queue_bind($testQueue1, $exchange, $queueRk1);
$channel->queue_bind($testQueue2, $exchange, $queueRk2);
$channel->queue_bind($testQueue3, $exchange, $queueRk3);
$channel->queue_bind($testQueue4, $exchange, $queueRk4);

$channel->basic_qos(null, 1, null);

$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    echo " [x] Done\n";
    $msg->ack();
};
//创建信道的消费者
$channel->basic_consume($testQueue1, '', false, false, false, false, $callback);
$channel->basic_consume($testQueue2, '', false, false, false, false, $callback);
$channel->basic_consume($testQueue3, '', false, false, false, false, $callback);
$channel->basic_consume($testQueue4, '', false, false, false, false, $callback);


while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

运行

#shell 1
php pop.php
#shell 2
php push.php
0人点赞
PHP
管理员

全部评论 0

推荐阅读 更多精彩内容