[原]Spark Streaming原理简析

标签: | 发表时间:2015-03-19 23:23 | 作者:zbf8441372
出处:http://blog.csdn.net/zbf8441372

执行流程

数据的接收

StreamingContext实例化的时候,需要传入一个 SparkContext,然后指定要连接的 spark matser url,即连接一个 spark engine,用于获得executor。

实例化之后,首先,要指定一个接收数据的方式,如

  val lines = ssc.socketTextStream("localhost", 9999)

这样从socket接收文本数据。这个步骤返回的是一个 ReceiverInputDStream的实现,内含 Receiver,可接收数据并转化为RDD放内存里。

ReceiverInputDStream有一个需要子类实现的方法

  def getReceiver(): Receiver[T]

子类实现这个方法,worker节点调用后能得到 Receiver,使得数据接收的工作能分布到worker上。

如果是local跑,由于 Receiver接收数据在本地,所以在启动streaming application的时候,要注意分配的core数目要大于 Receiver数目,才能腾出cpu做计算任务的调度。

Receiver需要子类实现

  def onStart()
def onStop()

来定义一个数据接收器的初始化、接收到数据后如何存、如何在结束的时候释放资源。

Receiver提供了一系列 store()接口,如 store(ByteBuffer)store(Iterator)等等。这些store接口是实现好了的,会由worker节点上初始化的 ReceiverSupervisor来完成这些存储功能。 ReceiverSupervisor还会对 Receiver做监控,如监控是否启动了、是否停止了、是否要重启、汇报error等等。

ReceiverSupervisor的存储接口的实现,借助的是 BlockManager,数据会以RDD的形式被存放,根据 StorageLevel选择不同存放策略。默认是序列化后存内存,放不下的话写磁盘(executor)。被计算出来的RDD中间结果,默认存放策略是序列化后只存内存。

ReceiverSupervisor在做putBlock操作的时候,会首先借助 BlockManager存好数据,然后往 ReceiverTracker发送一个 AddBlock的消息。 ReceiverTracker内部的 ReceivedBlockTracker用于维护一个receiver接收到的所有block信息,即 BlockInfo,所以 AddBlock会把信息存放在 ReceivedBlockTracker里。未来需要计算的时候, ReceiverTracker根据streamId,从 ReceivedBlockTracker取出对应的block列表。

RateLimiter帮助控制 Receiver速度, spark.streaming.receiver.maxRate参数。

数据源方面,普通的数据源为file, socket, akka, RDDs。高级数据源为Twitter, Kafka, Flume等。开发者也可以自己定制数据源。

任务调度

JobScheduler在context里初始化。当context start的时候,触发scheduler的start。

scheduler的start触发了 ReceiverTrackerJobGenerator的start。这两个类是任务调度的重点。前者在worker上启动 Receiver接收数据,并且暴露接口能够根据streamId获得对应的一批Block地址。后者基于数据和时间来生成任务描述。

JobScheduler内含一个线程池,用于调度任务执行。 spark.streaming.concurrentJobs可以控制job并发度,默认是1,即它只能一个一个提job。

job来自 JobGenerator生成的 JobSetJobGenerator根据时间,生成job并且执行cp。

JobGenerator的生成job逻辑:
- 调用 ReceiverTrackerallocateBlocksToBatch方法,为本批数据分配好block,即准备好数据
- 间接调用 DStreamgenerateJob(time)方法,制造可执行的RDD

DStream切分RDD和生成可执行的RDD,即 getOrCompute(time)
- 如果这个时间点的RDD已经生成好了,那么从内存hashmap里拿出来,否则下一步
- 如果时间是批次间隔的整数倍,则下一步,否则这个时间点不切
- 调用DStream的子类的 compute方法,得到RDD。可能是一个RDD,也可以是个RDD列表
- 对每个RDD,调用persist方法,制定默认的存储策略。如果时间点合适,同时调用RDD的 checkpoint方法,制定好cp策略
- 得到这些RDD后,调用 SparkContext.runJob(rdd, emptyFunction)。把这整个变成一个function,生成 Job类。未来会在executor上触发其 runJob

