RabbitMQ客户端源码阅读(05)_Consumer和spring amqp的封装原理

/ 默认分类 / 1 条评论 / 1637浏览

Consumer

一.吐槽

网上很多‘大佬’,对技术实在是太过于表面,比如这个问题 ‘@RabbitListener的使用及造成报错死循环的分析’ ,我以前也遇到过,下面先来看下网络上的‘大佬’们是如何解答的,不得不说,表面技术人石锤了! 1
2
3
4
5
关于上面的问题,下面我会慢慢解答,顺便捋一捋amqp中的消费者

二.前言

amqp中connection官方文档

Rabbitmq客户端建立的Connection是tcp长连接,在客户端维护了一个心跳检测任务,保证Connection的可用性,并且还可以建立自动恢复连接;
关于tcp长连接可以参考我的这篇文章

所以rabbitmq官方也说明:

Since connections are meant to be long-lived, clients usually consume messages by registering a subscription and having messages delivered (pushed) to them instead of polling.

在消费者中,也是通过broker push的方式接收到的 (rabbitmq也提供了consumer自己轮询pull的方法,但这基本不用)

三.Consumer详解

每一个消费者都可以设置一个消费标签,消费标签最好设置为唯一,这样就能在rabbitmq后台界面唯一监控消费者,可以在连接创建后,在当前连接上 声明消费者的时候指定标签(Consumer Tags)

官方文档_consumer说明
consumer_java客户端实现

下面写了一个简单的例子 (basicConsume发布消费者后,因为使用的是维护的tcp长连接,所以不要关闭这条channel, 这和前面我们在生产者中声明拓扑结构和发送消息后关闭channel不同,如果这里关闭了channel,那么当前消费者就和broker失去连接了,无法收到消息)

 public static void main(String[] args) throws IOException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.147");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
        } catch (IOException | TimeoutException e) {
            System.out.println("与rabbitmq创建连接信道失败");
        }
        boolean autoAck = false;
        Channel finalChannel = channel;
        if (channel != null) {
            channel.basicConsume("testQueue", autoAck, "myConsumerTag1", new DefaultConsumer(finalChannel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body)
                        throws IOException
                {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    long deliveryTag = envelope.getDeliveryTag();
                    // (process the message components here ...)
                    finalChannel.basicAck(deliveryTag, false);
                }
            });
        }
    }

和之前介绍的方式一样,这里我也将wireshark抓包截图放在下方,可以很清楚的看到,客户端和broker交互的完整过程:






这里好像有一个bug,就是在设置发送basicNack的时候,修改了重新入队的布尔值,但是wireshark抓包显示的amqp数据包却没有改变,但事实上是有效果的,我不知道这是为什么?有知道的大佬欢迎下方告诉我哦!

关于rabbitmq java 客户端消费者的实现不做过多分析,可以参考之前我的另一篇rabbitmq java客户端生产者源码分析,这里说明下基本逻辑(这里我分析的源码都是基于rabbitmq官方的java客户端,spring封装的后面我们会介绍)

首先,我们在调用basicConsume创建一个消费者的时候,这个消费者会被关联到当前channel中的消费者集合中:

    private final Map<String, Consumer> _consumers =
        Collections.synchronizedMap(new HashMap<String, Consumer>());

如上所示,键是当前消费者的标签,所以前面我们说最好每一个消费者指定的标签不要重复

    String actualConsumerTag = ((Basic.ConsumeOk) replyCommand.getMethod()).getConsumerTag();
    _consumers.put(actualConsumerTag, callback);

还是前面文章中我们介绍过的MainLoop线程,这个线程是客户端和broker通信的接收器,对于消息的消费,从amqp规范中可以知道,使用的是Deliver,后面我会放上抓包截图 这里还是之前说的前置处理器,这里会处理Deliver数据响应

