原创

RabbitMQ六种消息模式

温馨提示:
本文最后更新于 2019年03月21日,已超过 2,074 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

pom依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.0.2</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.10</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.5</version>
</dependency>

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
</dependency>

RabbitMQ的连接工具(我单独写出来了一个工具类,方便使用):

package com.lzhpo.rabbitmq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:获取Rabbitmq的连接工具</p>
 * <p> Description:</p>
 */
public class ConnectionUtils {

    /**
     * 获取Rabbitmq的连接
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        //定义一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //设置服务地址
        factory.setHost("127.0.0.1");

        //AMQP 5672
        factory.setPort(5672);

        //vhost
        factory.setVirtualHost("/");

        //用户名
        factory.setUsername("guest");

        //密码
        factory.setPassword("guest");

        return factory.newConnection();
    }
}

简单队列

简单队列:一个生产者P发送消息到队列Q,一个消费者C接收。

生产者(Send)

package com.lzhpo.rabbitmq.model5.simplequeues;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:生产者生产消息</p>
 * <p> Description:</p>
 */
public class Send {

    private static final  String QUEUE_NAME = "test_simple_queue";

    public static void main(String[] args) throws Exception{
        //获取一个连接
        Connection connection = ConnectionUtils.getConnection();

        //从连接中获取一个通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //需要发送的消息
        String msg = "hello simple!";

        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

        System.out.println("---send msg:" +msg);

        //关闭
        channel.close();
        connection.close();
    }
}

消费者(Recv)

package com.lzhpo.rabbitmq.model5.simplequeues;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:消费者获取消息</p>
 * <p> Description:</p>
 */
public class Recv {

    private static final  String QUEUE_NAME = "test_simple_queue";

    /**
     * main()入口
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception{
        //oldApi();//老版本api
        newApi();//新版本api
    }

    /**
     * 新版本api
     */
    private static void newApi() throws Exception{
        //获取一个连接
        Connection connection = ConnectionUtils.getConnection();

        //从连接中获取一个通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body, "utf-8");
                System.out.println("new api recv:" + msg);
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

    /**
     * 老版本api
     * @throws Exception
     */
    private static void oldApi() throws Exception{
        //获取一个连接
        Connection connection = ConnectionUtils.getConnection();

        //从连接中获取一个通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        //监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();

            String msgString = new String(delivery.getBody());
            System.out.println("[recv] msg: " +msgString);
        }
    }

}

工作队列

轮询分发

【轮询分发】:结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务,任务总是你一个我一个的分。

生产者(Send)
package com.lzhpo.rabbitmq.model5.workqueues.lunxun;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:【轮询分发】</p>
 * <p> Description:
 * 备注:消费者 1 我们处理时间是 1s ;而消费者 2 中处理时间是 2s;
 * 但是我们看到的现象并不是 1 处理的多 消费者 2 处理的少。
 * -----------------------------------
 * 消费者1【Recv1】:
 *  [1] Received '.0'
 *  [x] Done
 *  [1] Received '.2'
 *  [x] Done
 *  ......
 *  [1] Received '.46'
 *  [x] Done
 *  [1] Received '.48'
 *  [x] Done
 *  消费者 1 将偶数部分处理掉了
 * -----------------------------------
 * 消费者2【Recv2】:
 *  [2] Received '.1'
 *  [x] Done
 *  [2] Received '.3'
 *  [x] Done
 *  ......
 *  [2] Received '.47'
 *  [x] Done
 *  [2] Received '.49'
 *  [x] Done
 * 消费者 2 中将奇数部分处理掉了。
 * -----------------------------------
 * 我想要的是 1 处理的多,而 2 处理的少
 * 测试结果:
 * 1.消费者 1 和消费者 2 获取到的消息内容是不同的,同一个消息只能被一个消费者获取
 * 2.消费者 1 和消费者 2 货到的消息数量是一样的 一个奇数一个偶数
 * 按道理消费者 1 获取的比消费者 2 要多
 * -----------------------------------
 * 这种方式叫做【轮询分发】:结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务,任务总是你一个我一个的分。
 * -----------------------------------
 * </p>
 */
public class Send {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] args) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 50; i++) {
            //消息内容
            String message = "." +i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" +message +"'");
            Thread.sleep(i*10);
        }

        channel.close();
        connection.close();
    }
}
消费者1(Recv1)
package com.lzhpo.rabbitmq.model5.workqueues.lunxun;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:</p>
 * <p> Description:</p>
 */
public class Recv1 {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] args) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //定义一个消息的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String message = new String(body, "UTF-8");
                System.out.println(" [1] Received '" + message + "'");
                try {
                    doWork(message);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [x] Done");
                }
            }
        };
        boolean autoAck = true; //消息的确认模式自动应答
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
    private static void doWork(String task) throws InterruptedException {
        Thread.sleep(1000);
    }
    @SuppressWarnings("unused")
    public static void oldAPi() throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成状态false 自动true 自动应答 不需要手动确认
        channel.basicConsume(QUEUE_NAME, true, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}
