Rabbitmq学习以及基本接口封装
基于工程效率团队线上已有消息中间件在使用,为了统一技术方案,所以采用rabbitmq作为消息中间件。rocketmq和rabbitmq这两种消息中间件略有区别:
Rocketmq |
Rabbitmq |
Java开发,便于二次开发 |
Erlang语言开发 |
管理后台:cluster、topic、producer、consumer、nameserver。无登录账号密码 |
管理后台:connection、channel、exchange、queueus、admin。有登陆账号密码 |
Broker向Nameserver注册topic,producer从namesrver获取topic,进而向关联broker发送消息 |
通过exchange绑定不同queues进行消息派送 |
消费端支持集群消费和广播消费;广播消费下,同一个consumergroup下所有消费实例都会共享消费消息内容 |
同一个queue下所有消费实例会均分消息内容 |
支持事务消息 |
不支持事务消息 |
Broker支持通过tag过滤 |
Exchange绑定特定queues,每个queue为特定服务使用,两个不同服务如果采用同一个queue,那么在从这个队列进行消息消费时只能通过消息具体内容进行区分 |
通过将需要顺序的消息发送到同一个queue里保证顺序消息的功能 |
Exchange绑定固定的queue,实现顺序消息 |
接口封装步骤:
pom添加依赖
通用依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>
添加rabbitmq服务配置信息
# rabbitmq配置
spring:
rabbitmq:
addresses: 172.16.4.201
username: admin
password: Pass@1234
Rabbitmq三种交换机模式:
a) Direct Exchange
(直连):传递时需要一个Routekey,通过Routkey寻找对应队列;这种模式Exchange不需要绑定(binding)queue
例子:
BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
b) Fanout Exchange
(广播):任何发送到Fanout Exchange的消息都会转发到与该Exchange绑定(bingding)的所有queue上;这种模式不需要routkey
例子:BindingBuilder.bind(queueA).to(fanoutExchange);
c) Topic Exchange
可以理解为direct exchange+fanout exchange;任何发送到Exchange的消息都会被转发所有关心Routekey中所指定的queue上;
需要指定routekey;exchange需要绑定(binding)queue;
模糊匹配:#表示0个或若干个关键字, “”表示一个关键字。如log.”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
例子:BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
针对不同交换机模式具体rabbitmq配置文件如下:
public class RabbitConfig {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
//rabbitmq连接池
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
//rabbitTemplate用以简化rabbitmq发送和接收消息
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
// 必须是prototype类型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
/*
* 1、Direct Exchange(直连):传递时需要一个Routekey,通过Routkey寻找对应queue
*/
//获取DirectExchange类型交换机
//交换机-exchange
public static final String Direct_Exchange = "DIERCT_EXCHANGE";
//队列-queue
public static final String Direct_Queue_A = "Direct_QUEUE_A";
public static final String Direct_Queue_B = "Direct_QUEUE_B";
//路由关键字-routingkey
public static final String Direct_RoutingKey_A = "DIERCT_ROUTINGKEY_A";
public static final String Direct_RoutingKey_B = "DIERCT_ROUTINGKEY_B";
@Bean
public DirectExchange directExchange() {
return new DirectExchange(Direct_Exchange);
}
//获取queue
@Bean
public Queue queueA() {
return new Queue(Direct_Queue_A);
}
@Bean
public Queue queueB() {
return new Queue(Direct_Queue_B);
}
//传递routkey给exchange,将queue绑定到exchange上;可以将多个队列绑定到同一个exchange上;
//在生产者发送时需要routkey,格式:RabbitTemplate.convertAndSend(EXCHANGE, ROUTINGKEY, content);
@Bean
public Binding directBindingA() {
return BindingBuilder.bind(queueA()).to(directExchange()).with(Direct_RoutingKey_A);
}
@Bean
public Binding directBindingB() {
return BindingBuilder.bind(queueB()).to(directExchange()).with(Direct_RoutingKey_B);
}
/*
* 2、Fanout Exchange(广播):任何发送到Fanout Exchange的消息都会转发到与该Exchange绑定(bingding)的所有queue上;这种模式不需要routkey
*/
static final String Fanout_Exchange="FANOUT_EXCHANGE";
static final String Fanout_Queue_A="FANOUT_QUEUE_A";
static final String Fanout_Queue_B="FANOUT_QUEUE_B";
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(Fanout_Exchange);
}
@Bean
public Queue fanoutQueueA() {
return new Queue(Fanout_Queue_A);
}
@Bean
public Queue fanoutQueueB() {
return new Queue(Fanout_Queue_B);
}
//广播方式交换机与queue绑定无需routekey
//生产者发送时routkey为空,格式rabbitTemplate.convertAndSend(Exchange,"", content);这样可以将消息广播到在RabbitConfig类中所有绑定的queues上
@Bean
public Binding fanoutBindingA() {
return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
}
@Bean
public Binding fanoutBingB() {
return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
}
/*
* 3、Topic Exchange:任何发送到Exchange的消息都会被转发所有关心Routekey中所指定的queue上
*/
static final String Topic_Exchange="TOPIC_EXCHANGE";
static final String Topic_Queue_A="TOPIC_QUEUE_A";
static final String Topic_Queue_B="TOPIC_QUEUE_B";
static final String Topic_Routing_KeyA="TOPIC_#";
static final String Topic_Routing_KeyB="TOPIC_*";
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(Topic_Exchange);
}
@Bean
public Queue topicQueueA() {
return new Queue(Topic_Queue_A);
}
@Bean
public Queue topicQueueB() {
return new Queue(Topic_Queue_B);
}
@Bean
public Binding topicBingA() {
return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(Topic_Routing_KeyA);
}
@Bean
public Binding topicBingB() {
return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(Topic_Routing_KeyB);
}
}
消息发送接口:针对不同交换机类型,发送方法参数略有不同
@Component
public class RabbitSender {
@Autowired
private AmqpTemplate rabbitTemplate;
/*
* directExchange类型 需指定routingkey
*/
public void directSend(String sendMessage) {
// 注意 第一个参数是我们交换机的名称 ,第二个参数是routerKey 我们不用管空着就可以,第三个是你要发送的消息
this.rabbitTemplate.convertAndSend(RabbitConfig.Direct_Exchange, RabbitConfig.Direct_RoutingKey_A, sendMessage); // exchange,routkey,message
}
/*
* fanoutExchange类型 无需指定routingkey
*/
public void fanoutSend(String sendMessage) {
for(int i=0;i<1;i++) {
rabbitTemplate.convertAndSend(RabbitConfig.Fanout_Exchange, "", sendMessage+i);
}
}
/*
* topicExchange类型 需指定routingkey
*/
public void topicSend(String sendMessage) {
rabbitTemplate.convertAndSend(RabbitConfig.Topic_Exchange, RabbitConfig.Topic_Routing_KeyA, sendMessage);
}
}
监听器 可以监听某个或者某些queue
@Component
@RabbitListener(queues = {RabbitConfig.Direct_Queue_A})
public class Consumer1 {
/**
* 消息消费
* @RabbitHandler 代表此方法为接受到消息后的处理方法
*/
@RabbitHandler
public void recieved(String message) {
System.out.println("--------:"+JSON.toJSONString(message));
}
}
基于docker rocketmq安装&rocketmq基本接口封装
Rocketmq单机现已部署到qa环境:http://30.16.80.9:8080/#/;
Rocketmq接口封装代码以提交到cap/backed/develop-v2.0分支
具体rocketmq部署以及接口总结文档如下:
一、基于docker rocketmq安装:
1、 拉取rocketmq镜像
docker pull rocketmqinc/rocketmq
2、 拉取rocketmq-console镜像
docker pull styletang/rocketmq-console-ng
3、 启动nameserver
docker run -d -p 9876:9876 -v `pwd`/data/namesrv/logs:/root/logs -v `pwd`/data/namesrv/store:/root/store --name rmqnamesrv rocketmqinc/rocketmq sh mqnamesrv
4、 启动broker
docker run -d -p 10911:10911 -p 10909:10909 -v `pwd`/data/broker/logs:/root/logs -v `pwd`/data/broker/store:/root/store --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" rocketmqinc/rocketmq sh mqbroker -c ../conf/broker.conf
由于启动broker时rocketmq默认指定为内网地址,会导致外网无法连接到broker,报出如下错误信息: connect to xxx.xx.xx.xx:10911 failed
解决方案:①docker exec -it xxxxx bash --xxx是指broker对应的containerid
②cd ../conf
③vi broker.conf
④增加brokerIP1 = xxx.xxx.xxx.xxx --这里的ip地址指定为外网地址
⑤重启broker容器
5、 启动rocketmq-console
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=xxx.xxx.xxx.xxx:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng
--- xxx.xxx.xxx.xxx 为服务器地址
二、rocketmq接口封装
1、 rocketmq结构
2、 rocketmq实例
a) 消费端
i. 普通发送消息
public SendResult send(String topic, String tag, String sendMsg) {
log.info("SendMessage_topic:" + topic + "SendMessage_tag:" + tag + ",sendMsg:" + sendMsg);
SendResult sendResult = null;
Message msgMessage = null;
if (StringUtils.isBlank(topic)) {
throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, "topics is null !!!", false);
}
try {
if (StringUtils.isBlank(tag)) {
msgMessage = new Message(topic, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));
} else {
msgMessage = new Message(topic, tag, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));
}
sendResult = producer.send(msgMessage);
} catch (Exception e) {
log.error(e.toString());
}
return sendResult;
}
ii. 顺序消息发送
RocketMQ中同一个队列不能被并行消费,但可以并行消费多个队列。基于此,Rocket可以保证将需要排序的内容放在同一个队列中便可以保证消费的顺序进行
public SendResult sendOrderly(String orderId, String topic, String tag, String sendMsg) {
log.info("Orderly SendMessage_topic:" + topic + "SendMessage_tag:" + tag + ",sendMsg:" + sendMsg);
SendResult sendResult = null;
Message msgMessage = null;
if (StringUtils.isBlank(topic)) {
throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, "topics is null !!!", false);
}
try {
if (StringUtils.isBlank(tag)) {
msgMessage = new Message(topic, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));
} else {
msgMessage = new Message(topic, tag, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));
}
sendResult = producer.send(msgMessage, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
} catch (Exception e) {
log.error(e.toString());
}
return sendResult;
}
b) 消息监听器
i.顺序消费 实现MessageListenerOrderly接口
public class MQConsumeMsgOrderlyListener implements MessageListenerOrderly {
@Value("${rocketmq.consumer.topic}")
private String topic;
@Value("${rocketmq.consumer.tag}")
private String tag;
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
ConsumeOrderlyStatus result=ConsumeOrderlyStatus.SUCCESS;
if (StringUtils.isBlank(topic)) {
throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, "topics is null !!!", false);
}
for(MessageExt msg:msgs) {
try {
if(StringUtils.isNotBlank(msg.getTopic()) && msg.getTopic().equals(topic)) { //TODO 这个需要么,consumer订阅会指定topic
String Message = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
log.info("MQConsumeMsgListenerProcessor consumeMessage body:" + Message);
// TODO 根据接收到mq消息内容进行其他操作
return result;
}
}catch(Exception e) {
result = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return null;
}
ii. 并发消费 实现MessageListenerConcurrently接口
public class MQConsumeMsgConcurrentListener implements MessageListenerConcurrently {
@Value("${rocketmq.consumer.topic}")
private String topic;
@Value("${rocketmq.consumer.tag}")
private String tag;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
ConsumeConcurrentlyStatus result = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
if (StringUtils.isBlank(topic)) {
throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, "topics is null !!!", false);
}
for (MessageExt msg : msgs) {
try {
if (StringUtils.isNotBlank(msg.getTopic()) && msg.getTopic().equals(topic)) { // TODO
String Message = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
log.info("MQConsumeMsgListenerProcessor consumeMessage body:" + Message);
// TODO 根据接收到mq消息内容进行其他操作
return result;
}
} catch (Exception e) {
result = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return result;
}
}
c) 消费端
i.) 集群消费
一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息;
在初始化消费者时将consumer设置为集群消费模式:consumer.setMessageModel(MessageModel.CLUSTERING);
ii.) 广播消费
一条消息被多个Consumer消费,几十这些Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer消费一次;
在初始化消费者时将consumer设置为广播消费模式:
consumer.setMessageModel(MessageModel.BROADCASTING);