rabbitMQ框架(消费幂等,消息可靠性等)

2021年3月27日 15点热度 0条评论 来源: t-m_a_c

一.架构图

二.开启相应mq配置

1.启动类中开启mq配置注解

2.绑定生产者消费者管道


3.在你的配置中心(consul,Apollo等)添加基础配置


三.生产者执行流程

1.首先定义一个发送消息的接口

2.实现消息的接口

3.通过BinderAwareChannelResolver中获取massageChannel实例调用send方法通过管道发送消息

四.消费者消费

1.streamlistener监听消息
2.消费后无论成功与否会调用channel中的ack或nack方法,让生产者确认消费者是否消费成功,移除队列中的消息或将消息存放到死信队列

父类中的方法

里面的分布式锁时处理重复消费的后面提

五.架构层面的消息拦截处理包括(生产数据消费数据持久化mongo,解决消费幂等性,消息投递可靠性,消息事务方面的补偿)

1.生产者发送消息的时候进行代理拦截

拦截 (二.2)中消息key

将key存放到threadLocal中 目的是数据的存储

2.将生产者发送消息的方法进行拦截




由上面代码可知,发布订阅模式采用的时topic模式通过routingkey 的#通配符进行发布订阅等同于fanout模式,并将我们的数据存放到mongo中,方便业务来查看发送数据排查业务问题


这里的代码主要是实现生产者的一个消息confirm主要目为了将消息存到生产者的mongo库中,并存在生产者对应的私信队列

六.消费者listenStreamer的一个拦截过程

1.此段代码逻辑就是将header中的信息存放到userThreadLocal中,便于消费者的业务方获取token数据


2.这一段逻辑主要将消费者接受的数据存放到mongo中,便于消费业务方来处理业务问题

分布式锁的这段逻辑主要处理消息幂等性问题(据说没有一款消息中间件能够保证消息不会被重复消费)在生产者方已经生成了mqid(uuid)+mqkey作为redis的key,在消费者这一块来进行判断是否已经消费。最后面的代码设置threadlocal的参数,目的就是为了后续消息的补偿,分布式事务采用的时saga数据补偿的机制(这里可能不是最优秀的,会给业务方增加工作量)。
代理拦截的方法执行完了,最后还要走after,这里面主要就是判断消息消费的ack的一个状态,如果重试状态次数大于三的或者拒接的我们就放到死信队列中,通过自定义的事务补偿机制来进行补偿,最后!!!不要忘记释放最开始的那把分布式锁,别让其他线程拿不到锁

3.接下来又拦截了一次这个listenStreamer(可能jar中从1.0升级到2.0,没有在上次方法中继续写逻辑,也可能减少代码的臃肿提取出各自的业务逻辑)


这里就是mq业务的最后了,主要时上报状态给分布式事务那面处理这些死信队列中的数据。

    原文作者:t-m_a_c
    原文地址: https://blog.csdn.net/qq_40354931/article/details/115268023
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。