Kafka重复消费和丢失数据研究 | Zollty's Blog

标签: | 发表时间:2018-07-22 19:06 | 作者:
出处:http://blog.zollty.com


Kafka重复消费原因

底层根本原因:已经消费了数据,但是offset没提交。

原因1:强行kill线程,导致消费后的数据,offset没有提交。

原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。例如:

try {

consumer.unsubscribe();

} catch (Exception e) {

}

try {

consumer.close();

} catch (Exception e) {

}

上面代码会导致部分offset没提交,下次启动时会重复消费。


原因3(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到 消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么 就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费


Kafka Consumer丢失数据原因

猜测:设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时刚好把线程kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。



解决方案:记录offset和恢复offset的方案

 

理论上记录offset,下一个group consumer可以接着记录的offset位置继续消费。

offset记录方案:

每次消费时更新每个topic+partition位置的offset在内存中,

Map<key, value>,key=topic+'-'+partition,value=offset

当调用关闭consumer线程时,把上面Map的offset数据记录到 文件中*(分布式集群可能要记录到redis中)。

 

下一次启动consumer,需要读取上一次的offset信息,方法是 以当前的topic+partition为key,从上次的Map中去寻找offset。

然后使用consumer.seek()方法指定到上次的offset位置。

 

说明:

1、该方案针对单台服务器比较简单,直接把offset记录到本地文件中即可,但是对于多台服务器集群,offset也要记录到同一个地方,并且需要做去重处理。

      如果线上程序是由多台服务器组成的集群,是否可以用一台服务器来支撑?应该可以,只是消费慢一点,没多大影响。


2、如何保证接着offset消费的数据正确性

为了确保consumer消费的数据一定是接着上一次consumer消费的数据,

consumer消费时,记录第一次取出的数据,将其offset和上次consumer最后消费的offset进行对比,如果相同则继续消费。如果不同,则停止消费,检查原因。




相关 [kafka 消费 数据] 推荐:

Kafka重复消费和丢失数据研究 | Zollty's Blog

- -
底层根本原因:已经消费了数据,但是offset没提交. 原因1:强行kill线程,导致消费后的数据,offset没有提交. 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费.

分享一些 Kafka 消费数据的小经验

- - crossoverJie's Blog
之前写过一篇 《从源码分析如何优雅的使用 Kafka 生产者》 ,有生产者自然也就有消费者. 建议对 Kakfa 还比较陌生的朋友可以先看看. 就我的使用经验来说,大部分情况都是处于数据下游的消费者角色. 也用 Kafka 消费过日均过亿的消息(不得不佩服 Kakfa 的设计),本文将借助我使用 Kakfa 消费数据的经验来聊聊如何高效的消费数据.

高速数据总线kafka介绍

- - 数据库 - ITeye博客
在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转. 有没有一个系统可以同时搞定在线应用(消息)和离线应用(数据文件,日志). 2、降低编程复杂度,各个子系统不在是相互协商接口,各个子系统类似插口插在插座上,Kafka承担高速数据总线的作用.

kafka数据可靠性深度解读

- - CSDN博客推荐文章
Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用. 目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark等都支持与Kafka集成. Kafka凭借着自身的优势,越来越受到互联网企业的青睐,唯品会也采用Kafka作为其内部核心消息引擎之一.

Avoiding Data Loss - 避免Kafka数据丢失

- -
If for some reason the producer cannot deliver messages that have been consumed and committed by the consumer, it is possible for a MirrorMaker process to lose data..

kafka消费者客户端 - sowhat1943 - 博客园

- -
消费者与消费者组之间的关系. 每一个消费者都隶属于某一个消费者组,一个消费者组可以包含一个或多个消费者,每一条消息只会被消费者组中的某一个消费者所消费. 不同消费者组之间消息的消费是互不干扰的. 消费者组出现主要是出于两个目的:. (1) 使整体的消费能力具备横向的伸缩性. 可以适当增加消费者组中消费者的数量,来提高整体的消费能力.

大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合

- - 行业应用 - ITeye博客
大数据我们都知道hadoop,但并不都是hadoop.我们该如何构建大数据库项目. 对于离线处理,hadoop还是比较适合的,但是对于实时性比较强的,数据量比较大的,我们可以采用Storm,那么Storm和什么技术搭配,才能够做一个适合自己的项目. 可以带着下面问题来阅读本文章:. 1.一个好的项目架构应该具备什么特点.

Kafka实战-数据持久化 - 哥不是小萝莉

- - 博客园_首页
今天进入Kafka实战的最后一个环节,那就是Kafka实战的结果的数据持久化.   一般,我们在进行实时计算,将结果统计处理后,需要将结果进行输出,供前端工程师去展示我们统计的结果(所说的报表). 结果的存储,这里我们选择的是Redis+MySQL进行存储,下面用一张图来展示这个持久化的流程,如下图所示:.

实用 | 从Apache Kafka到Apache Spark安全读取数据

- - IT瘾-bigdata
随着在CDH平台上物联网(IoT)使用案例的不断增加,针对这些工作负载的安全性显得至关重要. 本篇博文对如何以安全的方式在Spark中使用来自Kafka的数据,以及针对物联网(IoT)使用案例的两个关键组件进行了说明. Cloudera Distribution of Apache Kafka 2.0.0版本(基于Apache Kafka 0.9.0)引入了一种新型的Kafka消费者API,可以允许消费者从安全的Kafka集群中读取数据.

记一次kafka数据丢失问题的排查 - CSDN博客

- -
数据丢失为大事,针对数据丢失的问题我们排查结果如下. 第二:是在什么地方丢失的数据,是否是YDB的问题.     数据丢失是在导入阶段,数据并没有写入到Kafka里面,所以YDB也就不会从Kafka里面消费到缺失的数据,数据丢失与延云YDB无关.     1.测试数据会一共创建365个分区,每个分区均是9亿数据,如果最终每个分区还是9亿(多一条少一条均不行),则数据完整.