RabbitMQ自学笔记

2021年3月23日 13点热度 0条评论 来源: xiuzhiwu

MQ(message queue)

AMQP协议
AMQP ladvanced message queuing protool)‘在2003年时被提出,最早用于解决金融领不同平台之间的消
息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protoco1(链接协
议),这是其和UNS的本质差别,AMOP不从API层进行限定,而是直接定义网络交换的数据格式。这使
得实现了AMOP的provider天然性就是跨平台的。以下是ANOP协议模型:


rabbitMq安装:(安装版本为3.8.14.1,系统为centOS7)

	进入官网下载rabbitmq-server-3.8.14-1.el7.noarch.rpm和erlang-23.2.1-1.el7.x86_64.rpm
	官网地址:https://packagecloud.io/rabbitmq
	利用ftp工具将文件放入linux系统,socat安装:
	yum install -y update
	yum install -y socat
	启动rabbitMq:systemctl start rabbitmq-server或者rabbitmq-server -detached           //启动rabbitmq,-detached代表后台守护进程方式启动
	查看rabbitMq状态:rabbitmqctl status
	查看rabbitmq是否启动:systemctl status rabbitmq-server
	停止rabbitMq:rabbitmqctl stop
	列出角色:rabbitmqctl list_users
	配置 rabbitmq 监控地址: rabbitmq-plugins enable rabbitmq_management
	关闭防火墙:systemctl stop firewalld.service
	添加用户:rabbitmqctl add_user admin admin 
	# rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" //添加权限
	# rabbitmqctl set_user_tags admin administrator  //修改用户角色
	rabbitmqctl help:查看更多命令
本地访问:虚拟机地址:15672	

项目实例:
1.创建项目,引入依赖

<!-- 引入rabbitMq相关依赖 -->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.7.2</version>
    </dependency>

第一种模型(直连):

	p:生产者,也就是要发送消息的的程序
	c:消费者,消息的接受者,会一直等待消息的到来
	queue:消息队列,类似于一个邮箱,可以缓存消息。生产者网其中投放消息,消费者从中取出消息。

第二种模型(work queue):

	当消息处理比较耗时时,可能生产消息的速度远远大于消费消息的速度,长此以往,
就会造成消息堆积,这种情况下,就可以使用工作队列。让多个消费者绑定到一个队列,共同消费队列中的消息。获取消息方式类似于轮询。

第三种模型(fanout):

	fanout:扇出,也称为广播
	在广播模式下,消息发送流程是这样的:
	· 可以有多个消费者
	· 每个消费者有自己的队列
	· 每个队列都要绑定到交换机
	· 生产者发送的消息只能发送到交换机,交换机来决定发送到哪个队列,生产者无法决定
	· 交换机把消息发送给绑定过的队列
	· 队列的消费者都能拿到消息,实现一条消息被多个消费者消费

第四种模型(Routing):

Routing之订阅模型-Direct(直连)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
	·队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
	·消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey 。
	·Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息流程:Puttina it all toaether

routing之订阅模型-Topic

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符! 这种模型 
Routingkey一般都是由一个或多个单词组成,多个单词之间以""分割,例如: item.insert

SpringBoot整合rabbitMq使用:
1.创建项目

	创建项目,在导包的时候搜索 spring fot rabbitMq,导入amqp依赖包

2.项目配置(yml文件):

spring:
  application:
    name: rabbitmq_springBoot
  rabbitmq:
    host: 192.168.1.31  // 虚拟机主机的ip
    port: 5672               //端口默认5672
    virtual-host: /ems    //自己创建的虚拟
    username: admin   //用户名和密码
    password: admin

第一种模型:hello模型在整合后的使用:

provider:生产者代码
	public void testHello(){
        rabbitTemplate.convertAndSend("hello","hello world");
    }

customer:消费者代码
	@Component
	@RabbitListener(queuesToDeclare = @Queue("hello")) // 默认名称hello,durable="true",autodelete="false"
	public class Customer {

    	@RabbitHandler
    	public void receive1(String message){
        	System.out.println("message:"+message);
    	}
	}

第二种模型:workQueue使用

生产者代码:
	 // work queue
    @Test
    public void testWork(){
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work","work message"+i);
        }
    }

消费者代码:
	@Component
	public class WorkCustomer {
	
	    @RabbitListener(queuesToDeclare = @Queue(value = "work"))
	    public void receive1(String message){
	        System.out.println("message1:"+message);
	    }
	
	    @RabbitListener(queuesToDeclare = @Queue(value = "work"))
	    public void receive2(String message){
	        System.out.println("message2:"+message);
	    }
	}

第三种模型:fanout广播

生产者代码:
	public void testFanout(){
        rabbitTemplate.convertAndSend("logs","","这是广播模型的消息");
    }
