延迟消息队列设计

标签: | 发表时间:2021-08-15 22:38 | 作者:
出处:https://juejin.cn

由于Kafka不支持延迟消息,而目前公司技术栈中消息中间件使用的是Kafka,业务方希望使用RocketMQ满足延迟消息场景,但如果仅仅只是需要延迟消息功能而引入多一套消息中间件,这会增加运维与维护成本。在此背景下,我们希望通过扩展Kafka客户端提供延迟消息的支持。

本篇将介绍四种延迟消息实现方案的原理,以及分析其优缺点。

方案一:时间轮算法

每个生产者持有一个时间轮延迟消息队列,消息保存在内存中。

  • slot = (当前时间戳 - 时间轮启动时间) % slot总数;
  • round = (当前时间戳 - 时间轮启动时间) / slot总数;
  • 延时消息链表: 按round排序。

截屏2021-08-14 20.48.25.png

缺点分析:

  • 消耗内存资源严重,只适用于延迟时间短(如一分钟内)的场景;
  • 容易丢失消息。

方案二:单轮时间轮算法+文件存储(方案一的改进版)

使用单轮的时间轮算法,单轮的slot数量满足max delay = slot count,并让每个slot指向一个文件。

截屏2021-08-14 21.13.21.png

缺点分析:

  • 如果服务容器化部署,重新构建后也会导致时间轮文件丢失,无法保证消息一致性。

方案三:多级分区+自动降级

按等级划分多个分区,根据剩余延时(期望发送时间-当前时间)将消息降级到指定分区,直到降级到真实topic。

截屏2021-08-14 21.03.19.png

缺点分析:

  • 需要经过多次发送-订阅,如果按照图中的等级划分,那么一个延迟2小时的消息至少要经过8次订阅、9次发送才最终发送到目标topic;
  • 由于同一个等级中每个消息的延时不同,如果要确保消息延迟准确,就可能导致一条消息不止需要经过8次订阅、9次发送才最终发送到目标topic,这次数可能会翻好几倍。

比如都是30~60分钟的等级,如果目前队列中的几条消息按顺序延迟值分别为: 50、40、36、56,为了不影响后面的延时消息,前面每个消息都必须要消费,然后重新写回同等级分区。因此,最坏的情况下,延时高的消息可能需要经过上百次发送-订阅才能完成。

方案四:多级延迟,不支持任意时间精度的延迟消息(方案三的改进版)

参考RocketMQ支持延迟消息设计,不支持任意时间精度的延迟消息,只支持特定级别的延迟消息,将消息延迟等级分为1s、5s、10s 、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h,共18个级别,只创建一个有18个分区的延时topic,每个分区对应不同延时等级。

截屏2021-08-14 20.48.59.png

举例:

  • 当发送延迟5秒消息时,将消息发送到order-topic.delay的第二个分区;
  • 当发送延迟1分钟消息时,将消息发送到order-topic.delay的第五个分区;
  • 当发送延迟1小时消息时,将消息发送到order-topic.delay的第17个分区;

优点:

  • 保证了每个分区中的消息都是时间顺序的,只需要顺序消费每个分区,将已经达到发送时间的消息转发到真实topic即可;
  • 如果消息未到达发送时间,则不需要提交offset,因为相同分区上的offset之后的消息也必定是未到发送时间的。

我们最终采用的是方案四,在实现上,我们为每个进程启动一个KafkaConsumer,使用正则表达式订阅以'.delay'结尾的topic,以此减少线程资源的消耗。在将消息发送到延迟topic时,将延迟等级作为消息key,而将原消息key存储在消息头,等发送到实际topic时再从延迟消息的消息头获取real key以及real topic。

相关 [延迟 消息队列 设计] 推荐:

延迟消息队列设计

- -
由于Kafka不支持延迟消息,而目前公司技术栈中消息中间件使用的是Kafka,业务方希望使用RocketMQ满足延迟消息场景,但如果仅仅只是需要延迟消息功能而引入多一套消息中间件,这会增加运维与维护成本. 在此背景下,我们希望通过扩展Kafka客户端提供延迟消息的支持. 本篇将介绍四种延迟消息实现方案的原理,以及分析其优缺点.

消息队列设计精要

- - 美团点评技术团队
消息队列已经逐渐成为企业IT系统内部通信的核心手段. 它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一. 当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify、MetaQ、RocketMQ等.

快速的消息队列 SquirrelMQ

- Le - 开源中国社区最新软件
SquirrelMQ是一个快速的消息队列.   SquirrelMQ VS Redis 入队列: SquirrelMQ:100万条数据,105S,内存使用84MB. Redis:100万条数据,156S,内存使用127MB.   出队列:   SquirrelMQ:100万条数据,230S. Redis:100万条数据,163S.

Feed消息队列架构分析

- - Tim[后端技术]
最近一两年,大部分系统的数据流由基于日志的离线处理方式转变成实时的流式处理方式,并逐渐形成几种通用的使用方式,以下介绍微博的消息队列体系. 当前的主要消息队列分成如图3部分. 1、feed信息流主流程处理,图中中间的流程,通过相关MQ worker将数据写入cache、Redis及MySQL,以便用户浏览信息流.

redis作为消息队列的使用

- - ITeye博客
在redis支持的数据结构中,有一个是集合list. 对List的操作常见的有lpush  lrange等. 在这种常见的操作时,我们是把集合当做典型意义上的‘集合’来使用的. 往往容易被忽视的是List作为“队列”的使用情况. 反编译redis的jar包,会发现:.  pop意为“弹”,是队列里的取出元素.

高可用消息队列框架ZBUS

- - 企业架构 - ITeye博客
我们在日常开发中可以需要用到消息队列 当然我们完全可以自己写一个生产者-消费者框架 但是高可用性、实时性已经大量数据堆积时候就显得问题捉襟见肘了下面推荐的框架在我时间项目中和测试中都是非常不错那么他是什么框架呢.    zbus git地址. http://git.oschina.net/rushmore/zbus ZBUS=MQ+RPC 服务总线 1)支持消息队列, 发布订阅, RPC, 交易系统队列适配 2)亿级消息堆积能力、支持HA高可用 3)无依赖单个Jar包 ~300K 4)丰富的API--JAVA/C/C++/C#/Python/Node.JS多语言接入,支持HTTP等协议长连接入.

[转]消息队列的两种模式

- -
Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. 点对点与发布订阅最初是由JMS定义的. 这两种模式主要区别或解决的问题就是发送到队列的消息能否重复消费(多订阅).

深入浅出 消息队列 ActiveMQ

- - 编程语言 - ITeye博客
ActiveMQ 是Apache出品,最流行的、功能强大的. 即时通讯和集成模式的开源服务器. ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现. 提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能.

java多线程消息队列的实现

- - 编程语言 - ITeye博客
2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行. 3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中. if(size==0){ //队列缓存池没有消息,等待. if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理.

你可能并不需要消息队列

- - Java译站
我是一个极简主义者,我不喜欢过早或者没必要地让软件复杂化. 而往软件系统中添加组件就是严重增加复杂性的一种做法. 消息队列是一个能让你获得容错性,分布式,解耦等架构能力的系统. 或许消息列队在你的应用中有不少适用的场景. 你可以看下这篇关于消息队列优点的 文章,看看到底有哪些合适的场景. 但可不要因为说"能解耦那太好了”就轻易使用它.