首页 > 其他 > 详细

rabbitmq 通过fanout模式将消息推送到多个队列

时间:2021-01-10 22:50:20      阅读:67      评论:0      收藏:0      [点我收藏+]
  • 使用场景:

有时我们会遇到这样的情况,多个功能模块都希望得到完整的消息数据。例如一个log的消息,一个我们希望输出在屏幕上实时监控,另外一个用户持久化日志。这时就可以使用fanout模式。fanout模式模式不像direct模式通过routingkey来进行匹配,而是会把消息发送到所以的已经绑定的队列中。

 

  • 消费者
<?php
// 生产者 p_fanout.php
//配置信息
$config = [
    ‘host‘     => ‘localhost‘,
    ‘port‘     => ‘5672‘,
    ‘login‘    => ‘guest‘,
    ‘password‘ => ‘guest‘,
    ‘vhost‘    => ‘/‘
];

$exchangeName = ‘e_fanout‘;
 
//创建连接和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!\n");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);

// 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
$exchange->setFlags(AMQP_DURABLE);

// 声明交换机
$exchange->declareExchange();

// 向服务器队列推送10条消息
for ($i = 0; $i < 10; $i++) {
    $msg = ‘hello world ‘ . $i;
    $exchange->publish($msg, 
    $routeKey, 
    AMQP_NOPARAM, 
    [‘delivery_mode‘ => 2]);
}

 

  • 消费者c1_fanout
<?php
// 消费者 c1_fanout.php
//配置信息
$config = [
    ‘host‘     => ‘localhost‘,
    ‘port‘     => ‘5672‘,
    ‘login‘    => ‘guest‘,
    ‘password‘ => ‘guest‘,
    ‘vhost‘    => ‘/‘
];

$exchangeName = ‘e_fanout‘;
$queueName    = ‘q_fanout_1‘;
$routeKey     = ‘‘;
 
//创建连接和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!\n");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);

// 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
$exchange->setFlags(AMQP_DURABLE);

// 声明交换机
$exchange->declareExchange();

// 创建消息队列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);

// 设置持久性
$queue->setFlags(AMQP_DURABLE);

// 声明消息队列
$queue->declareQueue();

$queue->bind($exchange->getName(), $routeKey);

// 接收消息并处理回调
// $queue->consume(‘receive‘);

//阻塞模式接收消息
echo "Message:\n";
while(True){
    $queue->consume(‘receive‘);
    //$queue->consume(‘processMessage‘, AMQP_AUTOACK); //自动ACK应答
}

// 处理回调的方法
function receive($envelop, $queue){
    echo $envelop->getBody() . "\n";

    // ACK 通知生产者任务完成
    $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
}

 

  • 消费者c2_fanout
<?php
// 消费者 c2_fanout.php
//配置信息
$config = [
    ‘host‘     => ‘localhost‘,
    ‘port‘     => ‘5672‘,
    ‘login‘    => ‘guest‘,
    ‘password‘ => ‘guest‘,
    ‘vhost‘    => ‘/‘
];

$exchangeName = ‘e_fanout‘;
$queueName    = ‘q_fanout_2‘;
$routeKey     = ‘‘;
 
//创建连接和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!\n");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);

// 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
$exchange->setFlags(AMQP_DURABLE);

// 声明交换机
$exchange->declareExchange();

// 创建消息队列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);

// 设置持久性
$queue->setFlags(AMQP_DURABLE);

// 声明消息队列
$queue->declareQueue();

$queue->bind($exchange->getName(), $routeKey);

// 接收消息并处理回调
// $queue->consume(‘receive‘);
//阻塞模式接收消息
echo "Message:\n";
while(True){
    $queue->consume(‘receive‘);
    //$queue->consume(‘processMessage‘, AMQP_AUTOACK); //自动ACK应答
}

// 处理回调的方法
function receive($envelop, $queue){
    echo $envelop->getBody() . "\n";

    // ACK 通知生产者任务完成
    $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
}

 

  • 运行结果

技术分享图片

 

 

  •  三个文件的执行顺序

  先启动两个消费者,最后启动生产者,原因是fanout模式下,生产者不会创建消息队列,如果消费者没有创建,则消息没有队列可放。

  

rabbitmq 通过fanout模式将消息推送到多个队列

原文:https://www.cnblogs.com/xiangdongsheng/p/14259020.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!