首页 > 其他 > 详细

消息队列实现分布式事务管理

时间:2021-01-21 23:59:28      阅读:51      评论:0      收藏:0      [点我收藏+]

原理图

技术分享图片


 

任务目标

当用户下单时,会增加与支付金额数相等的积分,在订单模块中完成下单,远程调用用户模块中的增加积分的操作,这里连个模块用rabbimq完成事务管理

环境准备

数据库中准备三张表

changgou_order中有两张,tb_task用于储存等待处理的任务,tb_task_his用于存储已经处理完的任务,日后做数据分析

这两张表用于记录和order订单有关的事务

changgou_user中有一张tb_point_log,用于记录待执行的添加积分的任务

这张表用于记录和积分有关的事务

三张表的创建sql为

DROP TABLE IF EXISTS `tb_task`;
CREATE TABLE `tb_task` (
  `id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT 任务id,
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `delete_time` datetime DEFAULT NULL,
  `task_type` varchar(32) DEFAULT NULL COMMENT 任务类型,
  `mq_exchange` varchar(64) DEFAULT NULL COMMENT 交换机名称,
  `mq_routingkey` varchar(64) DEFAULT NULL COMMENT routingkey,
  `request_body` varchar(512) DEFAULT NULL COMMENT 任务请求的内容,
  `status` varchar(32) DEFAULT NULL COMMENT 任务状态,
  `errormsg` varchar(512) DEFAULT NULL COMMENT 任务错误信息,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

DROP TABLE IF EXISTS `tb_task_his`;
CREATE TABLE `tb_task_his` (
  `id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT 任务id,
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `delete_time` datetime DEFAULT NULL,
  `task_type` varchar(32) DEFAULT NULL COMMENT 任务类型,
  `mq_exchange` varchar(64) DEFAULT NULL COMMENT 交换机名称,
  `mq_routingkey` varchar(64) DEFAULT NULL COMMENT routingkey,
  `request_body` varchar(512) DEFAULT NULL COMMENT 任务请求的内容,
  `status` varchar(32) DEFAULT NULL COMMENT 任务状态,
  `errormsg` varchar(512) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

 

DROP TABLE IF EXISTS `tb_point_log`;
CREATE TABLE `tb_point_log` (
  `order_id` varchar(200) NOT NULL,
  `user_id` varchar(200) NOT NULL,
  `point` int(11) NOT NULL,
  PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

为三张数据表准备对应的实体类

在changgou_service_order_api中

@Table(name="tb_task")
public class Task {
    @Id
    private Long id;
    @Column(name = "create_time")
    private Date createTime;
    @Column(name = "update_time")
    private Date updateTime;
    @Column(name = "delete_time")
    private Date deleteTime;
    @Column(name = "task_type")
    private String taskType;
    @Column(name = "mq_exchange")
    private String mqExchange;
    @Column(name = "mq_routingkey")
    private String mqRoutingkey;
    @Column(name = "request_body")
    private String requestBody;
    @Column(name = "status")
    private String status;
    @Column(name = "errormsg")
    private String errormsg;
    //getter,setter略
}

 

@Table(name="tb_task_his")
public class TaskHis {
    @Id
    private Long id;
    @Column(name = "create_time")
    private Date createTime;
    @Column(name = "update_time")
    private Date updateTime;
    @Column(name = "delete_time")
    private Date deleteTime;
    @Column(name = "task_type")
    private String taskType;
    @Column(name = "mq_exchange")
    private String mqExchange;
    @Column(name = "mq_routingkey")
    private String mqRoutingkey;
    @Column(name = "request_body")
    private String requestBody;
    @Column(name = "status")
    private String status;
    @Column(name = "errormsg")
    private String errormsg;
    //getter,setter略
}

 

在changgou_service_user_api中添加

@Table(name="tb_point_log")
public class PointLog {
    private String orderId;
    private String userId;
    private Integer point;
    //getter,setter略
}

 

添加rabbitmq配置类

在changgou_service_order中添加配置类

@Configuration
public class RabbitMQConfig {
    //添加积分任务交换机
    public static final String EX_BUYING_ADDPOINTUSER = "ex_buying_addpointuser";
    //添加积分消息队列
    public static final String CG_BUYING_ADDPOINT = "cg_buying_addpoint";
    //完成添加积分消息队列
    public static final String CG_BUYING_FINISHADDPOINT = "cg_buying_finishaddpoint";
    //添加积分路由key
    public static final String CG_BUYING_ADDPOINT_KEY = "addpoint";
    //完成添加积分路由key
    public static final String CG_BUYING_FINISHADDPOINT_KEY = "finishaddpoint";
    /**
     * 交换机配置
     * @return the exchange
     */
    @Bean(EX_BUYING_ADDPOINTUSER)
    public Exchange EX_BUYING_ADDPOINTUSER() {
        return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTUSER).durable(true).build();
    }
    //声明队列
    @Bean(CG_BUYING_FINISHADDPOINT)
    public Queue QUEUE_CG_BUYING_FINISHADDPOINT() {
        Queue queue = new Queue(CG_BUYING_FINISHADDPOINT);
        return queue;
    }
    //声明队列
    @Bean(CG_BUYING_ADDPOINT)
    public Queue QUEUE_CG_BUYING_ADDPOINT() {
        Queue queue = new Queue(CG_BUYING_ADDPOINT);
        return queue;
    }
    /**
     * 绑定队列到交换机 .
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding BINDING_QUEUE_FINISHADDPOINT(@Qualifier(CG_BUYING_FINISHADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_FINISHADDPOINT_KEY).noargs();
    }
    @Bean
    public Binding BINDING_QUEUE_ADDPOINT(@Qualifier(CG_BUYING_ADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_ADDPOINT_KEY).noargs();
    }
}

 


 

开始操作

修改新增订单方法

taskMapper为


import tk.mybatis.mapper.common.Mapper;

import java.util.Date;
import java.util.List;

public interface TaskMapper extends Mapper<Task> {

//用于定时查询任务表中的数据,获取未执行的任务
@Select("select * from tb_task where update_time<#{currentTime}")
@Results({@Result(column = "create_time",property = "createTime"), //由于表中的字段和实体类中的属性名不一致,我们需要手动配置一下
@Result(column = "update_time",property = "updateTime"),
@Result(column = "delete_time",property = "deleteTime"),
@Result(column = "task_type",property = "taskType"),
@Result(column = "mq_exchange",property = "mqExchange"),
@Result(column = "mq_routingkey",property = "mqRoutingkey"),
@Result(column = "request_body",property = "requestBody"),
@Result(column = "status",property = "status"),
@Result(column = "errormsg",property = "errormsg")})
List<Task> findTaskLessTanCurrentTime(Date currentTime);
}
 

 

在订单模块的新增订单方法中,添加如下,将数据写入task数据表

//增加任务表记录
Task task = new Task();
task.setCreateTime(new Date());
task.setUpdateTime(new Date());
task.setMqExchange(RabbitMQConfig.EX_BUYING_ADDPOINTURSE);
task.setMqRoutingkey(RabbitMQConfig.CG_BUYING_ADDPOINT_KEY);

Map map = new HashMap();
map.put("userName",order.getUsername());
map.put("orderId",order.getId());
map.put("point",order.getPayMoney());
task.setRequestBody(JSON.toJSONString(map));
taskMapper.insertSelective(task);//这是MyBatis提供的内置查询方法

 

开启定时扫描Task表,获取待处理的任务

在订单模块的引导类上加注解开启定时任务

@EnableScheduling

 

在订单模块下新建一个task包,新建一个类,执行定时查询


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.xml.crypto.Data;
import java.util.Date;
import java.util.List;

@Component
public class QueryPointTask {
@Autowired
private TaskMapper taskMapper;
@Autowired
private RabbitTemplate rabbitTemplate;

@Scheduled(cron = "0/2 * * * * *")//每两秒执行一次,具体语法参照以前的笔记
public void queryTask(){//每隔一段时间,执行一次查询任务表

//获取小于系统当前时间的数据
List<Task> taskList = taskMapper.findTaskLessTanCurrentTime(new Date());
if(taskList!= null && taskList.size()>0){
for (Task task : taskList) {
//将任务发送到消息队列
rabbitTemplate.convertAndSend(RabbitMQConfig.EX_BUYING_ADDPOINTUSER,RabbitMQConfig.CG_BUYING_ADDPOINT_KEY, JSON.toJSONString(taskList));
}
}
}
}
 

 

1

1

1

 

消息队列实现分布式事务管理

原文:https://www.cnblogs.com/wrc-blog/p/14310529.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!