RabbitMQ基础

organicpanda 发布于10天前 阅读31次
0 条评论

一. RabbitMQ简介

  • 1 . RabbitMQ是一个有Erlang开发的AMQP(Advanced Message Queue)的开源实现

  • 2 . RabbitMQ的官网:http://www.rabbitmq.com

  • 3 . RabbitMQ是一款消息组件,其中一定包含生产者,消费者,消息组件。RabbitMQ中有三个重要组成部分

    • a . Exchange:交换空间

    • b . Queue:数据队列

    • c . RoutingKey:队列路由(如果所有的队列的RoutingKey都一样,则属于广播小,如果不一样,则属于点对点消息)

  • 4 . RabbitMQ中的几个核心概念

    • a . Broker:消息队列的服务主机

    • b . Exchange:消息交换机,用于分发消息到队列

    • c . Queue:消息队列的载体,每个消息都会被投入到一个或多个队列

    • e . Binding:将Exchange与Queue按照RoutingKey规则进行绑定

    • f . RoutingKey:路由Key,Exchange根据RoutingKey进行消息分发

    • g . Vhost:虚拟主机,一个Broker可以有多个Vhost,用于实现用户(权限)的分离

    • h . Producer:消息生产者

    • i . Consumer:消息消费者

    • j . Channel:消息通道,每个Channel代表一个会话任务

二. 环境搭建

  • 1 . 安装Erlang开发环境

    • a . 在这里安装Erlang时遇到的坑较多,个人不推荐下载erlang源码进行解压缩编译安装,因为依赖的库较多(gcc,libncurses5-dev,.eg):

    • 建立erlang目录mkdir -p /usr/local/erlang

    • 进入源码目录 cd /user/local/src/otp_src_19.3

    • 编译配置 ./configure --prefix=/usr/local/erlang

    • 编译安装 make && make install

    • 配置环境变量

    vim /etc/profile
    export ERLANG_HOME=/usr/local/erlang
    export PATH=$PATH:$ERLANG_HOME/bin:
    source /etc/profile

    • b . 本人使用apt-get安装erlang语言环境

    • apt-get install erlang 或者apt-get install erlang-nox

    • c . 测试erlang

    • 输入erl 表示进入erlang环境

    • 输入halt().退出

  • 2 . 安装RabbitMQ

    • a . 根据官网介绍进行安装

      • 相关命令

        echo 'deb http://www.rabbitmq.com/debian/ testing main' |
             sudo tee /etc/apt/sources.list.d/rabbitmq.list
        wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc |
             sudo apt-key add -
        sudo apt-get update
        sudo apt-get install rabbitmq-server
    • b . 后台启动RabbitMQrabbitmq-server start > /dev/null 2>&1 &

    • c . 开启管理页面插件rabbitmq-plugins enable rabbitmq_management

    • d . 添加新用户rabbitmqctl add_user evans 123123(创建一个用户名为evans,密码为123123的用户)

    • e . 将新用户设为管理员rabbitmqctl set_user_tags evans administrator

    • f . 打开浏览器输入访问地址http://192.168.1.1:15672访问RabbitMQ管理页面

    • g . 查看RabbitMQ状态rabbitmqctl status,关闭RabbitMQrabbitmqctl stop

    • h . 设置用户虚拟主机,否则程序无法连接Queue

