spark streaming 从kafka 拉数据如何保证数据不丢失 | sunbiaobiao

标签: | 发表时间:2018-01-10 17:46 | 作者:
出处:http://coolplayer.net

spark streaming 从kafka 拉数据如何保证数据不丢失

为什么使用 direct 方式

Direct Approach VS Receiver-based Approach

  • 因为按需拉数据,所以不存在缓冲区,就不用担心缓冲区把内存撑爆了。这个在Receiver-based Approach 就比较麻烦,你需要通过spark.streaming.blockInterval等参数来调整。
  • 数据默认就被分布到了多个Executor上。Receiver-based Approach 你需要做特定的处理,才能让 Receiver分不到多个Executor上。
  • Receiver-based Approach 的方式,一旦你的Batch Processing 被delay了,或者被delay了很多个batch,那估计你的Spark Streaming程序离奔溃也就不远了。 Direct Approach (No Receivers) 则完全不会存在类似问题。就算你delay了很多个batch time,你内存中的数据只有这次处理的。

  • 能保证exact once 语意, 图解参考

spark streaming 里面应该怎么做

  • spark 可以使用 Direct 和 Receiver-based 两种方式从kafka中取数据,显然我们应该选用Direct 方式
  • 在处理过程中应该考虑如何 recovery, 这样我们需要把每个batch中的分区消费位置持久化存储在hdfs上, 为了重启的时候可以从上次断掉的位置继续消费

实现思路如下

我们可以 override StreamingListener 的onBatchCompleted函数, 在每个 batch 处理完的时候保存一下当前处理的kafka 的 offset, offset 数组信息可以从InputInfoTracker中获取,

重启的时候可以从持久化目录里面获取最后消费的分区消费位置数组, 然后设置一下 DirectKafkaInputDStream 的 currentOffsets 字段, 就可以做到从上次断掉的位置继续消费

关键代码如下

            
streamingContext.addStreamingListener(new BatchStreamingListener(this))
            
class BatchStreamingListener(runtime: SparkStreamingRuntime) extends StreamingListener {
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
operator.directKafkaRecoverSource.saveJobSate(time)
}
}
            
val info = ssc.scheduler.inputInfoTracker.getInfo(time)
info(jobName).metadata("offsets")

这里确保每一批数据消费完了之后,就持久化kafka中的分区offset 数组, 可以从 inputInfoTracker 获取每个batch 中处理的offset数组,写成一个文件,下面就是根据参数中设置的hdfs持久化路径上传到hdfs

            
val field = classOf[DirectKafkaInputDStream[_, _, _, _, _]].getDeclaredField("currentOffsets")
field.setAccessible(true)
field.set(dkid, state)

下面就简单了,在下次重启spark streaming 的时候遍历hdfs持久化路径中最末尾的文件, 找到其中保存的kafka offset 数组, 然后设置 DirectKafkaInputDStream 中的currentOffsets 字段, 然后就可以做到从上次断掉的位置继续消费

测试

下面模拟一个spark streaming 从kafka中消费数据的场景, 由于spark streming 中是direct 的方式, offset的数组没有打入 zookeeper , 所以kafka 中自带的监控工具是生效的, 这里有一个比较 trick的方法, 你可以使用 bin/kafka-console-consumer.sh 工具消费同一个topic, 这样的话你就可以从kafka 的监控工具中看到消息写到哪个位置了, 然后再观察你spark 里面持久化的位置数组, 然后就可以确认是从上次断掉的位置继续消费还是从最新的位置消费

相关 [spark streaming kafka] 推荐:

Spark Streaming 与 Kafka 整合的改进 | SmartSi

- -
Apache Kafka 正在迅速成为最受欢迎的开源流处理平台之一. 我们在 Spark Streaming 中也看到了同样的趋势. 因此,在 Apache Spark 1.3 中,我们专注于对 Spark Streaming 与 Kafka 集成进行重大改进. 为 Kafka 新增了 Direct API - 这允许每个 Kafka 记录在发生故障时只处理一次,并且不使用  Write Ahead Logs.

Kafka+Spark Streaming+Redis实时计算整合实践

- - 简单之美
基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性. 这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算.

Spark Streaming+kafka订单实时统计实现

- - CSDN博客推荐文章
前几篇文章我们分别学习Spark RDD和PairRDD编程,本文小编将通过简单实例来加深对RDD的理解. 开发环境:window7+eclipse+jdk1.7. 部署环境:linux+zookeeper+kafka+hadoop+spark. 本实例开发之前,默认已搭好了开发环境和部署环境,如果未搭建,可以参考本人相关大数据开发搭建博客.

Spark Streaming vs. Kafka Stream 哪个更适合你

- - IT瘾-bigdata
作者:Mahesh Chand Kandpal. 译者注:本文介绍了两大常用的流式处理框架,Spark Streaming和Kafka Stream,并对他们各自的特点做了详细说明,以帮助读者在不同的场景下对框架进行选择. 流式处理的需求每天都在增加,仅仅对大量的数据进行处理是不够的. 数据必须快速地得到处理,以便企业能够实时地对不断变化的业务环境做出反应.

实时流计算、Spark Streaming、Kafka、Redis、Exactly-once、实时去重

- - lxw的大数据田地
本文想记录和表达的东西挺多的,一时想不到什么好的标题,所以就用上面的关键字作为标题了. 在实时流式计算中,最重要的是在任何情况下,消息不重复、不丢失,即Exactly-once. 本文以Kafka–>Spark Streaming–>Redis为例,一方面说明一下如何做到Exactly-once,另一方面说明一下我是如何计算实时去重指标的.

spark streaming 从kafka 拉数据如何保证数据不丢失 | sunbiaobiao

- -
spark streaming 从kafka 拉数据如何保证数据不丢失. 为什么使用 direct 方式. 因为按需拉数据,所以不存在缓冲区,就不用担心缓冲区把内存撑爆了. 这个在Receiver-based Approach 就比较麻烦,你需要通过spark.streaming.blockInterval等参数来调整.

Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统

- -
Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统. 2015 年 7 月 27 日发布. 在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要.

将 Spark Streaming + Kafka direct 的 offset 存入Zookeeper并重用-Spark-about云开发-活到老 学到老

- -
使用Direct API时为什么需要见offset保存到Zookeeper中. 如何将offset存入到Zookeeper中. 如何解决Zookeeper中offset过期问题. 实现将offset存入Zookeeper. 在 Spark Streaming 中消费 Kafka 数据的时候,有两种方式分别是 1)基于 Receiver-based 的 createStream 方法和 2)Direct Approach (No Receivers) 方式的 createDirectStream 方法,详细的可以参考 .

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式 - CSDN博客

- -
Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据了. 一、基于Receiver的方式. 这种方式使用Receiver来获取数据.

[原]Spark Streaming原理简析

- - 张包峰的博客
StreamingContext实例化的时候,需要传入一个 SparkContext,然后指定要连接的 spark matser url,即连接一个 spark engine,用于获得executor. 实例化之后,首先,要指定一个接收数据的方式,如. 这样从socket接收文本数据. 这个步骤返回的是一个 ReceiverInputDStream的实现,内含 Receiver,可接收数据并转化为RDD放内存里.