RabbitMQ客户端源码阅读(02)_连接建立

/ 默认分类 / 0 条评论 / 1387浏览

第一部分

继上一篇文章中的例子,我们来详细分析下链接建立的过程,下面代码是我个人阅读源码的笔记,有不正确不准确的地方大家请下方留言评论. rabbitmq第一步建立连接,最终执行如下的代码:

/**
    * Create a new broker connection with a client-provided name, picking the first available address from
    * the list provided by the {@link AddressResolver}.
    *
    * If <a href="https://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
    * is enabled, the connection returned by this method will be {@link Recoverable}. Future
    * reconnection attempts will pick a random accessible address provided by the {@link AddressResolver}.
    *
    * @param executor thread execution service for consumers on the connection
    * @param addressResolver discovery service to list potential addresses (hostname/port pairs) to connect to
    * @param clientProvidedName application-specific connection name, will be displayed
    *                           in the management UI if RabbitMQ server supports it.
    *                           This value doesn't have to be unique and cannot be used
    *                           as a connection identifier e.g. in HTTP API requests.
    *                           This value is supposed to be human-readable.
    * @return an interface to the connection
    * @throws java.io.IOException if it encounters a problem
    * @see <a href="https://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
    */
   public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
       throws IOException, TimeoutException {
       if(this.metricsCollector == null) {
           this.metricsCollector = new NoOpMetricsCollector();
       }
       // make sure we respect the provided thread factory
       FrameHandlerFactory fhFactory = createFrameHandlerFactory();
       ConnectionParams params = params(executor);
       // set client-provided via a client property
       if (clientProvidedName != null) {
           Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());
           properties.put("connection_name", clientProvidedName);
           params.setClientProperties(properties);
       }
       
       //如果是开启了自动连接恢复,连接自动恢复,是rabbitmq java客户端在4.0之后默认开启的,如果客户端和服务端之间的连接中断或者连接失败(非程序主动申请断开,是因为网络原因发生异常断开的情况),会自动恢复连接
       //并且也会自动恢复之前的实体模型,例如队列,交换机等;  
       if (isAutomaticRecoveryEnabled()) {
           // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
           // No Sonar: no need to close this resource because we're the one that creates it
           // and hands it over to the user
           AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector); //NOSONAR

           conn.init();
           return conn;
       } else {
           List<Address> addrs = addressResolver.getAddresses();
           Exception lastException = null;
           for (Address addr : addrs) {
               try {
                   FrameHandler handler = fhFactory.create(addr, clientProvidedName);
                   AMQConnection conn = createConnection(params, handler, metricsCollector);
                   conn.start();
                   this.metricsCollector.newConnection(conn);
                   return conn;
               } catch (IOException e) {
                   lastException = e;
               } catch (TimeoutException te) {
                   lastException = te;
               }
           }
           if (lastException != null) {
               if (lastException instanceof IOException) {
                   throw (IOException) lastException;
               } else if (lastException instanceof TimeoutException) {
                   throw (TimeoutException) lastException;
               }
           }
           throw new IOException("failed to connect");
       }
   }
  1. 方法中的参数AddressResolver是地址解析器
public interface AddressResolver {

    /**
     * Get the potential {@link Address}es to connect to.
     * @return candidate {@link Address}es
     * @throws IOException if it encounters a problem
     */
    List<Address> getAddresses() throws IOException;

}

其中只有一个方法,是用来获取地址列表的,其实newConnection这个方法最先的调用是这样的:

    public Connection newConnection(List<Address> addrs) throws IOException, TimeoutException {
        return newConnection(this.sharedExecutor, addrs, null);
    }

可以看到,这里传入了一个List

,作用是,会在给出的列表地址中,逐个尝试连接,直到获取到第一个成功的连接;

    public Connection newConnection(ExecutorService executor, List<Address> addrs, String clientProvidedName)
            throws IOException, TimeoutException {
        return newConnection(executor, createAddressResolver(addrs), clientProvidedName);
    }

创建地址解析器

    protected AddressResolver createAddressResolver(List<Address> addresses) {
        if(addresses.size() == 1 && !isSSL()) {
            return new DnsRecordIpAddressResolver(addresses.get(0), isSSL());
        } else {
            return new ListAddressResolver(addresses);
        }
    }

可以看到这是一个protect修饰的方法,很明显,让你继承的,可以自己实现,或者简单一点使用最上面的那个构造方法,直接传入一个地址解析器,所以你可以自己实现一个地址解析器,用来获取你配置的rabbitmq的集群地址列表等等;

  1. 上面判断是否开启了使用自动连接恢复,于是这里就会有两种类型的连接