二. Java基本操作

    • 1 . 在管理界面中增加一个新的Queue

      • a . Name:队列名称

      • b . Durability:持久化选项:Durable(持久化保存),Transient(即时保存),持久化保存在RabbitMQ宕机或者重启后,未消费的消息仍然存在,即时保存在RabbitMQ宕机或者重启后不存在

      • c . Auto delete:自动删除

    • 2 . 引入RabbitMQ的Repository

      <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>4.1.0</version>
      </dependency>
    • 3 . 消息生产者MessageProducer.java

      package com.evans.rabbitmq;
      
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
      
      /**
       * Created by Evans 
       */
      public class MessageProducer {
          //队列名称
          private static final String QUEUE_NAME = "first";
          //主机IP
          private static final String HOST="127.0.0.1";
          //端口
          private static final Integer PORT=5672;
          //用户名
          private static final String USERNAME="evans";
          //密码
          private static final String PASSWORD="evans";
      
          public static void main(String[] args) throws Exception {
              //创建工厂类
              ConnectionFactory factory = new ConnectionFactory();
              //设置参数
              factory.setHost(HOST);
              factory.setPort(PORT);
              factory.setUsername(USERNAME);
              factory.setPassword(PASSWORD);
              //创建连接
              Connection connection =factory.newConnection();
              //创建Channel
              Channel channel=connection.createChannel();
              //声明Queue
              /*
               * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
               * 队列名称,是否持久保存,是否为专用的队列,是否允许自动删除,配置参数
               * 此处的配置与RabbitMQ管理界面的配置一致
               */
              channel.queueDeclare(QUEUE_NAME,true,false,true,null);
              Long start = System.currentTimeMillis();
              for (int i=0;i<100;i++){
                  //发布消息
                  /*
                   * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
                   * exchange名称,RoutingKey,消息参数(消息头等)(持久化时需要设置),消息体
                   * MessageProperties有4中针对不同场景可以进行选择
                   */
                  channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,("Message:"+i).getBytes());
              }
              Long end = System.currentTimeMillis();
              System.out.println("System cost :"+(end-start));
              channel.close();
              connection.close();
          }
      }
    • 4 . 运行MessageProduce的Main方法,在管理界面会出现详细的监控数据,此时消息已经成功发送至RabbitMQ的队列中

    • 5 . 消息消费者MessageConsumer.java

      package com.evans.rabbitmq;
      
      import com.rabbitmq.client.*;
      
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
      
      /**
       * Created by Evans on 2017/7/15.
       */
      public class MessageConsumer {
      
          //队列名称
          private static final String QUEUE_NAME = "first";
          //主机IP
          private static final String HOST="10.0.0.37";
          //端口
          private static final Integer PORT=5672;
          //用户名
          private static final String USERNAME="evans";
          //密码
          private static final String PASSWORD="evans";
      
          public static void main(String[] args) throws IOException, TimeoutException {
              //创建工厂类
              ConnectionFactory factory = new ConnectionFactory();
              //设置参数
              factory.setHost(HOST);
              factory.setPort(PORT);
              factory.setUsername(USERNAME);
              factory.setPassword(PASSWORD);
              //创建连接
              Connection connection =factory.newConnection();
              //创建Channel
              Channel channel=connection.createChannel();
              //声明Queue
              /*
               * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
               * 队列名称,是否持久保存,是否为专用的队列,是否允许自动删除,配置参数
               * 此处的配置与RabbitMQ管理界面的配置一致
               */
              channel.queueDeclare(QUEUE_NAME,true,false,true,null);
              //这里需要复写handleDelivery方法进行消息自定义处理
              Consumer consumer = new DefaultConsumer(channel){
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      String message = new String(body);
                      System.out.println("Consume Get Message : "+message);
                  }
              };
              channel.basicConsume(QUEUE_NAME,consumer);
          }
      }
  • 6 . 运行MessageConsumer的Main方法,会进行消息消费处理,此时控制台会输出消费的消息,此时完成了消息的生产与消费的基本操作,当存在多个消费者的处理同一个队列时,RabbitMQ会自动进行均衡负载处理,多个消费者共同来处理消息

    Consume Get Message : Message:0
    Consume Get Message : Message:1
    Consume Get Message : Message:2
    ...
    Consume Get Message : Message:99
  • 7 . RabbitMQ虚拟主机

    • a . 可以在管理界面的admin-vhost下设置多个虚拟主机

    • b . 在程序中通过设置factory参数进行虚拟主机的指定factory.setVirtualHost("firstHost")

  • 8 . Exchange工作模式:topic、direct、fanout

    • a . 广播模式(fanout):一条消息被所有的消费者进行处理

      ① .将消费者与生产者中的`channel.queueDeclare()`方法替换为`channel.exchangeDeclare(EXCHANGE_NAME, "fanout")`方法进行Exchange的指定,channel.basicPublish()方法需要指定exchange
      ② .此时再次运行生产者和多个消费者,则一个消息会被多个消费者进行消费处理
    • b . 直连模式(direct):一跳消息根据RoutingKey进行生产者与消费者的匹配,从而达到指定生产者的消息被指定消费者进行处理

      ① .将生产者中的`channel.queueDeclare()`方法替换为`channel.exchangeDeclare(EXCHANGE_NAME, "direct")`方法进行Exchange的指定,channel.basicPublish()方法需要指定exchange和RoutingKey("mykey")
      ② .将消费者中的`channel.queueDeclare()`方法替换为
      // 定义EXCHANGE的声明String
      channel.exchangeDeclare(EXCHANGE_NAME, "direct") ;
      // 通过通道获取一个队列名称                         
      String queueName= channel.queueDeclare().getQueue() ;
      // 进行绑定处理
      channel.queueBind(queueName, EXCHANGE_NAME, "mykey") ;
      ③ .此时RoutingKey作为唯一标记,这样就可以将消息推送到指定的消费者进行处理
    • c . 主题模式(topic):一条消息被所有的消费者进行处理

      ① .将生产者中的`channel.queueDeclare()`方法替换为`channel.exchangeDeclare(EXCHANGE_NAME, "topic") `方法进行Exchange的指定,channel.basicPublish()方法需要指定exchange和RoutingKey("mykey-01")
      ② .将消费者中的`channel.queueDeclare()`方法替换为
      // 定义EXCHANGE的声明String
      channel.exchangeDeclare(EXCHANGE_NAME, "topic") ;
      // 通过通道获取一个队列名称                         
      String queueName= channel.queueDeclare().getQueue() ;
      // 进行绑定处理
      channel.queueBind(queueName, EXCHANGE_NAME, "mykey-01");
      ③ .此时主题模式即为广播模式与直连模式的混合使用。