if (isOpen()) {
            // We're in normal running mode.

            if (method instanceof Basic.Deliver) {
                processDelivery(command, (Basic.Deliver) method);
                return true;

所以对于消费,接收到数据帧后主要的处理逻辑就是processDelivery()

    @Override
    protected void processDelivery(Command command, AMQImpl.Basic.Deliver method) {
        long tag = method.getDeliveryTag();
        if(tag > maxSeenDeliveryTag) {
            maxSeenDeliveryTag = tag;
        }
        super.processDelivery(command, offsetDeliveryTag(method));
    }
protected void processDelivery(Command command, Basic.Deliver method) {
        Basic.Deliver m = method;

        Consumer callback = _consumers.get(m.getConsumerTag());
        this.dispatcher.handleDelivery(callback,
                                       m.getConsumerTag(),
                                       envelope,
                                       (BasicProperties) command.getContentHeader(),
                                       command.getContentBody());
}


public void handleDelivery(final Consumer delegate,
                               final String consumerTag,
                               final Envelope envelope,
                               final AMQP.BasicProperties properties,
                               final byte[] body) throws IOException {
        executeUnlessShuttingDown(
        new Runnable() {
            @Override
            public void run() {
                try {
                //很明显,最终调用的就是我们实现的消费者中的handleDelivery方法  
                    delegate.handleDelivery(consumerTag,
                            envelope,
                            properties,
                            body);

这里我截取的是部分逻辑代码,如上,当接收到broker推送的消息时,按照消息中的consumerTag在客户端查出consumer处理逻辑(前面的map集合中保存了),找到了 consumer回调(其实就是我们自己写的消费者实现),然后就会执行当前消费者的消费逻辑对这条消息进行处理。

这里只是主要介绍下,rabbitmq消费者在使用中的一些特性和客户端消费的大体源码,之后再另一篇文章中,会详细介绍rabbitmq中怎样保证消息可靠性,那才是重点,哈哈哈,所以说前面 几篇rabbitmq博客只是打基础哦

下面就是上面我们调用的发布消费者的方法签名:

	public String basicConsume(String queue, boolean autoAck,String consumerTag, Consumer callback) throws IOException 

queue就是需要订阅的队列
consumeTag就是创建的当前这个消费者指定的消费者标签
autoAck表示是否开启自动确认
callback是当前客户端消费者的具体实现,下面我们来逐个介绍下:

  1. queue

如果指定的订阅的队列再broker中是不存在的,那么当前连接的channel会抛出一个channel级别的异常404 Not Found

Exception in thread "main" java.io.IOException
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:486)
	at cc.zuohui.controller.MyConsume.main(MyConsume.java:31)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'testQueue11' in vhost '/', class-id=60, method-id=20)
  1. consumerTag

不多介绍了,设置了唯一标签在管理监控界面消费者列表就可以很快找到

  1. autoAck

和生产者类似,消费者也可以设置消息接收模式,在消费者者中包含下面两种消息接收模式:

Automatic (deliveries require no acknowledgement, a.k.a. "fire and forget") Manual (deliveries require client acknowledgement)

  1. callback

callback参数是接口Consumer的实现,这就是真正的客户端消费实现,先来看下Consumer接口:

public interface Consumer {
    /**
    * 创建当前消费者的请求收到broker的响应后调用   即上面抓包图中的consumer-ok  
     * Called when the consumer is registered by a call to any of the
     * {@link Channel#basicConsume} methods.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     */
    void handleConsumeOk(String consumerTag);

    /**
    * 注册监听器的channel调用basicCancel关闭指定的conusmeTag的消费者成功后会回调(上面的抓包截图中也有展示)
    * 这里需要注意,每一个消费者都关联了一个channel,使用basicCancel关闭消费者时一定要保证时当时创建消费者时候使用的channel  
    * 否则会报错,找不到当前的消费者标签  
     * Called when the consumer is cancelled by a call to {@link Channel#basicCancel}.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     */
    void handleCancelOk(String consumerTag);

    /**
    * 消费者被动关闭会触发这个回调,比如broker删除了当前订阅的队列(如上面的抓包接收到的Basic.cancel)  
     * Called when the consumer is cancelled for reasons <i>other than</i> by a call to
     * {@link Channel#basicCancel}. For example, the queue has been deleted.
     * See {@link #handleCancelOk} for notification of consumer
     * cancellation due to {@link Channel#basicCancel}.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @throws IOException
     */
    void handleCancel(String consumerTag) throws IOException;

    /**
     * Called when either the channel or the underlying connection has been shut down.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @param sig a {@link ShutdownSignalException} indicating the reason for the shut down
     */
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

    /**
    * 当客户端channel调用channel.basicRecover();broker会响应recover-ok,接收到响应后就会执行这个回调,并且broker会
    * 将当前的所有未ack的消息重新推送给消费者  
     * Called when a <code><b>basic.recover-ok</b></code> is received
     * in reply to a <code><b>basic.recover</b></code>. All messages
     * received before this is invoked that haven't been <i>ack</i>'ed will be
     * re-delivered. All messages received afterwards won't be.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     */
    void handleRecoverOk(String consumerTag);

    /**
    * 当broker推送消息过来就会触发这个回调,这里就是消费消息的处理逻辑
     * Called when a <code><b>basic.deliver</b></code> is received for this consumer.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @param envelope packaging data for the message
     * @param properties content header data for the message
     * @param body the message body (opaque, client-specific byte array)
     * @throws IOException if the consumer encounters an I/O error while processing the message
     * @see Envelope
     */
    void handleDelivery(String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body)
        throws IOException;
}

下面来实现一个消费者:

ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.147");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
        } catch (IOException | TimeoutException e) {
            System.out.println("与rabbitmq创建连接信道失败");
        }
        conusmerChannel = channel;
        //关闭自动ack
        boolean autoAck = false;
        Channel finalChannel = channel;
        if (channel != null) {
            channel.basicConsume("testQueue", autoAck, "myConsumerTag1", new Consumer1(channel));
        }
        return "ok";
public class Consumer1 extends DefaultConsumer {

    /**
     * DefaultConsumer中没有默认无参构造方法,子类需要实现
     * @param channel
     */
    public Consumer1(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String routingKey = envelope.getRoutingKey();
        String contentType = properties.getContentType();
        long deliveryTag = envelope.getDeliveryTag();
        System.out.println(getConsumerTag()+"消费了消息:"+new String(body));
        //消息消费后手动发送ack确认   
        getChannel().basicAck(deliveryTag, true);
    }

    @Override
    public Channel getChannel() {
        return super.getChannel();
    }

    @Override
    public String getConsumerTag() {
        return super.getConsumerTag();
    }
}

简单起见,这里我只实现了消费逻辑,另外需要明确一点:

  1. 如果消费设置了autoack为true(消费自动确认),那么消费者接收到消息后就会和broker自动ack,broker就会把这条消息删除,表示已经被消费了,并且ack确认了 (无论消费者消费逻辑是异常还是如何,只要消息发送到消费者后就自动ack了)
  2. 如果autoack为false,那么表示消息投递确认模式为手动模式(Manual),那么如果消费中没有手动basicAck(deliveryTag, false);发送消费ack确认,或者在发送手动ack代码执行前消费抛出异常并且未捕获也导致手动ack发送代码没有执行到,或者发送ack由于网络异常导致 消费ack失败,那么服务端就会多一条nack消息,少一条ready消息(broker只会将ready状态的消息推送给订阅的消费者),所以当前这条nack的消息是不会被重复推送的,但是如果当前消费者断开后,nack消息会变为ready,下一次有消费者订阅这个队列就会收到这条nack->ready状态的消息
  3. 如果autoack为false,即为手动ack,那么如果在处理逻辑中消费失败,并且手动ack执行的是basicNack(long deliveryTag, boolean multiple, boolean requeue) ,也就是手动nack,按照amqp协议规范,可以指定是否需要重新入队,如果设置为true,则broker收到 nack回应后该条消息就会立刻被重新推送到消费者(这其中都是按照deliveryTag来标记这一条消息的)

ps:后面我会说明下,spring封装的rabbitmq客户单在推送数据时,nack一般会重新入队,所以导致一个循环报错的问题,但是网上的博客基本都是表面分析,根本没有从rabbitmq的本质看这个问题

上面介绍了rabbitmq客户端的消费确认,下面来看下spring封装的rabbitmq框架,其中的消费是怎样的,首先来看下面一段代码:

@RabbitListener(id = "consume1", autoStartup = "true",queues = {"testQueue"})
@Component
@Slf4j
public class TestRbListener {
//
//
    @RabbitHandler
    public void onMessage(byte[] msg) {
        try{

//            throw new RuntimeException("消费失败,"+new String(msg)+"抛出异常");
//            System.out.println("消费了消息:"+new String(msg));
        }catch(Exception e)
        {
            e.printStackTrace();
        }
    }

    @RabbitHandler
    public void onMessage(String message) {
        try{
            throw new RuntimeException("消费失败,"+message+"抛出异常");
//            System.out.println("消费了消息:"+message);
        }catch(Exception e)
        {
            e.printStackTrace();
        }
    }
}

对于如何使用就不做介绍,无论是springboot的官网或者网上的那些博客,有很多关于

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

spring封装的rabbitmq客户端的使用方法,这里主要简单分析下大体的原理结构.
首先在RabbitListener注解中,可以指定很多属性,其中消费者监听的队列等等,其中一个和我们本节介绍比较相关的就是ackMode,也就是消息确认机制

  1. 如果没有指定ackMode,那么默认是autoAck=true,也就是默认开启自动确认
    @RabbitListener注解主要起作用的处理器就是org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor
    扫描如下:
private TypeMetadata buildMetadata(Class<?> targetClass) {
		Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
		final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
		final List<ListenerMethod> methods = new ArrayList<>();
		final List<Method> multiMethods = new ArrayList<>();
		ReflectionUtils.doWithMethods(targetClass, method -> {
			Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
			if (listenerAnnotations.size() > 0) {
				methods.add(new ListenerMethod(method,
						listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
			}
			if (hasClassLevelListeners) {
				RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
				if (rabbitHandler != null) {
					multiMethods.add(method);
				}
			}
		}, ReflectionUtils.USER_DECLARED_METHODS);
		if (methods.isEmpty() && multiMethods.isEmpty()) {
			return TypeMetadata.EMPTY;
		}
		return new TypeMetadata(
				methods.toArray(new ListenerMethod[methods.size()]),
				multiMethods.toArray(new Method[multiMethods.size()]),
				classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
	}

前面我们定义了两个RabbitHandler,但是参数一个是String类型来接收消息,另一个是byte数组
这里寻找使用哪一个处理方法的逻辑如下:

    if (this.messagingMessageConverter.method == null && amqpMessage != null) {
        amqpMessage.getMessageProperties()
                .setTargetMethod(this.handlerAdapter.getMethodFor(message.getPayload()));
    }

    public Method getMethodFor(Object payload) {
        return getHandlerForPayload(payload.getClass()).getMethod();
    }

	protected InvocableHandlerMethod getHandlerForPayload(Class<? extends Object> payloadClass) {
		InvocableHandlerMethod handler = this.cachedHandlers.get(payloadClass);
		if (handler == null) {
			handler = findHandlerForPayload(payloadClass);
			if (handler == null) {
				throw new AmqpException("No method found for " + payloadClass);
			}
			this.cachedHandlers.putIfAbsent(payloadClass, handler); //NOSONAR
			setupReplyTo(handler);
		}
		return handler;
	}

payloadClass参数就是当前接收到的消息的消息体对象的Class对象
这里我接收到的是byte[]数组类型的消息,但是我再消费逻辑中没有设置这样的方法,那么就会报错如下:

Caused by: org.springframework.amqp.AmqpException: No method found for class [B  
//另外有一个小知识点补充一下,这里为什么显示for Class [B呢?在java中调用数组的getClass()方法获取class名称得到的就是'[数组类型的第一个字母大写',这里是byte数组.所以就是[B

如果我们设置的ackMode为自动确认模式,那么因为这里再寻找处理消息的方法的时候就报错了,框架中处理这个异常并打印信息

	@Override
	public void handleError(Throwable t) {
		log(t);

并且出现异常后会执行下面的回滚,再回滚中会发送nack

public void rollbackOnExceptionIfNecessary(Throwable ex) {

		boolean ackRequired = !this.acknowledgeMode.isAutoAck()
				&& (!this.acknowledgeMode.isManual() || ContainerUtils.isRejectManual(ex));
		try {
			if (this.transactional) {
				if (logger.isDebugEnabled()) {
					logger.debug("Initiating transaction rollback on application exception: " + ex);
				}
				RabbitUtils.rollbackIfNecessary(this.channel);
			}
			if (ackRequired) {
				OptionalLong deliveryTag = this.deliveryTags.stream().mapToLong(l -> l).max();
				if (deliveryTag.isPresent()) {
				//这段代码很关键
					this.channel.basicNack(deliveryTag.getAsLong(), true,
							ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger));
				}
				if (this.transactional) {
					// Need to commit the reject (=nack)
					RabbitUtils.commitIfNecessary(this.channel);
				}
			}
		}

如上,我单独拿出这段比较关键的代码:

    this.channel.basicNack(deliveryTag.getAsLong(), true,
            ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger));

可以看出,这就是前面博客和上文中介绍的amqp中的nack命令,这里的参数是否重新入队是ContainerUtils.shouldRequeue这个方法获取的 前方高能,进入一段比较关键的判断逻辑

/**
	 * Determine whether a message should be requeued; returns true if the throwable is a
	 * {@link MessageRejectedWhileStoppingException} or defaultRequeueRejected is true and
	 * there is not an {@link AmqpRejectAndDontRequeueException} in the cause chain or if
	 * there is an {@link ImmediateRequeueAmqpException} in the cause chain.
	 * @param defaultRequeueRejected the default requeue rejected.
	 * @param throwable the throwable.
	 * @param logger the logger to use for debug.
	 * @return true to requeue.
	 */
	public static boolean shouldRequeue(boolean defaultRequeueRejected, Throwable throwable, Log logger) {
		boolean shouldRequeue = defaultRequeueRejected ||
				throwable instanceof MessageRejectedWhileStoppingException;
		Throwable t = throwable;
		while (t != null) {
			if (t instanceof AmqpRejectAndDontRequeueException) {
				shouldRequeue = false;
				break;
			}
			else if (t instanceof ImmediateRequeueAmqpException) {
				shouldRequeue = true;
				break;
			}
			t = t.getCause();
		}
		if (logger.isDebugEnabled()) {
			logger.debug("Rejecting messages (requeue=" + shouldRequeue + ")");
		}
		return shouldRequeue;
	}

可以看出,springamqp框架中,nack后是如果需要重新入队那么条件需要满足下面的一种:

  1. 消费发生的异常为MessageRejectedWhileStoppingException
  2. defaultRequeueRejected参数配置为true 并且抛出的异常不是AmqpRejectAndDontRequeueException(可以没有异常)
    defaultRequeueRejected这个参数在框架是是可以配置的:
    listener:
      simple:
        default-requeue-rejected: 

所以为什么使用springamqp的rabbitmq框架开发消费监听时会发生消费报错死循环?

正确的回答如下: 如果消费监听使用的是自动确认模式,并且消费逻辑方法的入参没有能处理当前接收到的消息的类型,那么消费就会抛出找不到执行方法的异常,之后异常处理逻辑执行回滚, 因为设置了自动确认模式,所以会发送nack,nack中因为默认的requeue为true,所以当前的nack的消息会再次重新进入消费,所以这就进入了一个死循环;

所以要怎样解决呢?
首先这不是框架的bug,这样是正常的,是合理的,因为你使用了自动确认,并且默认重新入队,所以你需要做的就是修改为手动确认,自己消费逻辑,或者配置重新入队为false,如下所示

 @RabbitHandler
    public void onMessage(byte[] message, Channel channel,Message ms) {
        try{
            System.out.println("消费了消息:"+ new String(message));
            channel.basicNack(ms.getMessageProperties().getDeliveryTag(),false,true);
//            throw new RuntimeException("消费失败,"+new String(msg)+"抛出异常");
        }catch(Exception e)
        {
            e.printStackTrace();
        }
    }

四.使用spring amqp框架来自定义消息转换器

另外,网上应该有的文章也有说到,消息格式转换器,其实这里的消息接收参数最终都是经过消息格式转换得到的(默认使用的是SimpleMessageConverter,转为字节数组,如上所示),如果想要使用自己自定义的消息体格式对象,直接将消息转为自定义对象,那么可以设置自定义的消息格式转换器
那么要怎样自定义呢? 其实源码中已经告诉我们很多方法了:

	/**
	 * Set up the default strategies. Subclasses can override if necessary.
	 */
	protected void initDefaultStrategies() {
		setMessageConverter(new SimpleMessageConverter());
	}

这是RabbitTemplate中的方法,所以你可以使用子类然后重写rabbitTemplate的initDefaultStrategies()来替换默认的消息转换器

从现有的默认的消息转换器可以看出,如果要实现一个消息转换器可以直接实现接口MessageConverter ,或者继承AbstractMessageConverter

	@Override
	public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception { // NOSONAR
	    //将amqp消息使用转换器转为Message
		Message<?> message = toMessagingMessage(amqpMessage);
		//按照转换后的消息体类型寻找对应的参数类型的消费逻辑方法
		invokeHandlerAndProcessResult(amqpMessage, channel, message);
	}
protected void invokeHandlerAndProcessResult(@Nullable org.springframework.amqp.core.Message amqpMessage,
			Channel channel, Message<?> message) throws Exception { // NOSONAR

		if (logger.isDebugEnabled()) {
			logger.debug("Processing [" + message + "]");
		}
		InvocationResult result = null;
		try {
			if (this.messagingMessageConverter.method == null && amqpMessage != null) {
				amqpMessage.getMessageProperties()
						.setTargetMethod(this.handlerAdapter.getMethodFor(message.getPayload()));
			}
			result = invokeHandler(amqpMessage, channel, message);
			if (result.getReturnValue() != null) {
				handleResult(result, amqpMessage, channel, message);
			}
			else {
				logger.trace("No result object given - no result to handle");
			}
		}

查看上述的toMessagingMessage()逻辑就可以返现,可以再项目中注入一个自定义的消息转换器,这样就可以自动将amqp的消息内容转为payload是指定的java对象的 Message,这样只要再消费逻辑中加入对应的参数的方法即可直接赋值消费了

上面说了会按照从amqp消息中获得消息内容payload(这个过程就会执行消息转换器),如果发送的消息指定的消息头content_typetext/plain
那么payload会是String类型,所以消费逻辑中参数为String的方法就会被匹配上然后执行,如果消息指定的消息头content_typeapplication/json
那么默认就会是字节数组byte[],所以就会匹配相应的消费方法

如果希望传递的json数据直接被解析为java对象,可以自定义消息转换器,如下所示:

消费逻辑如下:

@RabbitListener(id = "consume1", autoStartup = "true",queues = {"testQueue"})
@Component
@Slf4j
public class ProcessCreateRbListener {
    @RabbitHandler
    public void onMessage(byte[] message, Channel channel,Message ms) {
        try{
            System.out.println("消费了消息:"+ new String(message));
//            channel.basicNack(ms.getMessageProperties().getDeliveryTag(),false,true);
//            throw new RuntimeException("消费失败,"+new String(msg)+"抛出异常");
        }catch(Exception e)
        {
            e.printStackTrace();
        }
    }

    @RabbitHandler
    public void onMessage(String message) {
        try{
            System.out.println("消费"+message);
        }catch(Exception e)
        {
            e.printStackTrace();
        }
    }

    @RabbitHandler
    public void onMessage(Student message) {
        try{
        System.out.println("消费:"+message);
        }catch(Exception e)
        {
            e.printStackTrace();
        }
    }
}

消息转换器,下面是简单的实现例子
springamqp文档地址

@Component
public class TestMessageConverter implements MessageConverter {

    //消息发送时:消息内容封装到Message
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    //这里就是rabbitTemplate发送消息时设置消息参数的逻辑,也就是消息转换器中的 convert to Message,可以参考amqp文档的convertmessage部分
        return new Message(object.toString().getBytes(),messageProperties);
    }
    
    //消费接收消息时, Message->消息内容
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        if(!message.getMessageProperties().getContentType().equals("text/plain")) {
            String jsonStr = new String(message.getBody());
            return JSONObject.parseObject(jsonStr, Student.class);
        }else{
            return new SimpleMessageConverter().fromMessage(message);
        }
        ........
    }
}

这里可以添加很多自己的逻辑,可以让消息生产者在消息参数中带上指定的java模型名称,然后在转换器中判断来组装出不同的java对象

ps:使用springamqp发送消息的时候,也是可以使用自定义的消息转换器,统一为消息设置消息参数,下面时发送的框架源码

	@Override
	public void convertAndSend(String exchange, String routingKey, final Object object,
			@Nullable CorrelationData correlationData) throws AmqpException {

		send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
	}
	
    protected Message convertMessageIfNecessary(final Object object) {
        if (object instanceof Message) {
            return (Message) object;
        }
        //这里的toMessage就可以是上面我们自己实现的 ,springamqp默认发送的消息参数参考这个类MessageProperties 
        return getRequiredMessageConverter().toMessage(object, new MessageProperties());
    }

五.总结

待补充....

  1. 11111