AMQP详细学习笔记

/ 默认分类 / 0 条评论 / 902浏览

AMQP

一.前言

  1. 简介
    全称为Advanced Message Queuing Protocol ,即高级消息队列协议。
    官网

amqp是消息传递的标准协议,遵循该协议的客户端和服务端之间可以进行消息通信,和客户端语言无关,amqp定义了一套消息传递系统的不同模块 以及消息传递的连接规则。

  1. why amqp?
    学习之前,需要先了解为什么要使用这样的高级消息队列协议,他的出现是为了解决哪些问题的呢?
    在分布式系统中,不同的系统之间如果使用socket进行消息通讯,需要解决下面的一些问题:

正式因为,直接使用socket网络传输消息会出现上面的很多问题,所以才出现了amqp这个标准的消息投递协议,它规定了消息的发送和接收端是通过什么样子 的模型进行通信,消息是如何传输,保存,发送,接收等等,并且对消息丢失等情况做了处理,但是amqp只是一个标准,只是告诉了人们应该怎样在发送和接收 方进行消息投递可以解决上面的问题,需要真正使用这样的消息传递模型还是需要使用这个标准协议的实现产品。

A standard is no use without products, and there are is a choice of excellent AMQP technology suppliers.

上面是amqp协议官方给出的一段解释,并且列出了下面这些遵循了amqp协议的消息中间件产品

2.amqp中定义的几个重要的模型

rabbitmq是一种多租户架构软件,多用户的环境下共用相同的系统或程序组件,并且仍可确保各用户间数据的隔离性.

多租户简单来说是指一个单独的实例可以为多个组织服务。多租户技术为共用的数据中心内如何以单一系统架构与服务提供多数客户端相同甚至可定制化的服务,并且仍然可以保障客户的数据隔离。一个支持多租户技术的系统需要在设计上对它的数据和配置进行虚拟分区,从而使系统的每个租户或称组织都能够使用一个单独的系统实例,并且每个租户都可以根据自己的需求对租用的系统实例进行个性化配置。

RabbitMQ is multi-tenant system: connections, exchanges, queues, bindings, user permissions, policies and some other things belong to virtual hosts, logical groups of entities.
也就是说,在rabbitmq中,每一个vhost都是一个隔离的环境,他们的实体模型互不干扰;

Exchange(交换机) Queue(队列) Bindings(绑定关系)

关于交换机的种类,队列和交换机的绑定关系这些都有很多基础教程,不做阐述。

ps:交换机有direct,fanout,topic,header;
direct就是根据消息的路由键和交换机绑定的队列的路由键完全匹配;fanout就是消息广播,会将消息发送到与fanout交换机绑定的所有队列; topic交换机就是按照*,#来进行路由键的模糊匹配,发送到不同的队列中。

3. amqp0­-9-­1 协议

  1. 简单介绍

amqp协议使遵循该高级消息队列协议的客户端和中间件服务端(消息代理)可以进行通信;
amqp属于网络通信协议,网络传输是不可靠的,所以amqp定义消息传输中使用的模型的时候定义了一些保证数据传输可靠性的设置,比如消息确认回调确认,死信队列等;
amqp是一种可编程协议,也就是说,使用amqp协议,客户端可以自由发挥,客户端可以自定义amqp中的模型实体,比如队列,交换机,路由规则,然后按照客户端自定义设置好的规则进行数据传输,但是可编程性也会带来一定的 风险,比如对实体模型操作的冲突等等;应用程序声明它们需要的 AMQP 0-9-1 实体,定义必要的路由方案,并可以在不再使用 AMQP 0-9-1 实体时选择删除它们(这些都可以在声明实体的时候设置属性)

s:比如在声明交换机的时候,如果申明的交换机已经存在,并且属性一致,那么本次声明不起作用,如果交换机不存在则申明成功,如果存在,但是属性不同,那么将抛出一个通道异常406;

由于可编程特性,下面是amqp给出的实体模型的一些可以定义的属性:

1.交换机属性
必须属性:

2.队列属性
必须属性:

  1. amqp协议介绍

amqp高级消息队列协议,客户端和服务端之间的交互使用的是很多定义在规范中的Method来通信的,并且在发送amqp数据包的时候,可以携带很多数据,所以从amqp或者rabbitmq中的官网我们 可以看到这样的文档:

可以看到很多class和method,如果你是一名面向对象语言的程序员,第一次你尝试详细了解amqp的时候一定会产生一些误解,比如这里的Connection,Channel,Basic等等难道都是class类吗,start, start-ok,publish等等难道都是Method方法对象???

