java 调用kettle api实现数据同步

标签: java kettle api | 发表时间:2015-04-18 16:54 | 作者:zhangzhen881024
出处:http://www.iteye.com

数据库:   

kettle 日志表

CREATE TABLE `t_lzfx_data_log` (
  `ID` bigint(20) NOT NULL AUTO_INCREMENT,
  `ID_BATCH` int(11) DEFAULT '0',
  `CHANNEL_ID` varchar(255) DEFAULT NULL,
  `TRANSNAME` varchar(255) DEFAULT NULL,
  `STEPNAME` varchar(200) DEFAULT NULL,
  `STEP_COPY` int(11) DEFAULT NULL,
  `LINES_READ` int(11) DEFAULT NULL,
  `LINES_WRITTEN` int(11) DEFAULT NULL,
  `LINES_UPDATED` int(11) DEFAULT NULL,
  `LINES_INPUT` int(11) DEFAULT NULL,
  `LINES_OUTPUT` int(11) DEFAULT NULL,
  `LINES_REJECTED` int(11) DEFAULT NULL,
  `ERRORS` int(11) DEFAULT NULL,
  `LOG_FIELD` blob,
  `LOG_DATE` datetime DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 数据表:

CREATE TABLE `syonline` (
  `ID` varchar(36) NOT NULL,
  `CREATEDATETIME` datetime DEFAULT NULL,
  `IP` varchar(100) DEFAULT NULL,
  `LOGINNAME` varchar(100) DEFAULT NULL,
  `TYPE` varchar(1) DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


CREATE TABLE `t_lzfx_base_syonline` (
  `ID` varchar(36) NOT NULL,
  `CREATEDATETIME` datetime DEFAULT NULL,
  `IP` varchar(100) DEFAULT NULL,
  `LOGINNAME` varchar(100) DEFAULT NULL,
  `TYPE` varchar(1) DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

JAVA 代码

 /**
	  * 两个库中的表名
	  */
	 public static String bjdt_tablename = "t_lzfx_base_syonline";
	 public static String kettle_tablename = "syonline";
	 public static String kettle_log = "t_lzfx_data_log";
	 
	/**
	 * 数据库连接信息,适用于DatabaseMeta其中 一个构造器DatabaseMeta(String xml)
	 */
	 public static final String[] databasesXML = {
	        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
	          "<connection>" +
	            "<name>bjdt</name>" +
	            "<server>127.0.0.1</server>" +
	            "<type>MYSQL</type>" +
	            "<access>Native</access>" + 
	            "<database>zjdata</database>" +
	            "<port>3306</port>" +
	            "<username>root</username>" +
	            "<password>root</password>" +
	          "</connection>",
	          "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
	          "<connection>" +
	            "<name>kettle</name>" +
	            "<server>127.0.0.1</server>" +
	            "<type>MYSQL</type>" +
	            "<access>Native</access>" + 
	            "<database>kettledb</database>" +
	            "<port>3306</port>" +
	            "<username>root</username>" +
	            "<password>root</password>" +
	          "</connection>"
	    };	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		try {
			KettleEnvironment.init();
			transDemo = new KettleDeleteTest();
			System.out.println("************start to generate my own transformation***********");
			
			TransMeta transMeta = new TransMeta();
			//设置转化的名称 
			transMeta.setName("转换名称");
			
			//添加转换的数据库连接
	        for (int i=0;i<databasesXML.length;i++){
	            DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);
	            transMeta.addDatabase(databaseMeta);
	        }
	        VariableSpace space = new Variables();  
	        //将step日志数据库配置名加入到变量集中  
	        space.setVariable("kettle_log","bjdt");
	        space.initializeVariablesFrom(null);  
	        StepLogTable stepLogTable = StepLogTable.getDefault(space,transMeta);
	        //StepLogTable使用的数据库连接名(上面配置的变量名)。  
	        stepLogTable.setConnectionName("bjdt");
	        //设置Step日志的表名  
	        stepLogTable.setTableName(kettle_log); 
	        //设置TransMeta的StepLogTable  
	        transMeta.setStepLogTable(stepLogTable); 
			
			//******************************************************************
			//第一个表输入步骤(原表数据输入)
			TableInputMeta oldTableInput = new TableInputMeta();
			DatabaseMeta database_bjdt = transMeta.findDatabase("bjdt");
			oldTableInput.setDatabaseMeta(database_bjdt);
			String old_select_sql = "SELECT ID,IP,CREATEDATETIME,LOGINNAME,TYPE FROM "+bjdt_tablename;
			oldTableInput.setSQL(old_select_sql);
			
			//添加TableInputMeta到转换中
			StepMeta oldTableInputMetaStep = new StepMeta("INPUTTABLE_"+bjdt_tablename,oldTableInput);
			//给步骤添加在spoon工具中的显示位置
			transMeta.addStep(oldTableInputMetaStep);
			//*****************************************************************
			//第二个表输入步骤(原表数据输入)
			TableInputMeta newTableInput = new TableInputMeta();
			//给表输入添加一个DatabaseMeta连接数据库
			DatabaseMeta database_kettle = transMeta.findDatabase("kettle");
			newTableInput.setDatabaseMeta(database_kettle);
			String new_select_sql = "SELECT ID,IP,CREATEDATETIME,LOGINNAME,TYPE FROM "+kettle_tablename;
			newTableInput.setSQL(new_select_sql);
			
			//添加TableInputMeta到转换中
			StepMeta newTableInputMetaStep = new StepMeta("INPUTTABLE_"+kettle_tablename,newTableInput);
			//给步骤添加在spoon工具中的显示位置
			transMeta.addStep(newTableInputMetaStep);
			//******************************************************************
			
			//******************************************************************
			//第三个步骤合并
			MergeRowsMeta mergeRowsMeta = new MergeRowsMeta();
///设置合并步骤的新旧数据源
			StepIOMetaInterface stepIOMeta = mergeRowsMeta.getStepIOMeta();
			stepIOMeta.getInfoStreams().get(0).setStepMeta(newTableInputMetaStep);
			stepIOMeta.getInfoStreams().get(1).setStepMeta(oldTableInputMetaStep);
			mergeRowsMeta.setFlagField("bz"); //设置标志字段
			mergeRowsMeta.setKeyFields(new String[]{"ID"});
			mergeRowsMeta.setValueFields(new String[]{"IP","CREATEDATETIME","LOGINNAME","TYPE"});
			StepMeta mergeStepMeta = new StepMeta("合并记录", mergeRowsMeta);
			transMeta.addStep(mergeStepMeta);
			//******************************************************************
			
			//******************************************************************
			//添加HOP把两个输入和合并的步骤关联
			transMeta.addTransHop(new TransHopMeta(oldTableInputMetaStep, mergeStepMeta));
			transMeta.addTransHop(new TransHopMeta(newTableInputMetaStep, mergeStepMeta));
			//******************************************************************
			
			//******************************************************************
			//第四个步骤同步数据
			SynchronizeAfterMergeMeta synchronizeAfterMergeMeta = new SynchronizeAfterMergeMeta();
			synchronizeAfterMergeMeta.setCommitSize(10000); //设置事务提交数量
			synchronizeAfterMergeMeta.setDatabaseMeta(database_kettle); //目标数据源
			synchronizeAfterMergeMeta.setSchemaName("");//数据表schema
			synchronizeAfterMergeMeta.setTableName(kettle_tablename); //数据表名称
			synchronizeAfterMergeMeta.setUseBatchUpdate(true); //设置批量更新
			//设置用来查询的关键字
			synchronizeAfterMergeMeta.setKeyLookup(new String[]{"ID"}); //设置用来查询的关键字
			synchronizeAfterMergeMeta.setKeyStream(new String[]{"ID"}); //设置流输入的字段
			synchronizeAfterMergeMeta.setKeyStream2(new String[]{""});//一定要加上
			synchronizeAfterMergeMeta.setKeyCondition(new String[]{"="}); //设置操作符
			//设置要更新的字段
			String[] updatelookup = {"ID","IP","CREATEDATETIME","LOGINNAME","TYPE"} ;
	 		String [] updateStream = {"ID","IP","CREATEDATETIME","LOGINNAME","TYPE"};
	 		Boolean[] updateOrNot = {false,true,true,true,true};
	 		synchronizeAfterMergeMeta.setUpdateLookup(updatelookup);
	 		synchronizeAfterMergeMeta.setUpdateStream(updateStream);
	 		synchronizeAfterMergeMeta.setUpdate(updateOrNot);
	 		
	 		//设置高级属性(操作)
	 		synchronizeAfterMergeMeta.setOperationOrderField("bz"); //设置操作标志字段名
	 		synchronizeAfterMergeMeta.setOrderInsert("new");
	 		synchronizeAfterMergeMeta.setOrderUpdate("changed");
	 		synchronizeAfterMergeMeta.setOrderDelete("deleted");
	 		StepMeta synStepMeta = new StepMeta("数据同步", synchronizeAfterMergeMeta);
	 		transMeta.addStep(synStepMeta);
			//******************************************************************
			
	 		//******************************************************************
			//添加HOP把合并和数据同步的步骤关联
			transMeta.addTransHop(new TransHopMeta(mergeStepMeta,synStepMeta));
			//******************************************************************
			
			String transXml = transMeta.getXML();
			System.out.println("transXml:"+transXml);
			
		    Trans trans = new Trans(transMeta);

		    trans.execute(null); // You can pass arguments instead of null.
		    trans.waitUntilFinished();
		    if ( trans.getErrors() > 0 )
		    {
		      throw new RuntimeException( "There were errors during transformation execution." );
		    }
		    System.out.println("***********the end************");
		} catch (Exception e) {
			e.printStackTrace();
			return;
		}
		
	}

 执行结果

2015/04/18 17:04:18 - 转换名称 - 为了转换解除补丁开始  [转换名称]
2015/04/18 17:04:20 - 合并记录.0 - 行号50000
2015/04/18 17:04:21 - INPUTTABLE_syonline.0 - linenr 50000
2015/04/18 17:04:21 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 50000
2015/04/18 17:04:21 - 合并记录.0 - 行号100000
2015/04/18 17:04:21 - 合并记录.0 - 行号150000
2015/04/18 17:04:21 - INPUTTABLE_syonline.0 - linenr 100000
2015/04/18 17:04:21 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 100000
2015/04/18 17:04:21 - 合并记录.0 - 行号200000
2015/04/18 17:04:21 - 合并记录.0 - 行号250000
2015/04/18 17:04:22 - INPUTTABLE_syonline.0 - linenr 150000
2015/04/18 17:04:22 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 150000
2015/04/18 17:04:22 - 合并记录.0 - 行号300000
2015/04/18 17:04:22 - 合并记录.0 - 行号350000
2015/04/18 17:04:22 - INPUTTABLE_syonline.0 - linenr 200000
2015/04/18 17:04:22 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 200000
2015/04/18 17:04:22 - 合并记录.0 - 行号400000
2015/04/18 17:04:23 - 合并记录.0 - 行号450000
2015/04/18 17:04:23 - INPUTTABLE_syonline.0 - linenr 250000
2015/04/18 17:04:23 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 250000
2015/04/18 17:04:23 - 合并记录.0 - 行号500000
2015/04/18 17:04:23 - 合并记录.0 - 行号550000
2015/04/18 17:04:23 - INPUTTABLE_syonline.0 - linenr 300000
2015/04/18 17:04:24 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 300000
2015/04/18 17:04:24 - 合并记录.0 - 行号600000
2015/04/18 17:04:24 - 合并记录.0 - 行号650000
2015/04/18 17:04:24 - INPUTTABLE_syonline.0 - linenr 350000
2015/04/18 17:04:24 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 350000
2015/04/18 17:04:24 - 合并记录.0 - 行号700000
2015/04/18 17:04:24 - 合并记录.0 - 行号750000
2015/04/18 17:04:25 - INPUTTABLE_syonline.0 - linenr 400000
2015/04/18 17:04:25 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 400000
2015/04/18 17:04:25 - 合并记录.0 - 行号800000
2015/04/18 17:04:25 - 合并记录.0 - 行号850000
2015/04/18 17:04:25 - INPUTTABLE_syonline.0 - linenr 450000
2015/04/18 17:04:26 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 450000
2015/04/18 17:04:26 - 合并记录.0 - 行号900000
2015/04/18 17:04:26 - 合并记录.0 - 行号950000
2015/04/18 17:04:26 - INPUTTABLE_syonline.0 - linenr 500000
2015/04/18 17:04:26 - INPUTTABLE_syonline.0 - Finished reading query, closing connection.
2015/04/18 17:04:26 - INPUTTABLE_syonline.0 - 完成处理 (I=500184, O=0, R=0, W=500184, U=0, E=0
2015/04/18 17:04:26 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 500000
2015/04/18 17:04:26 - INPUTTABLE_t_lzfx_base_syonline.0 - Finished reading query, closing connection.
2015/04/18 17:04:26 - 合并记录.0 - 行号1000000
2015/04/18 17:04:26 - INPUTTABLE_t_lzfx_base_syonline.0 - 完成处理 (I=500184, O=0, R=0, W=500184, U=0, E=0
2015/04/18 17:04:26 - 合并记录.0 - 完成处理 (I=0, O=0, R=1000368, W=500184, U=0, E=0
2015/04/18 17:04:26 - 数据同步.0 - 完成处理 (I=0, O=0, R=500184, W=500184, U=0, E=0

 



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [java kettle api] 推荐:

java 调用kettle api实现数据同步

- - 开源软件 - ITeye博客
* 数据库连接信息,适用于DatabaseMeta其中 一个构造器DatabaseMeta(String xml). transMeta.setName("转换名称");. //将step日志数据库配置名加入到变量集中. //StepLogTable使用的数据库连接名(上面配置的变量名). //设置Step日志的表名.

java调用kettle api 操作日志写入到数据库表

- - 开源软件 - ITeye博客
//将step日志数据库配置名加入到变量集中. //StepLogTable使用的数据库连接名(上面配置的变量名). //设置Step日志的表名. //设置TransMeta的StepLogTable. 已有 0 人发表留言,猛击->> 这里<<-参与讨论. —软件人才免语言低担保 赴美带薪读研.

Java API 设计清单 « 友好的API

- - 东西
在设计Java API的时候总是有很多不同的规范和考量. 与任何复杂的事物一样,这项工作往往就是在考验我们思考的缜密程度. 就像飞行员起飞前的检查清单,这张清单将帮助软件设计者在设计Java API的过程中回忆起那些明确的或者不明确的规范. 本文也可以看作为“ API设计指南”这篇文章的附录. 我们还准备了一些前后比对的例子来展示这个列表如何帮助你理清设计需求,找出错误,识别糟糕的设计实践以及如何寻找改进的时机.

Java的日期API真烂

- - 四火的唠叨
文章系本人原创,转载请保持完整性并注明出自 《四火的唠叨》. 记得在我刚学Java的时候,真是搞不清楚Date和Calendar这两个类,后来我渐渐知道,原来不能全怪我啊,Java日期API之烂是公认的(不妨参见 这篇文章,Tiago Fernandez做过一个投票,就是要选举最烂的Java API,结果Java日期API排行第二,仅次于臭名远扬的EJB2,嘿嘿).

rabbitmq java client api详解

- - 五四陈科学院
以下内容由 [五四陈科学院]提供. AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现. 每个rabbitmq-server叫做一个Broker,等着tcp连接进入. 在rabbitmq-server进程内有Exchange,定义了这个消息的发送类型. Queue是进程内的逻辑队列,有多个,有名字.

前Sun CEO:Android无需获Java API许可

- - 业界
北京时间4月27日,据国外媒体 CNET报道,前Sun首席执行官Jonathan Schwartz周四在甲骨文诉谷歌侵权案中作证时称,Java API不应被视为专利或受Sun的保护,只要谷歌不使用Java这个词. Schwartz解释开放软件的性质时表示:“这些都是开放的API,我们想让更多的人使用,我们希望建起最大的帐篷,邀请尽可能多的人来.

Java规则引擎与其API(JSR-94)

- - 行业应用 - ITeye博客
本文对Java规则引擎与其API(JSR-94)及相关实现做了较详细的介绍,对其体系结构和API应用有较详尽的描述,并指出Java规则引擎,规则语言,JSR-94的相互关系,以及JSR-94的不足之处和展望. 复杂企业级项目的开发以及其中随外部条件不断变化的业务规则(business logic),迫切需要分离商业决策者的商业决策逻辑和应用开发者的技术决策,并把这些商业决策放在中心数据库或其他统一的地方,让它们能在运行时(即商 务时间)可以动态地管理和修改从而提供软件系统的柔性和适应性.

elasticsearch java API------批量添加索引

- - 行业应用 - ITeye博客
elasticsearch java API------批量添加索引.         person.setName("张三" + i);  .         person.setSex("男");  .         String index = "user"; // 相当于数据库名  .         String type = "tb_person"; // 相当于表名  .

Spring Boot 中使用 Java API 调用 lucene

- - SegmentFault 最新的文章
Lucene是apache软件基金会4 jakarta项目组的一个子项目,是一个开放源代码的全文检索引擎工具包,但它不是一个完整的全文检索引擎,而是一个全文检索引擎的架构,提供了完整的查询引擎和索引引擎,部分文本分析引擎(英文与德文两种西方语言). Lucene的目的是为软件开发人员提供一个简单易用的工具包,以方便的在目标系统中实现全文检索的功能,或者是以此为基础建立起完整的全文检索引擎.

Kettle 创建 Transformation

- - CSDN博客推荐文章
1.第一步,先准备数据和工具. 安装好mysql以及客户端工具.   `status` int(11) NOT NULL COMMENT '对内= 1 ,对外= 2',. 以上我们建了四个表,客户customer,账户account,交易记录trade,交易明细表trade_detail. 用下面的sql查询一下得到每个客户下每个账户的交易明细.