GitHub - allwefantasy/streamingpro: Build Spark Batch/Streaming/MLlib Application by SQL

标签: | 发表时间:2018-07-31 15:03 | 作者:
出处:https://github.com

StreamingPro 中文文档

  1. 五分钟快速上手和体验
  2. Five Minute Quick Tutorial

应用模式和服务模式

  1. 应用模式:写json配置文件,StreamingPro启动后执行该文件,可以作为批处理或者流式程序。
  2. 服务模式:启动一个StreamingPro Server作为常驻程序,然后通过http接口发送MLSQL脚本进行交互。

我们强烈推荐使用第二种模式,第一种模式现在已经不太更新了,现在迅速迭代的是第二种模式,并且第二种模式可以构建AI平台。 为了避免编译的麻烦,你可以直接使用 release版本

对于确实需要使用json配置文件的,我们也提供了batch.mlsql脚本,可以让你使用mlsql语法,例如(v1.1.2开始具有这个功能):

{"mlsql": {"desc":"测试","strategy":"spark","algorithm": [],"ref": [],"compositor": [
      {"name":"batch.mlsql","params": [
          {"sql": ["select 'a' as a as table1;","save overwrite table1 as parquet.`/tmp/kk`;"]
          }
        ]
      }
    ],"configParams": {
    }
  }
}

编译

高级编程

使用MLSQL做机器学习

部署模型API服务

MLSQL常用功能

使用配置完成Spark编程

  1. Spark 批处理
  2. Spark Streaming
  3. Structured Streaming
  4. StreamingPro对机器学习的支持

概览:

  1. 概述
  2. 项目模块说明
  3. 编译
  4. 相关概念
  5. StreamingPro的一些参数

周边工具

  1. StreamingPro Manager
  2. StreamingPro json文件编辑器支持

实验

  1. flink支持

其他文档

概述

StreamingPro 支持以Spark,Flink等作为底层分布式计算引擎,通过一套统一的配置文件完成批处理,流式计算,Rest服务的开发。 特点有:

  1. 使用Json描述文件完成流式,批处理的开发,不用写代码。
  2. 支持SQL Server,支持XSQL/MLSQL(重点),完成批处理,机器学习,即席查询等功能。
  3. 标准化输入输出,支持UDF函数注册,支持自定义模块开发
  4. 支持Web化管理Spark应用的启动,监控

如果更细节好处有:

  1. 跨版本:StreamingPro可以让你不用任何变更就可以轻易的运行在spark 1.6/2.1/2.2上。
  2. 新语法:提供了新的DSl查询语法/Json配置语法
  3. 程序的管理工具:提供web界面启动/监控 Spark 程序
  4. 功能增强:2.1之后Structured Streaming 不支持kafka 0.8/0.9 ,Structured,此外还有比如spark streaming 支持offset 保存等
  5. 简化Spark SQL Server搭建成本:提供rest接口/thrift 接口,支持spark sql server 的负载均衡,自动将driver 注册到zookeeper上
  6. 探索更多的吧

项目模块说明

模块名 描述 备注
streamingpro-commons 一些基础工具类
streamingpro-spark-common Spark有多个版本,所以可以共享一些基础的东西
streamingpro-flink streamingpro对flink的支持
streamingpro-spark streamingpro对spark 1.6.x的支持
streamingpro-mlsql streamingpro对spark 2.x的支持(主项目)
streamingpro-api streamingpro把底层的spark API暴露出来,方便用户灵活处理问题
streamingpro-manager 通过该模块,可以很方便的通过web界面启动,管理,监控 spark相关的应用
streamingpro-dls 自定义connect,load,select,save,train,register等语法,便于用类似sql的方式做批处理任务,机器学习等

相关概念

如果你使用StreamingPro,那么所有的工作都是在编辑一个Json配置文件。通常一个处理流程,会包含三个概念:

  1. 多个输入
  2. 多个连续/并行的数据处理
  3. 多个输出

StreamingPro会通过'compositor'的概念来描述他们,你可以理解为一个处理单元。一个典型的输入compositor如下:

    {
        "name": "batch.sources",
        "params": [
          {
            "path": "file:///tmp/hdfsfile/abc.txt",
            "format": "json",
            "outputTable": "test"

          },
           {
              "path": "file:///tmp/parquet/",
              "format": "parquet",
              "outputTable": "test2"

            }
        ]
}

