docker部署rabbitmq

#首先输入:
docker search rabbitmq
#然后下载最新的rabbitmq
docker pull rabbitmq:latest
#然后运行rabbitmq,并设置账户和密码为admin(自行修改)
docker run -dit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:latest
#运行过后无法访问rabbitmq,需要进入容器安装插件
docker exec -it rabbit /bin/bash
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
#docker部署的rabbit后台默认没有UI图表显示,需要修改配置文件
#进入容器后
cd /etc/rabbitmq/conf.d/
#然后输入
echo management_agent.disable_metrics_collector = false > management_agent
#退出容器重启即可
docker restart rabbit
#防火墙放行15672和5672端口

消息队列生产消费流程

直连型交换机(Direct Exchange)

根据消息携带的路由键,将消息投递给对应的队列。
大致流程为将一个队列绑定到一个直连交换机上,并且给队列定义一个路由键(routing key)。
当一个消息携带一个路由值发送给交换机时,交换机会去在队列中寻找相同的路由键,然后传递给该队列。

扇形交换机(Fanout Exchange)

扇形交换机没有路由键的概念,就是绑定了路由键也是无效的。这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列

主题交换机(Topic Exchange)

主题交换机和直连交换机差不多,但是它的路由键和绑定键之间是由规则的。

  1. *(星号)用来表示一个必须出现的单词:例如路由键为A.CC.A,队列Q为.CC.,那么队列Q可以收到消息
  2. \#(井号)用来表示任意数量的单词:例如路由键为EE.FAAFWDZXC,队列Q1为EE.\#,那么队列Q1可以收到消息

Springboot整合RabbitMq

测试直连交换机方式

配置交换机和队列路由

package com.demo.Config.RabbitMq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class DirectRabbitConfig {

    //队列 起名:DirectQueue
    @Bean
    public Queue DirectQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("DirectQueue",true,true,false);

        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("DirectQueue",true,false,false);
    }

   //Direct交换机 起名:DirectExchange
    @Bean
    DirectExchange DirectExchange() {
      //  return new DirectExchange("DirectExchange",true,true);
        return new DirectExchange("DirectExchange",true,false);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:DirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("DirectRouting");
    }
}

创建生产者

package com.demo.Controller.RabbitMq;

import com.demo.Common.lang.ResultBody;
import org.apache.shiro.authz.annotation.Logical;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @author zqf
 * @date 2021/12/23 16:16
 * @description
 */
@RestController
public class RabbitProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/sendDirectMessage")
    @RequiresPermissions(logical = Logical.OR,value = {"admin"})
    public ResultBody sendDirectMessage(@RequestParam("word")String word){
        String messageId = String.valueOf(UUID.randomUUID());
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map = new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",word);
        map.put("createTime",createTime);
        //将消息携带绑定键值:DirectRouting 发送到交换机DirectExchange
        rabbitTemplate.convertAndSend("DirectExchange", "DirectRouting", map);
        return ResultBody.ok().message("发送成功");
    }
}

运行项目调用接口:

因为没创建消费者,所以生产者推送的消息还存在队列里

创建消费者

package com.demo.Controller.RabbitMq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;

/**
 * @author zqf
 * @date 2021/12/23 21:21
 * @description
 */

@Component
@RabbitListener(queues = "DirectQueue")//监听的队列名称 TestDirectQueue
public class DirectConsumer {

    @RabbitHandler
    public void process(Map Message) {
        System.out.println("DirectReceiver消费者收到消息  : " + Message.toString());
    }
}

重启项目:


刚刚发送的消息就被消费者消费掉了
如果多个消费者同时监听同一个直连交互队列,则会以轮训的方式依次消费,也不会存在重复消费的情况

测试主题交换机方式

创建配置文件绑定交换机和队列

package com.demo.Config.RabbitMq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @author zqf
 * @date 2021/12/26 13:00
 * @description
 */

@Configuration
public class TopicRabbitConfig {
    //绑定键
    public final static String topicOne = "topic.one";
    public final static String topicTwo = "topic.two";

    @Bean
    public Queue firstQueue(){
        return new Queue(TopicRabbitConfig.topicOne);
    }

    @Bean
    public Queue secondQueue(){
        return new Queue(TopicRabbitConfig.topicTwo);
    }

    @Bean
    TopicExchange exchange(){
        return new TopicExchange("topicExchange");
    }

    //将firstQueue和topicExchange绑定,而且绑定的键值为topic.topicOne
    //这样只要是消息携带的路由键是topic.topicOne,才会分发到该队列
    @Bean
    Binding bindingExchangeMessage(){
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(topicOne);
    }

    //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
    // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
    @Bean
    Binding bindingExchangeMessage2(){
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }
}

