原理图
任务目标
当用户下单时,会增加与支付金额数相等的积分,在订单模块中完成下单,远程调用用户模块中的增加积分的操作,这里连个模块用rabbimq完成事务管理
环境准备
数据库中准备三张表
changgou_order中有两张,tb_task用于储存等待处理的任务,tb_task_his用于存储已经处理完的任务,日后做数据分析
这两张表用于记录和order订单有关的事务
changgou_user中有一张tb_point_log,用于记录待执行的添加积分的任务
这张表用于记录和积分有关的事务
三张表的创建sql为
DROP TABLE IF EXISTS `tb_task`; CREATE TABLE `tb_task` ( `id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT ‘任务id‘, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, `delete_time` datetime DEFAULT NULL, `task_type` varchar(32) DEFAULT NULL COMMENT ‘任务类型‘, `mq_exchange` varchar(64) DEFAULT NULL COMMENT ‘交换机名称‘, `mq_routingkey` varchar(64) DEFAULT NULL COMMENT ‘routingkey‘, `request_body` varchar(512) DEFAULT NULL COMMENT ‘任务请求的内容‘, `status` varchar(32) DEFAULT NULL COMMENT ‘任务状态‘, `errormsg` varchar(512) DEFAULT NULL COMMENT ‘任务错误信息‘, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `tb_task_his`; CREATE TABLE `tb_task_his` ( `id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT ‘任务id‘, `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, `delete_time` datetime DEFAULT NULL, `task_type` varchar(32) DEFAULT NULL COMMENT ‘任务类型‘, `mq_exchange` varchar(64) DEFAULT NULL COMMENT ‘交换机名称‘, `mq_routingkey` varchar(64) DEFAULT NULL COMMENT ‘routingkey‘, `request_body` varchar(512) DEFAULT NULL COMMENT ‘任务请求的内容‘, `status` varchar(32) DEFAULT NULL COMMENT ‘任务状态‘, `errormsg` varchar(512) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `tb_point_log`; CREATE TABLE `tb_point_log` ( `order_id` varchar(200) NOT NULL, `user_id` varchar(200) NOT NULL, `point` int(11) NOT NULL, PRIMARY KEY (`order_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
为三张数据表准备对应的实体类
在changgou_service_order_api中
@Table(name="tb_task") public class Task { @Id private Long id; @Column(name = "create_time") private Date createTime; @Column(name = "update_time") private Date updateTime; @Column(name = "delete_time") private Date deleteTime; @Column(name = "task_type") private String taskType; @Column(name = "mq_exchange") private String mqExchange; @Column(name = "mq_routingkey") private String mqRoutingkey; @Column(name = "request_body") private String requestBody; @Column(name = "status") private String status; @Column(name = "errormsg") private String errormsg; //getter,setter略 }
@Table(name="tb_task_his") public class TaskHis { @Id private Long id; @Column(name = "create_time") private Date createTime; @Column(name = "update_time") private Date updateTime; @Column(name = "delete_time") private Date deleteTime; @Column(name = "task_type") private String taskType; @Column(name = "mq_exchange") private String mqExchange; @Column(name = "mq_routingkey") private String mqRoutingkey; @Column(name = "request_body") private String requestBody; @Column(name = "status") private String status; @Column(name = "errormsg") private String errormsg; //getter,setter略 }
在changgou_service_user_api中添加
@Table(name="tb_point_log") public class PointLog { private String orderId; private String userId; private Integer point; //getter,setter略 }
添加rabbitmq配置类
在changgou_service_order中添加配置类
@Configuration public class RabbitMQConfig { //添加积分任务交换机 public static final String EX_BUYING_ADDPOINTUSER = "ex_buying_addpointuser"; //添加积分消息队列 public static final String CG_BUYING_ADDPOINT = "cg_buying_addpoint"; //完成添加积分消息队列 public static final String CG_BUYING_FINISHADDPOINT = "cg_buying_finishaddpoint"; //添加积分路由key public static final String CG_BUYING_ADDPOINT_KEY = "addpoint"; //完成添加积分路由key public static final String CG_BUYING_FINISHADDPOINT_KEY = "finishaddpoint"; /** * 交换机配置 * @return the exchange */ @Bean(EX_BUYING_ADDPOINTUSER) public Exchange EX_BUYING_ADDPOINTUSER() { return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTUSER).durable(true).build(); } //声明队列 @Bean(CG_BUYING_FINISHADDPOINT) public Queue QUEUE_CG_BUYING_FINISHADDPOINT() { Queue queue = new Queue(CG_BUYING_FINISHADDPOINT); return queue; } //声明队列 @Bean(CG_BUYING_ADDPOINT) public Queue QUEUE_CG_BUYING_ADDPOINT() { Queue queue = new Queue(CG_BUYING_ADDPOINT); return queue; } /** * 绑定队列到交换机 . * @param queue the queue * @param exchange the exchange * @return the binding */ @Bean public Binding BINDING_QUEUE_FINISHADDPOINT(@Qualifier(CG_BUYING_FINISHADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_FINISHADDPOINT_KEY).noargs(); } @Bean public Binding BINDING_QUEUE_ADDPOINT(@Qualifier(CG_BUYING_ADDPOINT) Queue queue, @Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_ADDPOINT_KEY).noargs(); } }
开始操作
修改新增订单方法
taskMapper为
import tk.mybatis.mapper.common.Mapper;
import java.util.Date;
import java.util.List;
public interface TaskMapper extends Mapper<Task> {
//用于定时查询任务表中的数据,获取未执行的任务
@Select("select * from tb_task where update_time<#{currentTime}")
@Results({@Result(column = "create_time",property = "createTime"), //由于表中的字段和实体类中的属性名不一致,我们需要手动配置一下
@Result(column = "update_time",property = "updateTime"),
@Result(column = "delete_time",property = "deleteTime"),
@Result(column = "task_type",property = "taskType"),
@Result(column = "mq_exchange",property = "mqExchange"),
@Result(column = "mq_routingkey",property = "mqRoutingkey"),
@Result(column = "request_body",property = "requestBody"),
@Result(column = "status",property = "status"),
@Result(column = "errormsg",property = "errormsg")})
List<Task> findTaskLessTanCurrentTime(Date currentTime);
}
在订单模块的新增订单方法中,添加如下,将数据写入task数据表
//增加任务表记录 Task task = new Task(); task.setCreateTime(new Date()); task.setUpdateTime(new Date()); task.setMqExchange(RabbitMQConfig.EX_BUYING_ADDPOINTURSE); task.setMqRoutingkey(RabbitMQConfig.CG_BUYING_ADDPOINT_KEY); Map map = new HashMap(); map.put("userName",order.getUsername()); map.put("orderId",order.getId()); map.put("point",order.getPayMoney()); task.setRequestBody(JSON.toJSONString(map)); taskMapper.insertSelective(task);//这是MyBatis提供的内置查询方法
开启定时扫描Task表,获取待处理的任务
在订单模块的引导类上加注解开启定时任务
@EnableScheduling
在订单模块下新建一个task包,新建一个类,执行定时查询
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.List;
@Component
public class QueryPointTask {
@Autowired
private TaskMapper taskMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(cron = "0/2 * * * * *")//每两秒执行一次,具体语法参照以前的笔记
public void queryTask(){//每隔一段时间,执行一次查询任务表
//获取小于系统当前时间的数据
List<Task> taskList = taskMapper.findTaskLessTanCurrentTime(new Date());
if(taskList!= null && taskList.size()>0){
for (Task task : taskList) {
//将任务发送到消息队列
rabbitTemplate.convertAndSend(RabbitMQConfig.EX_BUYING_ADDPOINTUSER,RabbitMQConfig.CG_BUYING_ADDPOINT_KEY, JSON.toJSONString(taskList));
}
}
}
}
1
1
1
原文:https://www.cnblogs.com/wrc-blog/p/14310529.html