batch.sources就是一个compositor的名字。 这个compositor 把一个本地磁盘的文件映射成了一张表,并且告知系统,abc.txt里的内容 是json格式的。这样,我们在后续的compositor模块就可以使用这个 test表名了。通常,StreamingPro希望整个处理流程, 也就是不同的compositor都采用表来进行衔接。

StreamingPro不仅仅能做批处理,还能做流式,流式支持Spark Streaming,Structured Streaming。依然以输入compositor为例,假设 我们使用的是Structured Streaming,则可以如下配置。

    {
        "name": "ss.sources",
        "params": [
          {
            "format": "kafka9",
            "outputTable": "test",
            "kafka.bootstrap.servers": "127.0.0.1:9092",
            "topics": "test",
            "path": "-"
          },
          {
            "format": "com.databricks.spark.csv",
            "outputTable": "sample",
            "header": "true",
            "path": "/Users/allwefantasy/streamingpro/sample.csv"
          }
        ]
      }

第一个表示我们对接的数据源是kafka 0.9,我们把Kafka的数据映射成表test。 因为我们可能还需要一些元数据,比如ip和城市的映射关系, 所以我们还可以配置一些其他的非流式的数据源,我们这里配置了一个smaple.csv文件,并且命名为表sample。

如果你使用的是kafka >= 1.0,则 topics 参数需要换成'subscribe',并且使用时可能需要对内容做下转换,类似:

    select CAST(key AS STRING) as k, CAST(value AS STRING) as v from test

启动时,你需要把-streaming.platform 设置为 ss

如果我们的输入输出都是Hive的话,可能就不需要batch.sources/batch.outputs 等组件了,通常一个batch.sql就够了。比如:

    "without-sources-job": {
    "desc": "-",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select * from hiveTable",
            "outputTableName": "puarquetTable"
          }
        ]
      },
      {
        "name": "batch.outputs",
        "params": [
          {
            "format": "parquet",
            "inputTableName": "puarquetTable",
            "path": "/tmp/wow",
            "mode": "Overwrite"
          }
        ]
      }
    ],
    "configParams": {
    }
  }

在批处理里,batch.sources/batch.outputs 都是可有可无的,但是对于流式程序,stream.sources/stream.outputs/ss.sources/ss.outputs 则是必须的。

StreamingPro的一些参数

Property Name Default Meaning
streaming.name (none) required 等价于 spark.app.name
streaming.master (none) required 等价于 spark.master
streaming.duration 10 seconds spark streaming 周期,默认单位为秒
streaming.rest true/false,default is false 是否提供http接口
streaming.spark.service true/false,default is false 开启该选项时,streaming.platform必须为spark. 该选项会保证spark实例不会退出
streaming.platform spark/spark_streaming/ss/flink,default is spark 基于什么平台跑
streaming.checkpoint (none) spark streaming checkpoint 目录
streaming.kafka.offsetPath (none) kafka的偏移量保存目录。如果没有设置,会保存在内存中
streaming.driver.port 9003 配置streaming.rest使用,streaming.rest为true,你可以设置一个http端口
streaming.spark.hadoop.* (none) hadoop configuration,eg. -streaming.spark.hadoop.fs.defaultFS hdfs://name:8020
streaming.job.file.path (none) 配置文件路径,默认从hdfs加载
streaming.jobs (none) json配置文件里的job名称,按逗号分隔。如果没有配置该参数,默认运行所有job
streaming.zk.servers (none) 如果把spark作为一个server,那么streamingpro会把driver地址注册到zookeeper上
streaming.zk.conf_root_dir (none) 配置streaming.zk.servers使用
streaming.enableHiveSupport false 是否支持Hive
streaming.thrift false 是否thrift server
streaming.sql.source.[name].[参数] (none) batch/ss/stream.sources 中,你可以替换里面的任何一个参数
streaming.sql.out.[name].[参数] (none) batch/ss/stream.outputs 中,你可以替换里面的任何一个参数
streaming.sql.params.[param-name] (none) batch/ss/stream.sql中,你是可以写表达式的,比如 select * from :table, 之后你可以通过命令行传递该table参数

后面三个参数值得进一步说明:

假设我们定义了两个数据源,firstSource,secondSource,描述如下:

    {
        "name": "batch.sources",
        "params": [
          {
            "name":"firstSource",
            "path": "file:///tmp/sample_article.txt",
            "format": "com.databricks.spark.csv",
            "outputTable": "article",
            "header":true
          },
          {
              "name":"secondSource",
              "path": "file:///tmp/sample_article2.txt",
              "format": "com.databricks.spark.csv",
              "outputTable": "article2",
              "header":true
            }
        ]
      }

