ETL工具Pentaho Kettle的transformation和job集成
- - CSDN博客推荐文章Kettle是一款国外开源的etl工具,纯java编写,数据抽取高效稳定(数据迁移工具). Kettle中有两种脚本文件,transformation和job,transformation完成针对数据的基础转换,job则完成整个工作流的控制. 2.1.
transformation实现解析.
Kettle是一款国外开源的etl工具,纯java编写,数据抽取高效稳定(数据迁移工具)。Kettle中有两种脚本文件,transformation和job,transformation完成针对数据的基础转换,job则完成整个工作流的控制。
// 初始化Kettle环境,加载配置
KettleEnvironment. init();
//文件路径及文件名
String filename=”/foo/bar/trans.ktr”;
//解析transformation文件
TransMeta transmeta = new TransMeta(filename);
//加载transformation
Trans trans = new Trans(transmeta);
//在独立线程中执行transformation,“null”可以用参数集替代
trans.execute( null);
//等待transformation执行完毕
trans.waitUntilFinished();
//获取执行结果
Result result=trans.getResult();
// 初始化Kettle环境,加载配置
KettleEnvironment. init();
//文件路径及文件名
String filename=”/foo/bar/jobn.kjb”;
//解析job文件,不使用资源库
JobMeta jobmeta= new JobMeta(filename, null, null);
//加载job
Job job= new Job( null, jobmeta);
//在独立线程中执行job
job.start();
//等待job执行完毕
job.waitUntilFinished();
//获取执行结果
Result result=job.getResult();
// 初始化Kettle环境,加载配置
KettleEnvironment. init();
//资源库类型插件初始化
PluginRegistry.init();
//资源库对象实例化
Repository repository= null;
RepositoriesMeta repositoriesMeta = new RepositoriesMeta();
//读取资源库
repositoriesMeta.readData();
//遍历资源库
/*
for ( int i = 0; i < repositoriesMeta.nrRepositories(); i++ ) {
RepositoryMeta rinfo = repositoriesMeta.getRepository( i );
System. out.println( "#"+ ( i + 1 ) + " : " + rinfo.getName() + " [" + rinfo.getDescription() + "] id=" + rinfo.getId() );
}
*/
//根据资源库名称“1.0”查找资源库
RepositoryMeta repositoryMeta = repositoriesMeta.findRepository( "1.0" );
// 获取PluginRegistry实例
PluginRegistry registry = PluginRegistry. getInstance();
//加载资源库
repository = registry.loadClass(RepositoryPluginType. class,repositoryMeta, Repository. class);
//资源库初始化
repository.init(repositoryMeta);
//获取资源库路径
RepositoryDirectoryInterface directory = repository.loadRepositoryDirectoryTree();
// JobMeta实例化
JobMeta jobmeta = new JobMeta();
//解析资源库的job文件
jobmeta = repository.loadJob("job2", directory, null, null);
//job实例化
Job job = null;
//加载资源库job
job = new Job(repository, jobmeta);
//在独立线程中执行job
job.start();
//等待job执行完毕
job.waitUntilFinished();
//获取执行结果
Result result=job.getResult();
日志输出准备:
// FileLoggingEventListener实例化
FileLoggingEventListener fileLoggingEventListener= null;
//tran.log文件追加日志,true表示追加,false表示不追加
fileLoggingEventListener= new FileLoggingEventListener( "tran.log", true );
运行结果日志输出:
//获取结果集
Map<String,ResultFile> map=result.getResultFiles();
//遍历运行结果,并输出日志文件
for(String key:map.keySet()){
//获取ResultFile对象
ResultFile rf=map.get(key);
//创建日志通道
LogChannelInterface log = new LogChannel( "运行结果" );
//输出日志到日志文件
log.logBasic(rf.getFile().getName().toString());