Cyan Blog Cyan Blog
首页
  • Java (opens new window)
  • JUC (opens new window)
  • JVM (opens new window)
  • Redis

    • Redis安装 (opens new window)
    • Redis基础 (opens new window)
    • Redis实战 (opens new window)
    • Redis集群安装 (opens new window)
    • Redis分布式缓存 (opens new window)
    • Redis多级缓存 (opens new window)
    • Redis原理 (opens new window)
  • 管理工具

    • Maven (opens new window)
    • Git (opens new window)
  • SSM

    • Spring (opens new window)
    • SpringBoot (opens new window)
    • Mybatis (opens new window)
    • MybatisPlus (opens new window)
  • 微服务

    • Docker (opens new window)
    • RabbitMQ (opens new window)
    • SpringCloud (opens new window)
    • Dubbo (opens new window)
    • MongoDB (opens new window)
    • Zookeeper (opens new window)
  • Java面试题 (opens new window)
  • JUC面试题 (opens new window)
  • JVM面试题 (opens new window)
  • Linux面试题 (opens new window)
  • SQL面试题 (opens new window)
  • Maven面试题 (opens new window)
  • Redis面试题 (opens new window)
  • SSM面试题 (opens new window)
  • SpringCloud面试题 (opens new window)
  • Linux (opens new window)
  • C++ (opens new window)
  • 数据库

    • MySQL (opens new window)
    • NoSQL (opens new window)
  • 软件测试

    • 软件测试 (opens new window)
  • 加密解密 (opens new window)
  • bilibili字幕提取 (opens new window)
  • 道理 (opens new window)
  • 关于博主

    • Github (opens new window)
    • CSDN (opens new window)
  • 关于本站

    • 如何搭建博客网站 (opens new window)
首页
  • Java (opens new window)
  • JUC (opens new window)
  • JVM (opens new window)
  • Redis

    • Redis安装 (opens new window)
    • Redis基础 (opens new window)
    • Redis实战 (opens new window)
    • Redis集群安装 (opens new window)
    • Redis分布式缓存 (opens new window)
    • Redis多级缓存 (opens new window)
    • Redis原理 (opens new window)
  • 管理工具

    • Maven (opens new window)
    • Git (opens new window)
  • SSM

    • Spring (opens new window)
    • SpringBoot (opens new window)
    • Mybatis (opens new window)
    • MybatisPlus (opens new window)
  • 微服务

    • Docker (opens new window)
    • RabbitMQ (opens new window)
    • SpringCloud (opens new window)
    • Dubbo (opens new window)
    • MongoDB (opens new window)
    • Zookeeper (opens new window)
  • Java面试题 (opens new window)
  • JUC面试题 (opens new window)
  • JVM面试题 (opens new window)
  • Linux面试题 (opens new window)
  • SQL面试题 (opens new window)
  • Maven面试题 (opens new window)
  • Redis面试题 (opens new window)
  • SSM面试题 (opens new window)
  • SpringCloud面试题 (opens new window)
  • Linux (opens new window)
  • C++ (opens new window)
  • 数据库

    • MySQL (opens new window)
    • NoSQL (opens new window)
  • 软件测试

    • 软件测试 (opens new window)
  • 加密解密 (opens new window)
  • bilibili字幕提取 (opens new window)
  • 道理 (opens new window)
  • 关于博主

    • Github (opens new window)
    • CSDN (opens new window)
  • 关于本站

    • 如何搭建博客网站 (opens new window)
  • Redis安装

  • Redis基础
  • Redis实战——黑马点评

    • 短信登录
    • 商户查询缓存
    • 优惠卷秒杀
    • 分布式锁
    • 分布式锁-redission⭐⭐
    • 秒杀优化
    • Redis消息队列
    • 达人探店(ZSET)
    • 好友关注(SET)
    • 附近商户
    • Redis实战
    • 项目优化
      • 配置MQ
      • 接收消息
      • 发送消息
      • 抛出异常
      • 设置消息消费失败处理
  • Redis集群
  • Redis分布式缓存
  • Redis多级缓存
  • Redis原理
  • 案例导入说明
  • 安装OpenResty
  • Redis
  • Redis实战——黑马点评
2025-05-06
0
0
目录

项目优化

使用RabbitMQ来完成异步任务

项目中的秒杀下单业务是通过java的阻塞队列或者redis的stream消息队列实现异步处理订单的,

但是java的阻塞队列会有内存限制,服务器宕机等风险

# 配置MQ

1)添加依赖:

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2)配置MQ地址:

spring:
  rabbitmq:
    host: 192.168.10.130 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /cyan # 虚拟主机
    username: cyan # 用户名
    password: 123456 # 密码

在Virtual Hosts 中创建一个项目专用的virtual host:/cyan

# 接收消息

定义一个消息监听类:

package com.hmdp.listener;

import com.hmdp.entity.VoucherOrder;
import com.hmdp.service.IVoucherOrderService;
import lombok.RequiredArgsConstructor;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class VoucherOrderListener {
    private final IVoucherOrderService voucherOrderService;
    private final RedissonClient redissonClient;
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "seckill.queue", durable = "true"),
            exchange = @Exchange(name = "seckill.direct"),
            key = "seckill.success"
    ))
    public void listenOrderMessage(VoucherOrder voucherOrder){
        handleVoucherOrder(voucherOrder);

    }
    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        //1.获取用户
        Long userId = voucherOrder.getUserId();
        // 2.创建锁对象
        RLock redisLock = redissonClient.getLock("lock:order:" + userId);
        // 3.尝试获取锁
        boolean isLock = redisLock.tryLock();
        // 4.判断是否获得锁成功
        if (!isLock) {
            // 获取锁失败,直接返回失败或者重试
//            log.error("不允许重复下单!");
            return;
        }
        try {
            voucherOrderService.createVoucherOrder(voucherOrder);
        } finally {
            // 释放锁
            redisLock.unlock();
        }
    }
}

# 发送消息

rabbitTemplate.convertAndSend("seckill.direct", "seckill.success", voucherOrder);

为了防止消息过多导致MQ消息积压而丢失消息,开启MQ的lazy模式

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "seckill.queue", durable = "true"),
        exchange = @Exchange(name = "seckill.direct"),
        arguments = @Argument(name = "x-queue-mode", value = "lazy"),
        key = "seckill.success"
))

# 抛出异常

故意抛出异常查看是否能处理正常

发现抛出异常回滚后,MQ检测到异常,但会不断尝试重试进行消费

# 设置重试上限和重试次数

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: false # true无状态;false有状态。如果业务中包含事务,这里改为false

本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

# 设置消息转换器

stateless: false # true无状态;false有状态。如果业务中包含事务,这里改为false

因为设置为false报错,需要依赖消息id

设置消息转换器,自动创建消息id

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

# 设置消息消费失败处理

我们使用RepublishMessageRecoverer策略

创建一个配置类ErrorMessageConfig

package com.hmdp.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange() {
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue", true);
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

再次发送消息可以看到消息消费失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

不过!!

acknowledge-mode: auto # 自动ack

auto模式如果抛出的是消息转换异常,消息依然会被删除

上次更新: 2025/5/7 00:24:40
Redis实战
Redis集群

← Redis实战 Redis集群→

最近更新
01
md
06-29
02
Redis
06-29
03
HBase
06-29
更多文章>
Theme by Vdoing | Copyright © 2025-2025 Cyan Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式