RabbitMQ客户端源码阅读(04)_消息发送的回调和事务

/ 默认分类 / 0 条评论 / 1041浏览

1.创建通道

public String sendDirectMessage() throws IOException, InterruptedException, TimeoutException {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH🇲🇲ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.147");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
        } catch (IOException | TimeoutException e) {
            System.out.println("与rabbitmq创建连接信道失败");
        }
        HashMap<String,Object> mapArg = new HashMap<>();
        mapArg.put("testK","testV");
        channel.exchangeDeclare("testExchange","topic",false,false,false,mapArg);
        channel.queueDeclare("testQueue",false,false,false,null);
        channel.queueBind("testQueue","testExchange","testqueue.#");
        channel.confirmSelect();
        //消息成功发送到mq服务端交换机,即发送成功,不管是否成功路由到某一个队列中,如果服务端响应nack,则表示没有发送到交换机,可能是交换机不存在
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println(String.format("confirm回调-消息NACK(消息发送到rabbitmq失败),消息标志:%d,是否为批量确认:%b",deliveryTag,multiple));
            }
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println(String.format("confirm回调-消息ACK(消息成功发送到rabbitmq),消息标志:%d,是否为批量确认:%b",deliveryTag,multiple));
            }
        });
        //消息发送到了mq服务端交换机,但是没有找到相应的队列
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(String.format("return回调(消息发送到mq,但是没有路由到任何一个队列中)-响应code:%d,响应信息:%s,交换机:%s,路由键:%s,消息体:%s",replyCode,replyText,exchange,routingKey,new String(body)));
            }
        });
        for (int i = 0; i < 2; i++) {
            System.out.println(i);
            try{
                channel.basicPublish("testExchange","testqueue1.haha",true,null, SerializationUtils.serialize("hello rabbitmq"));
            }catch(Exception e)
            {
                System.out.println(e.getCause());
            }
            System.out.println("===");
            TimeUnit.MILLISECONDS.sleep(3000);
        }
        channel.close();
        connection.close();
        return "ok";
    }

连接建立后,创建通道(信道)Channel
生成channel编号,然后使用当前connection构造出channel,并且将channel放入connection中的channel集合中

channel = connection.createChannel();

    @Override
    public Channel createChannel() throws IOException {
        ensureIsOpen();
        ChannelManager cm = _channelManager;
        if (cm == null) return null;
        Channel channel = cm.createChannel(this);
        metricsCollector.newChannel(channel);
        return channel;
    }

前面我们讲解连接建立的过程中会首先创建一个_channel0,该通道的编号是0,这个通道专门用于连接的建立和关闭的信息交互通信的
其余的客户端和服务端的信息交互都需要创建新的channel,每一个新创建的channel可以用于当前创建的线程和broker的通信
这样的好处,可以重复利用一条connection,不需要每一个线程与broker通信都单独建立一个socket连接

2. 可编程协议

上面的代码中,我们在客户端自行创建实体模型

        channel.exchangeDeclare("testExchange","topic",false,false,false,mapArg);
        channel.queueDeclare("testQueue",false,false,false,null);
        channel.queueBind("testQueue","testExchange","testqueue.#");

其实也就是使用封装的rpc和broker进行通信,传递amqp命令来完成操作

3. rabbitmq中保证消息投递的回调机制

rabbitmq官方文档地址

上面例子的代码中,将回调监听器注册到信道,来实现消息投递回调

channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println(String.format("confirm回调-消息NACK(消息发送到rabbitmq失败),消息标志:%d,是否为批量确认:%b",deliveryTag,multiple));
            }
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println(String.format("confirm回调-消息ACK(消息成功发送到rabbitmq),消息标志:%d,是否为批量确认:%b",deliveryTag,multiple));
            }
        });
        //消息发送到了mq服务端交换机,但是没有找到相应的队列
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(String.format("return回调(消息发送到mq,但是没有路由到任何一个队列中)-响应code:%d,响应信息:%s,交换机:%s,路由键:%s,消息体:%s",replyCode,replyText,exchange,routingKey,new String(body)));
            }
        });

你可能会记得前面我们说过的创建连接的时候会注册一系列的监听器,那些监听器其实是连接自动恢复监听器和关闭阻塞监听器等等 如下:

    private final List<ShutdownListener> shutdownHooks  = Collections.synchronizedList(new ArrayList<>());
    private final List<RecoveryListener> recoveryListeners = Collections.synchronizedList(new ArrayList<>());
    private final List<BlockedListener> blockedListeners = Collections.synchronizedList(new ArrayList<>());

    // Records topology changes
    private final List<RecordedBinding> recordedBindings = Collections.synchronizedList(new ArrayList<>());
    private final Map<String, RecordedExchange> recordedExchanges = Collections.synchronizedMap(new LinkedHashMap<>());
    private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<>());
    private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<>());
    private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<>());

这个不要弄混了,这里我们说的消息投回调监听是和channel信道关联的.

channel调用注册监听器后,channel中的两个集合会添加监听器

    private final Collection<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();
    /** The ConfirmListener collection. */
    private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();

