Kafka、ActiveMQ、RabbitMQ和RocketMQ都有哪些区别?
消息队列有什么用?
通过异步处理提高系统性能(减少响应所需时间)
削峰/限流
降低系统耦合性。
说说 Broker 服务节点、Queue 队列、Exchange 交换器?
Broker:可以看做 RabbitMQ 的服务节点。一般情况下一个 Broker 可以看做一个 RabbitMQ 服务器。
Queue:RabbitMQ 的内部对象,用于存储消息。多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
Exchange:生产者将消息发送到交换器,由交换器将消息路由到一个或者多个队列中。当路由不到时,或返回给生产者或直接丢弃
Exchange Types(交换器类型)
RabbitMQ 常用的 Exchange Type 有 fanout、direct、topic、headers 这四种
1、fanout
fanout 类型的 Exchange 路由规则非常简单,它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。
2、direct
direct 类型的 Exchange 路由规则也很简单,它会把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。
3、topic
前面讲到 direct 类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey ,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
RoutingKey 为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如 “com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;
BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;
BindingKey 中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。
什么是死信队列?如何导致的?
死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
导致的死信的几种原因:
消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。
消息 TTL 过期。
队列满了,无法再添加。
消息无法路由:当消息不能被路由到任何队列时,例如,没有匹配的绑定关系或路由键时,消息可以被发送到死信队列。
什么是延迟队列?RabbitMQ 怎么实现延迟队列?
通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。
在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时,插件依赖 Erlang/OPT 18.0 及以上。
如何保证消息的可靠性?
消息丢失分为三种情况
生产者发送消息给mq的途中因为网络原因丢失
mq他自己重启了 或者 宕机了 导致丢失
mq给消费者的途中丢失了 ack返回但是后续的处理没有完成
解决办法:
开启生产者的确认机制 保证消息能到消息队列
我们去做持久化 让交换机 队列 消息全都持久化
消费者就是开启auto确认机制 由spring管理
开启消费者重试机制 失败多次我们放到错误交换机
消费者重复消费?
每个消息可以带一个唯一ID标识 然后等我们再插入的时候发现有了 就不去消费了
第二个就是幂等性
消息堆积
增加消费者
消费者开启线程池
使用惰性队列
一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:
●先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
●新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
●然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
●接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
●等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
消息顺序
解决方案很简单,单个consumer,单线程消费即可。
RabbitMQ 是如何保证高可用的?
普通集群模式
普通集群模式,就是将 RabbitMQ 实例部署到多台服务器上,多个实例之间协同工作,共享队列和交换机的元数据,并通过内部通信协议来协调消息的传递和管理。
在这种模式下,我们创建的Queue,它的元数据(配置信息)会在集群中的所有实例间进行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步到其他队列。
当我们消费消息的时候,如果消费者连接到了未保存消息的实例,那么那个实例会通过元数据定位到消息所在的实例,拉取数据过来发送给消费者进行消费。
消息的发送也是一样的,当发送者连接到了一个不保存消息的实例时,也会被转发到保存消息的实例上进行写操作。
这种集群模式下,每一个实例中的元数据是一样的,大家都是完整的数据。但是队列中的消息数据,在不同的实例上保存的是不一样的。这样通过增加实例的方式就可以提升整个集群的消息存储量,以及性能。
这种方式在高可用上有一定的帮助,不至于一个节点挂了就全都挂了。但是也还有缺点,至少这个实例上的数据是没办法被读写了。
镜像模式
顾名思义,就是每一台RabbitMQ都像一个镜像一样,存储的内容都是一样的。这种模式下,Queue的元数据和消息数据不再是单独存储在某个实例上,而是集群中的所有实例上都存储一份。
这样每次在消息写入的时候,就需要在集群中的所有实例上都同步一份,这样即使有一台实例发生故障,剩余的实例也可以正常提供完整的数据和服务。
这种模式下,就保障了RabbitMQ的高可用。
RabbitMQ如何实现消费端限流
什么是消费端限流,这是一种保护消费者的手段,假如说,现在是业务高峰期了,消息有大量堆积,导致MQ消费者需要不断地进行消息消费,很容易被打挂,甚至重启之后还是会被大量消息涌入,继续被打挂。
为了解决这个问题,RabbitMQ提供了basicQos的方式来实现消费端限流。我们可以在消费者端指定最大的未确认消息数,当达到这个限制时,RabbitMQ将不再推送新的消息给消费者,直到有一些消息得到确认。
想要实现这个功能,首先需要把自动提交关闭。
channel.basicConsume(queueName, false, consumer);/**
限流设置:
prefetchSize:每条消息大小的设置,0是无限制
prefetchCount:标识每次推送多少条消息
global:false标识channel级别的 true:标识消费者级别的
*/
channel.basicQos(0,10,false);
如以上配置,可以实现消费者在处理完一条消息后,才会获取下一条消息。
然后再在消费者处理完一条消息之后,手动发送确认消息给到RabbitMQ,这样就可以拉取下一条消息了:
1
channel.basicAck(deliveryTag, false); // 发送确认
评论区