侧边栏壁纸
博主头像
Haenu的Blog 博主等级

坚持学习,慢慢进步!

  • 累计撰写 35 篇文章
  • 累计创建 10 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

消息队列笔记

Haenu
2024-06-24 / 0 评论 / 0 点赞 / 66 阅读 / 0 字

Kafka、ActiveMQ、RabbitMQ和RocketMQ都有哪些区别?


优先级队列

延迟队列

死信队列

重试队列

消费模式

事务消息

Kafka

不支持

不支持,可以间接实现延迟队列



不直接支持,可以通过消费者逻辑来实现重试机制。

主要是拉模式。

支持事务,但限于消息生产。

RocketMQ

支持

直接支持延迟队列,可以设定消息的延迟时间。


支持

支持重试队列,可以自动或手动将消息重新发送。

支持推和拉两种模式。

支持事务消息。

RabbitMQ

支持

支持延迟队列,可以通过插件或者消息TTL和死信交换来实现。

支持

可以实现重试机制,但需要通过消息属性和额外配置来手动设置。

主要是推模式,但也可以实现拉模式。

支持基本的消息事务。

ActiveMQ

支持

支持

支持

支持重试机制,可以配置消息重发策略。

支持推和拉两种模式。

支持事务消息。

消息队列有什么用?

  1. 通过异步处理提高系统性能(减少响应所需时间)

  2. 削峰/限流

  3. 降低系统耦合性。

说说 Broker 服务节点、Queue 队列、Exchange 交换器?

  • Broker:可以看做 RabbitMQ 的服务节点。一般情况下一个 Broker 可以看做一个 RabbitMQ 服务器。

  • Queue:RabbitMQ 的内部对象,用于存储消息。多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。

  • Exchange:生产者将消息发送到交换器,由交换器将消息路由到一个或者多个队列中。当路由不到时,或返回给生产者或直接丢弃

Exchange Types(交换器类型)

RabbitMQ 常用的 Exchange Type 有 fanoutdirecttopicheaders 这四种

1、fanout

fanout 类型的 Exchange 路由规则非常简单,它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。

2、direct

direct 类型的 Exchange 路由规则也很简单,它会把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。

3、topic

前面讲到 direct 类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey ,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

  • RoutingKey 为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如 “com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;

  • BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;

  • BindingKey 中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

什么是死信队列?如何导致的?

死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。

导致的死信的几种原因

  • 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。

  • 消息 TTL 过期。

  • 队列满了,无法再添加。

  • 消息无法路由:当消息不能被路由到任何队列时,例如,没有匹配的绑定关系或路由键时,消息可以被发送到死信队列。

什么是延迟队列?RabbitMQ 怎么实现延迟队列?

  • 通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。

  • 在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时,插件依赖 Erlang/OPT 18.0 及以上。

如何保证消息的可靠性?

消息丢失分为三种情况

  1. 生产者发送消息给mq的途中因为网络原因丢失

  2. mq他自己重启了 或者 宕机了 导致丢失

  3. mq给消费者的途中丢失了 ack返回但是后续的处理没有完成

解决办法:

  1. 开启生产者的确认机制 保证消息能到消息队列

  2. 我们去做持久化 让交换机 队列 消息全都持久化

  3. 消费者就是开启auto确认机制 由spring管理

  4. 开启消费者重试机制 失败多次我们放到错误交换机

消费者重复消费?

每个消息可以带一个唯一ID标识 然后等我们再插入的时候发现有了 就不去消费了

第二个就是幂等性

唯一索引

利用数据库的唯一索引约束,解决了在insert场景时幂等问题。 业务生成全局唯一的值。 通常使用唯一主键索引。

优点:实现很简单

乐观锁

乐观锁解决了计算赋值型的修改场景 示例:UPDATE user SET point = point + 20, version = version + 1 WHERE userid=1 AND version=1

缺点:操作业务前,需要先查询出当前的version版本。

token

使用token + Redis

缺点:业务请求每次请求,都会有额外的请求

去重表

把唯一主键插入去重表,再进行业务操作,且他们在同一个事务中。这个保证了重复请求时,因为去重表有唯一约束,导致请求失败,避免了幂等问题。

消息堆积

  1. 增加消费者

  2. 消费者开启线程池

  3. 使用惰性队列

一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:
●先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
●新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
●然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,
消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
●接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
●等快速消费完积压数据之后,
得恢复原先部署的架构重新用原先的 consumer 机器来消费消息。

消息顺序

解决方案很简单,单个consumer,单线程消费即可。

RabbitMQ 是如何保证高可用的?

普通集群模式

普通集群模式,就是将 RabbitMQ 实例部署到多台服务器上,多个实例之间协同工作,共享队列和交换机的元数据,并通过内部通信协议来协调消息的传递和管理。

在这种模式下,我们创建的Queue,它的元数据(配置信息)会在集群中的所有实例间进行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步到其他队列。




当我们消费消息的时候,如果消费者连接到了未保存消息的实例,那么那个实例会通过元数据定位到消息所在的实例,拉取数据过来发送给消费者进行消费。

消息的发送也是一样的,当发送者连接到了一个不保存消息的实例时,也会被转发到保存消息的实例上进行写操作。

这种集群模式下,每一个实例中的元数据是一样的,大家都是完整的数据。但是队列中的消息数据,在不同的实例上保存的是不一样的。这样通过增加实例的方式就可以提升整个集群的消息存储量,以及性能。

这种方式在高可用上有一定的帮助,不至于一个节点挂了就全都挂了。但是也还有缺点,至少这个实例上的数据是没办法被读写了。

镜像模式

顾名思义,就是每一台RabbitMQ都像一个镜像一样,存储的内容都是一样的。这种模式下,Queue的元数据和消息数据不再是单独存储在某个实例上,而是集群中的所有实例上都存储一份。

这样每次在消息写入的时候,就需要在集群中的所有实例上都同步一份,这样即使有一台实例发生故障,剩余的实例也可以正常提供完整的数据和服务。




这种模式下,就保障了RabbitMQ的高可用。

RabbitMQ如何实现消费端限流

什么是消费端限流,这是一种保护消费者的手段,假如说,现在是业务高峰期了,消息有大量堆积,导致MQ消费者需要不断地进行消息消费,很容易被打挂,甚至重启之后还是会被大量消息涌入,继续被打挂。

为了解决这个问题,RabbitMQ提供了basicQos的方式来实现消费端限流。我们可以在消费者端指定最大的未确认消息数,当达到这个限制时,RabbitMQ将不再推送新的消息给消费者,直到有一些消息得到确认。

想要实现这个功能,首先需要把自动提交关闭。

channel.basicConsume(queueName, false, consumer);/**

  • 限流设置:

  • prefetchSize:每条消息大小的设置,0是无限制

  • prefetchCount:标识每次推送多少条消息

  • global:false标识channel级别的  true:标识消费者级别的
    */
    channel.basicQos(0,10,false);

如以上配置,可以实现消费者在处理完一条消息后,才会获取下一条消息。

然后再在消费者处理完一条消息之后,手动发送确认消息给到RabbitMQ,这样就可以拉取下一条消息了:

1

channel.basicAck(deliveryTag, false); // 发送确认

0

评论区