一、ActivityMq的介绍:
1.什么是消息中间件?与传统的传输通讯有什么区别?
异步,无需等待,消息存放在队列里面。
2.为什么要使用消息中间件?
消息中间件可以解决高并发。
两种通讯方式:01.点对点通讯。(Queue)
02.发布订阅。(Topic)
生产者:发送消息,提供接口的,主要向队列中发送消息
队列:存放消息地址
消息:发送的报文信息
消费者:调用接口的,主要从队列中获取消息
3.步骤:
01、生产者向队列进行发送消息,如果消费者不在,队列就会将消息缓存
02、消费者向队列中获取到消息之后,消费成功之后, 该消息队列直接被清除掉。(不清除就会产生重复消费问题)
4.生产者向队列中生产高并发流量,消费者会不会挂掉?
不会,因为队列会缓存消息。
5.为什么MQ能够解决高并发问题?
不会立马处理那么多的消息,队列会进行缓存,进行排队。
6.JMS:java发送消息,客户端与服务器进行通讯的方式,可以理解成java的消息 中间件
消息模型:
01、点对点通讯方式:
流程:生产者 队列 消费者。
特点:一对一 异步通讯,生产者生产的消息 只允许有一个消费者进行消费。
02、发布订阅:
流程:生产者 主题 消费者
特点:发布订阅 一个生产者 可以多个消费者 一对多。
二、ActivityMq安装:
1.下载ActivityMQ
官方网站:http://activemq.apache.org/download.html
2.运行ActivityMQ
解压apache-activemq-5.15.9-bin.zip,进入该文件夹的bin目录。
有两种方式启动ActivityMQ服务
01、在bin目录下用cmd命令activemq start 启动服务,关掉黑窗口服务即停止
02、进入bin目录下对应电脑位数的文件夹,64位进入win64,双击InstallService.bat批处理文件安装ActiveMQ服务,然后打开任务管理器启动ActiveMQ服务
启动服务后打开浏览器输入:http://localhost:8161/admin/ 输入默认设置的账户:admin密码admin
点击队列(Queues),输入队列名称(Queue Name)FirstQueue,然后点创建(Craete)
3.创建maven项目
添加一个activemq-all-5.15.3.jar即可,在pom.xml加入
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency> </dependencies>
三、创建producer.java和consumer.java
producer.java
package main; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { // 默认连接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认连接地址 private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { // 连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL); try { // 连接 Connection connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建session Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 消息目的地 Destination destination = session.createQueue("FirstQueue"); // 消息生产者 MessageProducer producer = session.createProducer(null); // 设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 发送消息 for (int i = 0; i < 10; i++) { // 创建一条文本消息 TextMessage message = session.createTextMessage("ActiveMQ: 这是第 " + i + " 条消息"); // Destination destination = session.createTopic("FirstQueueTopic"); // 生产者发送消息 producer.send(destination, message); } session.commit(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
consumer.java
package main; import javax.jms.*; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { // 默认连接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认连接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认连接地址 private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { // 连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL); try { // 连接 Connection connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消息目的地 Destination destination = session.createQueue("FirstQueue"); // Destination destination = session.createTopic("FirstQueueTopic"); // 消息消费者 MessageConsumer consumer = session.createConsumer(destination); while (true) { TextMessage message = (TextMessage) consumer.receive(); if (message != null) { System.out.println("接收到消息: " + message.getText()); } else { break; } } session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
四、测试:
先运行producer.java,再运行consumer.java。
五、项目中使用:
spring-activemq.xml配置文件:
生产者
<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <amq:connectionFactory id="amqConnectionFactory" brokerURL="${activemq.brokerURL}" userName="${activemq.username}" password="${activemq.password}" /> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="amqConnectionFactory" /> <property name="sessionCacheSize" value="100" /> </bean> <!-- 定义JmsTemplate的Queue类型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false" /> </bean> <!-- 定义JmsTemplate的Topic类型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory" /> <!-- pub/sub模型(发布/订阅) --> <property name="pubSubDomain" value="true" /> </bean> </beans>
config.properties配置用户名,密码,url等信息。
activemq.brokerURL=failover:(tcp://10.135.100.59:9203,tcp://10.135.100.60:9203) #activemq.brokerURL=tcp://10.135.100.59:9203 activemq.username=admin activemq.password=admin
代码中的使用:
@Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate;// 通过@Qualifier修饰符来注入对应的bean /** * @Title: send 发送一条消息到指定的队列(目标) * @Description: * @param queueName 队列名称 * @param message 消息内容 * @date 2016年10月12日 上午10:15:56 */ public void send(String queueName, final String message) { jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); }
原文:https://www.cnblogs.com/lyb0103/p/10908482.html