消息队列的消费幂等性如何保证

2020年12月2日 92点热度 0条评论 来源: linyb极客之路

什么是幂等?

任意多次执行所产生的影响均与一次执行的影响相同就可以称为幂等

什么是消息幂等?

当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响

为什么我们要保证幂等性,不保证幂等性,会不会有问题?

这个问题其实没法准确回答。回答这个问题的根源得从业务场景上进行分析。比如正常业务情况下,我们是不允许同个订单重复支付,这种业务场景我们就需要确保幂等性。再比如日志记录,这种业务场景,我们可能就不需要做幂等判断。

因此是否要保证幂等性,得基于业务进行考量

消息队列的消费幂等性如何保证?

没法保证。前面说了要保证幂等性,得基于业务场景进行考量。消息队列他本身就不是给你用来做业务幂等性用的。如果你要实现业务幂等性,靠消息队列是没法帮你完成的,你自己得根据自身业务场景,来实现幂等。

常用的业务幂等性保证方法

1、利用数据库的唯一约束实现幂等

比如将订单表中的订单编号设置为唯一索引,创建订单时,根据订单编号就可以保证幂等

2、去重表

这个方案本质也是根据数据库的唯一性约束来实现。其实现大体思路是:首先在去重表上建唯一索引,其次操作时把业务表和去重表放在同个本地事务中,如果出现重现重复消费,数据库会抛唯一约束异常,操作就会回滚

3、利用redis的原子性

每次操作都直接set到redis里面,然后将redis数据定时同步到数据库中

4、多版本(乐观锁)控制

此方案多用于更新的场景下。其实现的大体思路是:给业务数据增加一个版本号属性,每次更新数据前,比较当前数据的版本号是否和消息中的版本一致,如果不一致则拒绝更新数据,更新数据的同时将版本号+1

5、状态机机制

此方案多用于更新且业务场景存在多种状态流转的场景

6、token机制

生产者发送每条数据的时候,增加一个全局唯一的id,这个id通常是业务的唯一标识,比如订单编号。在消费端消费时,则验证该id是否被消费过,如果还没消费过,则进行业务处理。处理结束后,在把该id存入redis,同时设置状态为已消费。如果已经消费过了,则不进行处理。

演示

例子使用springboot2加kafka来演示一下使用token机制如何实现消费端幂等

1、application.yml

spring:
  redis:
    host: localhost
    port: 6379
    # 连接超时时间(毫秒)
    timeout: 10000
    jedis:
      pool:
        # 连接池中的最大空闲连接
        max-idle: 8
        # 连接池中的最小空闲连接
        min-idle: 10
        # 连接池最大连接数(使用负值表示没有限制)
        max-active: 100
        # 连接池最大阻塞等待时间(使用负值表示没有限制)
        max-wait: -1
    password:
  kafka:
    # 以逗号分隔的地址列表,用于建立与Kafka集群的初始连接(kafka 默认的端口号为9092)
    bootstrap-servers: localhost:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: com.github.lybgeek.kafka.serialization.ObjectSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: com.github.lybgeek.kafka.serialization.ObjectDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 1
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate

2、实现kafka的自定义序列和反序列

:kakfa默认的序列化和反序列方式是StringSerializer和StringDeserializer。我们要改造成支持对象的序列化和反序列化

a、序列化

public class ObjectSerializer implements Serializer<Object> {


    @Override
    public byte[] serialize(String topic, Object object) {
        return BeanUtils.serialize(object);
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

b、反序列化

public class ObjectDeserializer implements Deserializer<Object> {

    @Override
    public Object deserialize(String topic, byte[] bytes) {
        return BeanUtils.deserialize(bytes);
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }
}

3、消息对象

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDTO<T> implements Serializable {

    private String messageId;


    private T data;
}

4、生产者

:本例子简单模拟生产者多次生产同个消息,进而达到多次消费的效果

@Slf4j
@Component
public class KafkaProducer implements CommandLineRunner {


    @Autowired
    private KafkaTemplate kafkaTemplate;

    private int threadNum = 2;

    private ExecutorService executorService = Executors.newFixedThreadPool(threadNum);

    private CountDownLatch countDownLatch = new CountDownLatch(threadNum);


    @Override
    public void run(String... args) throws Exception {
          send();
    }


    private void send(){
        for(int i = 0; i < threadNum; i++){
            executorService.submit(()->{
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                   log.error(e.getMessage(),e);
                }
                String messageId = "b14701b8-4b08-4bbd-8a4e-70f76a432e99";

                MessageDTO messageDTO = MessageDTO.builder().messageId(messageId).data("hello").build();
                kafkaTemplate.send(Constant.TOPIC,messageDTO);
            });

            countDownLatch.countDown();
        }

    }
}

5、消费者

@Component
@Slf4j
public class KafkaConsumer {