消费者2(Recv2)
package com.lzhpo.rabbitmq.model5.workqueues.lunxun;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:</p>
 * <p> Description:</p>
 */
public class Recv2 {
    private final static String QUEUE_NAME = "test_queue_work";
    public static void main(String[] args) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //定义一个消息的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String message = new String(body, "UTF-8");
                System.out.println(" [2] Received '" + message + "'");
                try {
                    Thread.sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [x] Done");
                }
            }
        };
        boolean autoAck = true; //
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

公平分发

使用公平分发,必须关闭自动应答,改为手动应答。

生产者(Send)
package com.lzhpo.rabbitmq.model5.workqueues.gongping;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:【公平分发】</p>
 * <p> Description:
 * 虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有 2 个消费者,所有的偶数的消息都是繁忙的,而
 * 奇数则是轻松的。按照轮询的方式,偶数的任务交给了第一个消费者,所以一直在忙个不停。奇数的任务交给另一
 * 个消费者,则立即完成任务,然后闲得不行。
 *
 * 而 RabbitMQ 则是不了解这些的。他是不知道你消费者的消费能力的,这是因为当消息进入队列,RabbitMQ 就会分派
 * 消息。而 rabbitmq 只是盲目的将消息轮询的发给消费者。你一个我一个的这样发送.
 *
 * 为了解决这个问题,我们使用 basicQos( prefetchCount = 1)方法,来限制 RabbitMQ 只发不超过 1 条的消息给同
 * 一个消费者。当消息处理完毕后,有了反馈 ack,才会进行第二次发送。(也就是说需要手动反馈给 Rabbitmq )
 *
 * 还有一点需要注意,使用【公平分发】,必须关闭自动应答,改为手动应答。
 *
 *
 * 这时候现象就是消费者 1 速度大于消费者 2
 * </p>
 */
public class Send {
    private final static String QUEUE_NAME = "test_queue_work";
    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 指定一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        int prefetchCount = 1;

        //每个消费者发送确认信号之前,消息队列不发送下一个消息过来,一次只处理一个消息
        //限制发给同一个消费者不得超过1条消息
        channel.basicQos(prefetchCount);
        // 发送的消息
        for (int i = 0; i < 50; i++) {
            String message = "." + i;
            // 往队列中发出一条消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            Thread.sleep(i * 10);
        }
        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}
消费者1(Recv1)
package com.lzhpo.rabbitmq.model5.workqueues.gongping;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:</p>
 * <p> Description:</p>
 */
public class Recv1 {
    private final static String QUEUE_NAME = "test_queue_work";
    public static void main(String[] args) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);//保证一次只分发一个
        //定义一个消息的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String message = new String(body, "UTF-8");
                System.out.println(" [1] Received '" + message + "'");
                try {
                    doWork(message);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        /**手动应答**/
        boolean autoAck = false; //手动确认消息
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
    private static void doWork(String task) throws InterruptedException {
        Thread.sleep(1000);
    }
}
消费者2(Recv2)
package com.lzhpo.rabbitmq.model5.workqueues.gongping;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:</p>
 * <p> Description:
 * Message acknowledgment(消息应答):
 *
 * boolean autoAck = false;
 * channel.basicConsume(QUEUE_NAME, autoAck, consumer);
 *
 * boolean autoAck = true;(自动确认模式)一旦 RabbitMQ 将消息分发给了消费者,就会从内存中删除。
 * 在这种情况下,如果杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消
 * 费者但尚未处理的消息。
 *
 * boolean autoAck = false; (手动确认模式) 我们不想丢失任何任务,如果有一个消费者挂掉了,那么
 * 我们应该将分发给它的任务交付给另一个消费者去处理。 为了确保消息不会丢失,RabbitMQ 支持消
 * 息应答。消费者发送一个消息应答,告诉 RabbitMQ 这个消息已经接收并且处理完毕了。RabbitMQ 可
 * 以删除它了。
 *
 * 消息应答是默认打开的。也就是 boolean autoAck =false;
 *
 * </p>
 */
public class Recv2 {
    private final static String QUEUE_NAME = "test_queue_work";
    public static void main(String[] args) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);//保证一次只分发一个
        //定义一个消息的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String message = new String(body, "UTF-8");
                System.out.println(" [2] Received '" + message + "'");
                try {
                    doWork(message);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);/**关闭自动确认应答,手动应答**/
                }
            }
        };
        /**关闭自动应答**/
        boolean autoAck = false; //关闭自动确认
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
    private static void doWork(String task) throws InterruptedException {
        Thread.sleep(2000);
    }
    public static void oldAPi() throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成状态
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            // 休眠1秒
            Thread.sleep(1000);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);/**关闭自动确认应答,手动应答**/
        }
    }
}

消息订阅模式

【订阅模式】:一个消息被多个消费者消费。

  1. 一个生产者,多个消费者。
  2. 每一个消费者都有自己的队列。
  3. 生产者没有直接把消息发送到队列,而是发送到了交换机、转发器exchange。
  4. 每个队列都要绑定到交换机上。
  5. 生产者发送的消息经过交换机到达队列,就能实现一个消息被多个消费者消费。

