首页 > 其他 > 详细

了解RabbitMQ

时间:2020-03-21 18:58:34      阅读:65      评论:0      收藏:0      [点我收藏+]

一. 简介

1. 基本概念

RabbitMQ是AMQP协议的一个开源实现;

Message:消息是不具名的,由消息头和消息体组成;

Publisher:消息生产者,向交换器发布消息的客户端应用程序;

Exchange:交换器,用来接收消息,并将消息路由给队列;

Binding:用于消息队列和交换器之间的关联;

Queue:消息队列,用了保存消息知道发送给消费者;

Connection:网络连接,比如一个TCP连接;

Channel:信道,多路复用连接中的一条独立的双向数据流通道;鼓励在一个连接中创建多个通道,通道的创建和销毁比Connection小很多;避免线程间共享通道;

Consumer:消费者,从队列中区的消息的客户端应用程序;

Virtual Host:虚拟主机,一批交换器,消息队列和相关对象;

Broker:消息队列的服务实体;

2. 交换器类型

Headers:匹配AMQP消息的Header而不是路由键,此外和Direct完全一致,但性能相差很多,几乎不用;

Direct:消息中的路由键routing-key和binding中的绑定键bingding-key一致,交换器将消息发送到对应的队列中,完全匹配,单播的模式;

Fanout:不处理路由键,简单的将队列绑定到交换器,发送到交换器的每条消息都会被转发到所有队列中,速度很快;

Topic:队列需要绑定一种模式,通过模式匹配消息的路由键属性,消息的路由器和字符串用“.”分割为两部分,队列的模式会识别“#”和“*”两个通配符;

二. 简单实例

1. 消息生产者

ConnectionFactory factory = new ConnectionFactory(); // 创建连接工厂
factory.setUsername(""); 
factory.setPassword("");
factory.setHost(""); // 设置rabbitMQ地址
factory.setVirtualHost("");
Connection conn = factory.newConnection(); // 建立到代理服务器的连接
Channel channel = conn.creatChannel(); // 创建信道
String exchangeName = ""; // 声明交换器
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "routingKey";
byte[] msg = "".getBytes();
channel.basicPublish(exchangeName, routingKey, null, msg); // 发布消息
channel.close();
conn.close();

2. 消息消费者

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("");
factory.setPassword("");
factory.setHost("");
factory.setVirtualHost("");
Connection conn = factory.newConnection();
Channel channel = conn.creatChannel();
String exchangeName = "";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "routingKey";
String queneName = channel.queueDeclare().getQqueue(); // 声明队列
channel.queueBind(queneName, exchangeName, routingKey); // 绑定队列,通过路由键将队列和交换器绑定

while(true) {
    boolean autoAck = false;
    String consumerTag = "";
    channel.basicConsume(queneName, autoAck, consumerTag, new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String routingKey = envelope.getRoutingKey(); // 路由键
            String contentType = properties.getContentType(); // 内容类型
            long deliveryTag = envelope.getDeliveryTag();
            channel.basicAck(deliveryTag, false); // 确认消息
            String bodyStr = new String(body, "UTF-8"); // 消息体内容
        }
    });
}

3. Spring整合

在生产者和消费者中有很多重复代码,很多都是配置信息,基于此,Spring框架集成了RabbitMQ,用于简化使用操作;

主要API:

MessageListenerContainer:用来监听容器,为消息入队提供异步处理;

RabbitTemplate:用来发送和接收消息;

RabbitAdmin:用来声明队列,交换器,绑定;

三. 实践建议

1. 虚拟主机

客户端在连接消息服务器时必须指定一个虚拟主机,RabbitMQ中的权限控制是以vhost为单位的;

这种方式既能把同一台RabbitMQ服务器的不同业务应用区分开,又可以避免其内部队列,交换器的命名冲突;

可以通过RabbitMQ提供的rabbitmqctl工具管理vhost;

2. 消息保存

消息的保存方式有disk和RAM两种;

disk:一种情况是发布消息时指明需要写入磁盘;二是当服务器内存紧张时会将部分内存中的消息转移到磁盘;

RAM:只在RAM中保存内部数据库表数据,不会保存消息,消息存储索引,队列索引和其他节点状态等数据;

3. 消息确认模式

生产者确认消息正确到达broker的方式:通过AMQP协议中的事务机制;把信道设置为确认模式;

事务机制需要生产者应用同步等待broker的执行结果,在性能上极大降低消息服务器的吞吐量;

确认模式会把信道上发布的消息都分配一个唯一ID,消息被成功投递后,信道会向生产者发布包括ID的确认消息,异步进行,对性能影响小;

4. 消费者应答

要求消费者在消费完消息后发送一个回执给RabbitMQ服务器;

自动回执:服务器成功发送消息给消费者后会立即把消息从队列中删除;

手动回执:等到消费者回送的确认消息(向Broker显式发送ACK)后才会删除,如果因为意外没有发送ACK,Broker会把消息转发给其他消费者,如果没有则缓存起来指定有新的消费者注册;

当消费者处理消息失败或当期不能处理消息时,可以给broker发送一个拒绝一条或多条消息的指令,要求broker将消息丢弃或重新放入队列中;

可通过设置预取数量(Prefetch Count)限制每个消费者在发送ACK回执前一次最多可以接收多少条消息;

 

了解RabbitMQ

原文:https://www.cnblogs.com/bbbbs/p/12540648.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!