rabbitmq延时队列自动解锁库存

一、库存服务自动解锁库存

使用了最终一致性来解决分布式事务
当order服务出现异常回滚,此时ware服务无法回滚,怎么办?

使用seata全局事务虽然能在order服务出现异常导致回滚时使其他服务的也能同时回滚,但在流量大的情况下是使用加锁的方式,效率
低不适合并发量大的情况,也可以使用定时任务轮询去查看订单的状态,但是轮询的方式比较占资源和内存,所以选用最终一致性的方案,使用mq延时队列死信路由,然后做出补救方案,只要订单服务出现故障就通过mq定时去判断,只要能保证库存最终能解锁即可

延时队列自动解锁库存业务逻辑
ware服务在完成锁库存时就给mq发消息,把消息存到死信队列中,这个消息记录了那些商品锁定多少库存,当queue到达存活时间就会把消息交给死信路由交换机,死信路由交换机会把消息发到最终的队列,如果订单支付时间为30分钟,我们就把存活时间设置为40分钟,这样就能保证我们监听的消息一定是超过了支付的时间的,然后让ware库存服务去订阅监听最终的队列即可,只要有消息我们就去检验order订单服务,只要证明订单服务出现异常回滚或者订单超过支付时间未支付的订单我们就去做一个解锁还原库存的操作

1.库存锁定成功,给mq发消息

(1)保存 工作单(订单号)、工作单详情(商品锁了多少库存)
(2)把上面的数据给mq发一份

