RabbitMQ快速入门

Java基础

浏览数:210

2019-5-11

AD:资源代下载服务

一、前言

RabbitMQ其实是我最早接触的一个MQ框架,我记得当时是在大学的时候跑到图书馆一个人去看,由于RabbitMQ官网的英文还不算太难,因此也是参考官网学习的,一共有6章,当时是用Node来开发的,当时花了一下午看完了,也理解了。而现在回过头来再看,发现已经忘记了个差不多了,现在再回过头来继续看看,然乎记之。以防再忘,读者看时最好有一定的MQ基础。

二、RabbitMQ

首先我们需要知道的是RabbitMQ它是基于高级队列协议(AMQP)的,它是Elang编写的,下面将围绕RabbitMQ队列、交换机、RPC三个重点进行展开。

2.1、队列

存储消息的地方,多个生产者可以将消息发送到一个队列,多个消费者也可以消费同一个队列的消息。

注意:当多个消费者监听一个队列,此时生产者发送消息到队列只有一个消费者被消费,并且消费端的消费方式是按照消费端在内部启动的顺序轮询(round-robin)。

2.2、消费者

消费消息的一方

public class Send {

    private final static String QUEUE_NAME = "hello";
    private final static String IP = "172.16.12.162";
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername("admin");
        factory.setPassword("admin");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
public class Recv {

    private final static String QUEUE_NAME = "hello";
    private final static String IP = "172.16.12.162";

    public static void main(String[] args) {
        try {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP);
            factory.setUsername("admin");
            factory.setPassword("admin");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

2.3、小结

1、Rabbit是如何保证消息被消费的?
答:通过ack机制。每当一个消息被消费端消费的时候,消费端可以发送一个ack给RabbitMQ,这样RabbitMQ就知道了该条消息已经被完整消费并且可以被delete了。;如果一条消息被消费但是没有发送ack,那么此时RabbitMQ将会认为需要重新消费该消息,如果此时还有其它的消费者,那么此时RabbitMQ将会把这条消息交给它处理。

注意:开启ack机制的是autoAck=
false;

2、消息如何进行持久化?

  • 将queue持久化,即设置 channel.queueDeclare(QUEUE_NAME, true, false, false, null);第二个参数durable为true
  • 设置消息持久化,即设置MessageProperties.PERSISTENT_TEXT_PLAIN

注意:消息持久化并不一定保证消息不会被丢失

3、RabbitMQ如何避免两个消费者一个非常忙一个非常闲的情况?
通过如下设置,保证一个消费者一次只能消费一个消息,只有当它消费完成并且返回ack给RabbitMQ之后才给它派发新的消息。

int prefetchCount = 1 ;
channel.basicQos(prefetchCount)

4、RabbitMQ异常情况下如何保证消息不会被重复消费?
需要业务自身实现密等性,RabbitMQ没有提供比较好的方式去保证。

2.2、交换机

在RabbitMQ中,生产者其实从来不会发送消息到队列,甚至,它不知道消息被发送到了哪个队列。那它被发送到了哪里呢?就是本节的重点:交换机,下面就是它在RabbitMQ中的介绍图。(X就是交换机)生产者发送消息给交换机,然后由交换机将消息转发给队列。

从上图就产生一个问题:X怎么将消息发给queue呢?它是把消息发给所有queue还是发给一个指定的queue或者丢弃消息呢?这就是看交换机的类型了。下面一起谈谈这几种类型

2.2.1、fanout

fanout:广播模式,这个比较好理解,就是所有的队列都能收到交换机的消息。

如上面,两个队列都能收到交换机的消息。

2.2.2、direct

这个模式相当于发布/订阅模式的一种,当交换机类型为direct的时候,此时我们需要设置两个参数:

  1. channel.basicPublish(EXCHANGE_NAME, “”, null, message.getBytes(“UTF-8”));第二个参数,我们可以把它称呼为routeKey
  2. channel.queueBind(queueName, EXCHANGE_NAME, “”);第三个参数,我们把它称呼为bindKey

有了这两个参数,我们就可以指定我们订阅哪些消息了。


如图,Q1订阅了orange的消息,Q2订阅了black、green的消息。

2.2.3、topic

其实topic和direct有一点类似,它相当于对direct作了增强。在direct中,我们上面所说的bind routeKey为black、green的它是有限制的,它只能绝对的等于routeKey,但是有时候我们的需求不是这样,我们可能想要的是正则匹配即可,那么Topic就派上用场了。

当类型为topic时,它的bindKey对应字符串需要是以“.”分割,同时RabbitMQ还提供了两个符号:

  • 星号(*):表示1个单词
  • 井号(#):表示0、多个单词

上图的意思是:所有第二个单词为orange的消息发送个Q1,所有最后一个单词为rabbit或者第一个单词为lazy的消息发送给Q2。

2.2.4、header

这一种类型官方demo没有过多解释,这里也不研究了。

2.3、RPC

RabbitMQ 还可以实现RPC(远程过程调用)。什么是RPC,简单来说就是local调用remote方法。对应于RabbitMQ中则是Client发送一个request message,Server处理完成之后将其返回给Client。这里就有了一个疑问?Server是如何将response返回给Client的,这里RabbitMQ定义了一个概念:Callback Queue。
Callback Queue
注意这个队列是独一无二的String replyQueueName = channel.queueDeclare().getQueue();
首先我们需要明白一点的是为什么需要这个queue?我们知道在RabbitMQ作消息队列的时候,Client只需要将消息投放到queue中,然后Server从queue去取就可以了。但是在RabbitMQ作为RPC的时候多了一点就是,Client还需要返回结果,这时Server端怎么知道把消息发送给Client,这就是Callback Queue的用处了。
Correlation Id
在上面我们知道Server返回数据给Client是通过Callback Queue的,那么是为每一个request都创建一个queue吗?这未免太过浪费资源,RabbitMQ有更好的方案。在我们发送request,绑定一个唯一ID(correlationId),然后在消息被处理返回的时候取出这个ID和发出去的ID进行匹配。这样来说一个Callback Queue是Client级别而不是request级别的了。

实现
上面介绍了RabbitMQ实现RPC最重要的两个概念,具体代码比较简单还是贴下把。
client 端

public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] argv) throws Exception{
        RPCClient fibonacciRpc = new RPCClient();
        try {
            for (int i = 0; i < 32; i++) {
                String i_str = Integer.toString(i);
                System.out.println(" [x] Requesting fib(" + i_str + ")");
                String response = fibonacciRpc.call(i_str);
                System.out.println(" [.] Got '" + response + "'");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });

        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    public void close() throws IOException {
        connection.close();
    }
}

服务端

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.queuePurge(RPC_QUEUE_NAME);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();

                String response = "";

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // RabbitMq consumer worker thread notifies the RPC server owner thread
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

三、总结

这次回头再看RabbitMQ,再次重新理解了以下RabbitMQ,有些东西还是要慢慢嚼的。当然这些也都是官网的入门例子,后续有机会的话再深入研究。

作者:leekwe