上一篇文章中有介绍过,broker响应的数据,会首先经过过滤器的处理,也就是processAsync(Command command)
这个方法在Channel中,这里我摘了部分代码:

            Method method = command.getMethod();
            if (method instanceof Basic.Deliver) {
                processDelivery(command, (Basic.Deliver) method);
                return true;
            } else if (method instanceof Basic.Return) {
                callReturnListeners(command, (Basic.Return) method);
                return true;
            } else if (method instanceof Channel.Flow) {
                Channel.Flow channelFlow = (Channel.Flow) method;
                synchronized (_channelMutex) {
                    _blockContent = !channelFlow.getActive();
                    transmit(new Channel.FlowOk(!_blockContent));
                    _channelMutex.notifyAll();
                }
                return true;
            } else if (method instanceof Basic.Ack) {
                Basic.Ack ack = (Basic.Ack) method;
                callConfirmListeners(command, ack);
                handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
                return true;
            } else if (method instanceof Basic.Nack) {
                Basic.Nack nack = (Basic.Nack) method;
                callConfirmListeners(command, nack);
                handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
                return true;

从第一篇文章中介绍的抓包过程可以知道,broker响应的消息回调的amqp消息是这样的:

上面的抓包图可以看出,publish的消息会收到broker的响应,其中包括Ack,Return;
这是因为我们在代码中开启了ack确认机制和return返回机制;
channel.confirmSelect()开启ack
basicPublish(exchange, routingKey, mandatory, props, body); 中的mandatory强制标志设置为true表示开启消息无法路由到任何队列时直接原路返回消息还给客户端
因为我们发送到了一个没有的路由,所以消息无法投递到队列,所以受到了return响应,上面的抓包响应还可以看出来,原因是NO_ROUTE,并且还带了消息内容
并且还可以发现,ack确认机制只有返回deliveryTag,该值会根据当前信道上的发送的消息数不断递增,multiple表示的当前是否为批量消息确认,所以没有消息内容返回
这就涉及到一个问题,如果我们希望使用ack机制来保证消息从客户端到服务端投递的可靠性,那么就不能单纯的只添加一个ack回调机制,因为ack回调只会返回当前消息的deliveryTag,也就是消息的seq序列号,所以需要 程序员自己维护一个消息和seq序列号的对应关系,这样才能确定当前ack,nack或者没有收到回调的消息具体是哪一个! (事实上,这个问题,spring已经帮我们解决了,spring封装的rabbitmq客户端rabbitTemplate可以在 confirm回调中获取到消息,具体细节后面会介绍)

即confirm回调中如果broker响应了ack则表示消息发送到了交换机,如果是nack则表示消息没有发送到交换机,可能是交换机不存在.
但是如果在发送消息的时候,broker挂了,又只有一个服务,那么就需要捕获异常,因为此时客户端会抛出网络IO连接异常;

关于ack回调(confirm回调)有下面几个重要的接口:

public interface ConfirmListener {
    void handleAck(long deliveryTag, boolean multiple)
        throws IOException;

    void handleNack(long deliveryTag, boolean multiple)
        throws IOException;
}
@FunctionalInterface
public interface ConfirmCallback {

    void handle(long deliveryTag, boolean multiple) throws IOException;

}

所以channel中有下面两种方式为信道设置confirm监听器

	@Override
	public void addConfirmListener(ConfirmListener listener) {
		this.delegate.addConfirmListener(listener);
	}

	@Override
	public ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback) {
		return this.delegate.addConfirmListener(ackCallback, nackCallback);
	}

这里的add也就是向confirmListeners集合中添加confirm监听器,confirm监听器可以有多个,在回调的时候会遍历挨个执行

return回调的监听也是一样的道理,只不过在处理回调的时候,broker会返回这个路由失败消息的消息内容,但是return回调的前提是我们publish消息的时候设置了mandatory标志为true

总结一下,confirm的ack回调机制如果开启了,消息发送后都会受到broker的确认响应,但是响应的数据只有seqid没有消息内容;return回调如果开启了,只有在消息无法被路由到任何队列的时候 才会回调,并且会将消息原路返回;

所以上面的发送回调机制就可以保证消息从客户端到broker这个过程的投递的可靠性;

4. rabbitmq中的事务机制

上面我们使用的是回调的方式来检查消息是否成功投递,其实amqp协议中还提供了事务,客户端也可以使用事务来判断当前的消息是否投递成功

事务相关操作的Class是tx,关于它的详细信息可以查看官方文档
因为amqp中的事务处理处理非常耗时,调用后线程会阻塞等待broker的响应,所以不常使用,我在这里简单总结一下:

        channel.txSelect();
        channel.txCommit();
        channel.txRollback();

上面是事务的三种操作,开启事务,提交事务,回滚事务;
每一个操作都是向broker发送一个amqp命令,并且等待broker的响应;
事务提交后,事务回滚后,都是默认会自动再次开启一个新事务;

try{
    channel.txSelect(); //开启事务
    channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
    channel.txCommit(); //提交事务
    //channel.txRollback(); //如果在这里回滚而不是提交,则前面的publish将会被放弃,broker不会收到任何本次消息的投递
}catch (Exception e){
    e.printStackTrace();
    log.error("msg send fail");
}finally {
    channel.close();
    conn.close();
}

事务提交后,如果broker成功收到了本次事务中的消息,那么会响应客户端commitok,如果没有收到,则说明当前信道通信有问题,本次commit就会抛出IO异常, 业务中捕获这个异常也就知道当前消息投递失败。