一种是:RecoveryAwareAMQConnection 另一个是:AMQConnection
Connection类中包含下列一些重要的属性,后面就会明白他们的作用

public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {

    private Thread mainLoopThread;
    private final AMQChannel _channel0;
    /** Frame source/sink */
    private final FrameHandler _frameHandler;

    /** Flag controlling the main driver loop's termination */
    private volatile boolean _running = false;

    /** Manages heart-beat sending for this connection */
    private HeartbeatSender _heartbeatSender;

    private final String _virtualHost;
    private final int requestedHeartbeat;
    private final int requestedChannelMax;
    private final int requestedFrameMax;

先来看默认的开启建立自动恢复连接,那么首先是这个init方法:

    public void init() throws IOException, TimeoutException {
        //创建一个自动恢复连接
        this.delegate = this.cf.newConnection();
        this.addAutomaticRecoveryListener(delegate);
    }
    
    这里的delegate即为可自动恢复连接  
    private volatile RecoveryAwareAMQConnection delegate;

addAutomaticRecoveryListener是为当前连接注册断开自动连接的监听器,关于监听器的调用时机,我参考了源码和官方github上的issue 但是还是没能真正清楚的弄明白,我知道的是,mq客户端会注册shutdown监听器,当连接关闭后会调用连接关闭监听器,连接关闭监听器执行后会执行连接 重新连接监听器

private void addAutomaticRecoveryListener(final RecoveryAwareAMQConnection newConn) {
        final AutorecoveringConnection c = this;
        // this listener will run after shutdown listeners,
        // see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
        RecoveryCanBeginListener starter = cause -> {
            try {
                if (shouldTriggerConnectionRecovery(cause)) {
                    //这是主要的执行连接恢复的逻辑 
                    c.beginAutomaticRecovery();
                }
            } catch (Exception e) {
                newConn.getExceptionHandler().handleConnectionRecoveryException(c, e);
            }
        };
        synchronized (this) {
            newConn.addRecoveryCanBeginListener(starter);
        }
    }
private synchronized void beginAutomaticRecovery() throws InterruptedException {
        this.wait(this.params.getRecoveryDelayHandler().getDelay(0));

        this.notifyRecoveryListenersStarted();
        //创建新的连接 
        final RecoveryAwareAMQConnection newConn = this.recoverConnection();
        if (newConn == null) {
            return;
        }
        LOGGER.debug("Connection {} has recovered", newConn);
        this.addAutomaticRecoveryListener(newConn);
	    this.recoverShutdownListeners(newConn);
	    this.recoverBlockedListeners(newConn);
	    this.recoverChannels(newConn);
	    // don't assign new delegate connection until channel recovery is complete
	    this.delegate = newConn;
	    if (this.params.isTopologyRecoveryEnabled()) {
	        recoverTopology(params.getTopologyRecoveryExecutor());
	    }
		this.notifyRecoveryListenersComplete();
    }

下面这个方法考虑的真的特别全面:

// Returns new connection if the connection was recovered, 
	// null if application initiated shutdown while attempting recovery.  
    private RecoveryAwareAMQConnection recoverConnection() throws InterruptedException {
        int attempts = 0;
        //manuallyClosed初始值为false,当执行程序执行连接的close()方法主动关闭连接后,manuallyClosed变为true,所以这里保证,如果是主动关闭则不重新连接,注意这里是while循环,会一直不断的尝试恢复建立连接  
        while (!manuallyClosed) {
            try {
                attempts++;
                // No Sonar: no need to close this resource because we're the one that creates it
                // and hands it over to the user
                //创建一个新的连接,这个方法可能会抛出获取连接超时异常或者io异常,那么会被这里捕获,然后会记录获取连接失败的原因日志
				RecoveryAwareAMQConnection newConn = this.cf.newConnection(); //NOSONAR
				//使用同步锁保证重新建立的连接唯一(只成功重新建立一次)
				synchronized(recoveryLock) {
				    //如果当前还属于需要重新建立连接的状态,则返回新建立好的连接
					if (!manuallyClosed) {
						// This is the standard case.				
						return newConn;
					}
				}
				// This is the once in a blue moon case.  
				// Application code just called close as the connection
				// was being re-established.  So we attempt to close the newly created connection.
				//这里真的不得不佩服,考虑的非常全面,如果代码到这里,说明新的连接已经建立,但是此时用户程序刚好主动执行了close(),所以manuallyClosed变为true,所以这里关闭放弃这个新建的连接,然后返回空
				newConn.abort();
				return null;
            } catch (Exception e) {
                //每次重新建立连接失败后会睡眠等待一段延时之后才会再次尝试重新建立连接
                Thread.sleep(this.params.getRecoveryDelayHandler().getDelay(attempts));
                this.getExceptionHandler().handleConnectionRecoveryException(this, e);
            }
        }
        //如果一开始用户程序就刚好主动执行了close(),manuallyClosed变为true,直接返回空
		return null;
    }

建立失败后等待的时间也是根据当前尝试的次数来定的,具体方案如下:

        public ExponentialBackoffDelayHandler() {
            sequence = Arrays.asList(2000L, 3000L, 5000L, 8000L, 13000L, 21000L, 34000L);
        }

        public long getDelay(int recoveryAttempts) {
            int index = recoveryAttempts >= sequence.size() ? sequence.size() - 1 : recoveryAttempts;
            return sequence.get(index);
        }

很明显,当第一次尝试重新连接失败后,会等待2s然后再次尝试,如果再次失败会等待3s然后再次尝试,以此类推,一直到7次失败后,以后每次都是失败都是需要等待34s才再次尝试重新连接

  1. 建立自动恢复连接的时候,执行的是下面的代码:
    public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
        Exception lastException = null;
        List<Address> shuffled = shuffle(addressResolver.getAddresses());

        for (Address addr : shuffled) {
            try {
                FrameHandler frameHandler = factory.create(addr, connectionName());
                //这里可以看到最终建立的可自动恢复连接是RecoveryAwareAMQConnection,它是普通连接AMQConnection的父类
                RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
                conn.start();
                metricsCollector.newConnection(conn);
                return conn;
            } catch (IOException e) {
                lastException = e;
            } catch (TimeoutException te) {
                lastException = te;
            }
        }

        if (lastException != null) {
            if (lastException instanceof IOException) {
                throw (IOException) lastException;
            } else if (lastException instanceof TimeoutException) {
                throw (TimeoutException) lastException;
            }
        }
        throw new IOException("failed to connect");
    }