JobGenerator成功生成job后,调用 JobScheduler.submitJobSet(JobSet)JobScheduler会使用线程池提交JobSet中的所有job。该方法调用结束后, JobGenerator发送一个 DoCheckpoint的消息, 注意这里的cp是driver端元数据的cp,而不是RDD本身的cp。如果time合适,会触发cp操作,内部的 CheckpointWriter类会完成 write(streamingContext, time)

JobScheduler提交job的线程里,触发了job的run()方法,同时,job跑完后, JobScheduler处理 JobCompleted(job)。如果job跑成功了,调用 JobSethandleJobCompletion(Job),做些计时和数数工作,如果整个JobSet完成了,调用 JobGeneratoronBatchCompletion(time)方法, JobGenerator接着会做 clearMetadata的工作,然后 JobScheduler打印输出;如果job跑失败了, JobScheduler汇报error,最后会在context里抛异常。

更多说明

特殊操作

  1. transform:可以与外部RDD交互,比如做维表的join

  2. updateStateByKey:生成 StateDStream,比如做增量计算。 WordCount例子

    • 每一批都需要与增量RDD进行一次cogroup之后,然后执行update function。两个RDD做cogroup过程有些开销:RDD[K, V]和RDD[K, U]合成RDD[K, List[V], List[U]],List[U]一般size是1,理解为oldvalue,即RDD[K, batchValueList, Option[oldValue]]。然后update function处理完,变成RDD[K, newValue]。
    • 批与批之间严格有序,即增量合并操作,是有序的,批之间没发并发
    • 增量RDD的分区数可以开大,即这步增量的计算可以调大并发
  3. window:batch size,window length, sliding interval三个参数组成的滑窗操作。把多个批次的RDD合并成一个UnionRDD进行计算。

  4. foreachRDD: 这个操作是一个输出操作,比较特殊。

    /**
   * Apply a function to each RDD in this DStream. This is an output operator, so
   * 'this' DStream will be registered as an output stream and therefore materialized.
   */
  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
    new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
  }

DStream.foreachRDD()操作使开发者可以直接控制RDD的计算逻辑,而不是通过 DStream映射过去。所以借助这个方法,可以实现MLlib, Spark SQL与Streaming的集合,如: 结合Spark SQL、DataFrame做Wordcount

Cache

如果是window操作,默认接收的数据都persist在内存里。

如果是flume, kafka源头,默认接收的数据replicate成两份存起来。

Checkpoint

与state有关的流计算,计算出来的结果RDD,会被cp到HDFS上,原文如下:

Data checkpointing - Saving of the generated RDDs to reliable storage. This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depends on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increase in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.

cp的时间间隔也可以设定,可以多批做一次cp。

cp的操作是同步的。

简单的不带state操作的流任务,可以不开启cp。

driver端的metadata也有cp策略。driver cp的时候是将整个 StreamingContext对象写到了可靠存储里。

全文完 :)

作者:zbf8441372 发表于2015/3/19 15:23:05 原文链接
阅读:556 评论:0 查看评论

相关 [spark streaming 原理] 推荐:

[原]Spark Streaming原理简析

- - 张包峰的博客
StreamingContext实例化的时候,需要传入一个 SparkContext,然后指定要连接的 spark matser url,即连接一个 spark engine,用于获得executor. 实例化之后,首先,要指定一个接收数据的方式,如. 这样从socket接收文本数据. 这个步骤返回的是一个 ReceiverInputDStream的实现,内含 Receiver,可接收数据并转化为RDD放内存里.

Spark Streaming 调优实践

- - IT瘾-dev
分享嘉宾:肖力涛 拼多多 资深算法工程师. 注:欢迎转载,转载请注明出处. 在使用 Spark 和 Spark Streaming 时,当我们将应用部署在集群上时,可能会碰到运行慢、占用过多资源、不稳定等问题,这时需要做一些优化才能达到最好的性能. 有时候一个简单的优化可以起到化腐朽为神奇的作用,使得程序能够更加有效率,也更加节省资源.

