Springboot+RabbitMQ整合示例

2021年1月17日 12点热度 0条评论 来源: 默-存

一、RabbitMQ简介

         MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

1、MQ特点: MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。

2、含义:RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议

3、概念:RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。如果不熟悉AMQP,直接看RabbitMQ的文档会比较困难。不过它也只有几个关键概念,这里简单介绍。

RabbitMQ的结构图如下:

Broker:简单来说就是消息队列服务器实体。

  •   Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  •   Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  •   Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  •   Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  •   vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
  •   producer:消息生产者,就是投递消息的程序。
  •   consumer:消息消费者,就是接受消息的程序。
  •   channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

消息队列的使用过程大概如下:

  •        客户端连接到消息队列服务器,打开一个channel。
  •   客户端声明一个exchange,并设置相关属性。
  •   客户端声明一个queue,并设置相关属性。
  •   客户端使用routing key,在exchange和queue之间建立好绑定关系。
  •   客户端投递消息到exchange。

      exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

      exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

       RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:

  •   exchange持久化,在声明时指定durable => 1
  •   queue持久化,在声明时指定durable => 1
  •   消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

        如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

4、安装(Centos7):https://blog.csdn.net/typ1805/article/details/82744899

访问:http://192.168.0.132:15672/#/queues

 

二、Springboot整合RabbitMQ

1、添加pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.example.demo.rabbitmq</groupId>
	<artifactId>rabbitmq-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>rabbitmq-demo</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.5.RELEASE</version>
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>

2、application.yml配置文件主要是对rabbimq的配置信息

server:
  port: 8081

spring:
  application:
    name: rabbitmq-demo
  rabbitmq:
    host: 192.168.0.132
    port: 5672
    username: admin
    password: admin

3、初始化创建队列、转发器,并把队列绑定到转发器(RabbitConfig.java)

package com.example.demo.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.boot.SpringApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 路径:com.example.demo.rabbitmq.config
 * 类名:
 * 功能:队列配置
 * 备注:
 * 创建人:typ
 * 创建时间:2018/9/23 21:46
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Configuration
public class RabbitConfig {

    @Bean
    public Queue helloQueue() {
        return new Queue("hello");
    }

    @Bean
    public Queue userQueue() {
        return new Queue("user");
    }

