参考站点
https://www.rabbitmq.com/getstarted.html
https://www.jianshu.com/p/c1bee195a485
topic的意思是主题,topic类型的Exchange会根据通配符对Routing key进行匹配,只要Routing key满足某个通配符的条件,就会被路由到对应的Queue上
Routing key必须是一串字符串,每个单词用“.”分隔;
符号“#”表示匹配一个或多个单词;
符号“”表示匹配一个单词。
例如:“.123” 能够匹配到 “abc.123”,但匹配不到 “abc.def.123”;“#.123” 既能够匹配到 “abc.123”,也能匹配到 “abc.def.123”。
生产者(push.php)
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
require_once __DIR__ . '/../vendor/autoload.php';
$config = [
'host' => "192.168.56.56",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'vhost' => "/",
];
//USER.ABC.ORDER这个Routing key只可以匹配到 “USER.#”, GOODS.ABC.ORDER这个Routing key什么都匹配不到, GOODS.ABC只可以匹配到GOODS.*
$routingKey = 'GOODS.ABC';
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();
$exchange = "testExchange-topic";
// 声明一个交换机
$channel->exchange_declare($exchange, 'topic', false, true, false);
$message = new AMQPMessage("123", array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
//向$exchange交换机中绑定routing_key为$routingKey的队列推送消息
$channel->basic_publish($message, $exchange, $routingKey);
$channel->close();
$connection->close();
消费者(pop.php)
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
require_once __DIR__ . '/../vendor/autoload.php';
$config = [
'host' => "192.168.56.56",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'vhost' => "/",
];
//创建连接
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
//连接的基础上可以创建多个信道
$channel = $connection->channel();
$testQueue1 = "testQueue1-topic";
$testQueue2 = "testQueue2-topic";
$testQueue3 = "testQueue3-topic";
$testQueue4 = "testQueue4-topic";
$exchange = "testExchange-topic";
$queueRk1 = "*.ORDER";
$queueRk2 = "GOODS.*";
$queueRk3 = "#.STOCK";
$queueRk4 = "USER.#";
// 声明3队列
$channel->queue_declare($testQueue1, false, true, false, false);
$channel->queue_declare($testQueue2, false, true, false, false);
$channel->queue_declare($testQueue3, false, true, false, false);
$channel->queue_declare($testQueue4, false, true, false, false);
// 声明一个交换机
$channel->exchange_declare($exchange, 'topic', false, true, false);
//为队列绑定不同的routing key
$channel->queue_bind($testQueue1, $exchange, $queueRk1);
$channel->queue_bind($testQueue2, $exchange, $queueRk2);
$channel->queue_bind($testQueue3, $exchange, $queueRk3);
$channel->queue_bind($testQueue4, $exchange, $queueRk4);
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
echo " [x] Done\n";
$msg->ack();
};
//创建信道的消费者
$channel->basic_consume($testQueue1, '', false, false, false, false, $callback);
$channel->basic_consume($testQueue2, '', false, false, false, false, $callback);
$channel->basic_consume($testQueue3, '', false, false, false, false, $callback);
$channel->basic_consume($testQueue4, '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
运行
#shell 1
php pop.php
#shell 2
php push.php