spring boot与spring batch、postgres及elasticsearch整合
- - 互联网 - ITeye博客当系统有大量数据需要从数据库导入elasticsearch时,使用sping batch可以提高导入的效率. 这篇文章使用spring batch将数据从postgres导入elasticsearch. 本文使用spring data jest连接ES(也可以使用spring data elasticsearch连接ES),ES版本为5.5.3.
当系统有大量数据需要从数据库导入elasticsearch时,使用sping batch可以提高导入的效率。这篇文章使用spring batch将数据从postgres导入elasticsearch。
本文使用spring data jest连接ES(也可以使用spring data elasticsearch连接ES),ES版本为5.5.3
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hfcsbc.estl</groupId>
<artifactId>es-etl</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>es-etl</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.M7</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>com.github.vanroy</groupId>
<artifactId>spring-boot-starter-data-jest</artifactId>
<version>3.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<version>5.3.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>
package com.hfcsbc.esetl.domain;
import lombok.Data;
import org.springframework.data.elasticsearch.annotations.Document;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.OneToOne;
/**
* Create by pengchao on 2018/2/23
*/
@Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1")
@Entity
@Data
public class Person {
@Id
private Long id;
private String name;
@OneToOne
private Address address;
}
package com.hfcsbc.esetl.domain;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.Id;
/**
* Create by pengchao on 2018/2/23
*/
@Entity
@Data
public class Address {
@Id
private Long id;
private String name;
}
package com.hfcsbc.esetl.repository.jpa;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* Create by pengchao on 2018/2/23
*/
public interface PersonRepository extends JpaRepository<Person, Long> {
}
package com.hfcsbc.esetl.repository.es;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
/**
* Create by pengchao on 2018/2/23
*/
public interface EsPersonRepository extends ElasticsearchRepository<Person, Long> {
}
package com.hfcsbc.esetl.itemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemWriter;
import java.util.List;
/**
* Create by pengchao on 2018/2/23
*/
public class ElasticsearchItemWriter implements ItemWriter<Person>, ItemWriteListener<Person>, StepExecutionListener {
private EsPersonRepository personRepository;
public ElasticsearchItemWriter(EsPersonRepository personRepository) {
this.personRepository = personRepository;
}
@Override
public void beforeWrite(List<? extends Person> items) {
}
@Override
public void afterWrite(List<? extends Person> items) {
}
@Override
public void onWriteError(Exception exception, List<? extends Person> items) {
}
@Override
public void beforeStep(StepExecution stepExecution) {
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
@Override
public void write(List<? extends Person> items) throws Exception {
personRepository.saveAll(items);
}
}
package com.hfcsbc.esetl.config;
import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;
/**
* Create by pengchao on 2018/2/23
*/
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private EsPersonRepository personRepository;
@Bean
public ItemReader<Person> orderItemReader(EntityManagerFactory entityManagerFactory){
JpaPagingItemReader<Person> reader = new JpaPagingItemReader<Person>();
String sqlQuery = "select * from person";
try {
JpaNativeQueryProvider<Person> queryProvider = new JpaNativeQueryProvider<Person>();
queryProvider.setSqlQuery(sqlQuery);
queryProvider.setEntityClass(Person.class);
queryProvider.afterPropertiesSet();
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize(10000);
reader.setQueryProvider(queryProvider);
reader.afterPropertiesSet();
reader.setSaveState(true);
} catch (Exception e) {
e.printStackTrace();
}
return reader;
}
@Bean
public ElasticsearchItemWriter itemWriter(){
return new ElasticsearchItemWriter(personRepository);
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory,
ItemReader itemReader,
ItemWriter itemWriter){
return stepBuilderFactory
.get("step1")
.chunk(10000)
.reader(itemReader)
.writer(itemWriter)
.build();
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, Step step){
return jobBuilderFactory
.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(step)
.end()
.build();
}
/**
* spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:dataSource
* @param dataSource
* @param manager
* @return
*/
@Bean
public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(manager);
jobRepositoryFactoryBean.setDatabaseType("postgres");
try {
return jobRepositoryFactoryBean.getObject();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
spring:
redis:
host: 192.168.1.222
data:
jest:
uri: http://192.168.1.222:9200
username: elastic
password: changeme
jpa:
database: POSTGRESQL
show-sql: true
hibernate:
ddl-auto: update
datasource:
platform: postgres
url: jdbc:postgresql://192.168.1.222:5433/person
username: hfcb
password: hfcb
driver-class-name: org.postgresql.Driver
max-active: 2
spring.batch.initialize-schema: always
package com.hfcsbc.esetl;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class, ElasticsearchDataAutoConfiguration.class})
@EnableElasticsearchRepositories(basePackages = "com.hfcsbc.esetl.repository")
@EnableJpaRepositories(basePackages = "com.hfcsbc.esetl.repository.jpa")
public class EsEtlApplication {
public static void main(String[] args) {
SpringApplication.run(EsEtlApplication.class, args);
}
}