Browse Source

feat: 更新业务逻辑

master
xc-yjs 8 months ago
parent
commit
8e638de689
  1. 23
      nla-common/src/main/java/cn/nla/common/model/ProductMessage.java
  2. 2
      nla-coupon-service/src/main/java/cn/nla/coupon/service/impl/CouponRecordServiceImpl.java
  3. 6
      nla-product-service/src/main/java/cn/nla/product/ProductApplication.java
  4. 106
      nla-product-service/src/main/java/cn/nla/product/config/RabbitMQConfig.java
  5. 7
      nla-product-service/src/main/java/cn/nla/product/controller/ProductController.java
  6. 19
      nla-product-service/src/main/java/cn/nla/product/feign/OrderFeignService.java
  7. 9
      nla-product-service/src/main/java/cn/nla/product/mapper/ProductMapper.java
  8. 21
      nla-product-service/src/main/java/cn/nla/product/model/request/LockProductRequest.java
  9. 20
      nla-product-service/src/main/java/cn/nla/product/model/request/OrderItemRequest.java
  10. 53
      nla-product-service/src/main/java/cn/nla/product/mq/ProductStockMQListener.java
  11. 16
      nla-product-service/src/main/java/cn/nla/product/service/ProductService.java
  12. 109
      nla-product-service/src/main/java/cn/nla/product/service/impl/ProductServiceImpl.java
  13. 37
      nla-product-service/src/main/resources/application.yml
  14. 10
      nla-product-service/src/main/resources/mapper/ProductMapper.xml

23
nla-common/src/main/java/cn/nla/common/model/ProductMessage.java

@ -0,0 +1,23 @@
package cn.nla.common.model;
import lombok.Data;
@Data
public class ProductMessage {
/**
* 消息队列id
*/
private long messageId;
/**
* 订单号
*/
private String outTradeNo;
/**
* 库存锁定taskId
*/
private long taskId;
}

2
nla-coupon-service/src/main/java/cn/nla/coupon/service/impl/CouponRecordServiceImpl.java

