首页 > 其他 > 详细

activemq

时间:2019-03-10 20:44:28      阅读:154      评论:0      收藏:0      [点我收藏+]

一、概述

  1.消息中间件:主要解决应用的解耦、异步消息、减轻流量、实现高可用,可伸缩,和最终一致性架构等问题。目前有ActiveMQ、RabbitMQ、Kafka、RocketMQ、ZeroMQ、MetaMQ等。

  2.JMS规范:Java Message Service 是Java中面向消息中间件的api。

  3.组成

    -> 提供者:实现JMS规范的消息中间件服务器,即:存放消息的容器(队列)。

    ->客户端:接收、发送消息的程序。

    ->生产者/发布者:创建并发送消息的客户端,即:向消息容器发送消息。

    ->消费者/订阅者:接收处理消息的客户端。

    ->消息:应用之间传递数据的内容。

    ->消息模式:客户端之间传递消息的方式,JMS中规定了2中方式,点对点(一对一)、发布订阅(一对多)【官方称为主题和队列两种】。

  4.实现原理

                     技术分享图片

                            技术分享图片

  5.针对点对点方式,若有多个消费者对同一个消息进行消费 采用均摊消费。

  6.发布订阅(topic)模式 需要先启动订阅然后在发布 才能获取。  

一、ActivityMq(使用比较少)

   =================================================================================

    注意:生成者以事务方式提交并且commit(),消费以事务接收没有commit()时。第一次消费时:可以接收到消息,但是不会标记为已消费状态,第二次消费时:如果有生产者有新的消息继续发送,消费者接收此时添加commit()后此时才会标记为已消费状态。

   1.点对点(生成者)

 1 package com.mq.activity.point_to_point;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.DeliveryMode;
 5 import javax.jms.MessageProducer;
 6 import javax.jms.Queue;
 7 import javax.jms.Session;
 8 import javax.jms.TextMessage;
 9 
