Spark Streaming 数据限流简述

标签: dev | 发表时间:2020-01-24 00:00 | 作者:
出处:http://itindex.net/relian

  Spark Streaming对实时数据流进行分析处理,源源不断的从数据源接收数据切割成一个个时间间隔进行处理;
  流处理与批处理有明显区别,批处理中的数据有明显的边界、数据规模已知;而流处理数据流并没有边界,也未知数据规模;
  由于流处理的数据流特征,使之数据流具有不可预测性,而且数据处理的速率还与硬件、网络等资源有关,在这种情况下如不对源源不断进来的数据流速率进行限制,那当Spark节点故障、网络故障或数据处理吞吐量下来时还有数据不断流进来,那将有可能将出现OOM进而导致Spark Streaming程序崩溃;
  在Spark Streaming中不同的数据源采用不同的限速策略,但无论是Socket数据源的限流策略还是Kafka数据源的限流策略其速率(rate)的计算都是使用PIDController算法进行计算而得来;
  下面从源码的角度分别介绍Socket数据源与Kafka数据源的限流处理。

速率限制的计算与更新

  Spark Streaming的流处理其实是基于微批处理(MicroBatch)的,也就是说将数据流按某比较小的时间间隔将数据切割成为一段段微批数据进行处理;

                                      添加监听器


  StreamingContext调用Start()启动的时候会将速率控制器(rateController)添加到StreamingListener监听器中;
  当每批次处理完成时将触发监听器(RateController),使用该批处理的处理结束时间、处理延迟时间、调度延迟时间、记录行数调用PIDRateEstimator传入PID算法中(PID Controller)计算出该批次的速率(rate)并更新速率限制(rateLimit)与发布该限制速率;

   override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {    
val elements = batchCompleted.batchInfo.streamIdToInputInfo

for {
processingEnd <- batchCompleted.batchInfo.processingEndTime
workDelay <- batchCompleted.batchInfo.processingDelay
waitDelay <- batchCompleted.batchInfo.schedulingDelay
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}

private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
rateLimit.set(s.toLong)
publish(getLatestRate())
}
}

Socket数据源限流

  批次的限制速率上面已经算出,这里说的是接收Socket过来的数据时的数据限流;
  SocketInputStream类receive方法接收到数据后将数据存入 BlockGenerator的Buffer中,在写入Buffer前调用限流器 (RateLimiter)对写入数据进行限流;
  RateLimiter限流器使用了Google开源的 Guava中内置的RateLimiter限流器,该类只是对Guava限流器的简单封装;
  在Spark Streaming中可通过使用两个参数配置初始速率与最大速率spark.streaming.receiver.maxRate、spark.streaming.backpressure.initialRate;亦可配置PIDController算法相关的四个参数值;
  RateLimiter限流器是基于令牌桶的算法基本原理比较简单,以一个恒定的速率生成令牌放入令牌桶中,桶满则停止,处理请求时需要从令牌桶中取出令牌,当桶中无令牌可取时阻塞等待,此算法用于确保系统不被洪峰击垮。

   private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)    