    //===============以下是验证topic Exchange的队列==========
    @Bean
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }
    //===============以上是验证topic Exchange的队列==========


    //===============以下是验证Fanout Exchange的队列==========
    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
    //===============以上是验证Fanout Exchange的队列==========


    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    /**
     * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    /**
     * 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }

    @Bean
    Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
}

4、最简单的hello生产和消费实现(单生产者和单消费者)

生产者:

package com.example.demo.rabbitmq.service.oneToOne;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 路径:com.example.demo.rabbitmq.service
 * 类名:
 * 功能:生产者
 * 备注:单生产者-单消费者
 * 创建人:typ
 * 创建时间:2018/9/23 21:49
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
public class HelloSender {

    private static final Logger log = LoggerFactory.getLogger(HelloSender.class);

    @Autowired
    public AmqpTemplate amqpTemplate;

    public void send(){
        String context = "hello " + new Date();
        log.info("Sender:" + context);
        this.amqpTemplate.convertAndSend("hello",context);
    }
}

消费者:

package com.example.demo.rabbitmq.service.oneToOne;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service
 * 类名:
 * 功能:消费者
 * 备注:单生产者-单消费者
 * 创建人:typ
 * 创建时间:2018/9/23 22:14
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
public class HelloReceiver {

    private static final Logger log = LoggerFactory.getLogger(HelloReceiver.class);

    //监听器监听指定的Queue
    @RabbitListener(queues="hello")
    public void process(String hello){
        log.info("Receiver:"+hello);

    }

}

controller测试:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.oneToOne.HelloSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路径:com.example.demo.rabbitmq.controller
 * 类名:
 * 功能:《用一句描述一下》
 * 备注:单生产者-单消费者
 * 创建人:typ
 * 创建时间:2018/9/23 22:35
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@RestController
public class RabbitOneToOneTest {

    @Autowired
    private HelloSender helloSender;

    @PostMapping("/hello")
    public void hello(){
        helloSender.send();
    }
}

启动程序,执行:

http://localhost:8081/hello

结果如下:

Sender : hello1 Thu September 24 17:23:31 CST 2018
Receiver  : hello1 Thu September 24 17:23:31 CST 2018

5、单生产者-多消费者

生产者:

package com.example.demo.rabbitmq.service.oneToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 路径:com.example.demo.rabbitmq.service
 * 类名:
 * 功能:生产者
 * 备注:单生产者-多消费者
 * 创建人:typ
 * 创建时间:2018/9/23 21:49
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
public class HelloSender1 {

    private static final Logger log = LoggerFactory.getLogger(HelloSender1.class);

    @Autowired
    public AmqpTemplate amqpTemplate;

    public void send(String msg){
        String context = msg + new Date();
        log.info("Sender1:" + context);
        this.amqpTemplate.convertAndSend("hello",context);
    }
}

消费者1:

package com.example.demo.rabbitmq.service.oneToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service
 * 类名:
 * 功能:消费者1
 * 备注:单生产者-多消费者
 * 创建人:typ
 * 创建时间:2018/9/23 22:14
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
public class HelloReceiver1 {

    private static final Logger log = LoggerFactory.getLogger(HelloReceiver1.class);

    //监听器监听指定的Queue
    @RabbitListener(queues="hello")
    public void process(String hello){
        log.info("Receiver1:"+hello);
    }

}

消费者2:

package com.example.demo.rabbitmq.service.oneToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service
 * 类名:
 * 功能:消费者2
 * 备注:单生产者-多消费者
 * 创建人:typ
 * 创建时间:2018/9/23 22:14
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
public class HelloReceiver2 {

    private static final Logger log = LoggerFactory.getLogger(HelloReceiver2.class);

    //监听器监听指定的Queue
    @RabbitListener(queues="hello")
    public void process(String hello){
        log.info("Receiver2:"+hello);

    }

}

controller测试:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.oneToMany.HelloSender1;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路径:com.example.demo.rabbitmq.controller
 * 类名:
 * 功能:《用一句描述一下》
 * 备注:单生产者-多消费者
 * 创建人:typ
 * 创建时间:2018/9/23 22:35
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@RestController
public class RabbitOneToManyTest {

    @Autowired
    private HelloSender1 helloSender;
    
    /**
     * 方法名:
     * 功能:单生产者-多消费者
     * 描述:
     * 创建人:typ
     * 创建时间:2018/9/23 22:46
     * 修改人:
     * 修改描述:
     * 修改时间:
     */
    @PostMapping("/oneToMany")
    public void ontToMany(){
        for (int i=0;i<10;i++){
            helloSender.send("hello smg:"+i);
        }
    }
}

6、多生产者-多消费者

生产者1:

package com.example.demo.rabbitmq.service.manyToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 路径:com.example.demo.rabbitmq.service
 * 类名:
 * 功能:生产者1
 * 备注:多生产者-多消费者
 * 创建人:typ
 * 创建时间:2018/9/23 21:49
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
public class HelloSenderA {

    private static final Logger log = LoggerFactory.getLogger(HelloSenderA.class);

    @Autowired
    public AmqpTemplate amqpTemplate;

    public void send(String msg){
        String context = msg + new Date();
        log.info("SenderA:" + context);
        this.amqpTemplate.convertAndSend("hello",context);
    }
}

生产者2:

package com.example.demo.rabbitmq.service.manyToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 路径:com.example.demo.rabbitmq.service
 * 类名:
 * 功能:生产者2
 * 备注:多生产者-多消费者
 * 创建人:typ
 * 创建时间:2018/9/23 21:49
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
public class HelloSenderB {

    private static final Logger log = LoggerFactory.getLogger(HelloSenderB.class);