想要弄懂上面的代码,还需要搞明白FrameHandler,这是一个接口: 父接口NetworkConnection中有获取主机ip,端口号的方法 FrameHandler的实现类可以理解为数据帧处理器,其可以发送数据帧到与其关联的broker(使用socket), 当然也可以读取来自broker的数据帧,还有其他一些功能如下:

public interface FrameHandler extends NetworkConnection {

    /**
     * Set the underlying socket's read timeout in milliseconds, if applicable.
     * @param timeoutMs The timeout in milliseconds
     */
    void setTimeout(int timeoutMs) throws SocketException;

    /**
     * Get the underlying socket's read timeout in milliseconds.
     * @return The timeout in milliseconds
     */
    int getTimeout() throws SocketException;

    /**
     * Send the initial connection header, thus kickstarting the AMQP
     * protocol version negotiation process and putting the underlying
     * connection in a state such that the next layer of startup can
     * proceed.
     * @throws IOException if there is a problem accessing the connection
     */
    void sendHeader() throws IOException;

    void initialize(AMQConnection connection);

    /**
     * Read a {@link Frame} from the underlying data connection.
     * @return an incoming Frame, or null if there is none
     * @throws IOException if there is a problem accessing the connection
     * @throws SocketTimeoutException if the underlying read times out
     */
    Frame readFrame() throws IOException;

    /**
     * Write a {@link Frame} to the underlying data connection.
     * @param frame the Frame to transmit
     * @throws IOException if there is a problem accessing the connection
     */
    void writeFrame(Frame frame) throws IOException;

    /**
     * Flush the underlying data connection.
     * @throws IOException if there is a problem accessing the connection
     */
    void flush() throws IOException;

    /** Close the underlying data connection (complaint not permitted). */
    void close();
}

比如实现类SocketFrameHandler

public class SocketFrameHandler implements FrameHandler {

//底层关联的socket
    /** The underlying socket */
    private final Socket _socket;

//与socket关联的输入流(数据从broker到客户端)
    /** Socket's inputstream - data from the broker - synchronized on */
    private final DataInputStream _inputStream;

//与socket关联的输出流(数据从客户端到broker)
    /** Socket's outputstream - data to the broker - synchronized on */
    private final DataOutputStream _outputStream;

所以上面创建了FranmeHandle之后执行下面的代码,其实就是将frameHandler等其他数据放入RecoveryAwareAMQConnection构造方法中,创建连接,所以 前面我们介绍的Connection中的属性就包含frameHandler;

RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);

