Spring Boot + RabbitMQ + rabbitmq_delayed_message_exchange插件实现延迟队列

2021年1月17日 22点热度 0条评论 来源: 林旭南

最近在实现一个业务,就是需要做到消息延迟推送,在Java 多线程并发开发过程中,了解到DelayQueue类的:是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。然后再开一个扫描线程去轮询,就可以实现延迟消息的处理了,但是这样子做有一个弊端,就是得开一个线程时刻轮询,比较好资源,于是就寻找到了另外一种解决方案,现在就开始做下记录分享

解决方案就是RabbitMQ + rabbitmq_delayed_message_exchange插件(这种解决方案的好处还有就是可以解决队列先进先出的情况,比如第一个进到队列的到期时间是30分钟,第二个进入队列的到期时间是20分钟,但是由于队列的先进先出原则,第二个进入的会被阻塞了,等到第一个到期了才会被一起延迟处理)

  1. 安装erlang环境以及rabbitmq(这一步网上教程很多,跳过)
  2. 下载rabbitmq_delayed_message_exchange,官网地址 https://www.rabbitmq.com/community-plugins.htm
  3. 先解压,然后放置到rabbitmq安装的路径下的plugins文件夹中
  4. Enable 插件
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

     

  5. 重启rabbitmq
    net stop rabbitmq
    net start rabbitmq

     

  6. springboot 项目中引入相应的包
     

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

     

  7. 配置类
     

    @Configuration
    public class XdelayConfig {
    
        public static final String DELAY_EXCHANGE = "delay_exchange";
        public static final String DELAY_KEY = "delay_key";
        public static final String DELAY_QUEUE = "delay_queue";
    
        /**
         * 延时队列交换机
         * 注意这里的交换机类型:CustomExchange
         *
         * @return
         */
        @Bean
        public CustomExchange delayExchange() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
        }
    
        /**
         * 延时队列
         *
         * @return
         */
        @Bean
        public Queue delayQueue() {
            return new Queue(DELAY_QUEUE, true);
        }
    
        /**
         * 给延时队列绑定交换机
         *
         * @return
         */
        @Bean
        public Binding cfgDelayBinding(Queue cfgDelayQueue, CustomExchange cfgUserDelayExchange) {
            return BindingBuilder.bind(cfgDelayQueue).to(cfgUserDelayExchange).with(DELAY_KEY).noargs();
        }
    }

     

  8. 延迟消息发送类
     

    @Service
    @Slf4j
    public class XdelaySender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(Booking booking, int delayTime) {
            //这里的消息可以是任意对象,无需额外配置,直接传即可
            log.info("===============延时队列生产消息====================");
            log.info("发送时间:{},发送内容:{}", LocalDateTime.now(), booking.getBookingName());
            this.rabbitTemplate.convertAndSend(
                    XdelayConfig.DELAY_EXCHANGE,
                    XdelayConfig.DELAY_KEY,
                    booking,
                    message -> {
                        //注意这里时间可以使long,而且是设置header
                        message.getMessageProperties().setHeader("x-delay", delayTime * 60000);//设置延迟多少分钟
                        return message;
                    }
            );

     

  9. 延迟消息接收处理类
     

    @Component
    @Configuration
    @Slf4j
    public class XdelayReceiver {
    
        @RabbitListener(queues = XdelayConfig.DELAY_QUEUE)
        public void cfgUserReceiveDealy(Booking booking, Message message) throws IOException {
            log.info("===============接收队列接收消息====================");
            log.info("接收时间:{},接受内容:{}", LocalDateTime.now(), booking.getBookingName());
        }
    }

     

  10. 入口main函数也需要添加@EnableRabbit注解
     

    @SpringBootApplication
    @EnableRabbit
    public class SpringBootRabbitMqApplication {
    
    	public static void main(String[] args) {
    		SpringApplication.run(SpringBootRabbitMqApplication.class, args);
    	}
    }

     

  11. 测试实体类
     

    @Getter
    @Setter
    @ToString
    public class Booking implements Serializable {
        private static final long serialVersionUID = 1L;
        private String bookingName;
        private Date bookingTime;
        private String bookingContent;
        private String operatorName;
    }

     

  12. 测试接口
     

    @RestController
    public class TestController {
    
        @Autowired
        private XdelaySender xdelaySender;
    
    
        @GetMapping("/sendDelay")
        public Object sendDelay() {
            Booking booking1 = new Booking();
            booking1.setBookingContent("hhaha");
            booking1.setBookingName("第一本书");
            booking1.setBookingTime(new Date());
            booking1.setOperatorName("hellen");
            xdelaySender.send(booking1, 2);
            Booking booking2 = new Booking();
            booking2.setBookingContent("hhaha");
            booking2.setBookingName("第二本书");
            booking2.setBookingTime(new Date());
            booking2.setOperatorName("hellen");
            xdelaySender.send(booking2, 1);
            return "ok";
        }
    }

    记录完毕,自己亲手试下

    原文作者:林旭南
    原文地址: https://blog.csdn.net/qq_33749799/article/details/88938166
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。