使用NiFi简单的从网站获取数据写入到mysql数据库
最近稍微研究了下etl工具nifi,它是Apache下面的一个项目,是用作数据流处理的,具体的就不多做介绍了,网上也有很多介绍,官网是 https://nifi.apache.org/docs.html,我在这里做了一个从中移物联网onenet平台抽取数据到mysql的功能,没有做数据过滤以及中文转码乱码的情况处理,这2个地方还不清楚怎么做,如果会的同学也帮忙指导我一下,我们接下来看看整个的设计过程。
首先我们在官网上面下载nifi,我这里使用的版本是1.9.2,解压nifi后,我们就可以进入该文件夹下面的bin目录,点击run-nifi.bat启动nifi,可能nifi启动毕竟慢的原因,我关闭后重新启动一两次才能打开nifi的网页,默认端口是8080,可以在conf文件夹下面的nifi.properties修改端口号nifi.web.http.port。
进入到nifi主页后,我们可以看到如下图片
我们选择第一个图标,按住鼠标拖动该图标到空白页即可,显示出如下图片
在搜索框里面输入GetHttp,然后双击它就可以了,然后在空白页就会出现
双击或者右键点击configure选项,我们可以在settings->name下自定义名称,我们这里主要介绍下properties页签下的配置,其他的页签配置可以参考官网介绍,在properties页签下我们首先得定义从哪里取数据即url属性,在配置url属性之前我们可以可以在nifi配置一个公共的url地址属性,在空白处右键点击鼠标可以出现如下所示的内容
选择variables属性,进入后如下所示
添加如上所示的属性,现在回到GetHttp的properties页签上,在url属性上输入以下值
filename属性就是以后的flowfile名称,同时定义一个自定义属性api-key,用于参数的传递,接下来需要注意的是必须定义一个SSL Context Service,点击该属性的value值,如果没有定义过该属性值,需要我们自己新创建一个SSL Context Service,如下所示
点击create new service后进入如下界面
根据情况选择其中一个即可,通过SSL Context Service最右边的箭头符号进入到创建好的value值后,我们可以在properties页签中定义keystore和truststore2种方式的加密方式,我这里选择了keystore的方式,可以参考 https://www.cnblogs.com/aiaitie/p/9525564.html这个文章或者百度下keystore查询,生成后的引用方式
设置正确后,可以看到有个“雷电”符号,点击它表示启用就可以完成该SSL Context Service的应用了,因为从onenet接收过来的数据是
{ "errno" : 0, "data" : { "private" : true, "protocol" : "DTU", "create_time" : "2019-07-16 10:25:48", "act_time" : "2019-07-16 17:02:09", "online" : true, "id" : "设备id", "auth_info" : "设备编号", "last_ct" : "2019-07-30 08:27:27", "datastreams" : [ { "create_time" : "2019-07-16 17:02:11", "uuid" : "ae14d1f5-b789-4ea7-a894-a13b447283bd", "id" : "xxx" }, ], "title" : "设备名称", "desc" : "power monitor device" }, "error" : "succ" }
我们需要将数据进行json平展,所以引入了FlattenJson Processor进行json数据平展,如下所示
默认情况下分隔符是“.”,平展后的数据是
{ "errno" : 0, "data.private" : true, "data.protocol" : "DTU", "data.create_time" : "2019-07-16 10:25:48", "data.act_time" : "2019-07-16 17:02:09", "data.online" : true, "data.id" : "设备id", "data.auth_info" : "设备编号", "data.last_ct" : "2019-07-30 08:27:27", "data.datastreams" : [ { "create_time" : "2019-07-16 17:02:11", "uuid" : "ae14d1f5-b789-4ea7-a894-a13b447283bd", "id" : "xxx" }], "data.title" : "鍙板彉2", "data.desc" : "power monitor device", "error" : "succ" }
因为在mysql是不能使用“.”来数据插入操作的,而nifi是根据这些字段值进行数据对比和传值的,因此还需要把“.”替换为“_”来进行操作,因此我们还需要引入replaceText processor来进行替换操作,如下所示
替换完成后接下来我们就可以将json转换为sql进行插入操作了,这个时候我们使用ConvertJSONToSQL processor进行数据的转换,在properties页签中我们需要定义JDBC Connection pool,点击value值,如果没有则新增一个,进入到JDBC Connection pool中,在DBCPConnectionPool的properties页签中我们定义需要的的相关属性,如下所示
这里需要注意了,因为nifi没有导入mysql的相关jar包,需要我们自己去引入,把我们需要的mysql jar包放入到lib文件夹下面即可,就可以正常的进行使用了,配置好以后同样通过“雷电”符号进行启用配置,当然如果只有ConvertJSONToSQL processor还是不能把数据写入到mysql数据库中的,我们还要引用putsql processor才行,配置如下
整体上我们的nifi配置如下所示
数据库结果如下:
这里还有个奇怪的是做update操作的时候,它会将data_id字段修改为DATAID的sql语句,如下所示
UPDATE device SET data_title = ?, data_desc = ? WHERE DATAID = ?
也不是很清楚这个出现的原因,会的同学麻烦指导下。
已有 0 人发表留言,猛击->> 这里<<-参与讨论
ITeye推荐