Spark Streaming 自定义接收器

标签: spark streaming 定义 | 发表时间:2017-05-23 16:01 | 作者:
出处:http://m635674608.iteye.com
Spark Streaming可以从任意数据源接受流数据,而不仅仅是那些内置支持的数据源(如Flume、kafka等)。这就要求开发人员实现一个接收器(recevier),用于接收来自有关数据源的数据。本篇手册以一个自定义的接收器(recevier)实现和其在spark streaming中的应为为主线进行讲解。需要注意的是,自定义接收器可以用Scala或者Java实现。
 
实现自定义Receiver
 
自定义接收器必须继承自抽象类Receiver,实现两个方法
onStart():接收数据。
onStop():停止接收数据。
 
onStart()方法和onStop()方法不能无限期的阻塞。通常,onStart()方法将会启动负责接收数据的线程,而onStop()方法负责确保这些接收数据的线程是停止的。接收收据的线程可以使用isStopped()方法来检查它们是否应该停止接收数据。
 
 
一旦接收到数据,通过调用store(data)方法(Receiver类中提供的方法)数据被存储在spark中。store()方法有很多种,如允许一次只接收一条数据(record-at-a-time)或者全部object/序列化字节的collection。注意,store()类型影响了接收器(receiver)的可靠性和容错性语义。这是稍后讨论的更详细的。
 
在接收线程中的任何异常都应该被捕获并处理,以避免接收器的无声故障。restart(<exception>)将会通过异步调用onStop()方法,延迟一段时间后调用onStop()方法重启接收器(receiver)。stop(<exception>)将会调用onStop()方法终止接收器。同时,reporterror(<error>)向驱动程序报告错误信息而不停止或者重启接收器,这些错误信息可以在UI上或者日志中看见。
 
 
以下是接收一个套接字上的文本流的自定义接收器。它以文本流中的“\”分隔线分割,并将它们储存在spark中。如果接收线程有任何连接错误或接收错误,则接收器将重新启动。
 
[java]  view plain  copy
 
  1. public class JavaCustomReceiver extends Receiver<String> {  
  2.   
  3.   String host = null;  
  4.   int port = -1;  
  5.   
  6.   public JavaCustomReceiver(String host_ , int port_) {  
  7.     super(StorageLevel.MEMORY_AND_DISK_2());  
  8.     host = host_;  
  9.     port = port_;  
  10.   }  
  11.   
  12.   public void onStart() {  
  13.     // Start the thread that receives data over a connection  
  14.     new Thread()  {  
  15.       @Override public void run() {  
  16.         receive();  
  17.       }  
  18.     }.start();  
  19.   }  
  20.   
  21.   public void onStop() {  
  22.     // There is nothing much to do as the thread calling receive()  
  23.     // is designed to stop by itself if isStopped() returns false  
  24.   }  
  25.   
  26.   /** Create a socket connection and receive data until receiver is stopped */  
  27.   private void receive() {  
  28.     Socket socket = null;  
  29.     String userInput = null;  
  30.   
  31.     try {  
  32.       // connect to the server  
  33.       socket = new Socket(host, port);  
  34.   
  35.       BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));  
  36.   
  37.       // Until stopped or connection broken continue reading  
  38.       while (!isStopped() && (userInput = reader.readLine()) != null) {  
  39.         System.out.println("Received data '" + userInput + "'");  
  40.         store(userInput);  
  41.       }  
  42.       reader.close();  
  43.       socket.close();  
  44.   
  45.       // Restart in an attempt to connect again when server is active again  
  46.       restart("Trying to connect again");  
  47.     } catch(ConnectException ce) {  
  48.       // restart if could not connect to server  
  49.       restart("Could not connect", ce);  
  50.     } catch(Throwable t) {  
  51.       // restart if there is any other error  
  52.       restart("Error receiving data", t);  
  53.     }  
  54.   }  
  55. }  
 

在spark streaming中使用自定义的接收器
 
自定义接收器可用于在Spark Streaming应用中,通过使用streamingContext.receiverStream(自定义接收器的实例)。如下所示,创建一个输入Dstream
 
[java]  view plain  copy
 
  1. // Assuming ssc is the JavaStreamingContext  
  2. JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));  
  3. JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });  
  4. ...  
 
接收器的可靠性
 
正如在spark streaming编程指南中讨论的那样,基于接收器的可靠性和容错语义,有两种类型的接收器:
 
1 可靠的接收器:对于可靠的消息来源,允许发送的数据被确认,一个可靠的接收器正确地确认数据被接收器接收同时被可靠地存储在spark中。通常,实现可靠的接收器需仔细考量消息确认的语义。
 
 
2 不可靠的接收器:不可靠的接收器不向数据源发送确认信息。它可用于不支持确认机制的数据源,或者那些可靠的数据源但是我们不需要其使用复杂的确认机制。
 
为了实现可靠的接收器,必须要使用 store(multiple-records) 取存储数据。这种类型的store()是一种阻塞调用,只有在所有给定的记录被储存在spark里之后才返回。如果接收器的配置存储级别使用复制(默认启用),则复制完成后这个调用返回。因此,它确保了数据被可靠的存储,和接收器可以现在正确地确认消息;在复制数据的中间过程中接收器失败了,这将确保没有数据丢失(缓冲数据没有被确认,数据源将会重新发送)。
 