消费者代码
	@Component
	public class FanoutCustomer {
	    @RabbitListener(bindings = {
	            @QueueBinding(value = @Queue, //@Queue代表一个临时队列
	                    exchange = @Exchange(value = "logs",type = "fanout"))
	    })
	    public void receive1(String message){
	        System.out.println("message1:"+message);
	    }
	
	    @RabbitListener(bindings = {
	            @QueueBinding(value = @Queue, //@Queue代表一个临时队列
	                    exchange = @Exchange(value = "logs",type = "fanout"))
	    })
	    public void receive2(String message){
	        System.out.println("message2:"+message);
	    }
	}

第四种模型:Routing之订阅模型-Direct(直连)

生产者代码:
 	public void testRouting(){
        rabbitTemplate.convertAndSend("directs","error","routingKey message");
    }
消费者代码:
	@Component
	public class DirectCustomer {
	    @RabbitListener(bindings = {
	            @QueueBinding(
	                    value = @Queue,
	                    exchange = @Exchange(value = "directs",type = "direct"),
	                    key = {"info","error","warn"}
	            )
	    })
	    public void receive1(String message){
	        System.out.println("message1:"+message);
	    }
	
	
	    @RabbitListener(bindings = {
	            @QueueBinding(
	                    value = @Queue,
	                    exchange = @Exchange(value = "directs",type = "direct"),
	                    key = {"error"}
	            )
	    })
	    public void receive2(String message){
	        System.out.println("message2:"+message);
	    }
	}

rabbitmq集群
集群架构
–普通集群

默认情况下 :RabitMQ代理操作所需的所有数据/状态都将跨所有节点复制。这方面的一个例外是消息队列,默认情况下,消息队列位于一个节点上,尽管它们可以从所有节点看到和访问
	
核心解决问题:当集群中某一时刻master节点宕机,可以对Quene中信息,进行备份

主从配置:
rabbitmq-server -detached无法启动rabbitMq的,可以参考:
https://blog.51cto.com/13777111/2300898

虚拟机:192.168.1.32 主
	192.168.1.33 从
	192.168.1.34 从
1.ip映射
	vim /etc/hosts  加入
	192.168.1.32 mq1
	192.168.1.33 mq2
	192.168.1.34 mq3
	修改完成之后将所有虚拟机hosts文件同步 scp /etc/hosts root@mq2:/etc/,断点续传也行
2.机器安装rabbitmq和erlang,如果克隆了安装了这两个的虚拟机(VMware),则可以省略
3.查看cookie是否一致(VMware克隆的机器是一样的,不一样就用scp同步)
	cat /var/lib/rabbitmq/.erlang.cookie
4.启动rabbitMq,成功访问管理界面
	rabbitmq-server -detached
5.在node2和node3执行加入集群命令:
	1.关闭  rabbitmqctl stop_app
	2.加入集群  rabbitmqctl join_cluster rabbit@mq1
	3.启动服务:rabbitmqctl start_app
6.查看集群状态,任意节点执行:
	rabbitmqctl cluster_status
7.如果出现如下显示,集群搭建成功:
	Cluster status of node rabbit@mq3 ...
	[ {nodes, [ idisc, [ rabbit@mq1 , rabbit@mq2 , rabbit@mq3]}]》.frunning_nodes , [ rabbit@mq1 , rabbit@mq2 , rabbit@mq3 ]}.{cluster_name , <<" rabbit@mq1">>},
	{partitions, [ ]},
	{alarms, [ {rabbit@mq1, [ ]},{rabbit@mq2,[] },{rabbit@mq3, []》]}]

镜像集群:
1.镜像架构图

2.配置集群架构

策略说明
	rabbitmqctl set.policy [-p vhost>] [--priority <priority ] [--apply-to <apply-to>] <names <pattern<definition>
	-p Vhost:可选参数,针对指定vhost下的queue进行设置
	Name :policy的名称
	Pattern : queue的匹配模式(正则表达式)
	Definition:镜像定义,包括三个部分ha-mode,ha-params,ha-sync-mode
		ha-mode:指明镜像队列的模式,有效值为all/exactly/nodes
			all:表示在集群中所有的节点上进行镜像
			exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
			nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
		ha-params: ha-mode模式需要用到的参数
		ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
		priority:可选参数, policy的优先级
1.查看当前策略
	rabbitmqctl list_policies
2.添加策略
	rabbitmqctl set_policy ha-all ' ^hello' '{" ha-mode" : "all" , " ha-sync-mode" : " automatic" }'
	说明:策略正则表达式为“A”表示所有匹配所有队列名称^hello:匹配hello开头队列
3.删除策略
	rabbitmqctl set_policy ha-all '^hello' ' { " ha-mode " : "all" , " ha-sync-mode " : " automatic" }'

    原文作者:xiuzhiwu
    原文地址: https://blog.csdn.net/weixin_45301190/article/details/111701647
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。