Spark Streaming 与 Kafka 整合的改进 | SmartSi

标签: | 发表时间:2018-04-13 10:23 | 作者:
出处:http://smartsi.club

Apache Kafka 正在迅速成为最受欢迎的开源流处理平台之一。我们在 Spark Streaming 中也看到了同样的趋势。因此,在 Apache Spark 1.3 中,我们专注于对 Spark Streaming 与 Kafka 集成进行重大改进。主要增加如下:

  • 为 Kafka 新增了 Direct API - 这允许每个 Kafka 记录在发生故障时只处理一次,并且不使用  Write Ahead Logs。这使得 Spark Streaming + Kafka 流水线更高效,同时提供更强大的容错保证。
  • 为 Kafka 新增了 Python API - 这样你就可以在 Python 中处理 Kafka 数据。

在本文中,我们将更详细地讨论这些改进。

1. Direct API

Spark Streaming 自成立以来一直支持 Kafka,Spark Streaming 与 Kafka 在生产环境中的很多地方一起使用。但是,Spark 社区要求更好的容错保证和更强的可靠性语义。为了满足这一需求,Spark 1.2 引入了  Write Ahead Logs(WAL)。它可以确保在发生故障时从任何可靠的数据源(即Flume,Kafka和Kinesis等事务源)接收的数据不会丢失(即至少一次语义)。即使对于像 plain-old 套接字这样的不可靠(即非事务性)数据源,它也可以最大限度地减少数据的丢失。

然而,对于允许从数据流中的任意位置重放数据流的数据源(例如 Kafka),我们可以实现更强大的容错语义,因为这些数据源让 Spark Streaming 可以更好地控制数据流的消费。Spark 1.3 引入了 Direct API 概念,即使不使用 Write Ahead Logs 也可以实现 exactly-once 语义。让我们来看看集成 Apache Kafka 的 Spark Direct API 的细节。

2. 我们是如何构建它?

从高层次的角度看,之前的 Kafka 集成与  Write Ahead Logs(WAL)一起工作如下:

(1) 运行在 Spark workers/executors 上的 Kafka Receivers 连续不断地从 Kafka 中读取数据,这用到了 Kafka 高级消费者API。

(2) 接收到的数据存储在 Spark 的 worker/executor的内存上,同时写入到 WAL(拷贝到HDFS)上。Kafka Receiver 只有在数据保存到日志后才会更新 Zookeeper中的 Kafka 偏移量。

(3) 接收到的数据及其WAL存储位置信息也可靠地存储。在出现故障时,这些信息用于从故障中恢复,重新读取数据并继续处理。

虽然这种方法可以确保从 Kafka 接收的数据不会丢失,但是在失败的时候,某些记录仍然有可能会被多次被处理(即 at-least-once 语义)。这种情况在一些接收到的数据被可靠地保存到 WAL 中,但是在更新 Zookeeper 中相应的 Kafka 偏移量之前失败时会发生(译者注:即已经保存到WAL,但是还没有来得及更新 Zookeeper 中的 Kafka 偏移量)。从而导致了不一致的情况 - Spark Streaming 认为数据已被接收,但 Kafka 认为数据还未成功发送,因为 Zookeeper 中的偏移未更新。因此,在系统从故障中恢复后,Kafka 会再一次发送数据。

出现这种不一致的原因是两个系统无法对描述已发送内容的信息进行原子更新。为了避免这种情况,只需要一个系统来维护已发送或接收的内容的一致性视图。此外,这个系统需要有从故障中恢复时重放数据流的一切控制权。因此,我们决定所有消费的偏移量信息只保存在 Spark Streaming 中,这些信息可以使用 Kafka 的  Simple Consumer API 根据故障需要重放任意偏移量的数据来从故障中恢复。

为了构建这个系统,新的 Direct Kafka API 采用与 Receivers 和 WAL 完全不同的方法。与使用 Receivers 连续接收数据并将其存储在 WAL 中不同,我们只需在给出每个批次开始时要使用的偏移量范围。之后,在执行每个批次的作业时,将从 Kafka 中读取与偏移量范围对应的数据进行处理(与读取HDFS文件的方式类似)。这些偏移量也能可靠地保存()并用于重新计算数据以从故障中恢复。

请注意,Spark Streaming 可以在失败以后重新读取和处理来自 Kafka 的流片段以从故障中恢复。但是,由于 RDD 转换的 exactly-once 语义,最终重新计算的结果与在没有失败的结果完全相同。

因此,Direct API 消除了对 Kafka 的 WAL 和 Receivers 的依赖,同时确保每个 Kafka 记录都被 Spark Streaming 有效地接收一次。这允许我们用端到端的 exactly-once 语义将 Spark Streaming 与 Kafka 进行整合。总的来说,它使得这样的流处理流水线更加容错,高效并且更易于使用。

3. 如何来使用

新的API相比之前的更加容易使用:

// Define the Kafka parameters, broker list must be specified     
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092,anotherhost:9092")

// Define which topics to read from
val topics = Set("sometopic", "anothertopic")

// Create the direct stream with the Kafka parameters and topics
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)

由于这种 Direct API 没有使用 Receivers,因此你不必担心如何创建多个输入 DStream 来创建多个 Receivers。你也不需要考虑每个 Receiver 消费的 Kafka partition 的数量。每个 Kafka partition 将自动的并行读取。此外,每个 Kafka partition 与 RDD partition 一一对应,从而简化了并行模型。