第二部分

无论是自动恢复连接还是普通连接,和broker建立连接的交互过程都是start方法中执行的
下面是源码:

public void start()
            throws IOException, TimeoutException {
        initializeConsumerWorkService();
        initializeHeartbeatSender();
        this._running = true;
        // Make sure that the first thing we do is to send the header,
        // which should cause any socket errors to show up for us, rather
        // than risking them pop out in the MainLoop
        AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
            new AMQChannel.SimpleBlockingRpcContinuation();
        // We enqueue an RPC continuation here without sending an RPC
        // request, since the protocol specifies that after sending
        // the version negotiation header, the client (connection
        // initiator) is to wait for a connection.start method to
        // arrive.
        //channel0主要的工作就是建立和释放连接的,包括start/startok,open/openok,tune/tuneok这些
        //并且这些命令交互需要按照确定的顺序来交互,就是前面我们抓包的那个顺序,所以这里使用的是一个类似于阻塞队列的数据结构
        //保证连接的正常顺序建立
        _channel0.enqueueRpc(connStartBlocker);
        try {
            // The following two lines are akin to AMQChannel's
            // transmit() method for this pseudo-RPC.
            _frameHandler.setTimeout(handshakeTimeout);
            //发送header,就是客户端建立连接的第一个数据包,包括协议,协议id和版本id
            _frameHandler.sendHeader();
        } catch (IOException ioe) {
            _frameHandler.close();
            throw ioe;
        }
        
        this._frameHandler.initialize(this);
        //下面的代码就是我们抓包显示的过程,客户端收到服务端响应后组装amqp消息发送给服务端,整个连接交互的过程
        AMQP.Connection.Start connStart;
        AMQP.Connection.Tune connTune = null;
        try {
            //服务端->客户端   Start
            connStart =
                    (AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();

            _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());

            Version serverVersion =
                    new Version(connStart.getVersionMajor(),
                                       connStart.getVersionMinor());

            if (!Version.checkVersion(clientVersion, serverVersion)) {
                throw new ProtocolVersionMismatchException(clientVersion,
                                                                  serverVersion);
            }

            String[] mechanisms = connStart.getMechanisms().toString().split(" ");
            SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
            if (sm == null) {
                throw new IOException("No compatible authentication mechanism found - " +
                                              "server offered [" + connStart.getMechanisms() + "]");
            }

            String username = credentialsProvider.getUsername();
            String password = credentialsProvider.getPassword();

            if (credentialsProvider.getTimeBeforeExpiration() != null) {
                if (this.credentialsRefreshService == null) {
                    throw new IllegalStateException("Credentials can expire, a credentials refresh service should be set");
                }
                if (this.credentialsRefreshService.isApproachingExpiration(credentialsProvider.getTimeBeforeExpiration())) {
                    credentialsProvider.refresh();
                    username = credentialsProvider.getUsername();
                    password = credentialsProvider.getPassword();
                }
            }

            LongString challenge = null;
            LongString response = sm.handleChallenge(null, username, password);

            do {
                Method method = (challenge == null)
                                        ? new AMQP.Connection.StartOk.Builder()
                                                  .clientProperties(_clientProperties)
                                                  .mechanism(sm.getName())
                                                  .response(response)
                                                  .build()
                                        : new AMQP.Connection.SecureOk.Builder().response(response).build();

                try {
                //客户端->服务端 StartOk  //服务端->客户端   Tune
                    Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
                    if (serverResponse instanceof AMQP.Connection.Tune) {
                        connTune = (AMQP.Connection.Tune) serverResponse;
                    } else {
                        challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
                        response = sm.handleChallenge(challenge, username, password);
                    }
                } catch (ShutdownSignalException e) {
                    Method shutdownMethod = e.getReason();
                    if (shutdownMethod instanceof AMQP.Connection.Close) {
                        AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
                        if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
                            throw new AuthenticationFailureException(shutdownClose.getReplyText());
                        }
                    }
                    throw new PossibleAuthenticationFailureException(e);
                }
            } while (connTune == null);
        } catch (TimeoutException te) {
            _frameHandler.close();
            throw te;
        } catch (ShutdownSignalException sse) {
            _frameHandler.close();
            throw AMQChannel.wrap(sse);
        } catch(IOException ioe) {
            _frameHandler.close();
            throw ioe;
        }

        try {
            int negotiatedChannelMax =
                negotiateChannelMax(this.requestedChannelMax,
                                    connTune.getChannelMax());

            int channelMax = ConnectionFactory.ensureUnsignedShort(negotiatedChannelMax);

            if (channelMax != negotiatedChannelMax) {
                LOGGER.warn("Channel max must be between 0 and {}, value has been set to {} instead of {}",
                        MAX_UNSIGNED_SHORT, channelMax, negotiatedChannelMax);
            }

            _channelManager = instantiateChannelManager(channelMax, threadFactory);

            int frameMax =
                negotiatedMaxValue(this.requestedFrameMax,
                                   connTune.getFrameMax());
            this._frameMax = frameMax;

            int negotiatedHeartbeat =
                negotiatedMaxValue(this.requestedHeartbeat,
                                   connTune.getHeartbeat());

            int heartbeat = ConnectionFactory.ensureUnsignedShort(negotiatedHeartbeat);

            if (heartbeat != negotiatedHeartbeat) {
                LOGGER.warn("Heartbeat must be between 0 and {}, value has been set to {} instead of {}",
                        MAX_UNSIGNED_SHORT, heartbeat, negotiatedHeartbeat);
            }

            setHeartbeat(heartbeat);
            //客户端->服务端  Tuneok  客户端->服务端  Open 
            _channel0.transmit(new AMQP.Connection.TuneOk.Builder()
                                .channelMax(channelMax)
                                .frameMax(frameMax)
                                .heartbeat(heartbeat)
                              .build());
            _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
                                      .virtualHost(_virtualHost)
                                    .build());
        } catch (IOException ioe) {
            _heartbeatSender.shutdown();
            _frameHandler.close();
            throw ioe;
        } catch (ShutdownSignalException sse) {
            _heartbeatSender.shutdown();a
            _frameHandler.close();
            throw AMQChannel.wrap(sse);
        }

        if (this.credentialsProvider.getTimeBeforeExpiration() != null) {
            String registrationId = this.credentialsRefreshService.register(credentialsProvider, () -> {
                // return false if connection is closed, so refresh service can get rid of this registration
                if (!isOpen()) {
                    return false;
                }
                if (this._inConnectionNegotiation) {
                    // this should not happen
                    return true;
                }
                String refreshedPassword = credentialsProvider.getPassword();

                UpdateSecretExtension.UpdateSecret updateSecret = new UpdateSecretExtension.UpdateSecret(
                        LongStringHelper.asLongString(refreshedPassword), "Refresh scheduled by client"
                );
                try {
                    _channel0.rpc(updateSecret);
                } catch (ShutdownSignalException e) {
                    LOGGER.warn("Error while trying to update secret: {}. Connection has been closed.", e.getMessage());
                    return false;
                }
                return true;
            });

            addShutdownListener(sse -> this.credentialsRefreshService.unregister(this.credentialsProvider, registrationId));
        }

        // We can now respond to errors having finished tailoring the connection
        this._inConnectionNegotiation = false;
    }