    @Autowired
    private RedisUtils redisUtils;

    @KafkaListener(id = "msgId",topics = {Constant.TOPIC})
    public void receive(ConsumerRecord<String, MessageDTO<String>> record,Acknowledgment ack){

        boolean isRepeateConsume = checkRepeateConsume(record.value().getMessageId());
        if(isRepeateConsume){
            log.error("重复消费。。。。");
            //手工确认
            ack.acknowledge();
            return;
        }


       doBiz(record,ack);
    }

    private boolean checkRepeateConsume(String messageId){
        Object consumeStatus = redisUtils.get(messageId);
        /**
         * 1、如果redis存在消息ID,且消费状态为已消费,则说明是重复消费,此时消费端丢弃该消息
         */
        if(Objects.nonNull(consumeStatus) && "已消费".equals(consumeStatus.toString())){
           // log.error("重复消费。。。。");
            return true;
        }

        /**
         * 2、如果redis不存在消息id,或者状态不是已消费,则从业务方面进行判重
         *
         *  业务判重的可以考虑如下方法:
         *  如果该业务是存在状态流转,则采用状态机策略进行判重。
         *  如果该业务不是状态流转类型,则在新增时,根据业务设置一个唯一的属性,比如根据订单编号的唯一性;
         *  更新时,可以采用多版本策略,在需要更新的业务表上加上版本号
         */
        return checkRepeateByBiz(messageId);
    }



    /**
     * 模拟业务消费
     * @param messageDTO
     * @param ack
     */
    private void doBiz(ConsumerRecord<String, MessageDTO<String>> record,Acknowledgment ack){
        System.out.println("------模拟业务处理-----------");
        System.out.println("--------执行业务处理:"+record.value()+"------------");
        System.out.println("--------------1、业务处理完毕-----------");
        try {
            redisUtils.setEx(record.value().getMessageId(), "已消费",12, TimeUnit.HOURS);
            System.out.println("-------------2、业务处理完毕后,把全局ID存入redis,并设置值为已消费");
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("----------3、业务处理完毕后,消费端手工确认");
        //手工确认
        ack.acknowledge();

    }

}

6、效果

2020-08-09 16:25:32.701  INFO 9552 --- [    msgId-0-C-1] io.lettuce.core.KqueueProvider           : Starting without optional kqueue library
------模拟业务处理-----------
--------执行业务处理:MessageDTO(messageId=b14701b8-4b08-4bbd-8a4e-70f76a432e99, data=hello)------------
--------------1、业务处理完毕-----------
-------------2、业务处理完毕后,把全局ID存入redis,并设置值为已消费
----------3、业务处理完毕后,消费端手工确认
2020-08-09 16:25:36.021 ERROR 9552 --- [    msgId-0-C-1] c.g.l.kafka.consumer.KafkaConsumer       : 重复消费。。。。

总结

消息队列没法帮你做到消费端的幂等性,消费端的幂等性得基于业务场景进行实现。不过消息队列必须得保证消息不能丢,至少保证被消费一次,不然消息都丢了,没数据搞啥业务幂等。在实现消费端处理业务时,要确保消费端是采用手工确认应答机制,而不是自动应答机制。这样能够确保消费端一旦业务处理失败,生产者还能再次发送同个消息给消费端

demo链接

https://github.com/lyb-geek/s...

    原文作者:linyb极客之路
    原文地址: https://segmentfault.com/a/1190000023555975
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。