当前位置: 移动技术网 > IT编程>开发语言>Java > 基于Spring Batch向Elasticsearch批量导入数据示例

基于Spring Batch向Elasticsearch批量导入数据示例

2019年07月19日  | 移动技术网IT编程  | 我要评论

1.介绍

当系统有大量数据需要从数据库导入elasticsearch时,使用spring batch可以提高导入的效率。spring batch使用itemreader分页读取数据,itemwriter批量写数据。由于spring batch没有提供elastisearch的itemwriter和itemreader,本示例中自定义一个elasticsearchitemwriter(elasticsearchitemreader),用于批量导入。

2.示例

2.1 pom.xml

本文使用spring data jest连接es(也可以使用spring data elasticsearch连接es),es版本为5.5.3

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
  xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelversion>4.0.0</modelversion>

  <groupid>com.hfcsbc.estl</groupid>
  <artifactid>es-etl</artifactid>
  <version>0.0.1-snapshot</version>
  <packaging>jar</packaging>

  <name>es-etl</name>
  <description>demo project for spring boot</description>

  <parent>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-parent</artifactid>
    <version>2.0.0.m7</version>
    <relativepath/> <!-- lookup parent from repository -->
  </parent>

  <properties>
    <project.build.sourceencoding>utf-8</project.build.sourceencoding>
    <project.reporting.outputencoding>utf-8</project.reporting.outputencoding>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter</artifactid>
    </dependency>

    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-data-jpa</artifactid>
    </dependency>

    <dependency>
      <groupid>org.postgresql</groupid>
      <artifactid>postgresql</artifactid>
    </dependency>

    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-batch</artifactid>
    </dependency>

    <dependency>
      <groupid>com.github.vanroy</groupid>
      <artifactid>spring-boot-starter-data-jest</artifactid>
      <version>3.0.0.release</version>
    </dependency>

    <dependency>
      <groupid>io.searchbox</groupid>
      <artifactid>jest</artifactid>
      <version>5.3.2</version>
    </dependency>

    <dependency>
      <groupid>org.projectlombok</groupid>
      <artifactid>lombok</artifactid>
    </dependency>

    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-test</artifactid>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-maven-plugin</artifactid>
      </plugin>
    </plugins>
  </build>

  <repositories>
    <repository>
      <id>spring-snapshots</id>
      <name>spring snapshots</name>
      <url>https://repo.spring.io/snapshot</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
    <repository>
      <id>spring-milestones</id>
      <name>spring milestones</name>
      <url>https://repo.spring.io/milestone</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>

  </repositories>

  <pluginrepositories>
    <pluginrepository>
      <id>spring-snapshots</id>
      <name>spring snapshots</name>
      <url>https://repo.spring.io/snapshot</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </pluginrepository>
    <pluginrepository>
      <id>spring-milestones</id>
      <name>spring milestones</name>
      <url>https://repo.spring.io/milestone</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </pluginrepository>
  </pluginrepositories>
</project>

2.2 实体类及repository

package com.hfcsbc.esetl.domain;
import lombok.data;
import org.springframework.data.elasticsearch.annotations.document;
import org.springframework.data.elasticsearch.annotations.field;
import org.springframework.data.elasticsearch.annotations.fieldtype;
import javax.persistence.entity;
import javax.persistence.id;
import javax.persistence.onetoone;

/**
 * create by pengchao on 2018/2/23
 */
@document(indexname = "person", type = "person", shards = 1, replicas = 0, refreshinterval = "-1")
@entity
@data
public class person {
  @id
  private long id;
  private string name;
  @onetoone
  @field(type = fieldtype.nested)
  private address address;
}

package com.hfcsbc.esetl.domain;
import lombok.data;
import javax.persistence.entity;
import javax.persistence.id;
/**
 * create by pengchao on 2018/2/23
 */
@entity
@data
public class address {
  @id
  private long id;
  private string name;
}

