spark streaming 从kafka 拉数据如何保证数据不丢失 | sunbiaobiao

标签: | 发表时间:2018-01-10 17:46 | 作者:
出处:http://coolplayer.net

spark streaming 从kafka 拉数据如何保证数据不丢失

为什么使用 direct 方式

Direct Approach VS Receiver-based Approach

  • 因为按需拉数据,所以不存在缓冲区,就不用担心缓冲区把内存撑爆了。这个在Receiver-based Approach 就比较麻烦,你需要通过spark.streaming.blockInterval等参数来调整。
  • 数据默认就被分布到了多个Executor上。Receiver-based Approach 你需要做特定的处理,才能让 Receiver分不到多个Executor上。
  • Receiver-based Approach 的方式,一旦你的Batch Processing 被delay了,或者被delay了很多个batch,那估计你的Spark Streaming程序离奔溃也就不远了。 Direct Approach (No Receivers) 则完全不会存在类似问题。就算你delay了很多个batch time,你内存中的数据只有这次处理的。

  • 能保证exact once 语意, 图解参考

spark streaming 里面应该怎么做

  • spark 可以使用 Direct 和 Receiver-based 两种方式从kafka中取数据,显然我们应该选用Direct 方式
  • 在处理过程中应该考虑如何 recovery, 这样我们需要把每个batch中的分区消费位置持久化存储在hdfs上, 为了重启的时候可以从上次断掉的位置继续消费

实现思路如下

我们可以 override StreamingListener 的onBatchCompleted函数, 在每个 batch 处理完的时候保存一下当前处理的kafka 的 offset, offset 数组信息可以从InputInfoTracker中获取,

重启的时候可以从持久化目录里面获取最后消费的分区消费位置数组, 然后设置一下 DirectKafkaInputDStream 的 currentOffsets 字段, 就可以做到从上次断掉的位置继续消费

关键代码如下

            
streamingContext.addStreamingListener(new BatchStreamingListener(this))
            
class BatchStreamingListener(runtime: SparkStreamingRuntime) extends StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
operator.directKafkaRecoverSource.saveJobSate(time)
}
}
            
val info = ssc.scheduler.inputInfoTracker.getInfo(time)
info(jobName).metadata("offsets")

这里确保每一批数据消费完了之后,就持久化kafka中的分区offset 数组, 可以从 inputInfoTracker 获取每个batch 中处理的offset数组,写成一个文件,下面就是根据参数中设置的hdfs持久化路径上传到hdfs

            
val field = classOf[DirectKafkaInputDStream[_, _, _, _, _]].getDeclaredField("currentOffsets")
field.setAccessible(true)
field.set(dkid, state)

下面就简单了,在下次重启spark streaming 的时候遍历hdfs持久化路径中最末尾的文件, 找到其中保存的kafka offset 数组, 然后设置 DirectKafkaInputDStream 中的currentOffsets 字段, 然后就可以做到从上次断掉的位置继续消费

测试

下面模拟一个spark streaming 从kafka中消费数据的场景, 由于spark streming 中是direct 的方式, offset的数组没有打入 zookeeper , 所以kafka 中自带的监控工具是生效的, 这里有一个比较 trick的方法, 你可以使用 bin/kafka-console-consumer.sh 工具消费同一个topic, 这样的话你就可以从kafka 的监控工具中看到消息写到哪个位置了, 然后再观察你spark 里面持久化的位置数组, 然后就可以确认是从上次断掉的位置继续消费还是从最新的位置消费

相关 [spark streaming kafka] 推荐:

Kafka+Spark Streaming+Redis实时计算整合实践

- - 简单之美
基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性. 这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算.

Spark Streaming+kafka订单实时统计实现

- - CSDN博客推荐文章
前几篇文章我们分别学习Spark RDD和PairRDD编程,本文小编将通过简单实例来加深对RDD的理解. 开发环境:window7+eclipse+jdk1.7. 部署环境:linux+zookeeper+kafka+hadoop+spark. 本实例开发之前,默认已搭好了开发环境和部署环境,如果未搭建,可以参考本人相关大数据开发搭建博客.

Spark Streaming vs. Kafka Stream 哪个更适合你

- - IT瘾-bigdata
作者:Mahesh Chand Kandpal. 译者注:本文介绍了两大常用的流式处理框架,Spark Streaming和Kafka Stream,并对他们各自的特点做了详细说明,以帮助读者在不同的场景下对框架进行选择. 流式处理的需求每天都在增加,仅仅对大量的数据进行处理是不够的. 数据必须快速地得到处理,以便企业能够实时地对不断变化的业务环境做出反应.

spark streaming 从kafka 拉数据如何保证数据不丢失 | sunbiaobiao

- -
spark streaming 从kafka 拉数据如何保证数据不丢失. 为什么使用 direct 方式. 因为按需拉数据,所以不存在缓冲区,就不用担心缓冲区把内存撑爆了. 这个在Receiver-based Approach 就比较麻烦,你需要通过spark.streaming.blockInterval等参数来调整.

Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统

- -
Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统. 2015 年 7 月 27 日发布. 在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要.

[原]Spark Streaming原理简析

- - 张包峰的博客
StreamingContext实例化的时候,需要传入一个 SparkContext,然后指定要连接的 spark matser url,即连接一个 spark engine,用于获得executor. 实例化之后,首先,要指定一个接收数据的方式,如. 这样从socket接收文本数据. 这个步骤返回的是一个 ReceiverInputDStream的实现,内含 Receiver,可接收数据并转化为RDD放内存里.

Spark整合Kafka小项目

- - 互联网 - ITeye博客
SparkStreaming与kafka整合小项目实践含所有代码带详细注释. 总流程:自制日志生成器生成含数据日志,使用kafkaAppender直接发送到kafka,SparkStreaming从kafka消费日志,并流式处理将结果发送到kafka另一个topic,Java后台从kafka消费日志分析结果,实现秒级大数据实时分析展示.

Spark Streaming 自定义接收器

- - zzm
Spark Streaming可以从任意数据源接受流数据,而不仅仅是那些内置支持的数据源(如Flume、kafka等). 这就要求开发人员实现一个接收器(recevier),用于接收来自有关数据源的数据. 本篇手册以一个自定义的接收器(recevier)实现和其在spark streaming中的应为为主线进行讲解.

Spark Streaming + Elasticsearch构建App异常监控平台

- - 美团点评技术团队
本文已发表在《程序员》杂志2016年10月期. 如果在使用App时遇到闪退,你可能会选择卸载App、到应用商店怒斥开发者等方式来表达不满. 但开发者也同样感到头疼,因为崩溃可能意味着用户流失、营收下滑. 为了降低崩溃率,进而提升App质量,App开发团队需要实时地监控App异常. 一旦发现严重问题,及时进行热修复,从而把损失降到最低.

Spark Streaming 1.6 流式状态管理分析 - 简书

- -
Spark 1.6发布后,官方声称流式状态管理有10倍性能提升. 这篇文章会详细介绍Spark Streaming里新的流式状态管理. 在流式计算中,数据是持续不断来的,有时候我们要对一些数据做跨周期(Duration)的统计,这个时候就不得不维护状态了. 而状态管理对Spark 的 RDD模型是个挑战,因为在spark里,任何数据集都需要通过RDD来呈现,而RDD 的定义是一个不变的分布式集合.