Spark Streaming 自定义接收器

标签: spark streaming 定义 | 发表时间:2017-05-23 08:01 | 作者:
出处:http://m635674608.iteye.com
Spark Streaming可以从任意数据源接受流数据,而不仅仅是那些内置支持的数据源(如Flume、kafka等)。这就要求开发人员实现一个接收器(recevier),用于接收来自有关数据源的数据。本篇手册以一个自定义的接收器(recevier)实现和其在spark streaming中的应为为主线进行讲解。需要注意的是,自定义接收器可以用Scala或者Java实现。
 
实现自定义Receiver
 
自定义接收器必须继承自抽象类Receiver,实现两个方法
onStart():接收数据。
onStop():停止接收数据。
 
onStart()方法和onStop()方法不能无限期的阻塞。通常,onStart()方法将会启动负责接收数据的线程,而onStop()方法负责确保这些接收数据的线程是停止的。接收收据的线程可以使用isStopped()方法来检查它们是否应该停止接收数据。
 
 
一旦接收到数据,通过调用store(data)方法(Receiver类中提供的方法)数据被存储在spark中。store()方法有很多种,如允许一次只接收一条数据(record-at-a-time)或者全部object/序列化字节的collection。注意,store()类型影响了接收器(receiver)的可靠性和容错性语义。这是稍后讨论的更详细的。
 
在接收线程中的任何异常都应该被捕获并处理,以避免接收器的无声故障。restart(<exception>)将会通过异步调用onStop()方法,延迟一段时间后调用onStop()方法重启接收器(receiver)。stop(<exception>)将会调用onStop()方法终止接收器。同时,reporterror(<error>)向驱动程序报告错误信息而不停止或者重启接收器,这些错误信息可以在UI上或者日志中看见。
 
 
以下是接收一个套接字上的文本流的自定义接收器。它以文本流中的“\”分隔线分割,并将它们储存在spark中。如果接收线程有任何连接错误或接收错误,则接收器将重新启动。
 
[java]  view plain  copy
 
  1. public class JavaCustomReceiver extends Receiver<String> {  
  2.   
  3.   String host = null;  
  4.   int port = -1;  
  5.   
  6.   public JavaCustomReceiver(String host_ , int port_) {  
  7.     super(StorageLevel.MEMORY_AND_DISK_2());  
  8.     host = host_;  
  9.     port = port_;  
  10.   }  
  11.   
  12.   public void onStart() {  
  13.     // Start the thread that receives data over a connection  
  14.     new Thread()  {  
  15.       @Override public void run() {  
  16.         receive();  
  17.       }  
  18.     }.start();  
  19.   }  
  20.   
  21.   public void onStop() {  
  22.     // There is nothing much to do as the thread calling receive()  
  23.     // is designed to stop by itself if isStopped() returns false  
  24.   }  
  25.   
  26.   /** Create a socket connection and receive data until receiver is stopped */  
  27.   private void receive() {  
  28.     Socket socket = null;  
  29.     String userInput = null;  
  30.   
  31.     try {  
  32.       // connect to the server  
  33.       socket = new Socket(host, port);  
  34.   
  35.       BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));  
  36.   
  37.       // Until stopped or connection broken continue reading  
  38.       while (!isStopped() && (userInput = reader.readLine()) != null) {  
  39.         System.out.println("Received data '" + userInput + "'");  
  40.         store(userInput);  
  41.       }  
  42.       reader.close();  
  43.       socket.close();  
  44.   
  45.       // Restart in an attempt to connect again when server is active again  
  46.       restart("Trying to connect again");  
  47.     } catch(ConnectException ce) {  
  48.       // restart if could not connect to server  
  49.       restart("Could not connect", ce);  
  50.     } catch(Throwable t) {  
  51.       // restart if there is any other error  
  52.       restart("Error receiving data", t);  
  53.     }  
  54.   }  
  55. }  
 

在spark streaming中使用自定义的接收器
 
自定义接收器可用于在Spark Streaming应用中,通过使用streamingContext.receiverStream(自定义接收器的实例)。如下所示,创建一个输入Dstream
 
[java]  view plain  copy
 
  1. // Assuming ssc is the JavaStreamingContext  
  2. JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));  
  3. JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });  
  4. ...  
 
接收器的可靠性
 
正如在spark streaming编程指南中讨论的那样,基于接收器的可靠性和容错语义,有两种类型的接收器:
 