package com.hfcsbc.esetl.repository.jpa;
import com.hfcsbc.esetl.domain.person;
import org.springframework.data.jpa.repository.jparepository;
/**
 * create by pengchao on 2018/2/23
 */
public interface personrepository extends jparepository<person, long> {
}

package com.hfcsbc.esetl.repository.es;
import com.hfcsbc.esetl.domain.person;
import org.springframework.data.elasticsearch.repository.elasticsearchrepository;
/**
 * create by pengchao on 2018/2/23
 */
public interface espersonrepository extends elasticsearchrepository<person, long> {
}

2.3 配置elasticsearchitemwriter

package com.hfcsbc.esetl.itemwriter;
import com.hfcsbc.esetl.repository.es.espersonrepository;
import com.hfcsbc.esetl.domain.person;
import org.springframework.batch.core.exitstatus;
import org.springframework.batch.core.itemwritelistener;
import org.springframework.batch.core.stepexecution;
import org.springframework.batch.core.stepexecutionlistener;
import org.springframework.batch.item.itemwriter;
import java.util.list;
/**
 * create by pengchao on 2018/2/23
 */
public class elasticsearchitemwriter implements itemwriter<person>, itemwritelistener<person>, stepexecutionlistener {

  private espersonrepository personrepository;

  public elasticsearchitemwriter(espersonrepository personrepository) {
    this.personrepository = personrepository;
  }

  @override
  public void beforewrite(list<? extends person> items) {

  }

  @override
  public void afterwrite(list<? extends person> items) {

  }

  @override
  public void onwriteerror(exception exception, list<? extends person> items) {

  }

  @override
  public void beforestep(stepexecution stepexecution) {

  }

  @override
  public exitstatus afterstep(stepexecution stepexecution) {
    return null;
  }

  @override
  public void write(list<? extends person> items) throws exception {
    //实现类abstractelasticsearchrepository的saveall方法调用的是elasticsearchoperations.bulkindex(queries),为批量索引
    personrepository.saveall(items);
  }
}

2.4 配置elasticsearchitemreader(本示例未使用,仅供参考)

package com.hfcsbc.esetl.itemreader;
import org.springframework.batch.item.data.abstractpaginateddataitemreader;
import org.springframework.beans.factory.initializingbean;
import org.springframework.data.elasticsearch.core.elasticsearchoperations;
import org.springframework.data.elasticsearch.core.query.searchquery;
import java.util.iterator;
/**
 * create by pengchao on 2018/2/24
 */
public class elasticsearchitemreader<person> extends abstractpaginateddataitemreader<person> implements initializingbean {

  private final elasticsearchoperations elasticsearchoperations;

  private final searchquery query;

  private final class<? extends person> targettype;

  public elasticsearchitemreader(elasticsearchoperations elasticsearchoperations, searchquery query, class<? extends person> targettype) {
    this.elasticsearchoperations = elasticsearchoperations;
    this.query = query;
    this.targettype = targettype;
  }

  @override
  protected iterator<person> dopageread() {
    return (iterator<person>)elasticsearchoperations.queryforlist(query, targettype).iterator();
  }

  @override
  public void afterpropertiesset() throws exception {
  }
}

2.5 配置spring batch需要的配置

package com.hfcsbc.esetl.config;
import com.hfcsbc.esetl.itemwriter.elasticsearchitemwriter;
import com.hfcsbc.esetl.repository.es.espersonrepository;
import com.hfcsbc.esetl.domain.person;
import org.springframework.batch.core.job;
import org.springframework.batch.core.step;
import org.springframework.batch.core.configuration.annotation.enablebatchprocessing;
import org.springframework.batch.core.configuration.annotation.jobbuilderfactory;
import org.springframework.batch.core.configuration.annotation.stepbuilderfactory;
import org.springframework.batch.core.launch.support.runidincrementer;
import org.springframework.batch.core.repository.jobrepository;
import org.springframework.batch.core.repository.support.jobrepositoryfactorybean;
import org.springframework.batch.item.itemreader;
import org.springframework.batch.item.itemwriter;
import org.springframework.batch.item.database.jpapagingitemreader;
import org.springframework.batch.item.database.orm.jpanativequeryprovider;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.transaction.platformtransactionmanager;
import javax.persistence.entitymanagerfactory;
import javax.sql.datasource;
/**
 * create by pengchao on 2018/2/23
 */