除了新的流处理API之外,我们还引入了 KafkaUtils.createRDD(),它可用于在 Kafka 数据上运行批处理作业。

// Define the offset ranges to read in the batch job     
val offsetRanges = Array(

OffsetRange("some-topic", 0, 110, 220),
OffsetRange("some-topic", 1, 100, 313),
OffsetRange("another-topic", 0, 456, 789)

)

// Create the RDD based on the offset ranges
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](sparkContext, kafkaParams, offsetRanges)

如果你想了解更多关于API和它如何实现的细节,请看下面的内容:

4. Python 中的Kafka API

在 Spark 1.2 中,添加了 Spark Streaming 的基本 Python API,因此开发人员可以使用 Python 编写分布式流处理应用程序。在 Spark 1.3 中,扩展了 Python API 来包含Kafka。借此,在 Python 中使用 Kafka 编写流处理应用程序变得轻而易举。这是一个示例代码。

kafkaStream = KafkaUtils.createStream(streamingContext,     

"zookeeper-server:2181", "consumer-group", {"some-topic": 1})

lines = kafkaStream.map(lambda x: x[1])

查看完整的 示例和  python文档。运行该示例的说明可以在 Kafka 集成指南中找到。请注意,对于使用 Kafka API 运行示例或任何 python 应用程序,你必须将 Kafka Maven 依赖关系添加到路径中。这可以在 Spark 1.3 中轻松完成,因为你可以直接将 Maven 依赖关系添加到 spark-submit (推荐的方式来启动Spark应用程序)。

原文: https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html

相关 [spark streaming kafka] 推荐:

Spark Streaming 与 Kafka 整合的改进 | SmartSi

- -
Apache Kafka 正在迅速成为最受欢迎的开源流处理平台之一. 我们在 Spark Streaming 中也看到了同样的趋势. 因此,在 Apache Spark 1.3 中,我们专注于对 Spark Streaming 与 Kafka 集成进行重大改进. 为 Kafka 新增了 Direct API - 这允许每个 Kafka 记录在发生故障时只处理一次,并且不使用  Write Ahead Logs.

Kafka+Spark Streaming+Redis实时计算整合实践

- - 简单之美
基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性. 这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算.

Spark Streaming+kafka订单实时统计实现

- - CSDN博客推荐文章
前几篇文章我们分别学习Spark RDD和PairRDD编程,本文小编将通过简单实例来加深对RDD的理解. 开发环境:window7+eclipse+jdk1.7. 部署环境:linux+zookeeper+kafka+hadoop+spark. 本实例开发之前,默认已搭好了开发环境和部署环境,如果未搭建,可以参考本人相关大数据开发搭建博客.

Spark Streaming vs. Kafka Stream 哪个更适合你

- - IT瘾-bigdata
作者:Mahesh Chand Kandpal. 译者注:本文介绍了两大常用的流式处理框架,Spark Streaming和Kafka Stream,并对他们各自的特点做了详细说明,以帮助读者在不同的场景下对框架进行选择. 流式处理的需求每天都在增加,仅仅对大量的数据进行处理是不够的. 数据必须快速地得到处理,以便企业能够实时地对不断变化的业务环境做出反应.

实时流计算、Spark Streaming、Kafka、Redis、Exactly-once、实时去重

- - lxw的大数据田地
本文想记录和表达的东西挺多的,一时想不到什么好的标题,所以就用上面的关键字作为标题了. 在实时流式计算中,最重要的是在任何情况下,消息不重复、不丢失,即Exactly-once. 本文以Kafka–>Spark Streaming–>Redis为例,一方面说明一下如何做到Exactly-once,另一方面说明一下我是如何计算实时去重指标的.

【翻译】Spark Streaming 管理 Kafka Offsets 的方式探讨 - 简书

- -
Cloudera Engineering Blog 翻译:Offset Management For Apache Kafka With Apache Spark Streaming. Spark Streaming 应用从Kafka中获取信息是一种常见的场景. 从Kafka中读取持续不断的数据将有很多优势,例如性能好、速度快.

spark streaming 从kafka 拉数据如何保证数据不丢失 | sunbiaobiao

- -
spark streaming 从kafka 拉数据如何保证数据不丢失. 为什么使用 direct 方式. 因为按需拉数据,所以不存在缓冲区,就不用担心缓冲区把内存撑爆了. 这个在Receiver-based Approach 就比较麻烦,你需要通过spark.streaming.blockInterval等参数来调整.

Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统

- -
Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统. 2015 年 7 月 27 日发布. 在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要.

将 Spark Streaming + Kafka direct 的 offset 存入Zookeeper并重用-Spark-about云开发-活到老 学到老

- -
使用Direct API时为什么需要见offset保存到Zookeeper中. 如何将offset存入到Zookeeper中. 如何解决Zookeeper中offset过期问题. 实现将offset存入Zookeeper. 在 Spark Streaming 中消费 Kafka 数据的时候,有两种方式分别是 1)基于 Receiver-based 的 createStream 方法和 2)Direct Approach (No Receivers) 方式的 createDirectStream 方法,详细的可以参考 .

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式 - CSDN博客

- -
Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据了. 一、基于Receiver的方式. 这种方式使用Receiver来获取数据.