10 import org.apache.activemq.ActiveMQConnectionFactory;
11 
12 public class Producer {
13 
14     private static final String ADDRESS_URL = "tcp://127.0.0.1:61616";
15     private static final String QUERY_NAME = "quere_test";
16     
17     public static void main(String[] args) throws Exception {
18         
19         // 创建连接工厂
20         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ADDRESS_URL);
21         // 创建连接并开启
22         Connection connection = factory.createConnection();
23         connection.start();
24         // 创立连接回话  参数1 表示是否以事务方式提交,参数2消息的接收方式:自动接收(类似UDP协议)
25         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
26         // 创建目标队列
27         Queue queue = session.createQueue(QUERY_NAME);
28         // 创建生成者
29         MessageProducer producer = session.createProducer(queue);
30         // 设置消息是否持久化到本地文件中 默认不保存
31 //        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
32         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
33         // 创建消息
34         TextMessage message = null;
35         for (int i = 1; i <= 100; i++) {
36             message = session.createTextMessage("product start " + i + " ...");
37             producer.send(message);
38             // connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 如果以事务提交这里需要commit
39             // session.commit();
40         }
41         
42         message = null;
43         // 关闭连接
44         connection.close();
45     }
46     
47 }

  消费者

 1 public class Consumer {
 2 
 3     private static final String ADDRESS_URL = "tcp://127.0.0.1:61616";
 4     private static final String QUERY_NAME = "quere_test";
 5     
 6     public static void main(String[] args) throws Exception {
 7         // 创建连接工厂
 8         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ADDRESS_URL);
 9         // 创建连接并开启
10         Connection connection = factory.createConnection();
11         connection.start();
12         // 创立连接回话  参数1 表示是否以事务方式提交,参数2消息的接收方式:自动接收(类似UDP协议)
13         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
14         // 创建目标队列
15         Queue queue = session.createQueue(QUERY_NAME);
16         // 创建消费者
17         MessageConsumer consumer = session.createConsumer(queue);
18         // 创建消息监听(基于观察者模式)
19         consumer.setMessageListener(new MessageListener() {
20             
21             public void onMessage(Message message) {
22                 TextMessage messate = (TextMessage) message;
23                 try {
24                     System.out.println("consumer accept message is " + messate.getText());
25                     // 手动签收
26                     // messate.acknowledge();
27                     // 如消费者以事务提交,需要commit
28                     // session.commit();
29                 } catch (JMSException e) {
30                     e.printStackTrace();
31                 }
32             }
33         });
34         // 基于长连接 不需要关闭连接
35     }
36 }

  2.发布订阅

 1 public class ProductTopic {
 2 
 3     private static final String ADDRESS_URL = "tcp://127.0.0.1:61616";
 4     private static final String TOPIC_NAME = "topic_test";
 5     
 6     public static void main(String[] args) throws Exception {
 7         
 8         // 创建连接工厂
 9         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ADDRESS_URL);
10         // 创建连接并开启
11         Connection connection = factory.createConnection();
12         connection.start();
13         // 创立连接回话  参数1 表示是否以事务方式提交,参数2消息的接收方式:自动接收(类似UDP协议)
14         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
15         // 创建目标队列
16         Topic topic = session.createTopic(TOPIC_NAME);
17         // 创建生成者
18         MessageProducer producer = session.createProducer(topic);
19         // 创建消息
20         TextMessage message = null;
21         for (int i = 1; i <= 100; i++) {
22             message = session.createTextMessage("product start " + i + " ...");
23             producer.send(message);
24         }
25         message = null;
26         // 关闭连接
27         connection.close();
28     }
29     
30 }

  消费者

 1 public class ConsumerTopic {
 2 
 3     private static final String ADDRESS_URL = "tcp://127.0.0.1:61616";
 4     private static final String TOPIC_NAME = "topic_test";
 5     
 6     public static void main(String[] args) throws Exception {
 7         // 创建连接工厂
 8         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ADDRESS_URL);
 9         // 创建连接并开启
10         Connection connection = factory.createConnection();
11         connection.start();
12         // 创立连接回话  参数1 表示是否以事务方式提交,参数2消息的接收方式:自动接收(类似UDP协议)
13         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
14         // 创建目标队列
15         Topic topic = session.createTopic(TOPIC_NAME);
16         // 创建消费者
17         MessageConsumer consumer = session.createConsumer(topic);
18         // 创建消息监听(基于观察者模式)
19         consumer.setMessageListener(new MessageListener() {
20             
21             public void onMessage(Message message) {
22                 TextMessage messate = (TextMessage) message;
23                 try {
24                     System.out.println("consumer accept message is " + messate.getText());
25                     // 手动签收
26                 } catch (JMSException e) {
27                     e.printStackTrace();
28                 }
29             }
30         });
31         // 基于长连接 不需要关闭连接
32     }
33     
34 }

springboot整合

 pom文件点对点和订阅发布一样

 1 <parent>
 2         <groupId>org.springframework.boot</groupId>
 3         <artifactId>spring-boot-starter-parent</artifactId>
 4         <version>2.0.1.RELEASE</version>
 5     </parent>
 6 
 7     <!-- 管理依赖 -->
 8     <dependencyManagement>
 9         <dependencies>
10             <dependency>
11                 <groupId>org.springframework.cloud</groupId>
12                 <artifactId>spring-cloud-dependencies</artifactId>
13                 <version>Finchley.M7</version>
14                 <type>pom</type>
15                 <scope>import</scope>
16             </dependency>
17         </dependencies>
18     </dependencyManagement>
19     <dependencies>
20         <!-- SpringBoot整合Web组件 -->
21         <dependency>
22             <groupId>org.springframework.boot</groupId>
23             <artifactId>spring-boot-starter-web</artifactId>
24         </dependency>
25         <!-- SpringBoot Activemq -->
26         <dependency>
27             <groupId>org.springframework.boot</groupId>
28             <artifactId>spring-boot-starter-activemq</artifactId>
29         </dependency>
30         <dependency>
31             <groupId>com.alibaba</groupId>
32             <artifactId>fastjson</artifactId>
33             <version>1.2.49</version>
34         </dependency>
35         <dependency>
36             <groupId>org.springframework.boot</groupId>
37             <artifactId>spring-boot-starter-mail</artifactId>
38         </dependency>
39 
40         <dependency>
41             <groupId>org.apache.commons</groupId>
42             <artifactId>commons-lang3</artifactId>
43         </dependency>
44 
45     </dependencies>
46     <!-- 注意: 这里必须要添加, 否者各种依赖有问题 -->
47     <repositories>
48         <repository>
49             <id>spring-milestones</id>
50             <name>Spring Milestones</name>
51             <url>https://repo.spring.io/libs-milestone</url>
52             <snapshots>
53                 <enabled>false</enabled>
54             </snapshots>
55         </repository>
56     </repositories>

  springboot 点对点 application.yml

 1 spring:
 2   activemq:
 3   ###MQ连接通讯地址
 4     broker-url: tcp://127.0.0.1:61616
 5   ###账号
 6     user: admin
 7   ###密码  
 8     password: admin
 9 
