继上一篇文章中的例子,我们来详细分析下链接建立的过程,下面代码是我个人阅读源码的笔记,有不正确不准确的地方大家请下方留言评论. rabbitmq第一步建立连接,最终执行如下的代码:

    * Create a new broker connection with a client-provided name, picking the first available address from
    * the list provided by the {@link AddressResolver}.
    * If <a href="https://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
    * is enabled, the connection returned by this method will be {@link Recoverable}. Future
    * reconnection attempts will pick a random accessible address provided by the {@link AddressResolver}.
    * @param executor thread execution service for consumers on the connection
    * @param addressResolver discovery service to list potential addresses (hostname/port pairs) to connect to
    * @param clientProvidedName application-specific connection name, will be displayed
    *                           in the management UI if RabbitMQ server supports it.
    *                           This value doesn't have to be unique and cannot be used
    *                           as a connection identifier e.g. in HTTP API requests.
    *                           This value is supposed to be human-readable.
    * @return an interface to the connection
    * @throws java.io.IOException if it encounters a problem
    * @see <a href="https://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
   public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
       throws IOException, TimeoutException {
       if(this.metricsCollector == null) {
           this.metricsCollector = new NoOpMetricsCollector();
       // make sure we respect the provided thread factory
       FrameHandlerFactory fhFactory = createFrameHandlerFactory();
       ConnectionParams params = params(executor);
       // set client-provided via a client property
       if (clientProvidedName != null) {
           Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());
           properties.put("connection_name", clientProvidedName);
       //如果是开启了自动连接恢复,连接自动恢复,是rabbitmq java客户端在4.0之后默认开启的,如果客户端和服务端之间的连接中断或者连接失败(非程序主动申请断开,是因为网络原因发生异常断开的情况),会自动恢复连接
       if (isAutomaticRecoveryEnabled()) {
           // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
           // No Sonar: no need to close this resource because we're the one that creates it
           // and hands it over to the user
           AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector); //NOSONAR

           return conn;
       } else {
           List<Address> addrs = addressResolver.getAddresses();
           Exception lastException = null;
           for (Address addr : addrs) {
               try {
                   FrameHandler handler = fhFactory.create(addr, clientProvidedName);
                   AMQConnection conn = createConnection(params, handler, metricsCollector);
                   return conn;
               } catch (IOException e) {
                   lastException = e;
               } catch (TimeoutException te) {
                   lastException = te;
           if (lastException != null) {
               if (lastException instanceof IOException) {
                   throw (IOException) lastException;
               } else if (lastException instanceof TimeoutException) {
                   throw (TimeoutException) lastException;
           throw new IOException("failed to connect");
  1. 方法中的参数AddressResolver是地址解析器
public interface AddressResolver {

     * Get the potential {@link Address}es to connect to.
     * @return candidate {@link Address}es
     * @throws IOException if it encounters a problem
    List<Address> getAddresses() throws IOException;



    public Connection newConnection(List<Address> addrs) throws IOException, TimeoutException {
        return newConnection(this.sharedExecutor, addrs, null);



    public Connection newConnection(ExecutorService executor, List<Address> addrs, String clientProvidedName)
            throws IOException, TimeoutException {
        return newConnection(executor, createAddressResolver(addrs), clientProvidedName);


    protected AddressResolver createAddressResolver(List<Address> addresses) {
        if(addresses.size() == 1 && !isSSL()) {
            return new DnsRecordIpAddressResolver(addresses.get(0), isSSL());
        } else {
            return new ListAddressResolver(addresses);


  1. 上面判断是否开启了使用自动连接恢复,于是这里就会有两种类型的连接

一种是:RecoveryAwareAMQConnection 另一个是:AMQConnection

public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {

    private Thread mainLoopThread;
    private final AMQChannel _channel0;
    /** Frame source/sink */
    private final FrameHandler _frameHandler;

    /** Flag controlling the main driver loop's termination */
    private volatile boolean _running = false;

    /** Manages heart-beat sending for this connection */
    private HeartbeatSender _heartbeatSender;

    private final String _virtualHost;
    private final int requestedHeartbeat;
    private final int requestedChannelMax;
    private final int requestedFrameMax;


    public void init() throws IOException, TimeoutException {
        this.delegate = this.cf.newConnection();
    private volatile RecoveryAwareAMQConnection delegate;

addAutomaticRecoveryListener是为当前连接注册断开自动连接的监听器,关于监听器的调用时机,我参考了源码和官方github上的issue 但是还是没能真正清楚的弄明白,我知道的是,mq客户端会注册shutdown监听器,当连接关闭后会调用连接关闭监听器,连接关闭监听器执行后会执行连接 重新连接监听器

private void addAutomaticRecoveryListener(final RecoveryAwareAMQConnection newConn) {
        final AutorecoveringConnection c = this;
        // this listener will run after shutdown listeners,
        // see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
        RecoveryCanBeginListener starter = cause -> {
            try {
                if (shouldTriggerConnectionRecovery(cause)) {
            } catch (Exception e) {
                newConn.getExceptionHandler().handleConnectionRecoveryException(c, e);
        synchronized (this) {
private synchronized void beginAutomaticRecovery() throws InterruptedException {

        final RecoveryAwareAMQConnection newConn = this.recoverConnection();
        if (newConn == null) {
        LOGGER.debug("Connection {} has recovered", newConn);
	    // don't assign new delegate connection until channel recovery is complete
	    this.delegate = newConn;
	    if (this.params.isTopologyRecoveryEnabled()) {


// Returns new connection if the connection was recovered, 
	// null if application initiated shutdown while attempting recovery.  
    private RecoveryAwareAMQConnection recoverConnection() throws InterruptedException {
        int attempts = 0;
        while (!manuallyClosed) {
            try {
                // No Sonar: no need to close this resource because we're the one that creates it
                // and hands it over to the user
				RecoveryAwareAMQConnection newConn = this.cf.newConnection(); //NOSONAR
				synchronized(recoveryLock) {
					if (!manuallyClosed) {
						// This is the standard case.				
						return newConn;
				// This is the once in a blue moon case.  
				// Application code just called close as the connection
				// was being re-established.  So we attempt to close the newly created connection.
				return null;
            } catch (Exception e) {
                this.getExceptionHandler().handleConnectionRecoveryException(this, e);
		return null;


        public ExponentialBackoffDelayHandler() {
            sequence = Arrays.asList(2000L, 3000L, 5000L, 8000L, 13000L, 21000L, 34000L);

        public long getDelay(int recoveryAttempts) {
            int index = recoveryAttempts >= sequence.size() ? sequence.size() - 1 : recoveryAttempts;
            return sequence.get(index);


  1. 建立自动恢复连接的时候,执行的是下面的代码:
    public RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
        Exception lastException = null;
        List<Address> shuffled = shuffle(addressResolver.getAddresses());

        for (Address addr : shuffled) {
            try {
                FrameHandler frameHandler = factory.create(addr, connectionName());
                RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
                return conn;
            } catch (IOException e) {
                lastException = e;
            } catch (TimeoutException te) {
                lastException = te;

        if (lastException != null) {
            if (lastException instanceof IOException) {
                throw (IOException) lastException;
            } else if (lastException instanceof TimeoutException) {
                throw (TimeoutException) lastException;
        throw new IOException("failed to connect");

想要弄懂上面的代码,还需要搞明白FrameHandler,这是一个接口: 父接口NetworkConnection中有获取主机ip,端口号的方法 FrameHandler的实现类可以理解为数据帧处理器,其可以发送数据帧到与其关联的broker(使用socket), 当然也可以读取来自broker的数据帧,还有其他一些功能如下:

public interface FrameHandler extends NetworkConnection {

     * Set the underlying socket's read timeout in milliseconds, if applicable.
     * @param timeoutMs The timeout in milliseconds
    void setTimeout(int timeoutMs) throws SocketException;

     * Get the underlying socket's read timeout in milliseconds.
     * @return The timeout in milliseconds
    int getTimeout() throws SocketException;

     * Send the initial connection header, thus kickstarting the AMQP
     * protocol version negotiation process and putting the underlying
     * connection in a state such that the next layer of startup can
     * proceed.
     * @throws IOException if there is a problem accessing the connection
    void sendHeader() throws IOException;

    void initialize(AMQConnection connection);

     * Read a {@link Frame} from the underlying data connection.
     * @return an incoming Frame, or null if there is none
     * @throws IOException if there is a problem accessing the connection
     * @throws SocketTimeoutException if the underlying read times out
    Frame readFrame() throws IOException;

     * Write a {@link Frame} to the underlying data connection.
     * @param frame the Frame to transmit
     * @throws IOException if there is a problem accessing the connection
    void writeFrame(Frame frame) throws IOException;

     * Flush the underlying data connection.
     * @throws IOException if there is a problem accessing the connection
    void flush() throws IOException;

    /** Close the underlying data connection (complaint not permitted). */
    void close();


public class SocketFrameHandler implements FrameHandler {

    /** The underlying socket */
    private final Socket _socket;

    /** Socket's inputstream - data from the broker - synchronized on */
    private final DataInputStream _inputStream;

    /** Socket's outputstream - data to the broker - synchronized on */
    private final DataOutputStream _outputStream;

所以上面创建了FranmeHandle之后执行下面的代码,其实就是将frameHandler等其他数据放入RecoveryAwareAMQConnection构造方法中,创建连接,所以 前面我们介绍的Connection中的属性就包含frameHandler;

RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);



public void start()
            throws IOException, TimeoutException {
        this._running = true;
        // Make sure that the first thing we do is to send the header,
        // which should cause any socket errors to show up for us, rather
        // than risking them pop out in the MainLoop
        AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
            new AMQChannel.SimpleBlockingRpcContinuation();
        // We enqueue an RPC continuation here without sending an RPC
        // request, since the protocol specifies that after sending
        // the version negotiation header, the client (connection
        // initiator) is to wait for a connection.start method to
        // arrive.
        try {
            // The following two lines are akin to AMQChannel's
            // transmit() method for this pseudo-RPC.
        } catch (IOException ioe) {
            throw ioe;
        AMQP.Connection.Start connStart;
        AMQP.Connection.Tune connTune = null;
        try {
            //服务端->客户端   Start
            connStart =
                    (AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();

            _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());

            Version serverVersion =
                    new Version(connStart.getVersionMajor(),

            if (!Version.checkVersion(clientVersion, serverVersion)) {
                throw new ProtocolVersionMismatchException(clientVersion,

            String[] mechanisms = connStart.getMechanisms().toString().split(" ");
            SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
            if (sm == null) {
                throw new IOException("No compatible authentication mechanism found - " +
                                              "server offered [" + connStart.getMechanisms() + "]");

            String username = credentialsProvider.getUsername();
            String password = credentialsProvider.getPassword();

            if (credentialsProvider.getTimeBeforeExpiration() != null) {
                if (this.credentialsRefreshService == null) {
                    throw new IllegalStateException("Credentials can expire, a credentials refresh service should be set");
                if (this.credentialsRefreshService.isApproachingExpiration(credentialsProvider.getTimeBeforeExpiration())) {
                    username = credentialsProvider.getUsername();
                    password = credentialsProvider.getPassword();

            LongString challenge = null;
            LongString response = sm.handleChallenge(null, username, password);

            do {
                Method method = (challenge == null)
                                        ? new AMQP.Connection.StartOk.Builder()
                                        : new AMQP.Connection.SecureOk.Builder().response(response).build();

                try {
                //客户端->服务端 StartOk  //服务端->客户端   Tune
                    Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
                    if (serverResponse instanceof AMQP.Connection.Tune) {
                        connTune = (AMQP.Connection.Tune) serverResponse;
                    } else {
                        challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
                        response = sm.handleChallenge(challenge, username, password);
                } catch (ShutdownSignalException e) {
                    Method shutdownMethod = e.getReason();
                    if (shutdownMethod instanceof AMQP.Connection.Close) {
                        AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
                        if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
                            throw new AuthenticationFailureException(shutdownClose.getReplyText());
                    throw new PossibleAuthenticationFailureException(e);
            } while (connTune == null);
        } catch (TimeoutException te) {
            throw te;
        } catch (ShutdownSignalException sse) {
            throw AMQChannel.wrap(sse);
        } catch(IOException ioe) {
            throw ioe;

        try {
            int negotiatedChannelMax =

            int channelMax = ConnectionFactory.ensureUnsignedShort(negotiatedChannelMax);

            if (channelMax != negotiatedChannelMax) {
                LOGGER.warn("Channel max must be between 0 and {}, value has been set to {} instead of {}",
                        MAX_UNSIGNED_SHORT, channelMax, negotiatedChannelMax);

            _channelManager = instantiateChannelManager(channelMax, threadFactory);

            int frameMax =
            this._frameMax = frameMax;

            int negotiatedHeartbeat =

            int heartbeat = ConnectionFactory.ensureUnsignedShort(negotiatedHeartbeat);

            if (heartbeat != negotiatedHeartbeat) {
                LOGGER.warn("Heartbeat must be between 0 and {}, value has been set to {} instead of {}",
                        MAX_UNSIGNED_SHORT, heartbeat, negotiatedHeartbeat);

            //客户端->服务端  Tuneok  客户端->服务端  Open 
            _channel0.transmit(new AMQP.Connection.TuneOk.Builder()
            _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
        } catch (IOException ioe) {
            throw ioe;
        } catch (ShutdownSignalException sse) {
            throw AMQChannel.wrap(sse);

        if (this.credentialsProvider.getTimeBeforeExpiration() != null) {
            String registrationId = this.credentialsRefreshService.register(credentialsProvider, () -> {
                // return false if connection is closed, so refresh service can get rid of this registration
                if (!isOpen()) {
                    return false;
                if (this._inConnectionNegotiation) {
                    // this should not happen
                    return true;
                String refreshedPassword = credentialsProvider.getPassword();

                UpdateSecretExtension.UpdateSecret updateSecret = new UpdateSecretExtension.UpdateSecret(
                        LongStringHelper.asLongString(refreshedPassword), "Refresh scheduled by client"
                try {
                } catch (ShutdownSignalException e) {
                    LOGGER.warn("Error while trying to update secret: {}. Connection has been closed.", e.getMessage());
                    return false;
                return true;

            addShutdownListener(sse -> this.credentialsRefreshService.unregister(this.credentialsProvider, registrationId));

        // We can now respond to errors having finished tailoring the connection
        this._inConnectionNegotiation = false;



public class AMQCommand implements Command {
        /** The assembler for this command - synchronised on - contains all the state */
        private final CommandAssembler assembler;


public interface Command {
     * Retrieves the {@link Method} held within this Command. Downcast to
     * concrete (implementation-specific!) subclasses as necessary.
     * @return the command's method.
    Method getMethod();

     * Retrieves the ContentHeader subclass instance held as part of this Command, if any.
     * Downcast to one of the inner classes of AMQP,
     * for instance {@link AMQP.BasicProperties}, as appropriate.
     * @return the Command's {@link ContentHeader}, or null if none
    ContentHeader getContentHeader();

     * Retrieves the body byte array that travelled as part of this
     * Command, if any.
     * @return the Command's content body, or null if none
    byte[] getContentBody();


 * AMQP 0-9-1-specific implementation of {@link Command} which accumulates
 * method, header and body from a series of frames, unless these are
 * supplied at construction time.
 * <h2>Concurrency</h2>
 * This class is thread-safe.
public class AMQCommand implements Command {

    /** The assembler for this command - synchronised on - contains all the state */
    private final CommandAssembler assembler;

    /** Public API - {@inheritDoc} */
    public Method getMethod() {
        return this.assembler.getMethod();

    /** Public API - {@inheritDoc} */
    public AMQContentHeader getContentHeader() {
        return this.assembler.getContentHeader();
    /** Public API - {@inheritDoc} */
    public byte[] getContentBody() {
        return this.assembler.getContentBody();
    public boolean handleFrame(Frame f) throws IOException {
        return this.assembler.handleFrame(f);

     * Sends this command down the named channel on the channel's
     * connection, possibly in multiple frames.
     * @param channel the channel on which to transmit the command
     * @throws IOException if an error is encountered
    // 就已经很大,但是数据帧设置了最大限制,那么就需要多次发送数据帧了,下面的代码就是这样)
    public void transmit(AMQChannel channel) throws IOException {
        int channelNumber = channel.getChannelNumber();
        AMQConnection connection = channel.getConnection();
        synchronized (assembler) {
            Method m = this.assembler.getMethod();
            if (m.hasContent()) {
                byte[] body = this.assembler.getContentBody();
                Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
                int frameMax = connection.getFrameMax();
                boolean cappedFrameMax = frameMax > 0;
                int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
                if (cappedFrameMax && headerFrame.size() > frameMax) {
                    String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
                    throw new IllegalArgumentException(msg);
                for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
                    int remaining = body.length - offset;

                    int fragmentLength = (remaining < bodyPayloadMax) ? remaining
                            : bodyPayloadMax;
                    Frame frame = Frame.fromBodyFragment(channelNumber, body,
                            offset, fragmentLength);
            } else {


     * Public API - sends a frame directly to the broker.
    public void writeFrame(Frame f) throws IOException {



//_channel = 就可以看到了,如下 
    AMQChannel createChannel0() {
        return new AMQChannel(this, 0) {
            @Override public boolean processAsync(Command c) throws IOException {
                return getConnection().processControlCommand(c);


    public AMQChannel(AMQConnection connection, int channelNumber) {
        this._connection = connection;
        this._channelNumber = channelNumber;
        if(connection.getChannelRpcTimeout() < 0) {
            throw new IllegalArgumentException("Continuation timeout on RPC calls cannot be less than 0");
        this._rpcTimeout = connection.getChannelRpcTimeout();
        this._checkRpcResponseType = connection.willCheckRpcResponseType();
        this._trafficListener = connection.getTrafficListener();





    public void startMainLoop() {
        MainLoop loop = new MainLoop();
        final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
        mainLoopThread = Environment.newThread(threadFactory, loop, name);


public void run() {
            boolean shouldDoFinalShutdown = true;
            try {
                while (_running) {
                    Frame frame = _frameHandler.readFrame();
            } catch (Throwable ex) {
                if (ex instanceof InterruptedException) {
                    // loop has been interrupted during shutdown,
                    // no need to do it again
                    shouldDoFinalShutdown = false;
                } else {
            } finally {
                if (shouldDoFinalShutdown) {
    public static Frame readFrom(DataInputStream is) throws IOException {
        int type;
        int channel;

        try {
            type = is.readUnsignedByte();
        } catch (SocketTimeoutException ste) {
            // System.err.println("Timed out waiting for a frame.");
            return null; // failed
private void readFrame(Frame frame) throws IOException {
        if (frame != null) {
            _missedHeartbeats = 0;
            if (frame.type == AMQP.FRAME_HEARTBEAT) {
                // Ignore it: we've already just reset the heartbeat counter.
            } else {
                if (frame.channel == 0) { // the special channel
                } else {
                    if (isOpen()) {
                        // If we're still _running, but not isOpen(), then we
                        // must be quiescing, which means any inbound frames
                        // for non-zero channels (and any inbound commands on
                        // channel zero that aren't Connection.CloseOk) must
                        // be discarded.
                        ChannelManager cm = _channelManager;
                        if (cm != null) {
                            ChannelN channel;
                            try {
                                channel = cm.getChannel(frame.channel);
                            } catch(UnknownChannelException e) {
                                // this can happen if channel has been closed,
                                // but there was e.g. an in-flight delivery.
                                // just ignoring the frame to avoid closing the whole connection
                                LOGGER.info("Received a frame on an unknown channel, ignoring it");
        } else {
            // Socket timeout waiting for a frame.
            // Maybe missed heartbeat.

    public void handleFrame(Frame frame) throws IOException {
        AMQCommand command = _command;
        if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
            _command = new AMQCommand(); // prepare for the next one


do {
                Method method = (challenge == null)
                                        ? new AMQP.Connection.StartOk.Builder()
                                        : new AMQP.Connection.SecureOk.Builder().response(response).build();

                try {
                    Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
                    if (serverResponse instanceof AMQP.Connection.Tune) {
                        connTune = (AMQP.Connection.Tune) serverResponse;
                    } else {
                        challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
                        response = sm.handleChallenge(challenge, username, password);
                } catch (ShutdownSignalException e) {
                    Method shutdownMethod = e.getReason();
                    if (shutdownMethod instanceof AMQP.Connection.Close) {
                        AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
                        if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
                            throw new AuthenticationFailureException(shutdownClose.getReplyText());
                    throw new PossibleAuthenticationFailureException(e);
            } while (connTune == null);

如果当前默认值this.requestedChannelMax(构造Connection时可以指定)或者服务端给的数值有一个为0,则取其中最大的数值,如果都不是0,取两个中最小的,所以 只有服务端和客户端都确认通道数没有限制(0),那么真正建立好的连接的通道才会是通道无限制的;

   int negotiatedChannelMax =
    private static int negotiatedMaxValue(int clientValue, int serverValue) {
        return (clientValue == 0 || serverValue == 0) ?
            Math.max(clientValue, serverValue) :
            Math.min(clientValue, serverValue);




[查看原图] (http://54zh.cn/upload/2021/08/vsrr980loqhdcpq2018f2g2eka.png) 连接属性关系图

  1. rabbimq使用的是amqp协议进行消息传递通信
  2. amqp协议需要遵循规范,在java客户端中封住了AMQCommand
  3. rabbitmq4后默认建立可自动恢复连接
  4. rabbitmq中会有一个独立的线程不断读取broker响应的数据,并且心跳维持也是在这个线程中处理
  5. rabbitmq在建立连接的过程中会和服务端协商很多连接参数
  6. rabbitmq的java客户端对amqp进行了对象封装,包括数据帧处理器,amqp命令组装器,Method等等
  7. 等等等等


这里需要补充说明下,rabbitmq java客户端中的远程调用方式:

    public AMQCommand exnWrappingRpc(Method m)
        throws IOException
        try {
            return privateRpc(m);
        } catch (AlreadyClosedException ace) {
            // Do not wrap it since it means that connection/channel
            // was closed in some action in the past
            throw ace;
        } catch (ShutdownSignalException ex) {
            throw wrap(ex);
private AMQCommand privateRpc(Method m)
        throws IOException, ShutdownSignalException
        SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
        rpc(m, k);
        // At this point, the request method has been sent, and we
        // should wait for the reply to arrive.
        // Calling getReply() on the continuation puts us to sleep
        // until the connection's reader-thread throws the reply over
        // the fence or the RPC times out (if enabled)
        if(_rpcTimeout == NO_RPC_TIMEOUT) {
            return k.getReply();
        } else {
            try {
                return k.getReply(_rpcTimeout);
            } catch (TimeoutException e) {
                throw wrapTimeoutException(m, e);
