首页 > 其他 > 详细

ActiveMQ

时间:2020-03-24 18:06:20      阅读:71      评论:0      收藏:0      [点我收藏+]

面向消息的中间件(message-oriented middleware)MOM指利用高效可靠的消息传递机制与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储、流量削峰,异步通信,数据同步等功能。

过程:
       发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题topic中,在合适的时候,消息服务器回将消息转发给接受者。在这个过程中,发送和接收是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然的关系;
尤其在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者。

异步模式:

       消息发送者可以发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道(主题或者队列)上;
消息接收者则订阅或者监听该通道。一条消息可能最终转发给一个(队列)或者多个消息接收者(pub/订阅sub模式),这些消息接收者都无需对消息发送者做出同步回应。整个过程都是异步的。

特性 ActiveMQ  RabbitMQ   Kafka  RocketMQ
PRODUCER-CUMSUMER 支持 支持 支持 支持
PUBLISH-SUBSCRIBE 支持 支持 支持 支持
REQUEST-REPLY 支持 支持 - 支持
API完备性 低(静态配置)
多语言支持 支持,Java优先 语言无关 支持,Java优先 支持
单机吞吐量 万级 万级 十万级 单机万级
消息延迟 - 微秒级  毫秒级 -
可用性 高(主从) 高(主从) 非常高(分布式)
消息丢失 - 理论上不会丢失 -
消息重复  - 可控制  理论上会有重复  -
文档的完备性  高  高  高
提供快速入门  有  有  有
首次部署难度 -  高

 

ActiveMQ:

定义:Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。

作用:解耦,异步,削峰

Linux启动:解压之后什么都不用干,直接到bin目录下启动就行(./activemq start),不在bin目录启动(路径/activemq start).。只能进行最基本操作。

带日志启动:./activemq start > run_activemq.log

关闭:把start--->stop

 默认端口61616是JMS服务的端口

默认端口8161是管理控制台的端口(可以在浏览器打开)

查看是否启动   ps -ef|grep activemq       netstat -anp|grep 61616(查看端口是否被占用) lsof -i:61616 (查看端口是否被占用)

JMS的结构:

Provider:实现JMS接口和规范的消息中间件也就是MQ服务器

Producer:消息的生产者,创建和发送消息

Consumer:消息的消费者,接收和处理消息

 

 

Message:消息头 消息属性 消息体

消息头:

JMSDestination:消息目的地

JMSDeliveryMode:消息持久化模式

JMSExpiration:消息过期时间 默认是永不过期 0

 

JMSPriority:消息的优先级 消息优先级,从0-9十个级别,0-4是普通消息5-9是加急消息。JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。

 

JMSMessageID:消息的唯一标识符。可以解决幂等性问题。

说明: 消息的生产者可以set这些属性,消息的消费者可以get这些属性。这些属性在send方法里面也可以设置。

消息属性:

 

如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。

 

他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。

技术分享图片

 

 

 5种消息体格式:  特别说明 :producer方用什么格式发送,consumer就要用什么格式接收

技术分享图片

 

 

 

 

JMS的可靠性:持久化 事务  签收

消息的持久化:

 

       保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。

queue:   默认持久

技术分享图片

 

topic:一定要先有订阅者,在生产消息

生产者:

技术分享图片

 

 消费者:

技术分享图片

 

 

事务:生产者必须开启事务并提交,才能发送消息成功。消费者可以不提交,但是可以多次消费,没提交之前的数据MQ不承认

技术分享图片

 

 技术分享图片

 

 技术分享图片

 

签收:

技术分享图片

 

 

 签收与事务:

① 在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务优先于签收,开始事务后,签收机制不再起任何作用。

② 非事务性会话中,消息何时被确认取决于创建会话时的应答模式。

③ 生产者事务开启,只有commit后才能将全部消息变为已消费。

④ 事务偏向生产者,签收偏向消费者。也就是说,生产者使用事务更好点,消费者使用签收机制更好点。

 

Spring+aActivemq:

1.pom.xml:

 

pom.xml文件。灰色背景为必须的