@ -97,7 +97,7 @@ public class CouponRecordServiceImpl extends ServiceImpl<CouponRecordMapper, Cou
LoginUser loginUser = LoginInterceptor.threadLocal.get();
String orderOutTradeNo = recordRequest.getOrderOutTradeNo();
List<Long> lockCouponRecordIds = recordRequest.getLockCouponRecordIds();
// 锁定优惠券记录
// 锁定优惠券记录(校验优惠券存在可用)
int updateRows = baseMapper.lockUseStateBatch(loginUser.getId(), CouponStateEnum.USED.name(), lockCouponRecordIds);
//task表插入记录
List<CouponTaskEntity> couponTaskList = lockCouponRecordIds.stream().map(obj -> {

6
nla-product-service/src/main/java/cn/nla/product/ProductApplication.java

@ -3,10 +3,16 @@ package cn.nla.product;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@SpringBootApplication
@MapperScan("cn.nla.*.mapper")
@EnableFeignClients
@EnableDiscoveryClient
@EnableTransactionManagement
@ComponentScan(basePackages = {"cn.nla.*"})
public class ProductApplication {
public static void main(String[] args) {

106
nla-product-service/src/main/java/cn/nla/product/config/RabbitMQConfig.java

@ -0,0 +1,106 @@
package cn.nla.product.config;
import lombok.Data;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Data
public class RabbitMQConfig {
/**
* 交换机
*/
@Value("${mq.config.stock_event_exchange}")
private String eventExchange;
/**
* 第一个队列延迟队列
*/
@Value("${mq.config.stock_release_delay_queue}")
private String stockReleaseDelayQueue;
/**
* 第一个队列的路由key
* 进入队列的路由key
*/
@Value("${mq.config.stock_release_delay_routing_key}")
private String stockReleaseDelayRoutingKey;
/**
* 第二个队列被监听恢复库存的队列
*/
@Value("${mq.config.stock_release_queue}")
private String stockReleaseQueue;
/**
* 第二个队列的路由key
*
* 即进入死信队列的路由key
*/
@Value("${mq.config.stock_release_routing_key}")
private String stockReleaseRoutingKey;
/**
* 过期时间
*/
@Value("${mq.config.ttl}")
private Integer ttl;
/**
* 消息转换器
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 创建交换机 Topic类型也可以用dirct路由
* 一般一个微服务一个交换机
*/
@Bean
public Exchange stockEventExchange(){
return new TopicExchange(eventExchange,true,false);
}
/**
* 延迟队列
*/
@Bean
public Queue stockReleaseDelayQueue(){
Map<String,Object> args = new HashMap<>(3);
args.put("x-message-ttl",ttl);
args.put("x-dead-letter-exchange",eventExchange);
args.put("x-dead-letter-routing-key",stockReleaseRoutingKey);
return new Queue(stockReleaseDelayQueue,true,false,false,args);
}
/**
* 死信队列普通队列用于被监听
*/
@Bean
public Queue stockReleaseQueue(){
return new Queue(stockReleaseQueue,true,false,false);
}
/**
* 第一个队列即延迟队列的绑定关系建立
*/
@Bean
public Binding stockReleaseDelayBinding(){
return new Binding(stockReleaseDelayQueue,Binding.DestinationType.QUEUE,eventExchange,stockReleaseDelayRoutingKey,null);
}
/**
* 死信队列绑定关系建立
*/
@Bean
public Binding stockReleaseBinding(){
return new Binding(stockReleaseQueue,Binding.DestinationType.QUEUE,eventExchange,stockReleaseRoutingKey,null);
}
}

7
nla-product-service/src/main/java/cn/nla/product/controller/ProductController.java

@ -3,6 +3,7 @@ package cn.nla.product.controller;
import cn.nla.common.util.JsonData;
import cn.nla.product.model.VO.ProductVO;
import cn.nla.product.model.request.LockProductRequest;
import cn.nla.product.service.ProductService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@ -45,5 +46,11 @@ public class ProductController {
return JsonData.buildSuccess(productService.findDetailById(productId));
}
@ApiOperation("rpc-锁定,商品库存锁定")
@PostMapping("lock_products")
public JsonData lockProducts(@ApiParam("商品库存锁定") @RequestBody LockProductRequest lockProductRequest){
return productService.lockProductStock(lockProductRequest);
}
}

19
nla-product-service/src/main/java/cn/nla/product/feign/OrderFeignService.java

@ -0,0 +1,19 @@
package cn.nla.product.feign;
import cn.nla.common.util.JsonData;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
/**
* Feign调用优惠券服务接口
*/
@FeignClient(name = "nla-order-service")
public interface OrderFeignService {
/**
* 查询订单状态
*/
@GetMapping("/odr/product/v1/query_state")
JsonData queryProductOrderState(@RequestParam("out_trade_no") String outTradeNo);
}

9
nla-product-service/src/main/java/cn/nla/product/mapper/ProductMapper.java

@ -2,6 +2,7 @@ package cn.nla.product.mapper;
import cn.nla.product.model.entity.ProductEntity;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
/**
* <p>
@ -13,4 +14,12 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
*/
public interface ProductMapper extends BaseMapper<ProductEntity> {
/**
* 锁定商品库存
*/
int lockProductStock(@Param("productId") long productId, @Param("buyNum") int buyNum);
/**
* 解锁商品存储
*/
void unlockProductStock(@Param("productId")Long productId, @Param("buyNum")Integer buyNum);
}

21
nla-product-service/src/main/java/cn/nla/product/model/request/LockProductRequest.java

@ -0,0 +1,21 @@
package cn.nla.product.model.request;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.List;
@ApiModel(value = "商品锁定对象",description = "商品锁定对象协议")
@Data
public class LockProductRequest {
@ApiModelProperty(value = "订单id",example = "12312312312")
@JsonProperty("order_out_trade_no")
private String orderOutTradeNo;
@ApiModelProperty(value = "订单项")
@JsonProperty("order_item_list")
private List<OrderItemRequest> orderItemList;
}

20
nla-product-service/src/main/java/cn/nla/product/model/request/OrderItemRequest.java

@ -0,0 +1,20 @@
package cn.nla.product.model.request;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@ApiModel(value = "商品子项")
@Data
public class OrderItemRequest {
@ApiModelProperty(value = "商品id",example = "1")
@JsonProperty("product_id")
private long productId;
@ApiModelProperty(value = "购买数量",example = "2")
@JsonProperty("buy_num")
private int buyNum;
}

53
nla-product-service/src/main/java/cn/nla/product/mq/ProductStockMQListener.java

@ -0,0 +1,53 @@
package cn.nla.product.mq;
import cn.nla.common.model.ProductMessage;
import cn.nla.product.service.ProductService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
@Slf4j
@Component
@RabbitListener(queues = "${mq.config.stock_release_queue}")
public class ProductStockMQListener {
@Resource
private ProductService productService;
/**
*
* 重复消费-幂等性
*
* 消费失败重新入队后最大重试次数
* 如果消费失败不重新入队可以记录日志然后插到数据库人工排查
*
* 消费者这块还有啥问题大家可以先想下然后给出解决方案
*/
@RabbitHandler
public void releaseProductStock(ProductMessage productMessage, Message message, Channel channel) throws IOException {
log.info("监听到消息:releaseProductStock消息内容:{}", productMessage);
long msgTag = message.getMessageProperties().getDeliveryTag();
boolean flag = productService.releaseProductStock(productMessage);
try {
if (flag) {
//确认消息消费成功
channel.basicAck(msgTag, false);
}else {
channel.basicReject(msgTag,true);
log.error("释放商品库存失败 flag=false,{}",productMessage);
}
} catch (IOException e) {
log.error("释放商品库存异常:{},msg:{}",e,productMessage);
channel.basicReject(msgTag,true);
}
}
}

16
nla-product-service/src/main/java/cn/nla/product/service/ProductService.java

@ -1,8 +1,11 @@
package cn.nla.product.service;
import cn.nla.common.model.PageResult;
import cn.nla.common.model.ProductMessage;
import cn.nla.common.util.JsonData;
import cn.nla.product.model.VO.ProductVO;
import cn.nla.product.model.entity.ProductEntity;
import cn.nla.product.model.request.LockProductRequest;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
@ -30,4 +33,17 @@ public interface ProductService extends IService<ProductEntity> {
* 批量查询
*/
List<ProductVO> findProductsByIdBatch(List<Long> productIdList);
/**
* 锁定商品库存<br>
* 1)遍历商品锁定每个商品购买数量<br>
* 2)每一次锁定的时候都要发送延迟消息<br>
**/
JsonData lockProductStock(LockProductRequest lockProductRequest);
/**
* 释放商品库存
*/
boolean releaseProductStock(ProductMessage productMessage);
}

109
nla-product-service/src/main/java/cn/nla/product/service/impl/ProductServiceImpl.java

@ -1,20 +1,35 @@
package cn.nla.product.service.impl;
import cn.nla.common.enums.BizCodeEnum;
import cn.nla.common.enums.ProductOrderStateEnum;
import cn.nla.common.enums.StockTaskStateEnum;
import cn.nla.common.exception.BizException;
import cn.nla.common.model.PageResult;
import cn.nla.common.model.ProductMessage;
import cn.nla.common.util.JsonData;
import cn.nla.product.config.RabbitMQConfig;
import cn.nla.product.feign.OrderFeignService;
import cn.nla.product.mapper.ProductTaskMapper;
import cn.nla.product.model.VO.ProductVO;
import cn.nla.product.model.entity.ProductEntity;
import cn.nla.product.mapper.ProductMapper;
import cn.nla.product.model.entity.ProductTaskEntity;
import cn.nla.product.model.request.LockProductRequest;
import cn.nla.product.model.request.OrderItemRequest;
import cn.nla.product.service.ProductService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@ -25,8 +40,22 @@ import java.util.stream.Collectors;
* @author YJs
* @since 2024-08-09
*/
@Slf4j
@Service
public class ProductServiceImpl extends ServiceImpl<ProductMapper, ProductEntity> implements ProductService {
@Resource
private ProductTaskMapper productTaskMapper;
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private RabbitMQConfig rabbitMQConfig;
@Resource
private OrderFeignService orderFeignService;
@Override
public ProductVO findDetailById(Long productId) {
return beanProcess(baseMapper.selectById(productId));
@ -45,6 +74,84 @@ public class ProductServiceImpl extends ServiceImpl<ProductMapper, ProductEntity
.stream().map(this::beanProcess).collect(Collectors.toList());
}
@Override
public JsonData lockProductStock(LockProductRequest lockProductRequest) {
String outTradeNo = lockProductRequest.getOrderOutTradeNo();
List<OrderItemRequest> itemList = lockProductRequest.getOrderItemList();
//一行代码,提取对象里面的id并加入到集合里面
List<Long> productIdList = itemList.stream().map(OrderItemRequest::getProductId).collect(Collectors.toList());
//批量查询
List<ProductVO> productVOList = this.findProductsByIdBatch(productIdList);
//分组
Map<Long,ProductVO> productMap = productVOList.stream().collect(Collectors.toMap(ProductVO::getId, Function.identity()));
for(OrderItemRequest item:itemList){
//锁定商品记录
int rows = baseMapper.lockProductStock(item.getProductId(),item.getBuyNum());
if(rows != 1){
throw new BizException(BizCodeEnum.ORDER_CONFIRM_LOCK_PRODUCT_FAIL);
}else {
//插入商品product_task
ProductVO productVO = productMap.get(item.getProductId());
ProductTaskEntity productTask = new ProductTaskEntity();
productTask.setBuyNum(item.getBuyNum());
productTask.setLockState(StockTaskStateEnum.LOCK.name());
productTask.setProductId(item.getProductId());
productTask.setProductName(productVO.getTitle());
productTask.setOutTradeNo(outTradeNo);
productTaskMapper.insert(productTask);
log.info("商品库存锁定-插入商品product_task成功:{}",productTask);
// 发送MQ延迟消息,介绍商品库存
ProductMessage productMessage = new ProductMessage();
productMessage.setOutTradeNo(outTradeNo);
productMessage.setTaskId(productTask.getId());
rabbitTemplate.convertAndSend(rabbitMQConfig.getEventExchange(),rabbitMQConfig.getStockReleaseDelayRoutingKey(),productMessage);
log.info("商品库存锁定信息延迟消息发送成功:{}",productMessage);
}
}
return JsonData.buildSuccess();
}
@Override
public boolean releaseProductStock(ProductMessage productMessage) {
//查询工作单状态
ProductTaskEntity taskDO = productTaskMapper.selectOne(new QueryWrapper<ProductTaskEntity>().eq("id",productMessage.getTaskId()));
if(taskDO == null){
log.warn("工作单不存在,消息体为:{}",productMessage);
}
//lock状态才处理
assert taskDO != null;
if(taskDO.getLockState().equalsIgnoreCase(StockTaskStateEnum.LOCK.name())){
//查询订单状态
JsonData jsonData = orderFeignService.queryProductOrderState(productMessage.getOutTradeNo());
if(jsonData.getCode() == 0){
String state = jsonData.getData().toString();
if(ProductOrderStateEnum.NEW.name().equalsIgnoreCase(state)){
//状态是NEW新建状态,则返回给消息队,列重新投递
log.warn("订单状态是NEW,返回给消息队列,重新投递:{}",productMessage);
return false;
}
//如果是已经支付
if(ProductOrderStateEnum.PAY.name().equalsIgnoreCase(state)){
//如果已经支付,修改task状态为finish
taskDO.setLockState(StockTaskStateEnum.FINISH.name());
productTaskMapper.update(taskDO,new QueryWrapper<ProductTaskEntity>().eq("id",productMessage.getTaskId()));
log.info("订单已经支付,修改库存锁定工作单FINISH状态:{}",productMessage);
return true;
}
}
//订单不存在,或者订单被取消,确认消息,修改task状态为CANCEL,恢复优惠券使用记录为NEW
log.warn("订单不存在,或者订单被取消,确认消息,修改task状态为CANCEL,恢复商品库存,message:{}",productMessage);
taskDO.setLockState(StockTaskStateEnum.CANCEL.name());
productTaskMapper.update(taskDO,new QueryWrapper<ProductTaskEntity>().eq("id",productMessage.getTaskId()));
//恢复商品库存,集锁定库存的值减去当前购买的值
baseMapper.unlockProductStock(taskDO.getProductId(),taskDO.getBuyNum());
} else {
log.warn("工作单状态不是LOCK,state={},消息体={}",taskDO.getLockState(),productMessage);
}
return true;
}
private ProductVO beanProcess(ProductEntity product) {
ProductVO result = new ProductVO();
BeanUtils.copyProperties(product, result);

37
nla-product-service/src/main/resources/application.yml

@ -14,6 +14,28 @@ spring:
url: jdbc:mysql://117.72.43.105:3306/p_nla_product?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: Yuan625621105.
#注册中心地址
cloud:
nacos:
discovery:
server-addr: 117.72.43.105:8848
# username: nacos
# password: sW5U%pxecL#p
# namespace: yjs
#消息队列
rabbitmq:
host: 192.168.30.130
port: 5672
virtual-host: /
password: admin
username: admin
#开启手动确认消息
listener:
simple:
acknowledge-mode: manual
#配置plus打印sql⽇志
mybatis-plus:
configuration:
@ -24,3 +46,18 @@ mybatis-plus:
# level:
# root: INFO
##自定义消息队列配置,发送锁定库存消息-》延迟exchange-》lock.queue-》死信exchange-》release.queue
mq:
config:
#延迟队列,不能被监听消费
stock_release_delay_queue: stock.release.delay.queue
#延迟队列的消息过期后转发的队列
stock_release_queue: stock.release.queue
#交换机
stock_event_exchange: stock.event.exchange
#进入延迟队列的路由key
stock_release_delay_routing_key: stock.release.delay.routing.key
#消息过期,进入释放队列的key
stock_release_routing_key: stock.release.routing.key
#消息过期时间,毫秒,临时改为6分钟
ttl: 360000

10
nla-product-service/src/main/resources/mapper/ProductMapper.xml

@ -20,4 +20,14 @@
id, title, cover_img, detail, old_price, price, stock, create_time, lock_stock
</sql>
<!--锁定商品库存-->
<update id="lockProductStock">
update product set lock_stock = lock_stock + #{buyNum}
where id = #{productId} and stock - lock_stock>=#{buyNum}
</update>
<update id="unlockProductStock">
update product set lock_stock = lock_stock-#{buyNum} where id = #{productId}
</update>
</mapper>

Loading…
Cancel
Save