三. RabbitMQ整合Spring

  • 1 . 引入srping-rabbit的Repository

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.7.3.RELEASE</version>
    </dependency>
  • 2 . 建立rabbitmq.properties,对RabbitMQ的属性参数进行设置

    # RabbitMQ的主机IP
    mq.rabbit.host=192.168.68.211
    # RabbitMQ的端口
    mq.rabbit.port=5672
    # RabbitMQ的VHost
    mq.rabbit.vhost=hello
    # RabbitMQ的exchange名称
    mq.rabbit.exchange=spring.rabbit
    # 用户名
    mq.rabbit.username=evans
    # 密码
    mq.rabbit.password=evans
  • 3 . 生产者XML(需增加xmlns:rabbit命名空间)

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-4.3.xsdhttp://www.springframework.org/schema/context 
            http://www.springframework.org/schema/context/spring-context-4.3.xsdhttp://www.springframework.org/schema/rabbit 
            http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">
      <context:component-scan base-package="com.evans.rabbitmq"/>
      <!--定义rabbitmq配置的相关属性文件信息-->
      <context:property-placeholderlocation="classpath:rabbitmq.properties"/>
      <!--如果要想进行RabbiMQ的操作管理,则首先一定要准备出一个连接工厂类-->
      <rabbit:connection-factoryid="connectionFactory" host="${mq.rabbit.host}" port="${mq.rabbit.port}" username="${mq.rabbit.username}" password="${mq.rabbit.password}" virtual-host="${mq.rabbit.vhost}"/>
      <!--所有的连接工厂要求被RabbitMQ所管理-->
      <rabbit:adminconnection-factory="connectionFactory"/>
      <!--创建一个队列信息-->
      <rabbit:queueid="myQueue" durable="true" auto-delete="true" exclusive="false" name="queue.first"/>
      <!--下面实现一个直连的操作模式-->
      <rabbit:direct-exchangeid="mq-direct" name="${mq.rabbit.exchange}" durable="true"a uto-delete="true">
        <rabbit:bindings>
          <!--现在要求绑定到指定的队列之中-->
          <rabbit:bindingqueue="myQueue" key="key01"/>
        </rabbit:bindings>
      </rabbit:direct-exchange>
      <!--所有整合的消息系统都会有一个模版-->
      <rabbit:templateid="amqpTemplate" exchange="${mq.rabbit.exchange}" connection-factory="connectionFactory"/>
    </beans>
  • 4 . 消费者XML(需增加xmlns:rabbit命名空间)

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-4.3.xsdhttp://www.springframework.org/schema/context
             http://www.springframework.org/schema/context/spring-context-4.3.xsdhttp://www.springframework.org/schema/rabbit
              http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">
      <!--定义rabbitmq配置的相关属性文件信息-->
      <context:property-placeholderlocation="classpath:rabbitmq.properties"/>
      <!--如果要想进行RabbiMQ的操作管理,则首先一定要准备出一个连接工厂类-->
      <rabbit:connection-factoryid="connectionFactory" host="${mq.rabbit.host}" port="${mq.rabbit.port}" username="${mq.rabbit.username}" password="${mq.rabbit.password}" virtual-host="${mq.rabbit.vhost}"/>
      <!--所有的连接工厂要求被RabbitMQ所管理-->
      <rabbit:adminconnection-factory="connectionFactory"/>
      <!--创建一个队列信息-->
      <rabbit:queueid="myQueue" durable="true" auto-delete="true" exclusive="false" name="queue.first"/>
      <!--下面实现一个直连的操作模式-->
      <rabbit:direct-exchangeid="mq-direct" name="${mq.rabbit.exchange}" durable="true" auto-delete="true">
        <rabbit:bindings>
          <!--现在要求绑定到指定的队列之中-->
          <rabbit:bindingqueue="myQueue" key="key01"/>
        </rabbit:bindings>
      </rabbit:direct-exchange>
      <!--定义具体的消费处理类-->
      <beanid="messageConsumer" class="cn.evans.rabbitmq.MessageConsumer"/>
      <!--启动消费监听程序-->
      <rabbit:listener-containerconnection-factory="connectionFactory">
        <rabbit:listenerref="messageConsumer"queues="myQueue"/>
      </rabbit:listener-container>
    </beans>
  • 5 . 生产者

    • a . 定义消息Service

      package com.evans.rabbitmq;
      
      /**
       * Created by Evans 
       */
      public interface MessageService {
          /**
           * 发送消息
           * @param message
           */
          public void sendMessage(String message);
      }
    • b . 定义MessageService的实现类

      package com.evans.rabbitmq;
      
      import org.springframework.amqp.core.AmqpTemplate;
      
      import javax.annotation.Resource;
      
      /**
       * Created by Evans
       */
      public class MessageServiceImpl implements MessageService {
          
          @Resource
          private AmqpTemplate template;
          
          @Override
          public void sendMessage(String message) {
              template.convertAndSend("key01",message);
          }
      }
  • 5 . 消费者

    • a .消费者需要实现MessageListener接口

    • b .消息处理类

      package com.evans.rabbitmq;
      
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.core.MessageListener;
      
      /**
       * Created by Evans 
       */
      public class MessageConsumer implements MessageListener {
          
          @Override
          public void onMessage(Message message) {
              System.out.println("Consumer Message: "+ message);    
          }
      }