@Transactional@Overridepublic boolean orderStock(OrderStockRequest orderStockRequest) {//保存工作单WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();wareOrderTaskEntity.setOrderSn(orderStockRequest.getOrderSn());wareOrderTaskService.save(wareOrderTaskEntity);List<OrderItemVo> itemVos = orderStockRequest.getItemVos();List<SkuStockfromWare> collect = itemVos.stream().map(item -> {SkuStockfromWare skuStockfromWare = new SkuStockfromWare();skuStockfromWare.setSkuId(item.getSkuId());skuStockfromWare.setNum(item.getCount());//查询该商品在那些仓库有库存List<Long> wareId = wareSkuDao.skuStockfromWare(item.getSkuId());skuStockfromWare.setWareId(wareId);return skuStockfromWare;}).collect(Collectors.toList());//根据skuId遍历for (SkuStockfromWare skuStockfromWare : collect) {//判断是否锁定成功boolean flag = false;//判断该商品是否有仓库存在库存List<Long> wareIdList = skuStockfromWare.getWareId();if (wareIdList.size() < 0 || wareIdList == null){throw new NoWareStockException(skuStockfromWare.getSkuId());}for (Long wareId : wareIdList) {Long count = wareSkuDao.LockedStockFromWare(skuStockfromWare.getSkuId(),wareId,skuStockfromWare.getNum());if (count.equals(1L)){//锁定成功flag = true;//保存工作单详情WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity();wareOrderTaskDetailEntity.setSkuId(skuStockfromWare.getSkuId());wareOrderTaskDetailEntity.setSkuNum(skuStockfromWare.getNum());wareOrderTaskDetailEntity.setTaskId(wareOrderTaskEntity.getId());wareOrderTaskDetailEntity.setWareId(wareId);wareOrderTaskDetailEntity.setLockStatus(1);wareOrderTaskDetailService.save(wareOrderTaskDetailEntity);//TODO 库存锁定成功->发消息给交换机StockLocked stockLocked = new StockLocked();stockLocked.setTaskId(wareOrderTaskEntity.getId());WareOrderTaskDetailTo wareOrderTaskDetailTo = new WareOrderTaskDetailTo();BeanUtils.copyProperties(wareOrderTaskDetailEntity,wareOrderTaskDetailTo);stockLocked.setDetailTo(wareOrderTaskDetailTo);//convertAndSend(String exchange, String routingKey, Object object)rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",);//该商品锁定库存成功就执行下一个商品break;}}//如果没有一个仓库扣成功,代表此skuId的库存不足if (!flag){throw new SkuNoStockException(skuStockfromWare.getSkuId());}}return true;}

2.监听队列,解锁库存

(1)判断工作单是否存在
不存在代表锁库存操作已回滚,不做处理
(2)查询订单是否存在
如果订单不存在,表示下订单操作已回滚,执行解锁库存操作
如果存在,查询订单状态是否为 4-已关闭,如果是 4-已关闭,执行解锁库存操作,订单其他状态不做处理
(3)解锁前判断工作单的状态是否为 1-已锁定,证明只做了锁定库存操作
(4)解锁库存,修改工作单详情状态为 已解锁

/*** 解锁库存*/
@RabbitListener(queues = {"stock.release.stock.queue"})
@Service
public class UnLockStockListener {@AutowiredWareSkuService wareSkuService;@RabbitHandlerpublic void UnLockStock(StockLockedTo lockedTo, Channel channel, Message message) throws IOException {try {wareSkuService.unlockStock(lockedTo);//签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {//拒签,让消息重新归队,等待服务器重启进行下一次解锁channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}

解锁操作

/*** 解锁库存** (1).判断工作单是否存在*      不存在代表已回滚,不做处理*      (2).查询订单是否存在*          如果订单不存在,表示已回滚*             (3).执行解锁库存操作*          如果存在,查询订单状态是否为 4-已关闭*                如果是 4-已关闭,执行解锁库存操作,订单其他状态不做处理*/@Overridepublic void unlockStock(StockLockedTo lockedTo) {WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(lockedTo.getTaskId());//已回滚不做处理if (taskEntity != null){//查询订单是否存在R<OrderVo> r = orderFeignService.orderStatus(taskEntity.getOrderSn());if (r.getCode() == 0){OrderVo orderVo = r.getData(new TypeReference<OrderVo>() {});if (orderVo == null || orderVo.getStatus() == 4){WareOrderTaskDetailTo detailTo = lockedTo.getDetailTo();//判断工作单的状态是否为 1-已锁定,证明只做了锁定库存操作if (detailTo.getLockStatus() == 1){//恢复库存unlock(detailTo.getId(),detailTo.getSkuNum(),detailTo.getSkuId(),detailTo.getWareId());}}}else {throw new OrderFeignException();}}}/*** 解锁库存* UPDATE `wms_ware_sku` SET stock_locked = stock_locked - ?* WHERE sku_id = ? AND ware_id = ?*/private void unlock(Long id,Integer skuNum, Long skuId, Long wareId) {wareSkuDao.unlock(skuNum,skuId,wareId);//修改状态为 已解锁WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity();wareOrderTaskDetailEntity.setId(id);wareOrderTaskDetailEntity.setLockStatus(2);}

二、订单服务关闭订单同时也执行解锁库存操作

rabbitmq延时队列自动解锁库存

一、库存服务自动解锁库存

使用了最终一致性来解决分布式事务
当order服务出现异常回滚,此时ware服务无法回滚,怎么办?

使用seata全局事务虽然能在order服务出现异常导致回滚时使其他服务的也能同时回滚,但在流量大的情况下是使用加锁的方式,效率
低不适合并发量大的情况,也可以使用定时任务轮询去查看订单的状态,但是轮询的方式比较占资源和内存,所以选用最终一致性的方案,使用mq延时队列死信路由,然后做出补救方案,只要订单服务出现故障就通过mq定时去判断,只要能保证库存最终能解锁即可

延时队列自动解锁库存业务逻辑
ware服务在完成锁库存时就给mq发消息,把消息存到死信队列中,这个消息记录了那些商品锁定多少库存,当queue到达存活时间就会把消息交给死信路由交换机,死信路由交换机会把消息发到最终的队列,如果订单支付时间为30分钟,我们就把存活时间设置为40分钟,这样就能保证我们监听的消息一定是超过了支付的时间的,然后让ware库存服务去订阅监听最终的队列即可,只要有消息我们就去检验order订单服务,只要证明订单服务出现异常回滚或者订单超过支付时间未支付的订单我们就去做一个解锁还原库存的操作

1.库存锁定成功,给mq发消息

(1)保存 工作单(订单号)、工作单详情(商品锁了多少库存)
(2)把上面的数据给mq发一份

@Transactional@Overridepublic boolean orderStock(OrderStockRequest orderStockRequest) {//保存工作单WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity();wareOrderTaskEntity.setOrderSn(orderStockRequest.getOrderSn());wareOrderTaskService.save(wareOrderTaskEntity);List<OrderItemVo> itemVos = orderStockRequest.getItemVos();List<SkuStockfromWare> collect = itemVos.stream().map(item -> {SkuStockfromWare skuStockfromWare = new SkuStockfromWare();skuStockfromWare.setSkuId(item.getSkuId());skuStockfromWare.setNum(item.getCount());//查询该商品在那些仓库有库存List<Long> wareId = wareSkuDao.skuStockfromWare(item.getSkuId());skuStockfromWare.setWareId(wareId);return skuStockfromWare;}).collect(Collectors.toList());//根据skuId遍历for (SkuStockfromWare skuStockfromWare : collect) {//判断是否锁定成功boolean flag = false;//判断该商品是否有仓库存在库存List<Long> wareIdList = skuStockfromWare.getWareId();if (wareIdList.size() < 0 || wareIdList == null){throw new NoWareStockException(skuStockfromWare.getSkuId());}for (Long wareId : wareIdList) {Long count = wareSkuDao.LockedStockFromWare(skuStockfromWare.getSkuId(),wareId,skuStockfromWare.getNum());if (count.equals(1L)){//锁定成功flag = true;//保存工作单详情WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity();wareOrderTaskDetailEntity.setSkuId(skuStockfromWare.getSkuId());wareOrderTaskDetailEntity.setSkuNum(skuStockfromWare.getNum());wareOrderTaskDetailEntity.setTaskId(wareOrderTaskEntity.getId());wareOrderTaskDetailEntity.setWareId(wareId);wareOrderTaskDetailEntity.setLockStatus(1);wareOrderTaskDetailService.save(wareOrderTaskDetailEntity);//TODO 库存锁定成功->发消息给交换机StockLocked stockLocked = new StockLocked();stockLocked.setTaskId(wareOrderTaskEntity.getId());WareOrderTaskDetailTo wareOrderTaskDetailTo = new WareOrderTaskDetailTo();BeanUtils.copyProperties(wareOrderTaskDetailEntity,wareOrderTaskDetailTo);stockLocked.setDetailTo(wareOrderTaskDetailTo);//convertAndSend(String exchange, String routingKey, Object object)rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",);//该商品锁定库存成功就执行下一个商品break;}}//如果没有一个仓库扣成功,代表此skuId的库存不足if (!flag){throw new SkuNoStockException(skuStockfromWare.getSkuId());}}return true;}

2.监听队列,解锁库存

(1)判断工作单是否存在
不存在代表锁库存操作已回滚,不做处理
(2)查询订单是否存在
如果订单不存在,表示下订单操作已回滚,执行解锁库存操作
如果存在,查询订单状态是否为 4-已关闭,如果是 4-已关闭,执行解锁库存操作,订单其他状态不做处理
(3)解锁前判断工作单的状态是否为 1-已锁定,证明只做了锁定库存操作
(4)解锁库存,修改工作单详情状态为 已解锁

/*** 解锁库存*/
@RabbitListener(queues = {"stock.release.stock.queue"})
@Service
public class UnLockStockListener {@AutowiredWareSkuService wareSkuService;@RabbitHandlerpublic void UnLockStock(StockLockedTo lockedTo, Channel channel, Message message) throws IOException {try {wareSkuService.unlockStock(lockedTo);//签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {//拒签,让消息重新归队,等待服务器重启进行下一次解锁channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}

解锁操作

/*** 解锁库存** (1).判断工作单是否存在*      不存在代表已回滚,不做处理*      (2).查询订单是否存在*          如果订单不存在,表示已回滚*             (3).执行解锁库存操作*          如果存在,查询订单状态是否为 4-已关闭*                如果是 4-已关闭,执行解锁库存操作,订单其他状态不做处理*/@Overridepublic void unlockStock(StockLockedTo lockedTo) {WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(lockedTo.getTaskId());//已回滚不做处理if (taskEntity != null){//查询订单是否存在R<OrderVo> r = orderFeignService.orderStatus(taskEntity.getOrderSn());if (r.getCode() == 0){OrderVo orderVo = r.getData(new TypeReference<OrderVo>() {});if (orderVo == null || orderVo.getStatus() == 4){WareOrderTaskDetailTo detailTo = lockedTo.getDetailTo();//判断工作单的状态是否为 1-已锁定,证明只做了锁定库存操作if (detailTo.getLockStatus() == 1){//恢复库存unlock(detailTo.getId(),detailTo.getSkuNum(),detailTo.getSkuId(),detailTo.getWareId());}}}else {throw new OrderFeignException();}}}/*** 解锁库存* UPDATE `wms_ware_sku` SET stock_locked = stock_locked - ?* WHERE sku_id = ? AND ware_id = ?*/private void unlock(Long id,Integer skuNum, Long skuId, Long wareId) {wareSkuDao.unlock(skuNum,skuId,wareId);//修改状态为 已解锁WareOrderTaskDetailEntity wareOrderTaskDetailEntity = new WareOrderTaskDetailEntity();wareOrderTaskDetailEntity.setId(id);wareOrderTaskDetailEntity.setLockStatus(2);}

二、订单服务关闭订单同时也执行解锁库存操作