spring boot与spring batch、postgres及elasticsearch整合

标签: spring boot spring | 发表时间:2018-02-23 16:42 | 作者:543726916
出处:http://www.iteye.com

spring batch elasticsearch

1.介绍

当系统有大量数据需要从数据库导入elasticsearch时,使用sping batch可以提高导入的效率。这篇文章使用spring batch将数据从postgres导入elasticsearch。

2.示例

2.1 pom.xml

本文使用spring data jest连接ES(也可以使用spring data elasticsearch连接ES),ES版本为5.5.3

   <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hfcsbc.estl</groupId>
    <artifactId>es-etl</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>es-etl</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.M7</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>

        <dependency>
            <groupId>com.github.vanroy</groupId>
            <artifactId>spring-boot-starter-data-jest</artifactId>
            <version>3.0.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
            <version>5.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>

    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </pluginRepository>
        <pluginRepository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>


</project>
2.2 实体类及repository
   package com.hfcsbc.esetl.domain;

import lombok.Data;
import org.springframework.data.elasticsearch.annotations.Document;

import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.OneToOne;

/**
 * Create by pengchao on 2018/2/23
 */
@Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1")
@Entity
@Data
public class Person {
    @Id
    private Long id;
    private String name;
    @OneToOne
    private Address address;
}
   package com.hfcsbc.esetl.domain;

import lombok.Data;

import javax.persistence.Entity;
import javax.persistence.Id;

/**
 * Create by pengchao on 2018/2/23
 */
@Entity
@Data
public class Address {
    @Id
    private Long id;
    private String name;
}
   package com.hfcsbc.esetl.repository.jpa;

import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.jpa.repository.JpaRepository;

/**
 * Create by pengchao on 2018/2/23
 */
public interface PersonRepository extends JpaRepository<Person, Long> {
}
   package com.hfcsbc.esetl.repository.es;

import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

/**
 * Create by pengchao on 2018/2/23
 */
public interface EsPersonRepository extends ElasticsearchRepository<Person, Long> {
}
2.3 配置elasticsearchItemWriter
   package com.hfcsbc.esetl.itemWriter;

import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemWriter;

import java.util.List;

/**
 * Create by pengchao on 2018/2/23
 */
public class ElasticsearchItemWriter implements ItemWriter<Person>, ItemWriteListener<Person>, StepExecutionListener {

    private EsPersonRepository personRepository;

    public ElasticsearchItemWriter(EsPersonRepository personRepository) {
        this.personRepository = personRepository;
    }

    @Override
    public void beforeWrite(List<? extends Person> items) {

    }

    @Override
    public void afterWrite(List<? extends Person> items) {

    }

    @Override
    public void onWriteError(Exception exception, List<? extends Person> items) {

    }

    @Override
    public void beforeStep(StepExecution stepExecution) {

    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }

    @Override
    public void write(List<? extends Person> items) throws Exception {
        personRepository.saveAll(items);
    }
}
2.5 配置spring batch需要的配置
   package com.hfcsbc.esetl.config;

import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;

/**
 * Create by pengchao on 2018/2/23
 */
@Configuration
@EnableBatchProcessing
public class BatchConfig {
    @Autowired
    private EsPersonRepository personRepository;

    @Bean
    public ItemReader<Person> orderItemReader(EntityManagerFactory entityManagerFactory){
        JpaPagingItemReader<Person> reader = new JpaPagingItemReader<Person>();
        String sqlQuery = "select * from person";
        try {
            JpaNativeQueryProvider<Person> queryProvider = new JpaNativeQueryProvider<Person>();
            queryProvider.setSqlQuery(sqlQuery);
            queryProvider.setEntityClass(Person.class);
            queryProvider.afterPropertiesSet();
            reader.setEntityManagerFactory(entityManagerFactory);
            reader.setPageSize(10000);
            reader.setQueryProvider(queryProvider);
            reader.afterPropertiesSet();
            reader.setSaveState(true);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return reader;
    }

    @Bean
    public ElasticsearchItemWriter itemWriter(){
        return new ElasticsearchItemWriter(personRepository);
    }

    @Bean
    public Step step(StepBuilderFactory stepBuilderFactory,
                     ItemReader itemReader,
                     ItemWriter itemWriter){
        return stepBuilderFactory
                .get("step1")
                .chunk(10000)
                .reader(itemReader)
                .writer(itemWriter)
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, Step step){
        return jobBuilderFactory
                .get("importJob")
                .incrementer(new RunIdIncrementer())
                .flow(step)
                .end()
                .build();
    }

    /**
     * spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:dataSource
     * @param dataSource
     * @param manager
     * @return
     */
    @Bean
    public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(manager);
        jobRepositoryFactoryBean.setDatabaseType("postgres");
        try {
            return jobRepositoryFactoryBean.getObject();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

}
2.5配置数据库及es的连接地址
   spring:
  redis:
    host: 192.168.1.222
  data:
    jest:
      uri: http://192.168.1.222:9200
      username: elastic
      password: changeme

  jpa:
    database: POSTGRESQL
    show-sql: true
    hibernate:
      ddl-auto: update

  datasource:
    platform: postgres
    url: jdbc:postgresql://192.168.1.222:5433/person
    username: hfcb
    password: hfcb
    driver-class-name: org.postgresql.Driver
    max-active: 2

spring.batch.initialize-schema: always
2.6 配置入口类
   package com.hfcsbc.esetl;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;

@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class, ElasticsearchDataAutoConfiguration.class})
@EnableElasticsearchRepositories(basePackages = "com.hfcsbc.esetl.repository")
@EnableJpaRepositories(basePackages = "com.hfcsbc.esetl.repository.jpa")
public class EsEtlApplication {

