项目优化
使用RabbitMQ来完成异步任务
项目中的秒杀下单业务是通过java的阻塞队列或者redis的stream消息队列实现异步处理订单的,
但是java的阻塞队列会有内存限制,服务器宕机等风险
# 配置MQ
1)添加依赖:
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2)配置MQ地址:
spring:
rabbitmq:
host: 192.168.10.130 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /cyan # 虚拟主机
username: cyan # 用户名
password: 123456 # 密码
在Virtual Hosts 中创建一个项目专用的virtual host:/cyan
# 接收消息
定义一个消息监听类:
package com.hmdp.listener;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.service.IVoucherOrderService;
import lombok.RequiredArgsConstructor;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class VoucherOrderListener {
private final IVoucherOrderService voucherOrderService;
private final RedissonClient redissonClient;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "seckill.queue", durable = "true"),
exchange = @Exchange(name = "seckill.direct"),
key = "seckill.success"
))
public void listenOrderMessage(VoucherOrder voucherOrder){
handleVoucherOrder(voucherOrder);
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户
Long userId = voucherOrder.getUserId();
// 2.创建锁对象
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
// 3.尝试获取锁
boolean isLock = redisLock.tryLock();
// 4.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
// log.error("不允许重复下单!");
return;
}
try {
voucherOrderService.createVoucherOrder(voucherOrder);
} finally {
// 释放锁
redisLock.unlock();
}
}
}
# 发送消息
rabbitTemplate.convertAndSend("seckill.direct", "seckill.success", voucherOrder);
为了防止消息过多导致MQ消息积压而丢失消息,开启MQ的lazy模式
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "seckill.queue", durable = "true"),
exchange = @Exchange(name = "seckill.direct"),
arguments = @Argument(name = "x-queue-mode", value = "lazy"),
key = "seckill.success"
))
# 抛出异常
故意抛出异常查看是否能处理正常
发现抛出异常回滚后,MQ检测到异常,但会不断尝试重试进行消费
# 设置重试上限和重试次数
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自动ack
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: false # true无状态;false有状态。如果业务中包含事务,这里改为false
本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
# 设置消息转换器
stateless: false # true无状态;false有状态。如果业务中包含事务,这里改为false
因为设置为false报错,需要依赖消息id
设置消息转换器,自动创建消息id
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
# 设置消息消费失败处理
我们使用RepublishMessageRecoverer
策略
创建一个配置类ErrorMessageConfig
package com.hmdp.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange() {
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
再次发送消息可以看到消息消费失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
不过!!
acknowledge-mode: auto # 自动ack
auto模式如果抛出的是消息转换异常,消息依然会被删除
上次更新: 2025/5/7 00:24:40