18 changed files with 573 additions and 25 deletions
@ -0,0 +1,17 @@ |
|||
package cn.nla.common.enums; |
|||
|
|||
public enum StockTaskStateEnum { |
|||
/** |
|||
* 锁定 |
|||
*/ |
|||
LOCK, |
|||
/** |
|||
* 完成 |
|||
*/ |
|||
FINISH, |
|||
/** |
|||
* 取消,释放库存 |
|||
*/ |
|||
CANCEL; |
|||
|
|||
} |
@ -0,0 +1,22 @@ |
|||
package cn.nla.common.model; |
|||
|
|||
import lombok.Data; |
|||
|
|||
/** |
|||
* 消息对象 |
|||
**/ |
|||
@Data |
|||
public class CouponRecordMessage { |
|||
/** |
|||
* 消息id |
|||
*/ |
|||
private String messageId; |
|||
/** |
|||
* 订单号 |
|||
*/ |
|||
private String outTradeNo; |
|||
/** |
|||
* 库存锁定任务id |
|||
*/ |
|||
private Long taskId; |
|||
} |
@ -0,0 +1,100 @@ |
|||
package cn.nla.coupon.config; |
|||
|
|||
import lombok.Data; |
|||
import org.springframework.amqp.core.Binding; |
|||
import org.springframework.amqp.core.Exchange; |
|||
import org.springframework.amqp.core.Queue; |
|||
import org.springframework.amqp.core.TopicExchange; |
|||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; |
|||
import org.springframework.amqp.support.converter.MessageConverter; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
|
|||
import java.util.HashMap; |
|||
import java.util.Map; |
|||
|
|||
@Configuration |
|||
@Data |
|||
public class RabbitMQConfig { |
|||
/** |
|||
* 交换机 |
|||
*/ |
|||
@Value("${mq.config.coupon_event_exchange}") |
|||
private String eventExchange; |
|||
/** |
|||
* 第一个队列 延迟队列, |
|||
*/ |
|||
@Value("${mq.config.coupon_release_delay_queue}") |
|||
private String couponReleaseDelayQueue; |
|||
/** |
|||
* 第一个队列的路由key |
|||
* 进入队列的路由key |
|||
*/ |
|||
@Value("${mq.config.coupon_release_delay_routing_key}") |
|||
private String couponReleaseDelayRoutingKey; |
|||
/** |
|||
* 第二个队列,被监听恢复库存的队列 |
|||
*/ |
|||
@Value("${mq.config.coupon_release_queue}") |
|||
private String couponReleaseQueue; |
|||
/** |
|||
* 第二个队列的路由key |
|||
* |
|||
* 即进入死信队列的路由key |
|||
*/ |
|||
@Value("${mq.config.coupon_release_routing_key}") |
|||
private String couponReleaseRoutingKey; |
|||
/** |
|||
* 过期时间 |
|||
*/ |
|||
@Value("${mq.config.ttl}") |
|||
private Integer ttl; |
|||
/** |
|||
* 消息转换器 |
|||
*/ |
|||
@Bean |
|||
public MessageConverter messageConverter(){ |
|||
return new Jackson2JsonMessageConverter(); |
|||
} |
|||
/** |
|||
* 创建交换机 Topic类型,也可以用dirct路由 |
|||
* 一般一个微服务一个交换机 |
|||
*/ |
|||
@Bean |
|||
public Exchange couponEventExchange(){ |
|||
return new TopicExchange(eventExchange,true,false); |
|||
} |
|||
/** |
|||
* 延迟队列 |
|||
*/ |
|||
@Bean |
|||
public Queue couponReleaseDelayQueue(){ |
|||
Map<String,Object> args = new HashMap<>(3); |
|||
args.put("x-message-ttl",ttl); |
|||
args.put("x-dead-letter-routing-key",couponReleaseRoutingKey); |
|||
args.put("x-dead-letter-exchange",eventExchange); |
|||
return new Queue(couponReleaseDelayQueue,true,false,false,args); |
|||
} |
|||
/** |
|||
* 死信队列,普通队列,用于被监听 |
|||
*/ |
|||
@Bean |
|||
public Queue couponReleaseQueue(){ |
|||
return new Queue(couponReleaseQueue,true,false,false); |
|||
} |
|||
/** |
|||
* 第一个队列,即延迟队列的绑定关系建立 |
|||
*/ |
|||
@Bean |
|||
public Binding couponReleaseDelayBinding(){ |
|||
return new Binding(couponReleaseDelayQueue,Binding.DestinationType.QUEUE,eventExchange,couponReleaseDelayRoutingKey,null); |
|||
} |
|||
/** |
|||
* 死信队列绑定关系建立 |
|||
*/ |
|||
@Bean |
|||
public Binding couponReleaseBinding(){ |
|||
return new Binding(couponReleaseQueue,Binding.DestinationType.QUEUE,eventExchange,couponReleaseRoutingKey,null); |
|||
} |
|||
} |
@ -0,0 +1,18 @@ |
|||
package cn.nla.coupon.feign; |
|||
|
|||
import cn.nla.common.util.JsonData; |
|||
import org.springframework.cloud.openfeign.FeignClient; |
|||
import org.springframework.web.bind.annotation.*; |
|||
|
|||
|
|||
/** |
|||
* Feign调用优惠券服务接口 |
|||
*/ |
|||
@FeignClient(name = "nla-order-service") |
|||
public interface OrderFeignService { |
|||
/** |
|||
* 查询订单状态 |
|||
*/ |
|||
@GetMapping("/odr/product/v1/query_state") |
|||
JsonData queryProductOrderState(@RequestParam("out_trade_no") String outTradeNo); |
|||
} |
@ -0,0 +1,22 @@ |
|||
package cn.nla.coupon.model.request; |
|||
|
|||
import io.swagger.annotations.ApiModel; |
|||
import io.swagger.annotations.ApiModelProperty; |
|||
import lombok.Data; |
|||
|
|||
import java.util.List; |
|||
|
|||
@ApiModel(value = "优惠券锁定对象",description = "优惠券锁定对象") |
|||
@Data |
|||
public class LockCouponRecordRequest { |
|||
/** |
|||
* 优惠券记录id列表 |
|||
*/ |
|||
@ApiModelProperty(value = "优惠券记录id列表",example = "[1,2,3]") |
|||
private List<Long> lockCouponRecordIds; |
|||
/** |
|||
* 订单号 |
|||
*/ |
|||
@ApiModelProperty(value = "订单号",example = "3234fw234rfd232") |
|||
private String orderOutTradeNo; |
|||
} |
@ -0,0 +1,66 @@ |
|||
package cn.nla.coupon.mq; |
|||
|
|||
import cn.nla.common.model.CouponRecordMessage; |
|||
import cn.nla.coupon.service.CouponRecordService; |
|||
import com.rabbitmq.client.Channel; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.amqp.core.Message; |
|||
import org.springframework.amqp.rabbit.annotation.RabbitHandler; |
|||
import org.springframework.amqp.rabbit.annotation.RabbitListener; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import javax.annotation.Resource; |
|||
import java.io.IOException; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
@RabbitListener(queues = "${mq.config.coupon_release_queue}") |
|||
public class CouponMQListener { |
|||
|
|||
@Resource |
|||
private CouponRecordService couponRecordService; |
|||
|
|||
// @Resource
|
|||
// private RedissonClient redissonClient;
|
|||
|
|||
/** |
|||
* 重复消费-幂等性 |
|||
* <p> |
|||
* 消费失败,重新入队后最大重试次数: |
|||
* 如果消费失败,不重新入队,可以记录日志,然后插到数据库人工排查 |
|||
* <p> |
|||
* 消费者这块还有啥问题,大家可以先想下,然后给出解决方案 |
|||
*/ |
|||
@RabbitHandler |
|||
public void releaseCouponRecord(CouponRecordMessage recordMessage, Message message, Channel channel) throws IOException { |
|||
//防止同个解锁任务并发进入;如果是串行消费不用加锁;加锁有利也有弊,看项目业务逻辑而定
|
|||
//Lock lock = redissonClient.getLock("lock:coupon_record_release:"+recordMessage.getTaskId());
|
|||
//lock.lock();
|
|||
log.info("监听到消息:releaseCouponRecord消息内容:{}", recordMessage); |
|||
long msgTag = message.getMessageProperties().getDeliveryTag(); |
|||
boolean flag = couponRecordService.releaseCouponRecord(recordMessage); |
|||
try { |
|||
if (flag) { |
|||
//确认消息消费成功
|
|||
channel.basicAck(msgTag, false); |
|||
} else { |
|||
log.error("释放优惠券失败 flag=false,{}", recordMessage); |
|||
channel.basicReject(msgTag, true); |
|||
} |
|||
} catch (IOException e) { |
|||
log.error("释放优惠券记录异常:{},msg:{}", e, recordMessage); |
|||
channel.basicReject(msgTag, true); |
|||
} |
|||
// finally {
|
|||
// lock.unlock();
|
|||
// }
|
|||
|
|||
} |
|||
|
|||
// @RabbitHandler
|
|||
// public void releaseCouponRecord2(String msg,Message message, Channel channel) throws IOException {
|
|||
// log.info(msg);
|
|||
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
|
|||
// }
|
|||
|
|||
} |
@ -0,0 +1,74 @@ |
|||
server: |
|||
port: 9002 |
|||
spring: |
|||
application: |
|||
name: nla-coupon-service |
|||
# 缓存 |
|||
redis: |
|||
host: 127.0.0.1 |
|||
port: 6379 |
|||
database: 0 |
|||
password: yuan123456 |
|||
#数据库配置 |
|||
datasource: |
|||
driver-class-name: com.mysql.cj.jdbc.Driver |
|||
url: jdbc:mysql://117.72.43.105:3306/p_nla_coupon?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai |
|||
username: root |
|||
password: Yuan625621105. |
|||
#注册中心地址 |
|||
cloud: |
|||
nacos: |
|||
discovery: |
|||
server-addr: 117.72.43.105:8848 |
|||
# username: nacos |
|||
# password: sW5U%pxecL#p |
|||
# namespace: yjs |
|||
|
|||
#消息队列 |
|||
rabbitmq: |
|||
host: 192.168.30.130 |
|||
port: 5672 |
|||
virtual-host: / |
|||
password: admin |
|||
username: admin |
|||
#开启手动确认消息 |
|||
listener: |
|||
simple: |
|||
acknowledge-mode: manual |
|||
|
|||
#配置plus打印sql⽇志 |
|||
mybatis-plus: |
|||
configuration: |
|||
log-impl: |
|||
org.apache.ibatis.logging.stdout.StdOutImpl |
|||
#设置⽇志级别,ERROR/WARN/INFO/DEBUG,默认是INFO以上才显示 |
|||
#logging: |
|||
# level: |
|||
# root: INFO |
|||
|
|||
|
|||
#seata配置 |
|||
#seata: |
|||
# tx-service-group: ${spring.application.name}-group |
|||
# service: |
|||
# grouplist: |
|||
# nla: 127.0.0.1:8091 |
|||
# vgroup-mapping: |
|||
# nla-coupon-service-group: nla |
|||
|
|||
|
|||
##自定义消息队列配置,发送锁定库存消息-》延迟exchange-》lock.queue-》死信exchange-》release.queue |
|||
mq: |
|||
config: |
|||
#延迟队列,不能被监听消费 |
|||
coupon_release_delay_queue: coupon.release.delay.queue |
|||
#延迟队列的消息过期后转发的队列 |
|||
coupon_release_queue: coupon.release.queue |
|||
#交换机 |
|||
coupon_event_exchange: coupon.event.exchange |
|||
#进入延迟队列的路由key |
|||
coupon_release_delay_routing_key: coupon.release.delay.routing.key |
|||
#消息过期,进入释放死信队列的key |
|||
coupon_release_routing_key: coupon.release.routing.key |
|||
#消息过期时间,毫秒,临时改为6分钟 |
|||
ttl: 360000 |
@ -0,0 +1,31 @@ |
|||
package cn.nla.coupon; |
|||
|
|||
import cn.nla.common.model.CouponRecordMessage; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.Test; |
|||
import org.junit.runner.RunWith; |
|||
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
|||
import org.springframework.boot.test.context.SpringBootTest; |
|||
import org.springframework.test.context.junit4.SpringRunner; |
|||
|
|||
import javax.annotation.Resource; |
|||
|
|||
@RunWith(SpringRunner.class) |
|||
@SpringBootTest(classes = CouponApplication.class) |
|||
@Slf4j |
|||
public class CouponApplicationTests { |
|||
|
|||
@Resource |
|||
private RabbitTemplate rabbitTemplate; |
|||
|
|||
@Test |
|||
public void send() { |
|||
// rabbitTemplate.convertAndSend("coupon.event.exchange", "coupon.release.delay.routing.key", "测试数据");
|
|||
CouponRecordMessage message = new CouponRecordMessage(); |
|||
message.setOutTradeNo("123456abc"); |
|||
message.setTaskId(1L); |
|||
rabbitTemplate.convertAndSend("coupon.event.exchange", "coupon.release.delay.routing.key", message); |
|||
} |
|||
|
|||
} |
|||
|
Loading…
Reference in new issue