1 可靠的接收器:对于可靠的消息来源,允许发送的数据被确认,一个可靠的接收器正确地确认数据被接收器接收同时被可靠地存储在spark中。通常,实现可靠的接收器需仔细考量消息确认的语义。
 
 
2 不可靠的接收器:不可靠的接收器不向数据源发送确认信息。它可用于不支持确认机制的数据源,或者那些可靠的数据源但是我们不需要其使用复杂的确认机制。
 
为了实现可靠的接收器,必须要使用 store(multiple-records) 取存储数据。这种类型的store()是一种阻塞调用,只有在所有给定的记录被储存在spark里之后才返回。如果接收器的配置存储级别使用复制(默认启用),则复制完成后这个调用返回。因此,它确保了数据被可靠的存储,和接收器可以现在正确地确认消息;在复制数据的中间过程中接收器失败了,这将确保没有数据丢失(缓冲数据没有被确认,数据源将会重新发送)。
 
不可靠的接收器没有实现这种逻辑,它可以简单地从数据源接收记录并使用store(single-record)将它们插入(one-at-a-time )。它没有store(multiple-records)那样的可靠保证,其优点如下:
 
  • 系统考虑到将数据转化为适当大小的块;
  • 系统考虑到控制接收速率,如果速率限制已指定;
  • 不可靠接收器比可靠接收器更加容易实现;
原文地址:https://spark.apache.org/docs/latest/streaming-custom-receivers.html

http://blog.csdn.net/ouyang111222/article/details/50414621



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [spark streaming 定义] 推荐:

Spark Streaming 自定义接收器

- - zzm
Spark Streaming可以从任意数据源接受流数据,而不仅仅是那些内置支持的数据源(如Flume、kafka等). 这就要求开发人员实现一个接收器(recevier),用于接收来自有关数据源的数据. 本篇手册以一个自定义的接收器(recevier)实现和其在spark streaming中的应为为主线进行讲解.

[原]Spark Streaming原理简析

- - 张包峰的博客
StreamingContext实例化的时候,需要传入一个 SparkContext,然后指定要连接的 spark matser url,即连接一个 spark engine,用于获得executor. 实例化之后,首先,要指定一个接收数据的方式,如. 这样从socket接收文本数据. 这个步骤返回的是一个 ReceiverInputDStream的实现,内含 Receiver,可接收数据并转化为RDD放内存里.

Kafka+Spark Streaming+Redis实时计算整合实践

- - 简单之美
基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性. 这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算.

Spark Streaming + Elasticsearch构建App异常监控平台

- - 美团点评技术团队
本文已发表在《程序员》杂志2016年10月期. 如果在使用App时遇到闪退,你可能会选择卸载App、到应用商店怒斥开发者等方式来表达不满. 但开发者也同样感到头疼,因为崩溃可能意味着用户流失、营收下滑. 为了降低崩溃率,进而提升App质量,App开发团队需要实时地监控App异常. 一旦发现严重问题,及时进行热修复,从而把损失降到最低.

Spark Streaming+kafka订单实时统计实现

- - CSDN博客推荐文章
前几篇文章我们分别学习Spark RDD和PairRDD编程,本文小编将通过简单实例来加深对RDD的理解. 开发环境:window7+eclipse+jdk1.7. 部署环境:linux+zookeeper+kafka+hadoop+spark. 本实例开发之前,默认已搭好了开发环境和部署环境,如果未搭建,可以参考本人相关大数据开发搭建博客.

Spark Streaming vs. Kafka Stream 哪个更适合你

- - IT瘾-bigdata
作者:Mahesh Chand Kandpal. 译者注:本文介绍了两大常用的流式处理框架,Spark Streaming和Kafka Stream,并对他们各自的特点做了详细说明,以帮助读者在不同的场景下对框架进行选择. 流式处理的需求每天都在增加,仅仅对大量的数据进行处理是不够的. 数据必须快速地得到处理,以便企业能够实时地对不断变化的业务环境做出反应.

Spark Streaming 1.6 流式状态管理分析 - 简书

- -
Spark 1.6发布后,官方声称流式状态管理有10倍性能提升. 这篇文章会详细介绍Spark Streaming里新的流式状态管理. 在流式计算中,数据是持续不断来的,有时候我们要对一些数据做跨周期(Duration)的统计,这个时候就不得不维护状态了. 而状态管理对Spark 的 RDD模型是个挑战,因为在spark里,任何数据集都需要通过RDD来呈现,而RDD 的定义是一个不变的分布式集合.

Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统

- -
Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统. 2015 年 7 月 27 日发布. 在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要.

Hadoop Streaming 编程

- - 学着站在巨人的肩膀上
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:. 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer). 本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题.

Spark概览

- - 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.