pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
application.yml
#Springboot启动端口
server:
port: 7777
#ActiveMQ配置
spring:
activemq:
broker-url: tcp://192.168.181.128:61616 #ActiveMQ服务器IP
user: admin #ActiveMQ连接用户名
password: admin #ActiveMQ连接密码
jms:
#指定连接队列还是主题
pub-sub-domain: false # false = Queue | true = Topic
#定义服务上的队列名
myQueueName: springboot-activemq-queue
配置类
@Component
@EnableJms //开启Springboot的Jms
@EnableScheduling
public class QueueConfigBean {
@Value("${myQueueName}")
private String myQueueName;
@Bean
public ActiveMQQueue queue() {
// 创建一个ActiveMQQueue
return new ActiveMQQueue(myQueueName);
}
}
生产者代码
@Slf4j
@Component
public class Queue_Producer {
// JmsMessagingTemplate是Springboot的Jms模板,Spring的是JmsTemplate
private JmsMessagingTemplate jmsMessagingTemplate;
// 把ConfigBean类的ActiveMQQueue注入进来
private ActiveMQQueue activeMQQueue;
// 构造注入对象(推荐)
public Queue_Producer(JmsMessagingTemplate jmsMessagingTemplate, ActiveMQQueue activeMQQueue) {
this.jmsMessagingTemplate = jmsMessagingTemplate;
this.activeMQQueue = activeMQQueue;
}
// 发送Queue的方法
public void producerMsg() {
jmsMessagingTemplate.convertAndSend(activeMQQueue, "**************" + UUID.randomUUID().toString());
log.info("Queue生产者发送消息。。。");
}
// 间隔3秒投递,SpringBoot的Scheduled用来定时执行
@Scheduled(fixedDelay = 3000)
public void producerMsgScheduled() {
jmsMessagingTemplate.convertAndSend(activeMQQueue, "**************Scheduled" + UUID.randomUUID().toString());
log.info("Queue生产者定时投递Scheduled。。。");
}
}
生产者测试代码
@SpringBootTest(classes = BootproducerApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class TestActiveMQ {
@Autowired
private Queue_Producer queue_producer;
@Test
public void testSend() {
queue_producer.producerMsg();
}
}
主启动类
@SpringBootApplication
public class BootproducerApplication {
public static void main(String[] args) {
SpringApplication.run(BootproducerApplication.class, args);
}
}
pom.xml
同上7.1.
application.yml
除端口外,其他同上7.1.1.
消费者代码
@Component
public class Queue_Consummer {
@JmsListener(destination = "${myQueueName}") // 注解监听
public void receive(TextMessage textMessage) throws Exception {
System.out.println(" *** 消费者收到消息 ***" + textMessage.getText());
}
}
生产者配置类上有@EnableScheduling
,通过SpringBoot启动类启动有定时投递的功能,每3s发送一条消息。
也可以通过生产者测试代码手动发送消息,运行一次发送两条消息,其中一条是定时投递功能发送的。
启动消费者SpringBoot启动类后,消费掉所有消息。
pom.xml
同上
application.yml
#Springboot启动端口
server:
port: 7777
#ActiveMQ配置
spring:
activemq:
broker-url: tcp://192.168.181.128:61616 #ActiveMQ服务器IP
user: admin #ActiveMQ连接用户名
password: admin #ActiveMQ连接密码
jms:
#指定连接队列还是主题
pub-sub-domain: true # false = Queue | true = Topic
#定义服务上的队列名
myTopicName: springboot-activemq-topic
配置类
@Component
@EnableJms //开启Springboot的Jms
public class ActiveMQConfigBean {
@Value("${myTopicName}")
private String topicName;
@Bean
public ActiveMQTopic activeMQTopic() {
return new ActiveMQTopic(topicName);
}
}
生产者代码
@Slf4j
@Component
@EnableScheduling
public class Topic_Producer {
private JmsMessagingTemplate jmsMessagingTemplate;
private ActiveMQTopic activeMQTopic;
public Topic_Producer(JmsMessagingTemplate jmsMessagingTemplate, ActiveMQTopic activeMQTopic) {
this.jmsMessagingTemplate = jmsMessagingTemplate;
this.activeMQTopic = activeMQTopic;
}
@Scheduled(fixedDelay = 3000)
public void producer() {
jmsMessagingTemplate.convertAndSend(activeMQTopic, "主题消息: " + UUID.randomUUID().toString());
log.info("Topic生产者发送消息。。。");
}
}
pom.xml
同上
application.yml
除端口外,同7.2.1.
普通消费者代码
@Slf4j
@Component
public class Topic_Consumer {
@JmsListener(destination = "${myTopicName}")
public void consumer(TextMessage textMessage) throws JMSException {
log.info("订阅者收到消息: " + textMessage.getText());
}
}
持久化订阅配置类
package boot.activemq.topicconsumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.stereotype.Component;
import javax.jms.ConnectionFactory;
/**
* 设置持久化订阅
* 配置文件的方式无法进行配置持久化订阅。所以需要自己去生成一个持久化订阅
*/
@Component
@EnableJms
public class ActiveMQConfigBean {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String user;
@Value("${spring.activemq.password}")
private String password;
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerUrl);
connectionFactory.setUserName(user);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean(name = "jmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory());
defaultJmsListenerContainerFactory.setSubscriptionDurable(true);
defaultJmsListenerContainerFactory.setClientId("我是持久订阅者一号");
return defaultJmsListenerContainerFactory;
}
}
持久化订阅消费者代码
@Slf4j
@Component
public class Topic_Consumer {
//需要在监听方法指定连接工厂
@JmsListener(destination = "${myTopicName}",containerFactory = "jmsListenerContainerFactory")
public void consumer(TextMessage textMessage) throws JMSException {
log.info("订阅者收到消息: " + textMessage.getText());
}
}
先启动消费者,再启动生产者。
消费者代码中,有普通消费者,有持久化订阅消费者。持久化订阅消费者需要配置类,普通消费者不需要。
20200202 ActiveMQ 7. SpringBoot整合ActiveMQ
原文:https://www.cnblogs.com/huangwenjie/p/12251041.html