<dependencies>
   <!-- activemq核心依赖包  -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.10.0</version>
    </dependency>
    <!--  嵌入式activemqbroker所需要的依赖包   -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.10.1</version>
    </dependency>
    <!-- activemq连接池 -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
        <version>5.15.10</version>
    </dependency>
    <!-- spring支持jms的包 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>5.2.1.RELEASE</version>
    </dependency>
    <!--spring相关依赖包-->
    <dependency>
        <groupId>org.apache.xbean</groupId>
        <artifactId>xbean-spring</artifactId>
        <version>4.15</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aop</artifactId>
        <version>5.2.1.RELEASE</version>
    </dependency>
    <!-- Spring核心依赖 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>4.3.23.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>4.3.23.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aop</artifactId>
        <version>4.3.23.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-orm</artifactId>
        <version>4.3.23.RELEASE</version>
    </dependency>
</dependencies>

 

 

2.配置文件:

 

src/main/resources/spring-activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">

    <!--  开启包的自动扫描  -->
    <context:component-scan base-package="com.activemq.demo"/>
    <!--  配置生产者  -->
    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <!--      正真可以生产ConnectionConnectionFactory,由对应的JMS服务商提供      -->
            <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://ip:61616"/>
            </bean>
        </property>
        <property name="maxConnections" value="100"/>
    </bean>

    <!--  这个是队列目的地,点对点的Queue  -->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <!--    通过构造注入Queue名    -->
        <constructor-arg index="0" value="spring-queue"/>
    </bean>

    <!--  这个是队列目的地,  发布订阅的主题Topic-->
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-topic"/>
    </bean>

    <!--  Spring提供的JMS工具类,他可以进行消息发送,接收等  -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--    传入连接工厂    -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!--    传入目的地    -->
        <property name="defaultDestination" ref="destinationQueue"/>
        <!--    消息自动转换器    -->
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
</beans>

  3.producer:

技术分享图片

 

 

 4.consumer:

技术分享图片

 

 5.监听:

技术分享图片

 

 Springboot+ActiveMQ:

1.pom.xml

 

灰色背景是必须导的包,其他包如果项目没有可以导入。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>2.1.5.RELEASE</version>
      <relativePath/> <!-- lookup parent from repository -->
   </parent>
   <groupId>com.at.boot.activemq</groupId>
   <artifactId>boot_mq_produce</artifactId>
   <version>1.0-SNAPSHOT</version>


   <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <maven.compiler.source>1.8</maven.compiler.source>
      <maven.compiler.target>1.8</maven.compiler.target>
   </properties>

   <dependencies>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-test</artifactId>
         <scope>test</scope>
      </dependency>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
      </dependency>

      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter</artifactId>
      </dependency>
      <!--spring boot整合activemqjar-->
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-activemq</artifactId>
         <version>2.1.5.RELEASE</version>
      </dependency>
   </dependencies>

   <build>
      <plugins>
         <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
         </plugin>
      </plugins>
   </build>
</project>

 

 2.application.yml

 

灰色背景是activem的配置

# web占用的端口
server:
  port: 6666
spring:
  activemq:
    # activemqbrokerurl
    broker-url: tcp://ip:61616
    # 连接activemqbroker所需的账号和密码
    user: admin   //user password 没设可不配
    password: admin
  jms:
    # 目的地是queue还是topic false(默认) = queue    true =  topic
    pub-sub-domain: false

#  自定义队列名称。这只是个常量
myqueue: boot-activemq-queue

 

  3. 配置目的地的bean

技术分享图片

 

 

 4.producer:

技术分享图片

 

 

 5.consumer:

 技术分享图片

 

 

 test:

技术分享图片

 

 

 

 定时发送:

技术分享图片

 

 

 技术分享图片

 

 

 

 

ActiveMQ的传输协议:

 

ActiveMQ支持的client-broker通讯协议有:TVPNIOUDPSSLHttp(s)VM。其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的<transportConnectors>标签之内。

 

activemq传输协议的官方文档:http://activemq.apache.org/configuring-version-5-transports.html

技术分享图片

 

 

 技术分享图片

 

 

 

1.1.1  TCP协议

(1) Transmission Control Protocol(TCP)是默认的。TCPClient监听端口61616

(2) 在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。

(3) TCP连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。

(4) TCP传输的的优点:

TCP协议传输可靠性高,稳定性强

高效率:字节流方式传递,效率很高

有效性、可用性:应用广泛,支持任何平台

