将 Spark Streaming + Kafka direct 的 offset 存入Zookeeper并重用-Spark-about云开发-活到老 学到老

标签: | 发表时间:2018-05-17 15:48 | 作者:
出处:http://www.aboutyun.com
问题导读:
1. 使用Direct API时为什么需要见offset保存到Zookeeper中?
2. 如何将offset存入到Zookeeper中?
3. 如何解决Zookeeper中offset过期问题?


实现将offset存入Zookeeper

在 Spark Streaming 中消费 Kafka 数据的时候,有两种方式分别是 1)基于 Receiver-based 的 createStream 方法和 2)Direct Approach (No Receivers) 方式的 createDirectStream 方法,详细的可以参考  Spark Streaming + Kafka Integration Guide ,但是第二种使用方式中  kafka 的 offset 是保存在 checkpoint 中的,如果程序重启的话,会丢失一部分数据,可以参考    Spark & Kafka - Achieving zero data-loss 。

本文主要讲在使用第二种消费方式(Direct Approach)的情况下,如何将 kafka 中的 offset 保存到 zookeeper 中,以及如何从 zookeeper 中读取已存在的 offset。

大致思想就是,在初始化 kafka stream 的时候,查看 zookeeper 中是否保存有 offset,有就从该 offset 进行读取,没有就从最新/旧进行读取。在消费 kafka 数据的同时,将每个 partition 的 offset 保存到 zookeeper 中进行备份,具体实现参考下面代码
[Scala] 纯文本查看 复制代码
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
val topic : String = "topic_name"  //消费的 topic 名字
     val topics : Set[String] = Set(topic)                    //创建 stream 时使用的 topic 名字集合
  
     val topicDirs = new ZKGroupTopicDirs( "test_spark_streaming_group" , topic)  //创建一个 ZKGroupTopicDirs 对象,对保存
     val zkTopicPath = s "${topicDirs.consumerOffsetDir}"          获取 zookeeper 中的路径,这里会变成 /consumers/test _ spark _ streaming _ group/offsets/topic _ name
  
     val zkClient = new ZkClient( "10.4.232.77:2181" )          //zookeeper 的host 和 ip,创建一个 client
     val children = zkClient.countChildren(s "${topicDirs.consumerOffsetDir}" )    //查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)
  
     var kafkaStream : InputDStream[(String, String)] = null 
     var fromOffsets : Map[TopicAndPartition, Long] = Map()  //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
  
     if (children > 0 ) {  //如果保存过 offset,这里更好的做法,还应该和  kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误
         for (i <- 0 untilchildren) {
           val partitionOffset = zkClient.readData[String](s "${topicDirs.consumerOffsetDir}/${i}" )
           val tp = TopicAndPartition(topic, i)
           fromOffsets + = (tp -> partitionOffset.toLong)  //将不同 partition 对应的 offset 增加到 fromOffsets 中
           logInfo( "@@@@@@ topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@" )
         }
  
         val messageHandler = (mmd : MessageAndMetadata[String, String]) = > (mmd.topic, mmd.message())  //这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
         kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParam, fromOffsets, messageHandler)
     }
     else {
         kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics) //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset
     }
  
     var offsetRanges = Array[OffsetRange]()
     kafkaStream.transform{ rdd = >
       offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset
       rdd
     }.map(msg = > Utils.msgDecode(msg)).foreachRDD { rdd = >
       for (o <- offsetRanges) {
         val zkPath = s "${topicDirs.consumerOffsetDir}/${o.partition}"
         ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)  //将该 partition 的 offset 保存到 zookeeper
         logInfo(s "@@@@@@ topic  ${o.topic}  partition ${o.partition}  fromoffset ${o.fromOffset}  untiloffset ${o.untilOffset} #######" )
       }
  
       rdd.foreachPartition(
         message = > {
           while (message.hasNext) {
             logInfo(s "@^_^@   [" + message.next() + "] @^_^@" )
           }
         }
       )
     }
使用上面的代码,我们可以做到 Spark Streaming 程序从 Kafka 中读取数据是不丢失


解决Zookeeper中保存的offset过期问题

上一篇文章中,我们讲了如何在将 offset 保存在 zk 中,以及进行重用,但是程序中有个小问题“如果程序停了很长很长一段后再启动,zk 中保存的 offset 已经过期了,那会怎样呢?”本文将解决这个问题

如果 kafka 上的 offset 已经过期,那么就会报 OffsetOutOfRange 的异常,因为之前保存在 zk 的 offset 已经 topic 中找不到了。所以我们需要在 从 zk 找到 offset 的这种情况下增加一个判断条件,如果 zk 中保存的 offset 小于当前 kafka topic 中最小的 offset,则设置为 kafka topic 中最小的 offset。假设我们上次保存在 zk 中的 offset 值为 123(某一个 partition),然后程序停了一周,现在 kafka topic 的最小 offset 变成了 200,那么用前文的代码,就会得到 OffsetOutOfRange 的异常,因为 123 对应的数据已经找不到了。下面我们给出,如何获取 <topic, parition> 的最小 offset,这样我们就可以进行对比了
[Scala] 纯文本查看 复制代码
?
01
02
03
04
05
06
07
08
09
10
11
val partitionOffset = zkClient.readData[String](s "${topicDirs.consumerOffsetDir}/${i}" )
val tp = TopicAndPartition(topic, i)
  