要读懂这段代码,需要先了解下面的知识点:

要搞明白上面这段代码需要先弄清楚下面几个类的含义:

public class AMQCommand implements Command {
        /** The assembler for this command - synchronised on - contains all the state */
        private final CommandAssembler assembler;
}

AMQCommand是rabbitmq的java客户端对amqp协议中的命令的封装,就是之前介绍amqp时候的按照class分组的很多命令;
AMQCommand实现了Command接口,我们来看下这个接口:

public interface Command {
    /**
     * Retrieves the {@link Method} held within this Command. Downcast to
     * concrete (implementation-specific!) subclasses as necessary.
     *
     * @return the command's method.
     */
    Method getMethod();

    /**
     * Retrieves the ContentHeader subclass instance held as part of this Command, if any.
     *
     * Downcast to one of the inner classes of AMQP,
     * for instance {@link AMQP.BasicProperties}, as appropriate.
     *
     * @return the Command's {@link ContentHeader}, or null if none
     */
    ContentHeader getContentHeader();

    /**
     * Retrieves the body byte array that travelled as part of this
     * Command, if any.
     *
     * @return the Command's content body, or null if none
     */
    byte[] getContentBody();
}

不难看出,这个接口就是定义的获取amqp数据帧中的数据,包括新消息头,Method,命令,消息体;
下面再来看下AMQCommnd这个实现类:

/**
 * AMQP 0-9-1-specific implementation of {@link Command} which accumulates
 * method, header and body from a series of frames, unless these are
 * supplied at construction time.
 * <h2>Concurrency</h2>
 * This class is thread-safe.
 */
