RabbitMq中共用队列导致的问题和解决办法

/ 默认分类 / 0 条评论 / 818浏览

RabbitMq中共用队列导致的问题和解决办法

1.写在前面

这篇文章所表达的问题就是,当我们测试环境只有一个rmq的时候,如果我们需要在开发环境也使用这个rmq,那么会导致测试的消息被开发环境消费,当然这个问题很好解决,根本原因就是缺少一个开发环境的rmq,或者将开发环境换到一个host,但是我所接手的这个项目中刚好不是这样,所以导致我需要在使用测试环境的时候,关闭开发环境的消费,所以这里可以考虑使用下面集中方法。

2. 解决方法

1. 方法一: 直接在rmq的控制台关闭Connection

这种方法简单粗暴,但是在项目重新构建或者重启之后会使消费连接重新生效

首先在queue的消费者中找到需要关闭的连接,然后进入Connection的tab页找到刚刚的连接点进去,最下方会有一个Force Close关闭连接即可

2. 方法二: 在代码中控制开启和关闭

@RabbitListener注解会申明为一个监听消息消费的容器,这里面有一个id属性标志这个容器,还有一个autoStartup属性设置该容器是否自动启动,springboot整合amqp支持手动启动容器,只需要设置autoStartup属性为false,然后我们可以通过使用单个容器的ID,调用RabbitListenerEndpointRegistry类的getListenerContainer(String id)方法来获得对单个容器的引用,并执行strat方法,启动容器。

/**
 * @author zh@whatsoft.cn
 * @date 2021/2/24 - 16:03
 */
@Api(value = "rabbitmq监听消费控制", tags = {"rabbitmq监听消费控制"})
@RestController
@RequestMapping("/rbmq")
@Slf4j
public class RbmqListenerController {

    @Autowired
    RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

    @GetMapping("/all_listener")
    @ApiOperation(value = "获取所有消费监听", notes = "获取所有消费监听")
    public R<Object> getRbmqListeners() {
        List<MyRbmq> rbmqList = new ArrayList<>();
        Collection<MessageListenerContainer> listenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();
        for (MessageListenerContainer listenerContainer : listenerContainers) {
            SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) listenerContainer;
            ConnectionFactory connectionFactory = container.getConnectionFactory();
            MyRbmq rbmq = new MyRbmq();
            rbmq.setId(container.getListenerId());
            rbmq.setHost(connectionFactory.getHost());
            rbmq.setVisualHost(connectionFactory.getVirtualHost());
            rbmq.setQueueNames(Arrays.asList(container.getQueueNames()));
            rbmq.setActive(container.isActive());
            rbmq.setPort(connectionFactory.getPort());
            rbmqList.add(rbmq);
        }
        return R.ok().data(rbmqList);
    }

    @GetMapping("/listener/{id}")
    @ApiOperation(value = "获取指定id的消费监听容器状态", notes = "获取指定id的消费监听容器状态")
    public R<Object> getRbmqListener(@PathVariable("id") String id) {
        MessageListenerContainer listenerContainer = rabbitListenerEndpointRegistry.getListenerContainer(id);
        SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) listenerContainer;
        ConnectionFactory connectionFactory = container.getConnectionFactory();
        MyRbmq rbmq = new MyRbmq();
        rbmq.setId(container.getListenerId());
        rbmq.setHost(connectionFactory.getHost());
        rbmq.setVisualHost(connectionFactory.getVirtualHost());
        rbmq.setQueueNames(Arrays.asList(container.getQueueNames()));
        rbmq.setActive(container.isActive());
        rbmq.setPort(connectionFactory.getPort());
        return R.ok().data(rbmq);
    }

    @GetMapping("/start/{id}")
    @ApiOperation(value = "启动指定id的监听消费容器", notes = "启动指定id的监听消费容器")
    public R<Object> startRbmqListener(@PathVariable("id") String id) {
        MessageListenerContainer listenerContainer = rabbitListenerEndpointRegistry.getListenerContainer(id);
        return listenerContainer.isRunning() ? R.fail().data("this container is already running") : run(listenerContainer);
    }

    @GetMapping("/stop/{id}")
    @ApiOperation(value = "关闭指定id的监听消费容器", notes = "关闭指定id的监听消费容器")
    public R<Object> stopRbmqListener(@PathVariable("id") String id) {
        MessageListenerContainer listenerContainer = rabbitListenerEndpointRegistry.getListenerContainer(id);
        return listenerContainer.isRunning() ? stop(listenerContainer) : R.fail().data("this container is already closed");
    }


    private R<Object> stop(MessageListenerContainer listenerContainer) {
        try{
            listenerContainer.stop();
            return R.ok();
        }catch(Exception e)
        {
            return R.fail().message(U.allError(e));
        }
    }


    private R<Object> run(MessageListenerContainer listenerContainer) {
        try{
            listenerContainer.start();
            return R.ok();
        }catch(Exception e)
        {
            return R.fail().message(U.allError(e));
        }
    }

}

3.@RabbitListener原理解析

见晖的这篇文章