    @Autowired
    public AmqpTemplate amqpTemplate;

    public void send(String msg){
        String context = msg + new Date();
        log.info("SenderB:" + context);
        this.amqpTemplate.convertAndSend("hello",context);
    }
}

消费者1:

package com.example.demo.rabbitmq.service.manyToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service
 * 类名:
 * 功能:消费者1
 * 备注:多生产者-多消费者
 * 创建人:typ
 * 创建时间:2018/9/23 22:14
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
public class HelloReceiverA {

    private static final Logger log = LoggerFactory.getLogger(HelloReceiverA.class);

    //监听器监听指定的Queue
    @RabbitListener(queues="hello")
    public void process(String hello){
        log.info("ReceiverA:"+hello);
    }

}

消费者2:

package com.example.demo.rabbitmq.service.manyToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service
 * 类名:
 * 功能:消费者2
 * 备注:多生产者-多消费者
 * 创建人:typ
 * 创建时间:2018/9/23 22:14
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
public class HelloReceiverB {

    private static final Logger log = LoggerFactory.getLogger(HelloReceiverB.class);

    //监听器监听指定的Queue
    @RabbitListener(queues="hello")
    public void process(String hello){
        log.info("ReceiverB:"+hello);
    }

}

controller测试:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.manyToMany.HelloSenderA;
import com.example.demo.rabbitmq.service.manyToMany.HelloSenderB;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路径:com.example.demo.rabbitmq.controller
 * 类名:
 * 功能:《用一句描述一下》
 * 备注:多生产者-多消费者
 * 创建人:typ
 * 创建时间:2018/9/23 22:35
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@RestController
public class RabbitManyToManyTest {

    @Autowired
    private HelloSenderA helloSenderA;

    @Autowired
    private HelloSenderB helloSenderB;

    /**
     * 方法名:
     * 功能:多生产者-多消费者
     * 描述:
     * 创建人:typ
     * 创建时间:2018/9/23 22:46
     * 修改人:
     * 修改描述:
     * 修改时间:
     */
    @PostMapping("/manyToMany")
    public void ontToMany(){
        for (int i=0;i<10;i++){
            helloSenderA.send("hello smg:"+i);
            helloSenderB.send("hello smg:"+i);
        }
    }
}

7、实体类传输,springboot完美的支持对象的发送和接收,不需要格外的配置。

实体类(必须实现序列化接口):

package com.example.demo.rabbitmq.service.entity;

import java.io.Serializable;