创建生产者

package com.demo.Controller.RabbitMq.Topic;

import com.demo.Common.lang.ResultBody;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @author zqf
 * @date 2021/12/26 14:24
 * @description
 */
@RestController
public class TopicRabbitProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/sendTopicMessage")
    public ResultBody sendTopicMessage(@RequestParam String word){
        String messageId = UUID.randomUUID().toString();
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map = new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData", word);
        map.put("createTime",createTime);
    //向topicExchange交换机发送消息,路由键为topic.one
        rabbitTemplate.convertAndSend("topicExchange","topic.one",map);
        return ResultBody.ok().message("发送成功");
    }

    @PostMapping("/sendTopicMessage1")
    public ResultBody sendTopicMessage1(@RequestParam String word){
        String messageId = UUID.randomUUID().toString();
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map = new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData", word);
        map.put("createTime",createTime);
    //向topicExchange交换机发送消息,路由键为topic.random
        rabbitTemplate.convertAndSend("topicExchange","topic.random",map);
        return ResultBody.ok().message("发送成功");
    }
}

创建消费者

package com.demo.Controller.RabbitMq.Topic;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author zqf
 * @date 2021/12/27 16:58
 * @description
 */
@Component
public class TopicRabbitConsumer {
    @RabbitListener(queues = "topic.one")
    public void process(Map message){
        System.out.println("one:topic消费者受到消息:"+message.toString());
    }

    @RabbitListener(queues = "topic.two")
    public void process1(Map message){
        System.out.println("two:topic消费者收到消息"+message.toString());
    }
}

测试

TopicRabbitConsumer中,process监听队列1,绑定键为:topic.one。process1监听队列2,绑定键为:topic.#

执行sendTopicMessage接口:

消费者1和2都成功消费了消息

执行sendTopicMessage1接口

因为队列2路由键为topic.#,而发送的路由键为topic.random,而队列1路由键为topic.one,所以只有消费者2成功消费到了消息

测试扇形交换机方式

创建配置文件绑定交换机和队列

将队列A,B,C绑定到同一个交换机(扇形交换机无需与队列绑定路由键)

package com.demo.Config.RabbitMq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author zqf
 * @date 2021/12/27 20:08
 * @description
 */

@Configuration
public class FanoutRabbitConfig {

    @Bean
    public Queue queueA(){
        return new Queue("fanout.A");
    }

    @Bean
    public Queue queueB(){
        return new Queue("fanout.B");
    }