val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1 )))
val consumerMin = new SimpleConsumer( "broker_host" , 9092 , 10000 , 10000 , "getMinOffset" //注意这里的 broker_host,因为这里会导致查询不到,解决方法在下面
val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
var nextOffset = partitionOffset.toLong
if (curOffsets.length > 0 && nextOffset < curOffsets.head) {  // 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择
   nextOffset = curOffsets.head
}
fromOffsets + = (tp -> nextOffset) //设置正确的 offset,这里将 nextOffset 设置为 0(0 只是一个特殊值),可以观察到 offset 过期的想想

但是上面的代码有一定的问题,因为我们从 kafka 上获取 offset 的时候,需要寻找对应的 leader,从 leader 来获取 offset,而不是 broker,不然可能得到的 curOffsets 会是空的(表示获取不到)。下面的代码就是获取不同 partition 的 leader 相关代码
[Scala] 纯文本查看 复制代码
?
01
02
03
04
05
06
07
08
09
10
11
12
val topic _ name = "topic_name"     //topic_name 表示我们希望获取的 topic 名字
val topic 2 = List(topic _ name)      
val req = new TopicMetadataRequest(topic 2 , 0 )
val getLeaderConsumer = new SimpleConsumer( "broker_host" , 9092 , 10000 , 10000 , "OffsetLookup" // 第一个参数是 kafka broker 的host,第二个是 port
val res = getLeaderConsumer.send(req)
val topicMetaOption = res.topicsMetadata.headOption
val partitions = topicMetaOption match {
   case Some(tm) = >
     tm.partitionsMetadata.map(pm = > (pm.partitionId, pm.leader.get.host)).toMap[Int, String]  // 将结果转化为 partition -> leader 的映射关系
   case None = >
     Map[Int, String]()
}

上面的代码能够得到所有 partition 的 leader 地址,然后将 leader 地址替换掉上面第一份代码中的 broker_list 即可。
到此,在 spark streaming 中将 kafka 的 offset 保存到 zk,并重用的大部分情况都覆盖到了

相关 [spark streaming kafka] 推荐:

Spark Streaming 与 Kafka 整合的改进 | SmartSi

- -
Apache Kafka 正在迅速成为最受欢迎的开源流处理平台之一. 我们在 Spark Streaming 中也看到了同样的趋势. 因此,在 Apache Spark 1.3 中,我们专注于对 Spark Streaming 与 Kafka 集成进行重大改进. 为 Kafka 新增了 Direct API - 这允许每个 Kafka 记录在发生故障时只处理一次,并且不使用  Write Ahead Logs.

Kafka+Spark Streaming+Redis实时计算整合实践

- - 简单之美
基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性. 这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算.

Spark Streaming+kafka订单实时统计实现

- - CSDN博客推荐文章
前几篇文章我们分别学习Spark RDD和PairRDD编程,本文小编将通过简单实例来加深对RDD的理解. 开发环境:window7+eclipse+jdk1.7. 部署环境:linux+zookeeper+kafka+hadoop+spark. 本实例开发之前,默认已搭好了开发环境和部署环境,如果未搭建,可以参考本人相关大数据开发搭建博客.

Spark Streaming vs. Kafka Stream 哪个更适合你

- - IT瘾-bigdata
作者:Mahesh Chand Kandpal. 译者注:本文介绍了两大常用的流式处理框架,Spark Streaming和Kafka Stream,并对他们各自的特点做了详细说明,以帮助读者在不同的场景下对框架进行选择. 流式处理的需求每天都在增加,仅仅对大量的数据进行处理是不够的. 数据必须快速地得到处理,以便企业能够实时地对不断变化的业务环境做出反应.

实时流计算、Spark Streaming、Kafka、Redis、Exactly-once、实时去重

- - lxw的大数据田地
本文想记录和表达的东西挺多的,一时想不到什么好的标题,所以就用上面的关键字作为标题了. 在实时流式计算中,最重要的是在任何情况下,消息不重复、不丢失,即Exactly-once. 本文以Kafka–>Spark Streaming–>Redis为例,一方面说明一下如何做到Exactly-once,另一方面说明一下我是如何计算实时去重指标的.

spark streaming 从kafka 拉数据如何保证数据不丢失 | sunbiaobiao

- -
spark streaming 从kafka 拉数据如何保证数据不丢失. 为什么使用 direct 方式. 因为按需拉数据,所以不存在缓冲区,就不用担心缓冲区把内存撑爆了. 这个在Receiver-based Approach 就比较麻烦,你需要通过spark.streaming.blockInterval等参数来调整.

Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统

- -
Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统. 2015 年 7 月 27 日发布. 在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要.

将 Spark Streaming + Kafka direct 的 offset 存入Zookeeper并重用-Spark-about云开发-活到老 学到老

- -
使用Direct API时为什么需要见offset保存到Zookeeper中. 如何将offset存入到Zookeeper中. 如何解决Zookeeper中offset过期问题. 实现将offset存入Zookeeper. 在 Spark Streaming 中消费 Kafka 数据的时候,有两种方式分别是 1)基于 Receiver-based 的 createStream 方法和 2)Direct Approach (No Receivers) 方式的 createDirectStream 方法,详细的可以参考 .

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式 - CSDN博客

- -
Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据了. 一、基于Receiver的方式. 这种方式使用Receiver来获取数据.

[原]Spark Streaming原理简析

- - 张包峰的博客
StreamingContext实例化的时候,需要传入一个 SparkContext,然后指定要连接的 spark matser url,即连接一个 spark engine,用于获得executor. 实例化之后,首先,要指定一个接收数据的方式,如. 这样从socket接收文本数据. 这个步骤返回的是一个 ReceiverInputDStream的实现,内含 Receiver,可接收数据并转化为RDD放内存里.