/**
 * 路径:com.example.demo.rabbitmq.service.entity
 * 类名:
 * 功能:《用一句描述一下》
 * 备注:
 * 创建人:typ
 * 创建时间:2018/9/24 19:59
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
public class User implements Serializable{

    private String name;
    private String pass;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPass() {
        return pass;
    }

    public void setPass(String pass) {
        this.pass = pass;
    }
}

生产者:

package com.example.demo.rabbitmq.service.entity;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service.entity
 * 类名:
 * 功能:实体类传输
 * 备注:生产者
 * 创建人:typ
 * 创建时间:2018/9/24 20:01
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
public class UserSender {

    private static final Logger log = LoggerFactory.getLogger(UserSender.class);

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send() {
        User user = new User();
        user.setName("test");
        user.setPass("123456");
        log.info("user Sender:" + user.getName() + "," + user.getPass());
        amqpTemplate.convertAndSend("user", user);
    }
}

消费者:

package com.example.demo.rabbitmq.service.entity;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service.entity
 * 类名:
 * 功能:实体类传输
 * 备注:消费者
 * 创建人:typ
 * 创建时间:2018/9/24 20:07
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
@RabbitListener(queues = "user")
public class UserReceiver {

    private static final Logger log = LoggerFactory.getLogger(UserReceiver.class);

    @RabbitHandler
    public void process(User user) {
        log.info("user Receive:" + user.getName() + "," + user.getPass());
    }
}

controller测试:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.entity.UserSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路径:com.example.demo.rabbitmq.controller
 * 类名:
 * 功能:实体类传输测试
 * 备注:
 * 创建人:typ
 * 创建时间:2018/9/24 20:09
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@RestController
public class RabbitUserTest {

    @Autowired
    private UserSender userSender;

    @PostMapping("/userTest")
    public void userTets(){
        userSender.send();
    }
}

8、topic ExChange示例

     topic 是RabbitMQ中最灵活的一种方式,可以根据binding_key自由的绑定不同的队列。首先对topic规则配置,这里使用两个队列来测试(也就是在Application类中创建和绑定的topic.message和topic.messages两个队列),其中topic.message的bindting_key为“topic.message”,topic.messages的binding_key为“topic.#”。

生产者:

package com.example.demo.rabbitmq.service.topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service.topic
 * 类名:
 * 功能:topic ExChange示例------生产者
 * 备注:topic 是RabbitMQ中最灵活的一种方式,可以根据binding_key自由的绑定不同的队列
 * 创建人:typ
 * 创建时间:2018/9/24 20:12
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
public class TopicSender {

    private static final Logger log = LoggerFactory.getLogger(TopicSender.class);

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String msg1 = "I am topic.mesaage msg1!";
        log.info("sender1 : " + msg1);
        this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);

        String msg2 = "I am topic.mesaages msg2!";
        log.info("sender2 : " + msg2);
        this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
    }
}

消费者1:

package com.example.demo.rabbitmq.service.topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service.topic
 * 类名:
 * 功能:topic ExChange示例
 * 备注:消费者1(topic.message)
 * 创建人:typ
 * 创建时间:2018/9/24 20:12
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {

    private static final Logger log = LoggerFactory.getLogger(TopicReceiver1.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("topicReceiver1: " +msg);
    }

}

消费者2:

package com.example.demo.rabbitmq.service.topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service.topic
 * 类名:
 * 功能:topic ExChange示例
 * 备注:消费者2(topic.messages)
 * 创建人:typ
 * 创建时间:2018/9/24 20:12
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {

    private static final Logger log = LoggerFactory.getLogger(TopicReceiver2.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("topicReceiver2 : " +msg);
    }

}

controller测试:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.topic.TopicSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路径:com.example.demo.rabbitmq.controller
 * 类名:
 * 功能:topic ExChange示例
 * 备注:
 * 创建人:typ
 * 创建时间:2018/9/24 20:21
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@RestController
public class RabbitTopicTest {

    @Autowired
    private TopicSender topicSender;

    @PostMapping("/topicTest")
    public void topicTest(){
        topicSender.send();
    }
}

9、fanout ExChange示例

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的所有队列都收到这个消息。

生产者:

package com.example.demo.rabbitmq.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service.fanout
 * 类名:
 * 功能:fanout ExChange示例---生产者
 * 备注:Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的所有队列都收到这个消息。
 * 创建人:typ
 * 创建时间:2018/9/24 21:10
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
public class FanoutSender {

    private static final Logger log = LoggerFactory.getLogger(FanoutSender.class);

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String msg="fanoutSender :hello i am fanout";
        log.info(msg);
        this.rabbitTemplate.convertAndSend("fanoutExchange","abcd.ee", msg);
    }
}

消费者1:

package com.example.demo.rabbitmq.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service.fanout
 * 类名:
 * 功能:fanout ExChange示例
 * 备注:消费者A
 * 创建人:typ
 * 创建时间:2018/9/24 21:10
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

    private static final Logger log = LoggerFactory.getLogger(FanoutReceiverA.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("FanoutReceiverA  : " + msg);
    }

}

消费者2:

package com.example.demo.rabbitmq.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service.fanout
 * 类名:
 * 功能:fanout ExChange示例
 * 备注:消费者B
 * 创建人:typ
 * 创建时间:2018/9/24 21:10
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

    private static final Logger log = LoggerFactory.getLogger(FanoutReceiverB.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("FanoutReceiverB  : " + msg);
    }

}

消费者3:

package com.example.demo.rabbitmq.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service.fanout
 * 类名:
 * 功能:fanout ExChange示例
 * 备注:消费者C
 * 创建人:typ
 * 创建时间:2018/9/24 21:10
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

    private static final Logger log = LoggerFactory.getLogger(FanoutReceiverC.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("FanoutReceiverC  : " + msg);
    }

}

controller测试:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.fanout.FanoutSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路径:com.example.demo.rabbitmq.controller
 * 类名:
 * 功能:fanout ExChange示例
 * 备注:
 * 创建人:typ
 * 创建时间:2018/9/24 22:11
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@RestController
public class RabbitFanoutTest {

    @Autowired
    private FanoutSender fanoutSender;

    @PostMapping("/fanoutTest")
    public void fanoutTest() {
        fanoutSender.send();
    }
}

10、callback的消息发送

       增加回调处理,这里不再使用application.properties默认配置的方式,会在程序中显示的使用文件中的配置信息。

rabbitmq配置类:

package com.example.demo.rabbitmq.service.callback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;

/**
 * 路径:com.example.demo.rabbitmq.service.callback
 * 类名:
 * 功能:增加回调处理,这里不再使用application.properties默认配置的方式,会在程序中显示的使用文件中的配置信息。
 * 备注:
 * 创建人:typ
 * 创建时间:2018/9/24 20:09
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
public class RabbitConfig {

    private static final Logger log = LoggerFactory.getLogger(RabbitConfig.class);
    
    @Value("${spring.rabbitmq.host}")
    private String addresses;
    
    @Value("${spring.rabbitmq.port}")
    private String port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Value("${spring.rabbitmq.publisher-confirms}")
    private boolean publisherConfirms;
    
    @Bean
    public ConnectionFactory connectionFactory() {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses+":"+port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);

        //如果要进行消息回调,则这里必须要设置为true
        connectionFactory.setPublisherConfirms(publisherConfirms);
        return connectionFactory;
    }

    //因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplatenew() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

}

生产者:

package com.example.demo.rabbitmq.service.callback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * 路径:com.example.demo.rabbitmq.service.callback
 * 类名:CallBackSender
 * 功能:callback的消息发送-----生产者
 * 创建人:typ
 * 创建时间:2018/9/24 20:09
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
public class CallBackSender implements  RabbitTemplate.ConfirmCallback{

    private static final Logger log = LoggerFactory.getLogger(CallBackSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplatenew;

    public void send() {
        rabbitTemplatenew.setConfirmCallback(this);
        String msg="callbackSender : i am callback sender";
        log.info(msg);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("callbackSender UUID: " + correlationData.getId());
        this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData);  
    }

    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("callbakck confirm: " + correlationData.getId());
    }
}

消费者:

package com.example.demo.rabbitmq.service.callback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 路径:com.example.demo.rabbitmq.service.callback
 * 类名:
 * 功能:callback的消息发送
 * 备注:消费者
 * 创建人:typ
 * 创建时间:2018/9/24 20:12
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@Component
@RabbitListener(queues = "topic.messages")
public class CallBackReceiver {

    private static final Logger log = LoggerFactory.getLogger(CallBackReceiver.class);

    @RabbitHandler
    public void process(String msg) {
        log.info("CallBackReceiver : " +msg);
    }

}

controller测试:

package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.callback.CallBackSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 路径:com.example.demo.rabbitmq.controller
 * 类名:
 * 功能:callback的消息发送
 * 备注:
 * 创建人:typ
 * 创建时间:2018/9/24 22:20
 * 修改人:
 * 修改备注:
 * 修改时间:
 */
@RestController
public class RabbitCallBackTest {

    @Autowired
    private CallBackSender callBackSender;

    //执行代码可以看出callbackSender发出的UUID,收到了回应,又传回来了。
    @PostMapping("/callback")
    public void callbak() {
        callBackSender.send();
    }
}

 

欢迎关注

 

    原文作者:默-存
    原文地址: https://blog.csdn.net/typ1805/article/details/82835318
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。