这里需要和大家重点解释一下: amqp协议中客户端和服务端通信的操作都是通过Method方法来定义的,每一个Method方法代表一个动作,这些Method方法被分组管理,这里的分组就是Class类,也就是说amqp中的操作是由很多 Method组成,这些Method被分组到不同的Class;

下面是其中的一部分:

完整的数据查看官方文档

所以这里需要重点注意的是,amqp中的Class和Method与面向对象开发语言中类和方法没有任何的关系; (在java客户端中,如果你看过一些源码你可能会说,就有Connection,Tune,com.rabbitmq.client.Method等类,其实这些只是作为一个面向对象语言java对 其协议的描述,使用这样的封装形式设计,来设计开发java客户端,进而然java语言可以使用amqp定义的这一套通信规范来发送和接收解析数据,这和amqp中的通信规范mehod和class没有任何其他关系)

前面说了,只要是遵循了amqp协议的客户端,无论什么语言实现的,都可以和消息中间件服务进行通信,所以数据传输就是遵循的amqp协议。
下面我们写这样一段从客户端建立连接,创建交换机,创建队列,创建绑定关系,发送消息等整个过程,使用wireshark抓取amqp协议的数据包进行查看分析,从底层的协议包就可以看出服务端和客户端之间通信操作就是遵循的上面所说的 method和class来进行的:

下面的代码使用的是rabbitmq官方提供的java客户端来进行连接操作的代码:

import com.rabbitmq.client.*;
import org.springframework.util.SerializationUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author hi@54zh.cn
 * @date 2021/4/9 - 17:35
 */
@RestController
public class TestController {

    @GetMapping("/1")
    public String sendDirectMessage() throws IOException, InterruptedException {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH🇲🇲ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.147");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
        } catch (IOException | TimeoutException e) {
            System.out.println("与rabbitmq创建连接信道失败");
        }
        HashMap<String,Object> mapArg = new HashMap<>();
        mapArg.put("testK","testV");
        channel.exchangeDeclare("testExchange","topic",false,false,false,mapArg);
        channel.queueDeclare("testQueue",false,false,false,null);
        channel.queueBind("testQueue","testExchange","testqueue.#");
        channel.confirmSelect();
        //消息成功发送到mq服务端,即发送成功,不管是否成功路由到某一个队列中
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println(String.format("confirm回调-消息NACK(消息发送到rabbitmq失败),消息标志:%d,是否为批量确认:%b",deliveryTag,multiple));
            }
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println(String.format("confirm回调-消息ACK(消息成功发送到rabbitmq),消息标志:%d,是否为批量确认:%b",deliveryTag,multiple));
            }
        });
        //消息发送到了mq服务端,但是没有找到相应的队列
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(String.format("return回调(消息发送到mq,但是没有路由到任何一个队列中)-响应code:%d,响应信息:%s,交换机:%s,路由键:%s,消息体:%s",replyCode,replyText,exchange,routingKey,new String(body)));
            }
        });
        for (int i = 0; i < 2; i++) {
            TimeUnit.MILLISECONDS.sleep(3000);
            System.out.println(i);
            try{
                channel.basicPublish("testExchange","testqueue1.haha",true,null, SerializationUtils.serialize("hello rabbitmq"));
            }catch(Exception e)
            {
                System.out.println(e.getCause());
            }
            System.out.println("===");
        }
        return "ok";
    }
}
  1. 客户端连接前先告诉服务端使用的协议版本

客户端连接前先告诉服务端使用的协议版本

  1. 服务端判断如果版本支持,发送连接建立请求

服务端判断如果版本支持,发送连接建立请求

3.客户端返回连接建立成功的响应
客户端返回连接建立成功的响应

  1. 服务端收到连接建立成功的回应后开始和客户端协商相关参数
    服务端收到连接建立成功的回应后开始和客户端协商相关参数

  2. 客户端收到服务端协商的参数后,简单做了一下合理性判断,然后将最终的值响应给服务端,这里是怎样判断的,在后面的源码分析中我会说明
    客户端收到服务端协商的参数后,简单做了一下合理性判断,然后将最终的值响应给服务端,这里是怎样判断的,在后面的源码分析中我会说明

  3. 后面都是一样的道理,按照amqp协议的规则进行通信,包括连接哪一个visualhost,开启发送验证的回调确认等等;

  4. 声明交换机

    可以看到,我们在代码中指定的交换机的属性都会加入到amqp的数据帧中发送给服务端,包括声明队列,绑定关系,回调确认等等都是一样的道理

  5. 发送消息

9.完整的过程