生产者(Send)

package com.lzhpo.rabbitmq.model5.subscribeModel;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:</p>
 * <p> Description:
 * 先运行Send创建交换器
 *
 * 但是这个发送的消息到哪了呢? 消息丢失了!!!因为交换机没有存储消息的能力,在 rabbitmq 中只有队列存储消息的
 * 能力.因为这时还没有队列,所以就会丢失;
 * 小结:消息发送到了一个没有绑定队列的交换机时,消息就会丢失!
 *
 * 【订阅模式】:一个消息被多个消费者消费。
 * 1.一个生产者,多个消费者。
 * 2.每一个消费者都有自己的队列。
 * 3.生产者没有直接把消息发送到队列,而是发送到了交换机、转发器exchange
 * 4.每个队列都要绑定到交换机上
 * 5.生产者发送的消息经过交换机到达队列,就能实现一个消息被多个消费者消费。
 *
 * 邮件->注册->短信
 *
 * </p>
 */
public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange 交换机 转发器
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //订阅模式

        // 消息内容
        String msg = "Hello PB";
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        System.out.println("Send: " +msg);

        channel.close();
        connection.close();
    }
}

消费者1(Recv1)

package com.lzhpo.rabbitmq.model5.subscribeModel;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:</p>
 * <p> Description:</p>
 */
public class Recv1 {

    private final static String QUEUE_NAME = "test_queue_fanout_email";
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /** 绑定队列到交换机 **/
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        //------------下面逻辑和work模式一样-----
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body, "utf-8");
                System.out.println("[1] Recv msg:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[1] done ");
                    // 手动回执
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

消费者2(Recv2)

package com.lzhpo.rabbitmq.model5.subscribeModel;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:</p>
 * <p> Description:</p>
 */
public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_fanout_sms";
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /** 绑定队列到交换机 **/
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        // 同一时刻服务器只会发一条消息给消费者
        // 定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body, "utf-8");
                System.out.println("[2] Recv msg:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2] done ");
                    // 手动回执
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

路由模式

  1. 发送消息到交换机并且要指定路由key 。
  2. 消费者将队列绑定到交换机时需要指定路由key。

生产者(Send)

package com.lzhpo.rabbitmq.model5.routingModel;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:</p>
 * <p> Description:
 * 先运行Send创建交换器
 * </p>
 */
public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 消息内容
        String msg = "hello direct!";
        //routingKey
        //String routingKey = "error";//error两个都可以收到
        //String routingKey = "info";//info只有Recv2能收到
        String routingKey = "warning";//warning只有Recv2能收到
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());

        System.out.println("-------------send: " +msg);

        channel.close();
        connection.close();
    }
}

消费者1(Recv1)

package com.lzhpo.rabbitmq.model5.routingModel;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:</p>
 * <p> Description:</p>
 */
public class Recv1 {

    private final static String QUEUE_NAME = "test_queue_direct_1";
    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /** 绑定队列到交换机 **/
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body, "utf-8");
                System.out.println("[1] Recv msg:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[1] done ");
                    // 手动回执
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

消费者2(Recv2)

package com.lzhpo.rabbitmq.model5.routingModel;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:</p>
 * <p> Description:</p>
 */
public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_direct_2";
    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /** 绑定队列到交换机 **/
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        //定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body, "utf-8");
                System.out.println("[2] Recv msg:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2] done ");
                    // 手动回执
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

主题模式

Topic主题模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。

生产者(Send)

package com.lzhpo.rabbitmq.model5.topicModel;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:</p>
 * <p> Description:
 * Topic主题模式:将路由键和某种模式匹配。
 * </p>
 */
public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 消息内容
        String message = "id=1001";
        channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

消费者1(Recv1)

package com.lzhpo.rabbitmq.model5.topicModel;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:</p>
 * <p> Description:</p>
 */
public class Recv1 {

    private final static String QUEUE_NAME = "test_queue_topic_1";
    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body, "utf-8");
                System.out.println("[2] Recv msg:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2] done ");
                    // 手动回执
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

消费者2(Recv2)

package com.lzhpo.rabbitmq.model5.topicModel;

import com.lzhpo.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * <p> Author:lzhpo </p>
 * <p> Title:</p>
 * <p> Description:</p>
 */
public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_topic_2";
    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /** 绑定队列到交换机 **/
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");//全匹配:item.#
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body, "utf-8");
                System.out.println("[2] Recv msg:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2] done ");
                    // 手动回执
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

RPC远程调用模式

前面学习了如何使用work队列在多个worker之间分配任务,但是如果需要在远程机器上运行个函数并等待结果,就需要使用RPC(远程过程调用)模式来实现。

参考官网教程【模拟RPC服务来返回斐波那契数列】:https://www.rabbitmq.com/tutorials/tutorial-six-java.html

本文目录