Spark Streaming 自定义接收器

- - zzm
Spark Streaming可以从任意数据源接受流数据,而不仅仅是那些内置支持的数据源(如Flume、kafka等). 这就要求开发人员实现一个接收器(recevier),用于接收来自有关数据源的数据. 本篇手册以一个自定义的接收器(recevier)实现和其在spark streaming中的应为为主线进行讲解.

Spark Streaming 与 Kafka 整合的改进 | SmartSi

- -
Apache Kafka 正在迅速成为最受欢迎的开源流处理平台之一. 我们在 Spark Streaming 中也看到了同样的趋势. 因此,在 Apache Spark 1.3 中,我们专注于对 Spark Streaming 与 Kafka 集成进行重大改进. 为 Kafka 新增了 Direct API - 这允许每个 Kafka 记录在发生故障时只处理一次,并且不使用  Write Ahead Logs.

GitHub - allwefantasy/streamingpro: Build Spark Batch/Streaming/MLlib Application by SQL

- -
StreamingPro 中文文档. 应用模式:写json配置文件,StreamingPro启动后执行该文件,可以作为批处理或者流式程序. 服务模式:启动一个StreamingPro Server作为常驻程序,然后通过http接口发送MLSQL脚本进行交互. 我们强烈推荐使用第二种模式,第一种模式现在已经不太更新了,现在迅速迭代的是第二种模式,并且第二种模式可以构建AI平台.

Spark Streaming 数据限流简述

- - IT瘾-dev
  Spark Streaming对实时数据流进行分析处理,源源不断的从数据源接收数据切割成一个个时间间隔进行处理;.   流处理与批处理有明显区别,批处理中的数据有明显的边界、数据规模已知;而流处理数据流并没有边界,也未知数据规模;.   由于流处理的数据流特征,使之数据流具有不可预测性,而且数据处理的速率还与硬件、网络等资源有关,在这种情况下如不对源源不断进来的数据流速率进行限制,那当Spark节点故障、网络故障或数据处理吞吐量下来时还有数据不断流进来,那将有可能将出现OOM进而导致Spark Streaming程序崩溃;.

Kafka+Spark Streaming+Redis实时计算整合实践

- - 简单之美
基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性. 这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算.

Spark Streaming + Elasticsearch构建App异常监控平台

- - 美团点评技术团队
本文已发表在《程序员》杂志2016年10月期. 如果在使用App时遇到闪退,你可能会选择卸载App、到应用商店怒斥开发者等方式来表达不满. 但开发者也同样感到头疼,因为崩溃可能意味着用户流失、营收下滑. 为了降低崩溃率,进而提升App质量,App开发团队需要实时地监控App异常. 一旦发现严重问题,及时进行热修复,从而把损失降到最低.

Spark Streaming+kafka订单实时统计实现

- - CSDN博客推荐文章
前几篇文章我们分别学习Spark RDD和PairRDD编程,本文小编将通过简单实例来加深对RDD的理解. 开发环境:window7+eclipse+jdk1.7. 部署环境:linux+zookeeper+kafka+hadoop+spark. 本实例开发之前,默认已搭好了开发环境和部署环境,如果未搭建,可以参考本人相关大数据开发搭建博客.

Spark Streaming vs. Kafka Stream 哪个更适合你

- - IT瘾-bigdata
作者:Mahesh Chand Kandpal. 译者注:本文介绍了两大常用的流式处理框架,Spark Streaming和Kafka Stream,并对他们各自的特点做了详细说明,以帮助读者在不同的场景下对框架进行选择. 流式处理的需求每天都在增加,仅仅对大量的数据进行处理是不够的. 数据必须快速地得到处理,以便企业能够实时地对不断变化的业务环境做出反应.