一、概述
1.消息中间件:主要解决应用的解耦、异步消息、减轻流量、实现高可用,可伸缩,和最终一致性架构等问题。目前有ActiveMQ、RabbitMQ、Kafka、RocketMQ、ZeroMQ、MetaMQ等。
2.JMS规范:Java Message Service 是Java中面向消息中间件的api。
3.组成
-> 提供者:实现JMS规范的消息中间件服务器,即:存放消息的容器(队列)。
->客户端:接收、发送消息的程序。
->生产者/发布者:创建并发送消息的客户端,即:向消息容器发送消息。
->消费者/订阅者:接收处理消息的客户端。
->消息:应用之间传递数据的内容。
->消息模式:客户端之间传递消息的方式,JMS中规定了2中方式,点对点(一对一)、发布订阅(一对多)【官方称为主题和队列两种】。
4.实现原理
5.针对点对点方式,若有多个消费者对同一个消息进行消费 采用均摊消费。
6.发布订阅(topic)模式 需要先启动订阅然后在发布 才能获取。
一、ActivityMq(使用比较少)
=================================================================================
注意:生成者以事务方式提交并且commit(),消费以事务接收没有commit()时。第一次消费时:可以接收到消息,但是不会标记为已消费状态,第二次消费时:如果有生产者有新的消息继续发送,消费者接收此时添加commit()后此时才会标记为已消费状态。
1.点对点(生成者)
1 package com.mq.activity.point_to_point; 2 3 import javax.jms.Connection; 4 import javax.jms.DeliveryMode; 5 import javax.jms.MessageProducer; 6 import javax.jms.Queue; 7 import javax.jms.Session; 8 import javax.jms.TextMessage; 9 10 import org.apache.activemq.ActiveMQConnectionFactory; 11 12 public class Producer { 13 14 private static final String ADDRESS_URL = "tcp://127.0.0.1:61616"; 15 private static final String QUERY_NAME = "quere_test"; 16 17 public static void main(String[] args) throws Exception { 18 19 // 创建连接工厂 20 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ADDRESS_URL); 21 // 创建连接并开启 22 Connection connection = factory.createConnection(); 23 connection.start(); 24 // 创立连接回话 参数1 表示是否以事务方式提交,参数2消息的接收方式:自动接收(类似UDP协议) 25 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 26 // 创建目标队列 27 Queue queue = session.createQueue(QUERY_NAME); 28 // 创建生成者 29 MessageProducer producer = session.createProducer(queue); 30 // 设置消息是否持久化到本地文件中 默认不保存 31 // producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 32 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 33 // 创建消息 34 TextMessage message = null; 35 for (int i = 1; i <= 100; i++) { 36 message = session.createTextMessage("product start " + i + " ..."); 37 producer.send(message); 38 // connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 如果以事务提交这里需要commit 39 // session.commit(); 40 } 41 42 message = null; 43 // 关闭连接 44 connection.close(); 45 } 46 47 }
消费者
1 public class Consumer { 2 3 private static final String ADDRESS_URL = "tcp://127.0.0.1:61616"; 4 private static final String QUERY_NAME = "quere_test"; 5 6 public static void main(String[] args) throws Exception { 7 // 创建连接工厂 8 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ADDRESS_URL); 9 // 创建连接并开启 10 Connection connection = factory.createConnection(); 11 connection.start(); 12 // 创立连接回话 参数1 表示是否以事务方式提交,参数2消息的接收方式:自动接收(类似UDP协议) 13 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 14 // 创建目标队列 15 Queue queue = session.createQueue(QUERY_NAME); 16 // 创建消费者 17 MessageConsumer consumer = session.createConsumer(queue); 18 // 创建消息监听(基于观察者模式) 19 consumer.setMessageListener(new MessageListener() { 20 21 public void onMessage(Message message) { 22 TextMessage messate = (TextMessage) message; 23 try { 24 System.out.println("consumer accept message is " + messate.getText()); 25 // 手动签收 26 // messate.acknowledge(); 27 // 如消费者以事务提交,需要commit 28 // session.commit(); 29 } catch (JMSException e) { 30 e.printStackTrace(); 31 } 32 } 33 }); 34 // 基于长连接 不需要关闭连接 35 } 36 }
2.发布订阅
1 public class ProductTopic { 2 3 private static final String ADDRESS_URL = "tcp://127.0.0.1:61616"; 4 private static final String TOPIC_NAME = "topic_test"; 5 6 public static void main(String[] args) throws Exception { 7 8 // 创建连接工厂 9 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ADDRESS_URL); 10 // 创建连接并开启 11 Connection connection = factory.createConnection(); 12 connection.start(); 13 // 创立连接回话 参数1 表示是否以事务方式提交,参数2消息的接收方式:自动接收(类似UDP协议) 14 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 15 // 创建目标队列 16 Topic topic = session.createTopic(TOPIC_NAME); 17 // 创建生成者 18 MessageProducer producer = session.createProducer(topic); 19 // 创建消息 20 TextMessage message = null; 21 for (int i = 1; i <= 100; i++) { 22 message = session.createTextMessage("product start " + i + " ..."); 23 producer.send(message); 24 } 25 message = null; 26 // 关闭连接 27 connection.close(); 28 } 29 30 }
消费者
1 public class ConsumerTopic { 2 3 private static final String ADDRESS_URL = "tcp://127.0.0.1:61616"; 4 private static final String TOPIC_NAME = "topic_test"; 5 6 public static void main(String[] args) throws Exception { 7 // 创建连接工厂 8 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ADDRESS_URL); 9 // 创建连接并开启 10 Connection connection = factory.createConnection(); 11 connection.start(); 12 // 创立连接回话 参数1 表示是否以事务方式提交,参数2消息的接收方式:自动接收(类似UDP协议) 13 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 14 // 创建目标队列 15 Topic topic = session.createTopic(TOPIC_NAME); 16 // 创建消费者 17 MessageConsumer consumer = session.createConsumer(topic); 18 // 创建消息监听(基于观察者模式) 19 consumer.setMessageListener(new MessageListener() { 20 21 public void onMessage(Message message) { 22 TextMessage messate = (TextMessage) message; 23 try { 24 System.out.println("consumer accept message is " + messate.getText()); 25 // 手动签收 26 } catch (JMSException e) { 27 e.printStackTrace(); 28 } 29 } 30 }); 31 // 基于长连接 不需要关闭连接 32 } 33 34 }
springboot整合
pom文件点对点和订阅发布一样
1 <parent> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-parent</artifactId> 4 <version>2.0.1.RELEASE</version> 5 </parent> 6 7 <!-- 管理依赖 --> 8 <dependencyManagement> 9 <dependencies> 10 <dependency> 11 <groupId>org.springframework.cloud</groupId> 12 <artifactId>spring-cloud-dependencies</artifactId> 13 <version>Finchley.M7</version> 14 <type>pom</type> 15 <scope>import</scope> 16 </dependency> 17 </dependencies> 18 </dependencyManagement> 19 <dependencies> 20 <!-- SpringBoot整合Web组件 --> 21 <dependency> 22 <groupId>org.springframework.boot</groupId> 23 <artifactId>spring-boot-starter-web</artifactId> 24 </dependency> 25 <!-- SpringBoot Activemq --> 26 <dependency> 27 <groupId>org.springframework.boot</groupId> 28 <artifactId>spring-boot-starter-activemq</artifactId> 29 </dependency> 30 <dependency> 31 <groupId>com.alibaba</groupId> 32 <artifactId>fastjson</artifactId> 33 <version>1.2.49</version> 34 </dependency> 35 <dependency> 36 <groupId>org.springframework.boot</groupId> 37 <artifactId>spring-boot-starter-mail</artifactId> 38 </dependency> 39 40 <dependency> 41 <groupId>org.apache.commons</groupId> 42 <artifactId>commons-lang3</artifactId> 43 </dependency> 44 45 </dependencies> 46 <!-- 注意: 这里必须要添加, 否者各种依赖有问题 --> 47 <repositories> 48 <repository> 49 <id>spring-milestones</id> 50 <name>Spring Milestones</name> 51 <url>https://repo.spring.io/libs-milestone</url> 52 <snapshots> 53 <enabled>false</enabled> 54 </snapshots> 55 </repository> 56 </repositories>
springboot 点对点 application.yml
1 spring: 2 activemq: 3 ###MQ连接通讯地址 4 broker-url: tcp://127.0.0.1:61616 5 ###账号 6 user: admin 7 ###密码 8 password: admin 9 10 11 ###自定义队列 12 query_name: springboot-queue 13 server: 14 port: 8082
springboot 点对点 生产者
1 @Component 2 public class Config { 3 4 @Value("${query_name}") 5 private String queryName; 6 7 @Bean 8 public Queue queue() { 9 return new ActiveMQQueue(queryName); 10 } 11 12 }
1 @Component 2 public class Produce { 3 4 @Autowired 5 private Queue queue; 6 7 @Autowired 8 private JmsMessagingTemplate jmsMessagingTemplate; 9 10 @Scheduled(fixedDelay = 5000) 11 public void send() { 12 String msg = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase(); 13 jmsMessagingTemplate.convertAndSend(queue, msg); 14 } 15 }
springboot 点对点 消费者
1 @Component 2 public class Consumer { 3 4 @JmsListener(destination = "${query_name}") 5 public void accept(String msg) { 6 System.out.println("consumer accept " + msg + "..."); 7 } 8 9 }
1 spring: 2 activemq: 3 ###MQ连接通讯地址 4 broker-url: tcp://127.0.0.1:61616 5 ###账号 6 user: admin 7 ###密码 8 password: admin 9 10 11 ###自定义队列 12 query_name: springboot-queue 13 server: 14 port: 8083
springboot 发布订阅 生产者
1 spring: 2 activemq: 3 ###MQ连接通讯地址 4 broker-url: tcp://127.0.0.1:61616 5 ###账号 6 user: admin 7 ###密码 8 password: admin 9 10 11 ###自定义队列 12 query_name: springboot-topic 13 server: 14 port: 8083
1 @Component 2 public class Config { 3 4 @Value("${query_name}") 5 private String queryName; 6 7 @Bean 8 public Topic queue() { 9 return new ActiveMQTopic(queryName); 10 } 11 12 }
1 @Component 2 public class Produce { 3 4 @Autowired 5 private Topic topic; 6 7 @Autowired 8 private JmsMessagingTemplate jmsMessagingTemplate; 9 10 @Scheduled(fixedDelay = 5000) 11 public void send() { 12 String msg = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase(); 13 jmsMessagingTemplate.convertAndSend(topic, msg); 14 } 15 }
springboot 发布订阅 消费者
1 spring: 2 activemq: 3 ###MQ连接通讯地址 4 broker-url: tcp://127.0.0.1:61616 5 ###账号 6 user: admin 7 ###密码 8 password: admin 9 ### springboot默认没有开启发布订阅权限 10 jms: 11 pub-sub-domain: true 12 13 ###自定义队列 14 query_name: springboot-topic 15 server: 16 port: 8082
1 @Component 2 public class Consumer { 3 4 @JmsListener(destination = "${query_name}") 5 public void accept(String msg) { 6 System.out.println("consumer accept " + msg + "..."); 7 } 8 9 }
原文:https://www.cnblogs.com/0ziyu0/p/10506040.html