/**
* Push a single data item into the buffer.
*/
def addData(data: Any): Unit = {
if (state == Active) {
//调用限流器等待
waitToPush()
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}

def waitToPush() {
//限流器申请令牌
rateLimiter.acquire()
}

Guava库中RateLimiter限流器基本使用:

   //创建限流器,每秒产生令牌数1    
RateLimiter rateLimiter=RateLimiter.create(1);
for (int i = 0; i < 10; i++) {
//获得一个令牌,未申请到令牌则阻塞等待
double waitTime = rateLimiter.acquire();
System.out.println(String.format("id:%d time:%d waitTime:%f",i,System.currentTimeMillis(),waitTime));
}

Kafka数据源限流的实现

  在Spark Streaming Kafka包拉取Kafka数据会进行如下动作:
  1、取Kafka中最新偏移量、分区
  2、通过rateController限制每个分区可拉取的最大消息数
  3、在DirectKafkaInputDStream中创建KafkaRDD,在其中调用相关对象拉取数据

  通过如上步骤也可用看出,只要限制了Kafka某个分区的偏移量(offset)范围也就可限制从Kafka拉取的消息数量,从而达到限流的目的,Spark streaming kafka也是通过此实现的;

计算每个分区速率限制,有如下步骤:
  1、通过seekToEnd获取最新可用偏移量与当前偏移量对比获得当前所有分区延迟偏移量
  单个分区偏移量延迟=最新偏移量记录-当前偏移量记录
  2、获取配置项中每个分区最大速率 (spark.streaming.kafka.maxRatePerPartition),背压率计算,计算每个分区背压率计算公式为:
  单个分区背压率=单个分区偏移量延迟/所有分区总延迟*速率限制
  速率限制(rateLimit):为通过PIDController动态计算得来

  如有配置每个分区最大速率则取配置项最大速率与背压率两者中的最小值,未配置则取背压率作为每个分区速率限制;

  3、将批次间隔(batchDuration)*每个分区速率限制=每个分区最大消息数
  4、取当前分区偏移量+分区最大消息数 与 最新偏移量两者当中最小的,由此来控制拉取消息速率;

  如当前偏移量+分区最大消息数 大于 最新偏移量则取 最新偏移量否则取 当前偏移量+分区最大消息数作为拉取Kafka数据的Offset范围;

   // 限制每个分区最大消息数    
protected def clamp(
offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {

maxMessagesPerPartition(offsets).map { mmp =>
mmp.map { case (tp, messages) =>
val uo = offsets(tp)
tp -> Math.min(currentOffsets(tp) + messages, uo)
}
}.getOrElse(offsets)
}

  不管是Kafka数据源还是Socket数据源Spark Streaming中都使用了PIDController算法用于计算其速率限制值,两者的差别也只是因为两种数据源的获取方式数据特征而决定的。Socket数据源使用了Guava RateLimiter、而Kafka数据源自己实现了基于Offsets的限流;
  以上所介绍的框架版本为:Spark Streaming 版本为2.3.2与spark-streaming-kafka-0-10_2.11;

参考资料: 

http://kafka.apache.org
http://spark.apache.org

相关 [spark streaming 数据] 推荐:

Spark Streaming 数据限流简述

- - IT瘾-dev
  Spark Streaming对实时数据流进行分析处理,源源不断的从数据源接收数据切割成一个个时间间隔进行处理;.   流处理与批处理有明显区别,批处理中的数据有明显的边界、数据规模已知;而流处理数据流并没有边界,也未知数据规模;.   由于流处理的数据流特征,使之数据流具有不可预测性,而且数据处理的速率还与硬件、网络等资源有关,在这种情况下如不对源源不断进来的数据流速率进行限制,那当Spark节点故障、网络故障或数据处理吞吐量下来时还有数据不断流进来,那将有可能将出现OOM进而导致Spark Streaming程序崩溃;.

spark streaming 从kafka 拉数据如何保证数据不丢失 | sunbiaobiao

- -
spark streaming 从kafka 拉数据如何保证数据不丢失. 为什么使用 direct 方式. 因为按需拉数据,所以不存在缓冲区,就不用担心缓冲区把内存撑爆了. 这个在Receiver-based Approach 就比较麻烦,你需要通过spark.streaming.blockInterval等参数来调整.

[原]Spark Streaming原理简析

- - 张包峰的博客
StreamingContext实例化的时候,需要传入一个 SparkContext,然后指定要连接的 spark matser url,即连接一个 spark engine,用于获得executor. 实例化之后,首先,要指定一个接收数据的方式,如. 这样从socket接收文本数据. 这个步骤返回的是一个 ReceiverInputDStream的实现,内含 Receiver,可接收数据并转化为RDD放内存里.

Spark Streaming 调优实践

- - IT瘾-dev
分享嘉宾:肖力涛 拼多多 资深算法工程师. 注:欢迎转载,转载请注明出处. 在使用 Spark 和 Spark Streaming 时,当我们将应用部署在集群上时,可能会碰到运行慢、占用过多资源、不稳定等问题,这时需要做一些优化才能达到最好的性能. 有时候一个简单的优化可以起到化腐朽为神奇的作用,使得程序能够更加有效率,也更加节省资源.

Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统

- -
Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统. 2015 年 7 月 27 日发布. 在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要.

Spark Streaming 自定义接收器

- - zzm
Spark Streaming可以从任意数据源接受流数据,而不仅仅是那些内置支持的数据源(如Flume、kafka等). 这就要求开发人员实现一个接收器(recevier),用于接收来自有关数据源的数据. 本篇手册以一个自定义的接收器(recevier)实现和其在spark streaming中的应为为主线进行讲解.

Spark Streaming 与 Kafka 整合的改进 | SmartSi

- -
Apache Kafka 正在迅速成为最受欢迎的开源流处理平台之一. 我们在 Spark Streaming 中也看到了同样的趋势. 因此,在 Apache Spark 1.3 中,我们专注于对 Spark Streaming 与 Kafka 集成进行重大改进. 为 Kafka 新增了 Direct API - 这允许每个 Kafka 记录在发生故障时只处理一次,并且不使用  Write Ahead Logs.

GitHub - allwefantasy/streamingpro: Build Spark Batch/Streaming/MLlib Application by SQL

- -
StreamingPro 中文文档. 应用模式:写json配置文件,StreamingPro启动后执行该文件,可以作为批处理或者流式程序. 服务模式:启动一个StreamingPro Server作为常驻程序,然后通过http接口发送MLSQL脚本进行交互. 我们强烈推荐使用第二种模式,第一种模式现在已经不太更新了,现在迅速迭代的是第二种模式,并且第二种模式可以构建AI平台.

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式 - CSDN博客

- -
Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据了. 一、基于Receiver的方式. 这种方式使用Receiver来获取数据.

Kafka+Spark Streaming+Redis实时计算整合实践

- - 简单之美
基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性. 这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算.