//从上面的注释可以得知,AMQCommand就是从一些列的数据帧中拼凑出amqp消息(java中封装为了AMQCommand),包括method,header和body,就是我们前面说的消息头,Method,命令,消息体
public class AMQCommand implements Command {

//但是主要用来将数据帧拼装成amqp消息(anqp命令)的是这个封装在AMQCommand中的装配器成员变量CommandAssembler
    /** The assembler for this command - synchronised on - contains all the state */
    private final CommandAssembler assembler;

    //可以看到,这里实现的接口中的方法都是基于装配器CommandAssembler来实现的
    //获取amqp消息中的method部分
    /** Public API - {@inheritDoc} */
    @Override
    public Method getMethod() {
        return this.assembler.getMethod();
    }

    //获取amqp消息中的头部
    /** Public API - {@inheritDoc} */
    @Override
    public AMQContentHeader getContentHeader() {
        return this.assembler.getContentHeader();
    }
    
    //获取amqp消息中的body部分
    /** Public API - {@inheritDoc} */
    @Override
    public byte[] getContentBody() {
        return this.assembler.getContentBody();
    }
    
    //处理数据帧,使用装配器装配,如果amqp消息装配完整了就返回true
    public boolean handleFrame(Frame f) throws IOException {
        return this.assembler.handleFrame(f);
    }

    /**
     * Sends this command down the named channel on the channel's
     * connection, possibly in multiple frames.
     * @param channel the channel on which to transmit the command
     * @throws IOException if an error is encountered
     */
    //向指定的通道上的连接发送当前的amqp命令(当然发送数据包的时候也是一帧帧发送的,不是说组装号了amqp命令,直接一次发出,因为可能amqp命令中的body
    // 就已经很大,但是数据帧设置了最大限制,那么就需要多次发送数据帧了,下面的代码就是这样)
    public void transmit(AMQChannel channel) throws IOException {
        //获取通道编号,connection在建立的时候会有一个0号通道,和broker建立连接的通信和连接关闭的通信都是使用的0号通道,其他每个线程
        //和broker进行消息通信都会有单独的一个建立在当前connection上的channel,并且channel的编号会递增
        int channelNumber = channel.getChannelNumber();
        //获取当前channel的connection
        AMQConnection connection = channel.getConnection();
        
        //线程安全控制,保证数据帧的完整和正确
        synchronized (assembler) {
            //获取amqp命令中的method
            Method m = this.assembler.getMethod();
            if (m.hasContent()) {
                //获取amqp命令中的body
                byte[] body = this.assembler.getContentBody();
                //获取amqp命令中的header
                Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
                //获取当前连接的数据帧的大小限制(这个参数是在连接建立的时候cs双方协商的)
                int frameMax = connection.getFrameMax();
                //如果限制大于0则说明有限制,0表示没有大小限制
                boolean cappedFrameMax = frameMax > 0;
                //计算每个数据帧除了必要部分我们还能使用的空闲空间大小,EMPTY_FRAME_SIZE是无body内容的数据帧占用的大小(这是标准大小,值位8)
                //如果有数据帧最大限制就是最大值减去已经被占用的必须空间,如果没有限制那么我们可以直接使用body内容大小的空间
                int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
                //如果数据帧头部已经超过最大限制,直接报错
                if (cappedFrameMax && headerFrame.size() > frameMax) {
                    String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
                    throw new IllegalArgumentException(msg);
                }
                //发送method
                connection.writeFrame(m.toFrame(channelNumber));
                //发送header
                connection.writeFrame(headerFrame);
                //发送body,每次发送的大小就是前面计算出来的数据帧甚于可用的空间bodyPayloadMax
                for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
                    int remaining = body.length - offset;

                    int fragmentLength = (remaining < bodyPayloadMax) ? remaining
                            : bodyPayloadMax;
                    Frame frame = Frame.fromBodyFragment(channelNumber, body,
                            offset, fragmentLength);
                    connection.writeFrame(frame);
                }
            } else {
                connection.writeFrame(m.toFrame(channelNumber));
            }
        }
        connection.flush();
    }
}

现在来详细看下发送数据帧的方法:

    /**
     * Public API - sends a frame directly to the broker.
     */
    public void writeFrame(Frame f) throws IOException {
        _frameHandler.writeFrame(f);
        _heartbeatSender.signalActivity();
    }

这个方法是AMQConnection中的方法,也就是和broker建立的连接提供的方法,注释中可以看出这个方法是直接发送数据帧到broker


来看下Connection中的_channel0属性:

//上面我们介绍Connection的重要属性的时候,其中有一个就是_channel0,可以看到在这里有使用,我们先来看下他是如何初始化的,在Conection的类中搜索
//_channel = 就可以看到了,如下 
    AMQChannel createChannel0() {
        return new AMQChannel(this, 0) {
            @Override public boolean processAsync(Command c) throws IOException {
                return getConnection().processControlCommand(c);
            }
        };
    }  

首先因为Channel需要关联connection,也就是这样的构造方法:

    public AMQChannel(AMQConnection connection, int channelNumber) {
        this._connection = connection;
        this._channelNumber = channelNumber;
        if(connection.getChannelRpcTimeout() < 0) {
            throw new IllegalArgumentException("Continuation timeout on RPC calls cannot be less than 0");
        }
        this._rpcTimeout = connection.getChannelRpcTimeout();
        this._checkRpcResponseType = connection.willCheckRpcResponseType();
        this._trafficListener = connection.getTrafficListener();
    }

而上面传入的就是this,也就是当前的Connection

SimpleBlockingRpcContinuation可以理解为请求发送和响应获取的队列,保证请求的顺序和响应的数据接收,结构如下:

查看原图


发送完header之后,当前帧处理器调用了initialize方法,参数传入的是当前Connection

connection.startMainLoop();
    public void startMainLoop() {
        MainLoop loop = new MainLoop();
        final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
        mainLoopThread = Environment.newThread(threadFactory, loop, name);
        mainLoopThread.start();
    }

很显然,MainLoop是一个线程执行逻辑,代码如下:

public void run() {
            boolean shouldDoFinalShutdown = true;
            try {
                //_running在建立连接后哦就是设置为true了,所以启动线程后这里的帧处理器就一直在读取帧数据,如果broker有数据响应过来就会读取到
                while (_running) {
                    //从连接关联的输入流读取broker响应的数据帧
                    Frame frame = _frameHandler.readFrame();
                    //处理响应的数据帧
                    readFrame(frame);
                }
            } catch (Throwable ex) {
                if (ex instanceof InterruptedException) {
                    // loop has been interrupted during shutdown,
                    // no need to do it again
                    shouldDoFinalShutdown = false;
                } else {
                    handleFailure(ex);
                }
            } finally {
                if (shouldDoFinalShutdown) {
                    doFinalShutdown();
                }
            }
        }
//_frameHandler.readFrame();readUnsignedByte底层调用的是阻塞的read,这里如果超时,那么读取到null
    public static Frame readFrom(DataInputStream is) throws IOException {
        int type;
        int channel;

        try {
            type = is.readUnsignedByte();
        } catch (SocketTimeoutException ste) {
            // System.err.println("Timed out waiting for a frame.");
            return null; // failed
        }
        
//readFrame(frame);
private void readFrame(Frame frame) throws IOException {
//如果frame不是null,则读取到了响应数据帧
        if (frame != null) {
        //重置_missedHeartbeats为0
            _missedHeartbeats = 0;
            if (frame.type == AMQP.FRAME_HEARTBEAT) {
            //如果当前读取到的是心跳数据帧,那么不需要做任何事了,因为心跳计数器已经被重置了
                // Ignore it: we've already just reset the heartbeat counter.
            } else {
            //如果是连接建立的通道的响应数据
                if (frame.channel == 0) { // the special channel
                    _channel0.handleFrame(frame);
                } else {
                //如果是其他数据交互
                    if (isOpen()) {
                        // If we're still _running, but not isOpen(), then we
                        // must be quiescing, which means any inbound frames
                        // for non-zero channels (and any inbound commands on
                        // channel zero that aren't Connection.CloseOk) must
                        // be discarded.
                        ChannelManager cm = _channelManager;
                        if (cm != null) {
                            ChannelN channel;
                            try {
                                channel = cm.getChannel(frame.channel);
                            } catch(UnknownChannelException e) {
                                // this can happen if channel has been closed,
                                // but there was e.g. an in-flight delivery.
                                // just ignoring the frame to avoid closing the whole connection
                                LOGGER.info("Received a frame on an unknown channel, ignoring it");
                                return;
                            }
                            channel.handleFrame(frame);
                        }
                    }
                }
            }
        } else {
        //如果响应数据帧是null,处理超时,其中的逻辑大致是说,如果超时的次数连续超过一定时间和次数,就直接抛出连接异常的错误,并且记录心跳异常
        //次数会加1(_missedHeartbeats值)  
            // Socket timeout waiting for a frame.
            // Maybe missed heartbeat.
            handleSocketTimeout();
        }
    }

//_channel0.handleFrame(frame);
    public void handleFrame(Frame frame) throws IOException {
        AMQCommand command = _command;
        //如果从当前数据帧装配出完整的amqp命令了,command.handleFrame(frame)之后frame数据帧中的数据已经被组装到了AMQCommand中,这里也就是command变量中,前面有介绍过
        if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
        //重新new一个_command,等待为下一个数据帧处理
            _command = new AMQCommand(); // prepare for the next one
            //处理当前的amqp请求,收到的响应会被放入connStartBlocker中,关于connStartBlocker的结构见上面的图片  
            handleCompleteInboundCommand(command);
        }
    }