@configuration
@enablebatchprocessing
public class batchconfig {
  @autowired
  private espersonrepository personrepository;

  @bean
  public itemreader<person> orderitemreader(entitymanagerfactory entitymanagerfactory){
    jpapagingitemreader<person> reader = new jpapagingitemreader<person>();
    string sqlquery = "select * from person";
    try {
      jpanativequeryprovider<person> queryprovider = new jpanativequeryprovider<person>();
      queryprovider.setsqlquery(sqlquery);
      queryprovider.setentityclass(person.class);
      queryprovider.afterpropertiesset();
      reader.setentitymanagerfactory(entitymanagerfactory);
      reader.setpagesize(10000);
      reader.setqueryprovider(queryprovider);
      reader.afterpropertiesset();
      reader.setsavestate(true);
    } catch (exception e) {
      e.printstacktrace();
    }

    return reader;
  }

  @bean
  public elasticsearchitemwriter itemwriter(){
    return new elasticsearchitemwriter(personrepository);
  }

  @bean
  public step step(stepbuilderfactory stepbuilderfactory,
           itemreader itemreader,
           itemwriter itemwriter){
    return stepbuilderfactory
        .get("step1")
        .chunk(10000)
        .reader(itemreader)
        .writer(itemwriter)
        .build();
  }

  @bean
  public job job(jobbuilderfactory jobbuilderfactory, step step){
    return jobbuilderfactory
        .get("importjob")
        .incrementer(new runidincrementer())
        .flow(step)
        .end()
        .build();
  }

  /**
   * spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:datasource
   * @param datasource
   * @param manager
   * @return
   */
  @bean
  public jobrepository jobrepository(datasource datasource, platformtransactionmanager manager){
    jobrepositoryfactorybean jobrepositoryfactorybean = new jobrepositoryfactorybean();
    jobrepositoryfactorybean.setdatasource(datasource);
    jobrepositoryfactorybean.settransactionmanager(manager);
    jobrepositoryfactorybean.setdatabasetype("postgres");
    try {
      return jobrepositoryfactorybean.getobject();
    } catch (exception e) {
      e.printstacktrace();
    }
    return null;
  }
}

2.6配置数据库及es的连接地址

spring:
 redis:
  host: 192.168.1.222
 data:
  jest:
   uri: http://192.168.1.222:9200
   username: elastic
   password: changeme

 jpa:
  database: postgresql
  show-sql: true
  hibernate:
   ddl-auto: update

 datasource:
  platform: postgres
  url: jdbc:postgresql://192.168.1.222:5433/person
  username: hfcb
  password: hfcb
  driver-class-name: org.postgresql.driver
  max-active: 2

spring.batch.initialize-schema: always

2.7 配置入口类

package com.hfcsbc.esetl;

import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchautoconfiguration;
import org.springframework.boot.autoconfigure.data.elasticsearch.elasticsearchdataautoconfiguration;
import org.springframework.data.elasticsearch.repository.config.enableelasticsearchrepositories;
import org.springframework.data.jpa.repository.config.enablejparepositories;

@springbootapplication(exclude = {elasticsearchautoconfiguration.class, elasticsearchdataautoconfiguration.class})
@enableelasticsearchrepositories(basepackages = "com.hfcsbc.esetl.repository")
@enablejparepositories(basepackages = "com.hfcsbc.esetl.repository.jpa")
public class esetlapplication {

  public static void main(string[] args) {
    springapplication.run(esetlapplication.class, args);
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网