    public static void main(String[] args) {
        SpringApplication.run(EsEtlApplication.class, args);
    }
}




源码地址: https://github.com/SuperPeng0903/es-etl



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [spring boot spring] 推荐:

spring boot与spring batch、postgres及elasticsearch整合

- - 互联网 - ITeye博客
当系统有大量数据需要从数据库导入elasticsearch时,使用sping batch可以提高导入的效率. 这篇文章使用spring batch将数据从postgres导入elasticsearch. 本文使用spring data jest连接ES(也可以使用spring data elasticsearch连接ES),ES版本为5.5.3.

Spring boot传统部署

- - 企业架构 - ITeye博客
使用spring boot很方便,一个jar包就可以启动了,因为它里面内嵌了tomcat等服务器. 但是spring boot也提供了部署到独立服务器的方法. 如果你看文档的话,从jar转换为war包很简单,pom.xml的配置修改略去不讲. 只看source的修改,很简单,只要一个配置类,继承自SpringBootServletInitializer, 并覆盖configure方法.

值得使用的Spring Boot

- - ImportNew
2013年12月12日,Spring发布了4.0版本. 这个本来只是作为Java平台上的控制反转容器的库,经过将近10年的发展已经成为了一个巨无霸产品. 不过其依靠良好的分层设计,每个功能模块都能保持较好的独立性,是Java平台不可多得的好用的开源应用程序框架. Spring的4.0版本可以说是一个重大的更新,其全面支持Java8,并且对Groovy语言也有良好的支持.

Spring Boot配置多个DataSource

- - 廖雪峰的官方网站
使用Spring Boot时,默认情况下,配置 DataSource非常容易. Spring Boot会自动为我们配置好一个 DataSource. 如果在 application.yml中指定了 spring.datasource的相关配置,Spring Boot就会使用该配置创建一个 DataSource.

Spring boot executable jar/war 原理

- - ImportNew
spring boot里其实不仅可以直接以 Java -jar demo.jar的方式启动,还可以把jar/war变为一个可以执行的脚本来启动,比如./demo.jar. 把这个executable jar/war 链接到/etc/init.d下面,还可以变为Linux下的一个service. 只要在spring boot maven plugin里配置:.

Spring Boot Starter是什么?

- - 技术,永无止境
在工作中我们经常能看到各种各样的springboot starter,如spring-cloud-netflix、spring-cloud-alibaba等等. 这些starter究竟有什么作用呢. 在了解这些starter之前,我们需要先大概知道Spring MVC与Spring Boot的关系.

SPRING BOOT OAUTH2 + KEYCLOAK - service to service call

- - BlogJava-首页技术区
employee-service调用department-service,如果要按OAUTH2.0流程,只需要提供client-id和client-secrect即可. 在KEYCLOAK中引入service-account,即配置该employee-service时,取消standard-flow,同时激活service-account.

Spring Boot使用redis做数据缓存

- - ITeye博客
SysUser.class)); //请注意这里. 3 redis服务器配置. /** *此处的dao操作使用的是spring data jpa,使用@Cacheable可以在任意方法上,*比如@Service或者@Controller的方法上 */ public interface SysUserRepo1 extends CustomRepository {.

spring boot应用启动原理分析

- - ImportNew
在spring boot里,很吸引人的一个特性是可以直接把应用打包成为一个jar/war,然后这个jar/war是可以直接启动的,不需要另外配置一个Web Server. 如果之前没有使用过spring boot可以通过下面的demo来感受下. 下面以这个工程为例,演示如何启动Spring boot项目:.

Apache Shiro和Spring boot的结合使用

- - 企业架构 - ITeye博客
实际上在Spring boot里用Spring Security最合适,毕竟是自家东西,最重要的一点是Spring Security里自带有csrf filter,防止csrf攻击,shiro里就没有. 但是Spring Security有点太复杂,custmize起来比较费力,不如shiro来的简单.