(5) 关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/tcp-transport-reference

 

 

1.1.2  NIO协议

(1) New I/O API Protocol(NIO)

(2) NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。

(3) 适合使用NIO协议的场景:

可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。

可能对于Broker有一个很迟钝的网络传输,NIOTCP提供更好的性能。

(4) NIO连接的URI形式:nio://hostname:port?key=value&key=value

(5) 关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/configuring-version-5-transports.html

 

 

技术分享图片

 

 

 

ActiveMQ这些协议传输的底层默认都是使用BIO网络的IO模型。只有当我们指定使用nio才使用NIOIO模型。

activemq.xml变化:

技术分享图片

 

 

 代码层:

技术分享图片

--------------------------------------------------------------

技术分享图片

 

 

 

修改配置文件activemq.xml

技术分享图片

 

 

 消息的持久化:

技术分享图片

 

 

 

(1) AMQ Message Store

基于文件的存储机制,是以前的默认机制,现在不再使用。

AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本

 

(2) kahaDB

现在默认的。

基于日志文件,从ActiveMQ5.4(含)开始默认的持久化插件。

官网文档:http://activemq.aache.org/kahadb

技术分享图片

 

 

 技术分享图片

 

 

 

(1) KahaDB的存储原理

技术分享图片

 

 

 

 

技术分享图片

 

 

 

 

(3) JDBC消息存储

技术分享图片

 

 

 

1.添加mysql数据库的驱动包到lib文件夹

技术分享图片

 

 

 2.dbcPersistenceAdapter配置

技术分享图片

 

 

 3. 数据库连接池配置

需要我们准备一个mysql数据库,并创建一个名为activemq(可以自己命名)的数据库。

技术分享图片

 

 

 </broker>标签和<import>标签之间插入数据库连接池配置

 

 

 技术分享图片

 

 

 默认是的dbcp数据库连接池,如果要换成其他数据库连接池,需要将该连接池jar包,也放到lib目录下。

4.查看表

 

重启activemq。会自动生成如下3张表。如果没有自动生成,需要我们手动执行SQL。我个人建议要自动生成,我在操作过程中查看日志文件,发现了不少问题,最终解决了这些问题后,是能够自动生成的。如果不能自动生成说明你的操作有问题。如果实在不行,手动建表

技术分享图片

 

 

 技术分享图片

 

 

 技术分享图片

 

 

 

 

 

(4) LevelDB消息存储

 技术分享图片

 

 

基于zookeeper+levelDB搭建ActiveMQ集群 ,高可用避免单点故障。

1.zk集群

2.brokerName必须一样

技术分享图片

 

 

3.

技术分享图片

 

jetty.xml设置:

技术分享图片

 

 

 4.tcp改变,真实机器可不变

技术分享图片

 

 5.启动zk>mq

6.验证:进入zk客户端  ls /    

技术分享图片

 

 ls /activemq

技术分享图片

 

 ls /activemq/leveldb-stores

技术分享图片

 

 查看master:

get /activemq/leveldb-stores 00000000001

技术分享图片

 

 查看集群数:ps -ef|grep activemq|grep -v grep|wc -l

 

 

 

 

异步投递:

技术分享图片

 

 

 代码实现:

技术分享图片

 

 

 

(1) 异步发送如何确认发送成功

技术分享图片

 

 

 代码:

技术分享图片

 

 

 

技术分享图片

 

 

 技术分享图片

 

 

必改:

技术分享图片

 

 

 代码:

技术分享图片

 

 

 

消息消费的重试机制

(1) 是什么

官网文档:http://activemq.apache.org/redelivery-policy

是什么: 消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。

 

(2) 具体哪些情况会引发消息重发

① Client用了transactions且再session中调用了rollback

② Client用了transactions且再调用commit之前关闭或者没有commit

③ ClientCLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover

 

(3) 请说说消息重发时间间隔和重发次数

间隔:1

次数:6

每秒发6

 

(4) 有毒消息Poison ACK

一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。

属性说明:

技术分享图片

 

 

 自定义重试:

技术分享图片

 

 

 解决幂等性问题:

技术分享图片

 

 

 



 

  
 

 

ActiveMQ

原文:https://www.cnblogs.com/lvym/p/12558934.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!