协商参数详细过程,主要是这段代码:

do {
                Method method = (challenge == null)
                                        ? new AMQP.Connection.StartOk.Builder()
                                                  .clientProperties(_clientProperties)
                                                  .mechanism(sm.getName())
                                                  .response(response)
                                                  .build()
                                        : new AMQP.Connection.SecureOk.Builder().response(response).build();

                try {
                    Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
                    if (serverResponse instanceof AMQP.Connection.Tune) {
                        connTune = (AMQP.Connection.Tune) serverResponse;
                    } else {
                        challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
                        response = sm.handleChallenge(challenge, username, password);
                    }
                } catch (ShutdownSignalException e) {
                    Method shutdownMethod = e.getReason();
                    if (shutdownMethod instanceof AMQP.Connection.Close) {
                        AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
                        if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
                            throw new AuthenticationFailureException(shutdownClose.getReplyText());
                        }
                    }
                    throw new PossibleAuthenticationFailureException(e);
                }
            } while (connTune == null);

协商最大通道数
如果当前默认值this.requestedChannelMax(构造Connection时可以指定)或者服务端给的数值有一个为0,则取其中最大的数值,如果都不是0,取两个中最小的,所以 只有服务端和客户端都确认通道数没有限制(0),那么真正建立好的连接的通道才会是通道无限制的;

   int negotiatedChannelMax =
                negotiateChannelMax(this.requestedChannelMax,
                                    connTune.getChannelMax());
                                    
    private static int negotiatedMaxValue(int clientValue, int serverValue) {
        return (clientValue == 0 || serverValue == 0) ?
            Math.max(clientValue, serverValue) :
            Math.min(clientValue, serverValue);
    }

后面还有协商最大帧数据大小和心跳间隔,逻辑都是一样的

第三部分-总结

下面是简单总结的两个图片:

连接建立过程图
[查看原图] (http://54zh.cn/upload/2021/08/vsrr980loqhdcpq2018f2g2eka.png) 连接属性关系图
查看原图

  1. rabbimq使用的是amqp协议进行消息传递通信
  2. amqp协议需要遵循规范,在java客户端中封住了AMQCommand
  3. rabbitmq4后默认建立可自动恢复连接
  4. rabbitmq中会有一个独立的线程不断读取broker响应的数据,并且心跳维持也是在这个线程中处理
  5. rabbitmq在建立连接的过程中会和服务端协商很多连接参数
  6. rabbitmq的java客户端对amqp进行了对象封装,包括数据帧处理器,amqp命令组装器,Method等等
  7. 等等等等

补充

这里需要补充说明下,rabbitmq java客户端中的远程调用方式:
和上面我们分析的接收Start响应的方式基本一致,这里也是使用的SimpleBlockingRpcContinuation来获取响应的数据

    public AMQCommand exnWrappingRpc(Method m)
        throws IOException
    {
        try {
            return privateRpc(m);
        } catch (AlreadyClosedException ace) {
            // Do not wrap it since it means that connection/channel
            // was closed in some action in the past
            throw ace;
        } catch (ShutdownSignalException ex) {
            throw wrap(ex);
        }
    }
private AMQCommand privateRpc(Method m)
        throws IOException, ShutdownSignalException
    {
        SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
        //最终调用的也是我们前面分析的transmit方法,向指定的通道发送数据  
        rpc(m, k);
        // At this point, the request method has been sent, and we
        // should wait for the reply to arrive.
        //
        // Calling getReply() on the continuation puts us to sleep
        // until the connection's reader-thread throws the reply over
        // the fence or the RPC times out (if enabled)
        //等待响应,这里可能会超时  
        if(_rpcTimeout == NO_RPC_TIMEOUT) {
            return k.getReply();
        } else {
            try {
                return k.getReply(_rpcTimeout);
            } catch (TimeoutException e) {
                throw wrapTimeoutException(m, e);
            }
        }
    }

这个数据发送在后面建立非0编号的信道中都会使用,发送数据都会使用这样的方式