电商项目中,如果后台添加商品信息,该信息放到数据库。 我们同时,需要更新搜索引擎的倒排索引,同时,假如有商品页面的静态化处理,也需要更新该页面信息
应该怎么解决?
方式一 :可以在后台添加商品的方法中,如果数据插入数据库成功,就调用更新倒排索引的方法,接着调用更新静态化页面的方法。
代码应该是:
Long goodsId = addGoods(goods);
if (goodsId != null) {
refreshInvertedIndex(goods);
refreshStaticPage(goods);
}
问题:假如更新倒排索引失败,该怎么办?假如更新静态页面失败怎么办?
解决方式: 如果更新倒排索引失败,重试;如果更新静态页面失败,重试
代码应该是这样:
public Long saveGoods() {
Long goodsId = addGoods(goods);
if (goodsId != null) {
// 调用递归的方法,实现重试
boolean indexFlag = refreshInvertedIndex(goods);
// 调用递归的方法,实现重试
boolean pageFlag = refreshStaticPage(goods);
}
}
private boolean refreshInvertedIndex(Goods goods) {
// 调用服务的方法
boolean flag = indexService.refreshIndex(goods);
if (!flag) {
refreshInvertedIndex(goods);
}
}
private boolean refreshStaticPage(Goods goods) {
// 调用服务的方法
boolean flag = staticPageService.refreshStaticPage(goods);
if (!flag) {
refreshStaticPage(goods);
}
}
以上代码在执行中的问题:
或许可以加上迭代的等待时间,迭代的次数加以限制,减少CPU消耗。
或许还可以加上多线程,同时执行更新的操作,减少执行的时间。
但是都是基于该调用一定在可见的时间内调用成功。
还是老问题:如果更新失败怎么办?
归根到底,是同步调用处理不当。这个问题在分布式架构中尤为严重。
方式二:可以先执行添加商品的方法,商品添加成功,将更新索引和更新静态页面的任务缓存到一个公共的位置,然后由相应的服务从该位置获取任务来执行。
Long goodsId = addGoods(goods);
if (goodsId != null) {
goodsTaskService.cache(goods);
}
此时,由于添加商品仅仅是将数据插入数据库,然后将任务信息缓存,调用立刻返回。
对于添加商品方法的调用,不会存在线程阻塞,不会存在调用栈崩溃。
再考虑远一点:由于更新倒排索引的的服务和更新静态页面的服务要从公共的缓存或者叫任务池中取出任务并执行,它们也会有执行失败的问题,也需要重试。如果一直更新失败,也需要一个方式来处理。
比如如果更新失败,则每隔 3 秒钟重试一次,重试三次都失败则放弃执行。然后将错误结果放到另一个公共的地方,等待后续的补偿,无论是手工还是自动的。
还有问题:
看来真是解决了一个问题,引进来三个问题。
如果上述的问题都由我们从 0 开始解决,开发难度可想而知。
分布式服务中,由于业务拆分,应用也需要拆分,甚至数据库分库分表。
但是完成一个业务处理,往往要设计到多个模块之间的协调处理。此时模块之间,服务与服务之间以及客户端与服务端之间的通信将变得非常复杂。
比较典型的 生产者消费者模式 ,可以跨平台、支持异构系统,通常借助消息中间件来完成。
优点 :系统间解耦,并具有一定的可恢复性,支持异构系统,下游通常可并发执行,系统具备弹性。服务解耦、流量削峰填谷等
缺点 :消息中间件存在一些瓶颈和一致性问题,对于开发来讲不直观且不易调试,有额外成本。
使用异步消息模式需要注意的问题:
维基百科对消息中间件的解释:面向消息的系统(消息中间件)是在分布式系统中完成消息的发送和接收的基础软件。
消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。
消息中间件就是在通信的上下游之间截断:break it,Broker
然后利用中间件解耦、异步的特性,构建弹性、可靠、稳定的系统。
体会一下:“必有歹人从中作梗”,”定有贵人从中相助“
异步处理、流量削峰、限流、缓冲、排队、最终一致性、消息驱动 等需求的场景都可以使用消息中间件。
并发编程领域经典面试题:请使用 Java 代码来实现 “生产者消费者模式”。
BlockingQueue
(阻塞队列)是 Java 中常见的容器,在多线程编程中被广泛使用。
put
take
@Data
@AllArgsConstructor
@NoArgsConstructor
public class KouZhao {
private String id;
private String type;
}
public class Producer implements Runnable {
private final BlockingQueue<KouZhao> blockingQueue;
public Producer(BlockingQueue<KouZhao> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(200);
if (blockingQueue.remainingCapacity() > 0) {
KouZhao kouZhao = new KouZhao(UUID.randomUUID().toString(), "N95");
blockingQueue.add(kouZhao);
System.out.println("我在生产口罩,当前库存是:" + blockingQueue.size());
} else {
System.out.println("我的仓库已经堆满了" + blockingQueue.size() + "个口罩,快来买口罩啊!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Consumer implements Runnable {
private final BlockingQueue<KouZhao> blockingQueue;
public Consumer(BlockingQueue<KouZhao> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(100);
long startTime = System.currentTimeMillis(); // 获取开始时间
KouZhao kouZhao = blockingQueue.take();
long endTime = System.currentTimeMillis(); // 获取结束时间
System.out.println("我消费了口罩:" + kouZhao + ", 等到货时我阻塞了" + (endTime - startTime) + "ms");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class App {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<KouZhao> queue = new ArrayBlockingQueue<>(20);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
Thread.sleep(20000);
new Thread(consumer).start();
}
}
上述代码放到生产环境显然是不行的,比如没有集群,没有分布式,玩法太单一,不能满足企业级应用的要求
比如:
首先,产品应该是开源的。开源意味着如果队列使用中遇到 bug ,可以很快修改,而不用等待开发者的更新。
其次,产品必须是近几年比较流行的,要有一个活跃的社区。这样遇到问题很快就可以找到解决方法。同时流行也意味着 bug 较少。流行的产品一般跟周边系统兼容性比较好。
最后,作为消息队列,要具备以下几个特性:
RabbitMQ 开始是用在 电信业务 的可靠通信的,也是少有的几款支持 AMQP 协议的产品之一。
优点:
缺点:
RabbitMQ | RocketMQ | Kafka | |
---|---|---|---|
单机吞吐量 | 1w 量级 | 10w 量级 | 10w 量级 |
开发语言 | Erlang | Java | Java 和 Scala |
消息延迟 | 微秒 | 毫秒 | 毫秒 |
消息丢失 | 可能性很低 | 参数优化后可以 0 丢失 | 参数优化后可以 0 丢失 |
消费模式 | 推拉 | 推拉 | 仅支持拉取 |
主题数量对吞吐量的影响 | \ | 几百上千个主题会对吞吐量有一个小的影响 | 几十上百个主题会极大影响吞吐量 |
可用性 | 高(主从) | 很高(主从) | 很高(分布式) |
消息中间件的使用场景非常广泛,比如,12306 购票的排队锁座,电商秒杀,大数据实时计算等。
比如 6.18 ,活动从 0:00 开始,仅限前 200 名,秒杀即将开始时,用户会疯狂刷新 APP 或者浏览器来保证自己能够尽早的看到商品。
系统应该如何应对高并发的读请求:
系统应该如何应对高并发的写请求?生成订单,扣减库存,用户这些操作不经过缓存直达数据库。如果在 1s 内,有 1 万个数据连接同时到达,系统的数据库会濒临崩溃。如何解决这个问题呢?我们可以使用 消息队列。
消息队列的作用:
将秒杀请求暂存于消息队列,业务服务器响应用户 “秒杀结果正在处理中。。。”,释放系统资源去处理其它用户的请求。
削峰填谷,削平短暂的流量高峰,消息堆积会造成请求延迟处理,但秒杀用户对于短暂延迟有一定容忍度。
秒杀商品有 1000 件,处理一次购买请求的时间是 500ms,那么总共就需要 500s 的时间。这时你部署 10 个队列处理程序,那么秒杀请求的处理时间就是 50s,也就是说用户需要等待 50s 才可以看到秒杀的结果,这是可以接受的。这时会并发 10 个请求到达数据库,并不会对数据库造成很大的压力。
先处理主要的业务,异步处理次要的业务。
如主要流程是生成订单、扣减库存;次要流程比如购买成功之后会给用户发优惠券,增加用户的积分。
此时秒杀只要处理生成订单,扣减库存的耗时,发放优惠券、增加用户积分异步去处理了。
将秒杀数据同步给数据团队,有两种思路:
使用 HTTP 或者 RPC 同步调用,即提供一个接口,实时将数据推送给数据服务
问题:系统的耦合度高,如果其中一个服务有问题,可能会导致另一个服务不可用
使用消息队列
将数据全部发送给消息队列,然后数据服务订阅这个消息队列,接收数据进行处理
拉勾网站分 B 端和 C 端, B 端面向企业用户, C 端面向求职者。
这两个模块业务处理逻辑不同,数据库表结构不同,实际上是处于解耦的状态。
但是各自又需要对方的数据,需要共享:如
无论是 B 端还是 C 端,都有各自的搜索引擎和缓存, B 端需要获取 C 端的更新以更新搜索引擎和缓存; C 端需要获取 B 端的更新以更新 C 端的搜索引擎与缓存。
如何解决 B 端 C 端数据共享的问题?
使用同步方式, B 端和 C 端耦合比较紧密,如果其中一个服务有问题,可能会导致另一个服务不可用。比如 C 端的 RPC 挂掉,企业用户有可能无法发布新的职位信息,因为发布了对方也看不到; B 端的 RPC 挂掉,求职者可能无法更新简历,因为即使简历更新了,对方也看不到。
你可能会想,可以让 B 端或 C 端在对方 RPC 挂掉的时候,先将该通知消息缓存起来,等对方服务恢复之后再进行同步。
这正是引入异步方式,使用消息队列的目的。
使用消息队列的异步方式,对 B 端 C 端进行解耦,只要消息队列可用,双方都可以将需要同步的信息发送到消息队列,对方在收到消息队列推送来的消息的时候,各自更新自己的搜索引擎,更新自己的缓存数据。
如上图,用户在支付宝购买了一张电影票后很快就收到消息推送和短信(电影院地址、几号厅、座位号、场次时间等),同时用户会积累一定的会员积分。
这里,交易系统并不需要一直等待消息送达等动作都完成后才返回成功,允许一定延迟和瞬时不一致(最终一致性),而且后面两个动作通常可以并发执行。
如果后期监控大盘想要获取实时交易数据,只需要新增个消费者程序并订阅该消息即可,交易系统对此并不感知,松耦合。
JMS 即 Java 消息服务( Java Message Service )应用程序接口,是一个 Java 平台中关于面向消息中间件( MOM , Message oriented Middleware )的 API ,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。与具体平台无关的 API ,绝大多数 MOM 提供商都支持。
它类似于 JDBC ( Java Database Connectivity )。
JMS 报文头全部字段:
字段名称 | 含义 |
---|---|
JMSDestination |
JMSDestination 字段包含了消息要发送到的目的地。 |
JMSDeliveryMode |
JMSDeliveryMode 字段包含了消息在发送的时候指定的投递模式。 |
JMSMessageID |
该字段包含了服务器发送的每个消息的唯一标识。 |
JMSTimestamp |
该字段包含了消息封装完成要发往服务器的时间。不是真正向服务器发送的时间,因为真正的发送时间,可能会由于事务或客户端消息排队而延后。 |
JMSCorrelationID |
客户端使用该字段的值与另一条消息关联。一个典型的场景是使用该字段将响应消息与请求消息关联。JMSCorrelationID 可以包含如下值:- 服务器规定的消息 ID - 应用指定的字符串 - 服务器原生的 byte[] 值 |
JMSReplyTo |
该字段包含了在客户端发送消息的时候指定的 Destination 。即对该消息的响应应该发送到该字段指定的 Destination 。设置了该字段值的消息一般期望收到一个响应。 |
JMSRedelivered |
如果这个字段是 true ,则告知消费者应用这条消息已经发送过了,消费端应用应该小心别重复处理了。 |
JMSType |
消息发送的时候用于标识该消息的类型。具体有哪些类型,由 JMS 实现厂商决定。 |
JMSExpiration |
发送消息时,其到期时间将计算为 send 方法上指定的生存时间值与当前 GMT 值之和。 从 send 方法返回时,消息的 JMSExpiration 标头字段包含此 值。 收到消息后,其 JMSExpiration 标头字段包含相同的值。 |
JMSPriority |
JMS定义了一个十级优先级值,最低优先级为 0 ,最高优先级为 9 。 此外, 客户端应将优先级 0-4 视为正常优先级,将优先级 5-9 视为快速优先级。 JMS 不需要服务器严格执行消息的优先级排序; 但是,它应该尽力在普通 消息之前传递加急消息。 |
消息主体则携带着应用程序的数据或有效负载。
根据有效负载的类型来划分,可以将消息分为几种类型:
TextMessage
)ObjectMessage
)MapMessage
)BytesMessage
)StreamMessage
)Message
)JMS 由以下元素组成:
JMS 元素 | 描述 |
---|---|
JMS 供应商产品 | JMS 接口的一个实现。该产品可以是 Java 的 JMS 实现,也可以是非 Java 的面向消息中间件的适配器 |
JMS Client | 生产或消费基于消息的 Java 的应用程序或对象 |
JMS Producer | 创建并发送消息的 JMS 客户 |
JMS Consumer | 接收消息的 JMS 客户 |
JMS Message | 包括可以在 JMS 客户之间传递的数据的对象 |
JMS Queue | 缓存消息的容器。消息的接受顺序并不一定要与消息的发送顺序相同。消息被消费后将从队列中移除 |
JMS Topic | Pub/Sub 模式 |
对象 | 角色 | 描述 |
---|---|---|
ConnectionFactory 接口 |
连接工厂 | 用户用来创建到 JMS 提供者的连接的被管对象。 JMS 客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在 JNDI 名字空间中配置连接工厂,这样, JMS 客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。 |
Connection 接口 |
连接 | 连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与 JMS 提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。 |
Destination 接口 |
目标 | 目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。 JMS 管理员创建这些对象,然后用户通过 JNDI 发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者 / 订阅者模型的主题。 |
Session 接口 |
会话 | 表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者来接收消息。 |
MessageConsumer 接口 |
消息消费者 | 由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或(非阻塞)接收队列和主题类型的消息。 |
MessageProducer 接口 |
消息生产者 | 由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。 |
Message 接口 |
消息 | 是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分: -- 消息头(必须):包含用于识别和为消息寻找路由的操作设置。 -- 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。 -- 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。 |
Java 消息服务应用程序结构支持两种模式:
在点对点或队列模型下
一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列,概括为:
发布/订阅模式
在发布者和订阅者之间存在时间依赖性:
JMS 有两种传递消息的方式:标记为 NON_PERSISTENT
的消息最多投递一次,而标记为 PERSISTENT
的消息将使用暂存后再转送的机制投递。
如果一个 JMS 服务下线,持久性消息不会丢失,等该服务恢复时再传递。
默认的消息传递方式是非持久性的。使用非持久性消息可能降低内务和需要的存储器,当不需要接收所有消息时使用。
开源软件:
专有的供应商包括:
生产中应用基本上都是以集群部署的。在 Queue 模式下,消息的消费没有什么问题,因为不同节点的相同应用会抢占式地消费消息,这样还能分摊负载。
如果使用 Topic 广播模式?对于一个消息,不同节点的相同应用都会收到该消息,进行相应的操作,这样就重复消费了。
方案一:选择 Queue 模式,创建多个一样的 Queue ,每个应用消费自己的 Queue
弊端 :浪费空间,生产者还需要关注下游到底有几个消费者,违反了 解耦 的初衷
方案二:选择 Topic 模式,在业务上做散列,或者通过分布式锁等方式来实现不同节点间的竞争
弊端 :对业务侵入较大,不是优雅的解决方法
ActiveMQ 通过“虚拟主题”解决了这个问题。
生产中似乎需要结合这两种模式:即不同节点的相同应用间存在竞争,会部分消费( P2P ),而不同的应用都需要消费到全量的消息( Topic )模式。这样就可以避免重复消费。
JMS规范文档(jms-1_1-fr-spec.pdf)下载地址
JMS 是 JEE 平台的标准消息传递 API 。它可以在商业和开源实现中使用。每个实现都包括一个 JMS 服务器,一个 JMS 客户端库,以及用于管理消息传递系统的其他特定于实现的组件。 JMS 提供程序可以是消息传递服务的独立实现,也可以是非 JMS 消息传递系统的桥梁。
JMS 客户端 API 是标准化的,因此 JMS 应用程序可在供应商的实现之间移植。但是:
AMQP 全称高级消息队列协议( Advanced Message Queuing Protocol ),是一种标准,类似于 JMS 。目前 RabbitMQ 主流支持 AMQP 0-9-1 , 3.8.4 版本支持 AMQP 1.0 。
元素 | 描述 |
---|---|
Publisher |
消息发送者,将消息发送到 Exchange 并指定 RoutingKey ,以便 queue 可以接收到指定的消息 |
Consumer |
消息消费者,从 queue 获取消息,一个 Consumer 可以订阅多个 queue 以从多个 queue 中接收消息 |
Server |
一个具体的 MQ 服务实例,也称为 Broker |
Virtual host |
虚拟主机,一个 Server 下可以有多个虚拟主机,用于隔离不同项目,一个 Virtual host 通常包含多个 Exchange 、 Message Queue |
Exchange |
交换器,接收 Producer 发送来的消息,把消息转发到对应的 Message Queue 中 |
Routing key |
路由键,用于指定消息路由规则( Exchange 将消息路由到具体的 queue 中),通常需要和具体的 Exchange 类型、 Binding 的 Routing key 结合起来使用 |
Bindings |
指定了 Exchange 和 Queue 之间的绑定关系。 Exchange 根据消息的 Routing key 和 Binding 配置( 绑定关系、 Binding 、 Routing key 等 )来决定把消息分派到哪些具体的 queue 中。这依赖于 Exchange 类型 |
Message Queue |
实际存储消息的容器,并把消息传递给最终的 Consumer |
AMQP 是一个二进制的协议,信息被组织成数据帧,有很多类型。数据帧携带协议方法和其他信息。所有数据帧都拥有基本相同的格式:帧头,负载,帧尾。数据帧负载的格式依赖于数据帧的类型。
我们假定有一个可靠的面向流的网络传输层( TCP/IP 或等价的协议)。
在一个单一的 Socket 连接中,可能有多个相互独立的控制线程,称为 channel 。每个数据帧使用通道号码编号。通过数据帧的交织,不同的通道共享一个连接。对于任意给定通道,数据帧严格按照序列传输。
我们使用小的数据类型来构造数据帧,如 bit
, integer
, string
以及字段表。数据帧的字段做了轻微的封装,不会让传输变慢或解析困难。根据协议规范机械地生成数据帧层相对简单。
线级别(wire-level,线路层)的格式被设计为可伸缩和足够通用,以支持任意的高层协议(不仅是 AMQP )。我们假定 AMQP 会扩展,改进以及随时间的其他变化,并要求 wire-level 格式支持这些变化。
AMQP 使用的数据类型如下:
Integers
(数值范围 1-8 的十进制数字):用于表示大小,数量,限制等,整数类型无符号的,可以在帧内不对齐。Bits
(统一为 8 个字节):用于表示 开/关 值。Short strings
:用于保存简短的文本属性,字符串个数限制为 255 , 8 个字节Long strings
:用于保存二进制数据块。Field tables
:包含键值对,字段值一般为字符串,整数等。AMQP 客户端和服务端进行协议协商。意味着当客户端连接上之后,服务端会向客户端提出一些选项,客户端必须能接收或修改。如果双方都认同协商的结果,继续进行连接的建立过程。
协议协商是一个很有用的技术手段,因为它可以让我们断言假设和前置条件。
在 AMQP 中,我们需要协商协议的一些特殊方面:
对限制条件的认同可能会导致双方重新分配 key 的缓存,避免死锁。每个发来的数据帧要么遵守认同的限制,也就是安全的,要么超过了限制,此时另一方出错,必须断开连接。出色地践行了 要么一切工作正常,要么完全不工作 的 RabbitMQ 哲学。
协商双方认同限制到一个小的值,如下:
TCP/IP 是流协议,没有内置的机制用于界定数据帧。现有的协议从以下几个方面来解决:
RabbitMQ 的 JMS 客户端用 RabbitMQ Java 客户端实现,既与 JMS API 兼容,也与 AMQP 0-9-1 协议兼容。
存在局限性:
RabbitMQ JMS 客户端不支持某些 JMS 1.1 功能:
RabbitMQ 使用 AMQP 协议, JMS 规范仅对于 Java 的使用作出的规定,跟其他语言无关,AMQP 协议是语言无关的,只要语言实现了该协议,就可以做客户端。如此,则不同语言之间互操作性得以保证。
原文:https://www.cnblogs.com/huangwenjie/p/14516098.html