10 
11 ###自定义队列    
12 query_name: springboot-queue
13 server:
14   port: 8082

springboot 点对点 生产者

 1 @Component
 2 public class Config {
 3     
 4     @Value("${query_name}")
 5     private String queryName;
 6     
 7     @Bean
 8     public Queue queue() {
 9         return new ActiveMQQueue(queryName);
10     }
11 
12 }
 1 @Component
 2 public class Produce {
 3 
 4     @Autowired
 5     private Queue queue;
 6     
 7     @Autowired
 8     private JmsMessagingTemplate jmsMessagingTemplate;
 9     
10     @Scheduled(fixedDelay = 5000)
11     public void send() {
12         String msg = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
13         jmsMessagingTemplate.convertAndSend(queue, msg);
14     }
15 }

springboot 点对点 消费者

1 @Component
2 public class Consumer {
3     
4     @JmsListener(destination = "${query_name}")
5     public void accept(String msg) {
6         System.out.println("consumer accept " + msg + "...");
7     }
8     
9 }
 1 spring:
 2   activemq:
 3   ###MQ连接通讯地址
 4     broker-url: tcp://127.0.0.1:61616
 5   ###账号
 6     user: admin
 7   ###密码  
 8     password: admin
 9 
10 
11 ###自定义队列    
12 query_name: springboot-queue
13 server:
14   port: 8083

springboot 发布订阅 生产者

 1 spring:
 2   activemq:
 3   ###MQ连接通讯地址
 4     broker-url: tcp://127.0.0.1:61616
 5   ###账号
 6     user: admin
 7   ###密码  
 8     password: admin
 9 
10 
11 ###自定义队列    
12 query_name: springboot-topic
13 server:
14   port: 8083
 1 @Component
 2 public class Config {
 3     
 4     @Value("${query_name}")
 5     private String queryName;
 6     
 7     @Bean
 8     public Topic queue() {
 9         return new ActiveMQTopic(queryName);
10     }
11 
12 }
 1 @Component
 2 public class Produce {
 3 
 4     @Autowired
 5     private Topic topic;
 6     
 7     @Autowired
 8     private JmsMessagingTemplate jmsMessagingTemplate;
 9     
10     @Scheduled(fixedDelay = 5000)
11     public void send() {
12         String msg = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
13         jmsMessagingTemplate.convertAndSend(topic, msg);
14     }
15 }

springboot 发布订阅 消费者

 1 spring:
 2   activemq:
 3   ###MQ连接通讯地址
 4     broker-url: tcp://127.0.0.1:61616
 5   ###账号
 6     user: admin
 7   ###密码  
 8     password: admin
 9 ### springboot默认没有开启发布订阅权限
10   jms:
11     pub-sub-domain: true
12 
13 ###自定义队列    
14 query_name: springboot-topic
15 server:
16   port: 8082
1 @Component
2 public class Consumer {
3     
4     @JmsListener(destination = "${query_name}")
5     public void accept(String msg) {
6         System.out.println("consumer accept " + msg + "...");
7     }
8     
9 }

 

activemq

原文:https://www.cnblogs.com/0ziyu0/p/10506040.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!