四. RabbitMQ整合SpringBoot

  • 1 . 引入SpringBoot的RabbitMQ脚手架

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  • 2 . 配置Application.yml

    spring:
      rabbitmq:
        host: 10.0.0.37
        port: 5672
        username: evans
        password: evans
  • 3 . 配置类

    package com.evans.rabbitmq;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.context.annotation.Configuration;
    
    /**
     * Created by Evans 
     */
    @Configuration
    public class RabbitConfigure {
        @Bean
        public Queue firstQueue(){
            return new Queue("firstQueue");
        }
    }
  • 4 . 生产者

    package com.evans.rabbitmq;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.time.LocalDateTime;
    
    /**
     * Created by Evans
     */
    @Component
    public class MessageProducer {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
        
        public void send(){
            LocalDateTime current =LocalDateTime.now();
            System.out.println("Send Message : "+current);
            rabbitTemplate.convertAndSend("firstQueue","Send Message"+ current);
        }
    }
  • 5 . 消费者

    package com.evans.rabbitmq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * Created by Evans 
     */
    @Component
    @RabbitListener(queues = "firstQueue")
    public class MessageConsumer {
    
        @RabbitHandler
        public void consumer(String message){
            System.out.println("Consumer Message : "+message);
        }
    }
  • 6 . FanoutExchange配置

    @Configuration
    public class FanoutConfiguration {
    
        @Bean
        public Queue fanoutFirstQueue() {
            return new Queue("fanout.first");
        }
    
        @Bean
        public Queue fanoutSecondQueue() {
            return new Queue("fanout.second");
        }
    
        @Bean
        public Queue fanoutThirdQueue() {
            return new Queue("fanout.third");
        }
    
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
        @Bean
        public Binding bindingExchangeFanoutFirst(Queue fanoutFirstQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanoutFirstQueue).to(fanoutExchange);
        }
    
        @Bean
        public Binding bindingExchangeFanoutSecond(Queue fanoutSecondQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanoutSecondQueue).to(fanoutExchange);
        }
    
        @Bean
        public Binding bindingExchangeFanoutThird(Queue fanoutThirdQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanoutThirdQueue).to(fanoutExchange);
        }
    
    }
  • 7 . TopicExchange配置

    @Configuration
    public class TopicConfiguration {
    
        @Bean
        public Queue topicFirstQueue() {
            return new Queue("topic.first");
        }
    
        @Bean
        public Queue topicAnyQueue() {
            return new Queue("topic.any");
        }
    
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange("topicExchange");
        }
    
        @Bean
        public Binding bindingExchangeTopicFirst(Queue topicFirstQueue, TopicExchange topicExchange) {
            return BindingBuilder.bind(topicFirstQueue).to(topicExchange).with("topic.first");
        }
    
        @Bean
        public Binding bindingExchangeTopicAny(Queue topicAnyQueue, TopicExchange topicExchange) {
            return BindingBuilder.bind(topicAnyQueue).to(topicExchange).with("topic.#");
        }
    
    }

查看原文: RabbitMQ基础

  • ticklishfish
需要 登录 后回复方可回复, 如果你还没有账号你可以 注册 一个帐号。