我们希望path不是固定的,而是启动时候决定的,这个时候,我们可以在启动脚本中使用-streaming.sql.source.[name].[参数] 来完成这个需求。 比如:

    -streaming.sql.source.firstSource.path  file:///tmp/wow.txt

这个时候,streamingpro启动的时候会动态将path 替换成你要的。包括outputTable等都是可以替换的。

有时候我们需要定时执行一个任务,而sql语句也是动态变化的,具体如下:

    {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select * from test where hp_time=:today",
            "outputTableName": "finalOutputTable"
          }
        ]
      },

这个时候我们在启动streamingpro的时候,通过参数:

    -streaming.sql.params.today  "2017"

动态替换 sql语句里的:today

相关 [github allwefantasy streamingpro] 推荐:

GitHub - allwefantasy/streamingpro: Build Spark Batch/Streaming/MLlib Application by SQL

- -
StreamingPro 中文文档. 应用模式:写json配置文件,StreamingPro启动后执行该文件,可以作为批处理或者流式程序. 服务模式:启动一个StreamingPro Server作为常驻程序,然后通过http接口发送MLSQL脚本进行交互. 我们强烈推荐使用第二种模式,第一种模式现在已经不太更新了,现在迅速迭代的是第二种模式,并且第二种模式可以构建AI平台.

Home · JohnLangford/vowpal_wabbit Wiki · GitHub

- -
There are two ways to have a fast learning algorithm: (a) start with a slow algorithm and speed it up, or (b) build an intrinsically fast learning algorithm.

git和github简介(上)

- linyehui - 没做完,没准备好
在此贴上本人在Web标准化交流会6月25日北京站的主题分享. 在线PPT:http://jinjiang.github.com/slides/learning-git/. PPT源码:https://github.com/Jinjiang/slides/tree/gh-pages/learning-git.

Github使用指南(转)

- - CSDN博客推荐文章
来自:https://github.com/neuola/neuola-legacy/wiki/github%E4%BD%BF%E7%94%A8%E6%8C%87%E5%8D%97. 如果你只是想了解 github 的使用,请跳到 Github 简介一节. 作为程序员大军之一,想必大家有这样的经历吧.

github 上的好东西

- - 收集分享互联网资源
基于HTML5的专业级图像处理开源引擎.

Windows 下 使用TortoiseGit GitHub

- - CSDN博客研发管理推荐文章
TortoiseGit依赖msysgit,首先下载: http://code.google.com/p/msysgit/downloads/detail?name=msysGit-fullinstall-1.8.1.2-preview20130201.exe&can=2&q=. 再下载TortoiseGit: http://code.google.com/p/tortoisegit/wiki/Download?tm=2.

一个 GitHub Trending 小工具

- - IT瘾-dev
Github Trending基本上是我每天都会浏览的网页,上面会及时发布一些GIthub上比较有潜力的项目,或者说每日Star数增量排行榜. 不过由于Github Trending经常会实时更新,即使你访问得再勤,难免还是会错过一些你感兴趣的项目,为此不少人都想出了自己的解决办法,例如. josephyzhou,他的 github-trending项目得到了众多人的青睐,我仔细阅读了他的源码 (Go),发现实现也较为简单, 就用Python 重写了一下,发现代码少了好多,详见 我的 github-trending.

blong/clickhouse .md at master · xingxing9688/blong · GitHub

- -
https://clickhouse.yandex/tutorial.html快速搭建集群参考. https://clickhouse.yandex/reference_en.html官网文档. https://habrahabr.ru/company/smi2/blog/317682/关于集群配置参考.

Github 用户数突破一百万

- pipitu - Solidot
51开源社区 写道 "Gibhub用户数已突破一百万,Github官方博客发布了庆祝图片. 目前已有1,004,771用户,在Github托管了超过2,836,210 个git库. Git是一个分布式的版本控制系统,最初由Linus Torvalds编写,用作Linux内核代码的管理. 推出后,Git在其它项目中也取得了很大成功,尤其是在Ruby社区中.

Linux源代码托管在Github上

- wang - Solidot
在kernel.org服务器遭入侵之后,Linux作者Linus Torvalds于9月3日在Github上创建了帐号,将内核源代码托管到了Github. 他在邮件列表上解释说,由于master.kernel.org下线,他建立了一个临时的源代码托管仓库.