    @Bean
    public Queue queueC(){
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA(){
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeB(){
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchangeC(){
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
}

创建生产者

package com.demo.Controller.RabbitMq.Fanout;

import com.demo.Common.lang.ResultBody;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @author zqf
 * @date 2021/12/27 22:51
 * @description
 */
@RestController
public class FanoutProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;


    @PostMapping("/SendFanoutMessage")
    public ResultBody process(@RequestParam String word){
        String messageId = UUID.randomUUID().toString();
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map = new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData", word);
        map.put("createTime",createTime);
        rabbitTemplate.convertAndSend("fanoutExchange",null,map);
        return ResultBody.ok().message("发送成功");
    }
}

创建消费者

package com.demo.Controller.RabbitMq.Fanout;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author zqf
 * @date 2021/12/27 22:55
 * @description
 */
@Component
public class FanoutConsumer {
    @RabbitListener(queues = "fanout.A")
    public void process(Map msg){
        System.out.println("fanoutA受到消息:"+msg.toString());
    }

    @RabbitListener(queues = "fanout.B")
    public void process1(Map msg){
        System.out.println("fanoutB受到消息:"+msg.toString());
    }

    @RabbitListener(queues = "fanout.C")
    public void process2(Map msg){
        System.out.println("fanoutC受到消息:"+msg.toString());
    }
}

测试

与交换机绑定的三个队列都收到了消息

生产者的消息确认

在使用RabbitMQ的时候,可以通过消息的持久化操作来解决因为服务器的异常崩溃而导致的消息丢失,但是我们不知道生产者把消息发送出去之后,消息有没有到达服务器,默认情况下,发送的消息是不会返回给生产者的,也就意味着不知道消息有没有成功发送给服务器。

在RabbitMQ中有两种方法解决这个问题:

  1. 事务机制
  2. 发送方确认机制

在这里我们只讨论消息确认机制

消息确认其实就是消息的回调,指生产者推送消息成功或者消费者接受消息成功。

开启消息确认需要在application.yml文件添加:publisher-confirm-type: correlated

# rabbitmq配置信息
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: 123456
    #确认是否发送到交换机
    publisher-confirm-type: correlated
    #确认是否发送到队列
    publisher-returns: true

添加配置

package com.demo.Config.RabbitMq;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author zqf
 * @date 2021/12/28 14:50
 * @description
 */
@Configuration
public class RabbitConfig {
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("ConfirmCallback: 相关数据:"+correlationData);
            System.out.println("ConfirmCallback: 确认情况: "+ack);
            System.out.println("ConfirmCallback: 原因:"+cause);
        });

        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("ReturnCallback: 消息:"+message);
            System.out.println("ReturnCallback: 回应码:"+replyCode);
            System.out.println("ReturnCallback: 回应信息:"+replyText);
            System.out.println("ReturnCallback: 交换机:"+exchange);
            System.out.println("ReturnCallback: 路由键:"+routingKey);
        });
        return rabbitTemplate;
    }
}
两个回调函数的触发方式:
  1. 消息成功发送

    二:DirectReceiver消费者收到消息  : {createTime=2021-12-28 15:56:43, messageId=db6bc11f-9c77-4718-a11f-32fd119ba6a2, messageData=消息}
    ConfirmCallback: 相关数据:null
    ConfirmCallback: 确认情况: true
    ConfirmCallback: 原因:null
  2. 未找到交换机

    向不存在的交换机发送消息

    package com.demo.Controller.RabbitMq.ProducerConfirm;
    
    import com.demo.Common.lang.ResultBody;
    import org.apache.shiro.authz.annotation.Logical;
    import org.apache.shiro.authz.annotation.RequiresPermissions;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    /**
     * @author zqf
     * @date 2021/12/23 16:16
     * @description
     */
    @RestController
    public class ConfirmTestProducer {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostMapping("/SendConfirmMessage")
        @RequiresPermissions(logical = Logical.OR,value = {"admin"})
        public ResultBody sendDirectMessage(@RequestParam("word")String word){
            String messageId = String.valueOf(UUID.randomUUID());
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String,Object> map = new HashMap<>();
            map.put("messageId",messageId);
            map.put("messageData",word);
            map.put("createTime",createTime);
            //将消息携带绑定键值:DirectRouting 发送到交换机DirectExchange
            rabbitTemplate.convertAndSend("ConfirmTestExchange", "DirectRouting", map);
            return ResultBody.ok().message("发送成功");
        }
    }
    
    2021-12-28 15:59:55.371 ERROR 14936 --- [35.180.189:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'NOT-EXCHANGE' in vhost '/', class-id=60, method-id=40)
    ConfirmCallback: 相关数据:null
    ConfirmCallback: 确认情况: false
    ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'NOT-EXCHANGE' in vhost '/', class-id=60, method-id=40)
    
  3. 找到交换机,未找到队列

    创建一个交换机,不绑定队列

    package com.demo.Config.RabbitMq;
    
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author zqf
     * @date 2021/12/28 15:41
     * @description
     */
    @Configuration
    public class ConfirmTestConfig {
    
        @Bean
        DirectExchange ConfirmTestExchange() {
            return new DirectExchange("ConfirmTestExchange");
        }
    
    }
    

    然后发送向这个交换机发送消息

    ConfirmCallback: 相关数据:null
    ConfirmCallback: 确认情况: true
    ConfirmCallback: 原因:null
    ReturnCallback: 消息:(Body:'{createTime=2021-12-28 16:12:18, messageId=f0519b66-c574-48e4-b75c-f37ea2691811, messageData=消息}' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
    ReturnCallback: 回应码:312
    ReturnCallback: 回应信息:NO_ROUTE
    ReturnCallback: 交换机:ConfirmTestExchange
    ReturnCallback: 路由键:DirectRouting
结论
  • 消息从 producer 到 RabbitMQ broker cluster 成功,则会返回一个 confirmCallback
  • 消息从 exchange 到 queue 投递失败,则会返回一个 returnCallback

消费者的消息确认

消费者确认指的就是RabbitMQ需要确认消息到底有没有被收到,来确定要不要将该条消息从队列中删除掉。这就需要消费者来告诉RabbitMQ。

消费者确认有两种方式:

  1. 自动应答(RabbitMQ消费者确认默认就是自动应答):消费者在消费消息的时候,如果设定应答模式为自动,则消费者收到消息后,消息就会立即被 RabbitMQ 从 队列中删除掉。
  2. 手动应答:消费者在收到消息后:
  • 可以在既定的正常情况下进行确认(告诉 RabbitMQ,我已经消费过该消息了,你可以删除该条数据了);
  • 可以在既定的异常情况下不进行确认(RabbitMQ 会继续保留该条数据),这样下一次可以继续消费该条数据。
最后修改:2022 年 01 月 30 日
如果觉得我的文章对你有用,请随意赞赏