val topic : String = "topic_name" //消费的 topic 名字
val topics : Set[String] = Set(topic) //创建 stream 时使用的 topic 名字集合
val topicDirs = new ZKGroupTopicDirs( "test_spark_streaming_group" , topic) //创建一个 ZKGroupTopicDirs 对象,对保存
val zkTopicPath = s "${topicDirs.consumerOffsetDir}" 获取 zookeeper 中的路径,这里会变成 /consumers/test _ spark _ streaming _ group/offsets/topic _ name
val zkClient = new ZkClient( "10.4.232.77:2181" ) //zookeeper 的host 和 ip,创建一个 client
val children = zkClient.countChildren(s "${topicDirs.consumerOffsetDir}" ) //查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)
var kafkaStream : InputDStream[(String, String)] = null
var fromOffsets : Map[TopicAndPartition, Long] = Map() //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
if (children > 0 ) { //如果保存过 offset,这里更好的做法,还应该和 kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误
for (i <- 0 untilchildren) {
val partitionOffset = zkClient.readData[String](s "${topicDirs.consumerOffsetDir}/${i}" )
val tp = TopicAndPartition(topic, i)
fromOffsets + = (tp -> partitionOffset.toLong) //将不同 partition 对应的 offset 增加到 fromOffsets 中
logInfo( "@@@@@@ topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@" )
}
val messageHandler = (mmd : MessageAndMetadata[String, String]) = > (mmd.topic, mmd.message()) //这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParam, fromOffsets, messageHandler)
}
else {
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics) //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset
}
var offsetRanges = Array[OffsetRange]()
kafkaStream.transform{ rdd = >
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset
rdd
}.map(msg = > Utils.msgDecode(msg)).foreachRDD { rdd = >
for (o <- offsetRanges) {
val zkPath = s "${topicDirs.consumerOffsetDir}/${o.partition}"
ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString) //将该 partition 的 offset 保存到 zookeeper
logInfo(s "@@@@@@ topic ${o.topic} partition ${o.partition} fromoffset ${o.fromOffset} untiloffset ${o.untilOffset} #######" )
}
rdd.foreachPartition(
message = > {
while (message.hasNext) {
logInfo(s "@^_^@ [" + message.next() + "] @^_^@" )
}
}
)
}