不可靠的接收器没有实现这种逻辑,它可以简单地从数据源接收记录并使用store(single-record)将它们插入(one-at-a-time )。它没有store(multiple-records)那样的可靠保证,其优点如下:
 
  • 系统考虑到将数据转化为适当大小的块;
  • 系统考虑到控制接收速率,如果速率限制已指定;
  • 不可靠接收器比可靠接收器更加容易实现;
原文地址:https://spark.apache.org/docs/latest/streaming-custom-receivers.html

http://blog.csdn.net/ouyang111222/article/details/50414621



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [spark streaming 定义] 推荐:

Spark Streaming 自定义接收器

- - zzm
Spark Streaming可以从任意数据源接受流数据,而不仅仅是那些内置支持的数据源(如Flume、kafka等). 这就要求开发人员实现一个接收器(recevier),用于接收来自有关数据源的数据. 本篇手册以一个自定义的接收器(recevier)实现和其在spark streaming中的应为为主线进行讲解.

[原]Spark Streaming原理简析

- - 张包峰的博客
StreamingContext实例化的时候,需要传入一个 SparkContext,然后指定要连接的 spark matser url,即连接一个 spark engine,用于获得executor. 实例化之后,首先,要指定一个接收数据的方式,如. 这样从socket接收文本数据. 这个步骤返回的是一个 ReceiverInputDStream的实现,内含 Receiver,可接收数据并转化为RDD放内存里.

Spark Streaming 调优实践

- - IT瘾-dev
分享嘉宾:肖力涛 拼多多 资深算法工程师. 注:欢迎转载,转载请注明出处. 在使用 Spark 和 Spark Streaming 时,当我们将应用部署在集群上时,可能会碰到运行慢、占用过多资源、不稳定等问题,这时需要做一些优化才能达到最好的性能. 有时候一个简单的优化可以起到化腐朽为神奇的作用,使得程序能够更加有效率,也更加节省资源.

Spark Streaming 与 Kafka 整合的改进 | SmartSi

- -
Apache Kafka 正在迅速成为最受欢迎的开源流处理平台之一. 我们在 Spark Streaming 中也看到了同样的趋势. 因此,在 Apache Spark 1.3 中,我们专注于对 Spark Streaming 与 Kafka 集成进行重大改进. 为 Kafka 新增了 Direct API - 这允许每个 Kafka 记录在发生故障时只处理一次,并且不使用  Write Ahead Logs.

GitHub - allwefantasy/streamingpro: Build Spark Batch/Streaming/MLlib Application by SQL

- -
StreamingPro 中文文档. 应用模式:写json配置文件,StreamingPro启动后执行该文件,可以作为批处理或者流式程序. 服务模式:启动一个StreamingPro Server作为常驻程序,然后通过http接口发送MLSQL脚本进行交互. 我们强烈推荐使用第二种模式,第一种模式现在已经不太更新了,现在迅速迭代的是第二种模式,并且第二种模式可以构建AI平台.

Spark Streaming 数据限流简述

- - IT瘾-dev
  Spark Streaming对实时数据流进行分析处理,源源不断的从数据源接收数据切割成一个个时间间隔进行处理;.   流处理与批处理有明显区别,批处理中的数据有明显的边界、数据规模已知;而流处理数据流并没有边界,也未知数据规模;.   由于流处理的数据流特征,使之数据流具有不可预测性,而且数据处理的速率还与硬件、网络等资源有关,在这种情况下如不对源源不断进来的数据流速率进行限制,那当Spark节点故障、网络故障或数据处理吞吐量下来时还有数据不断流进来,那将有可能将出现OOM进而导致Spark Streaming程序崩溃;.

Kafka+Spark Streaming+Redis实时计算整合实践

- - 简单之美
基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性. 这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算.

Spark Streaming + Elasticsearch构建App异常监控平台

- - 美团点评技术团队
本文已发表在《程序员》杂志2016年10月期. 如果在使用App时遇到闪退,你可能会选择卸载App、到应用商店怒斥开发者等方式来表达不满. 但开发者也同样感到头疼,因为崩溃可能意味着用户流失、营收下滑. 为了降低崩溃率,进而提升App质量,App开发团队需要实时地监控App异常. 一旦发现严重问题,及时进行热修复,从而把损失降到最低.

Spark Streaming+kafka订单实时统计实现

- - CSDN博客推荐文章
前几篇文章我们分别学习Spark RDD和PairRDD编程,本文小编将通过简单实例来加深对RDD的理解. 开发环境:window7+eclipse+jdk1.7. 部署环境:linux+zookeeper+kafka+hadoop+spark. 本实例开发之前,默认已搭好了开发环境和部署环境,如果未搭建,可以参考本人相关大数据开发搭建博客.

Spark Streaming vs. Kafka Stream 哪个更适合你

- - IT瘾-bigdata
作者:Mahesh Chand Kandpal. 译者注:本文介绍了两大常用的流式处理框架,Spark Streaming和Kafka Stream,并对他们各自的特点做了详细说明,以帮助读者在不同的场景下对框架进行选择. 流式处理的需求每天都在增加,仅仅对大量的数据进行处理是不够的. 数据必须快速地得到处理,以便企业能够实时地对不断变化的业务环境做出反应.