当前位置: 移动技术网 > IT编程>开发语言>Java > Spring Batch 读 10 万条记录,写到 MongoDB

Spring Batch 读 10 万条记录,写到 MongoDB

2020年05月11日  | 移动技术网IT编程  | 我要评论

实践内容

从 mariadb 一张表内读 10 万条记录,经处理后写到 mongodb 。

具体实现

1、新建 spring boot 应用,依赖如下:

        <!-- web 应用 -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-web</artifactid>
            <exclusions>
                <exclusion>
                    <groupid>org.springframework.boot</groupid>
                    <artifactid>spring-boot-starter-logging</artifactid>
                </exclusion>
                <exclusion>
                    <groupid>org.springframework.boot</groupid>
                    <artifactid>spring-boot-starter-tomcat</artifactid>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- web 容器 undertow -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-undertow</artifactid>
        </dependency>

        <!-- 日志 log4j2 -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-log4j2</artifactid>
        </dependency>

        <!-- mongodb -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-data-mongodb</artifactid>
        </dependency>

        <!-- brantch -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-batch</artifactid>
        </dependency>

        <!-- mariadb 驱动 -->
        <dependency>
            <groupid>org.mariadb.jdbc</groupid>
            <artifactid>mariadb-java-client</artifactid>
            <version>2.0.2</version>
        </dependency>

        <!-- lombok 代码简化 -->
        <dependency>
            <groupid>org.projectlombok</groupid>
            <artifactid>lombok</artifactid>
            <version>1.16.14</version>
        </dependency>

2、创建一张表,并生成 10 万条数据

drop table people if exists;

create table people  (
    id bigint identity not null primary key,
    first_name varchar(20),
    last_name varchar(20)
);

3、创建 person 类

@data
public class person {
    private long id;
    private string lastname;
    private string firstname;
}

4、创建一个中间处理器 personitemprocessor

import org.springframework.batch.item.itemprocessor;

@log4j2
public class personitemprocessor implements itemprocessor<person, person> {

    @override
    public person process(final person person) throws exception {
        final string firstname = person.getfirstname().touppercase();
        final string lastname = person.getlastname().touppercase();

        final person transformedperson = new person(firstname, lastname);

        log.info("converting (" + person + ") into (" + transformedperson + ")");

        return transformedperson;
    }

}

5、创建 personmapper,用户数据库映射

public class personmapper implements rowmapper {

    private static final string id_column = "id";
    private static final string nickname_column = "first_name";
    private static final string email_column = "last_name";


    @override
    public object maprow(resultset resultset, int i) throws sqlexception {
        person user = new person();
        person.setid(resultset.getlong(id_column));
        person.setnickname(resultset.getstring(nickname_column));
        person.setemail(resultset.getstring(email_column));
        return person;
    }
}

6、创建任务完成的监听 jobcompletionnotificationlistener

@log4j2
@component
public class jobcompletionnotificationlistener extends jobexecutionlistenersupport {

    @override
    public void afterjob(jobexecution jobexecution) {
        if(jobexecution.getstatus() == batchstatus.completed) {
            log.info("!!! job finished! time to verify the results");
        }
    }
}

7、构建批处理任务 batchconfiguration

@configuration
@enablebatchprocessing
public class batchconfiguration {

    @autowired
    public jobbuilderfactory jobbuilderfactory;

    @autowired
    public stepbuilderfactory stepbuilderfactory;

    @autowired
    public datasource datasource;
    
    @autowired
    public mongotemplate mongotemplate;

    @bean
    public jdbccursoritemreader<person> reader(){
        jdbccursoritemreader<person> itemreader = new jdbccursoritemreader<person>();
        itemreader.setdatasource(datasource);
        itemreader.setsql("select id, nickname, email from people");
        itemreader.setrowmapper(new personmapper());
        return itemreader;
    }

    @bean
    public personitemprocessor processor() {
        return new personitemprocessor();
    }
    
    @bean
    mongoitemwriter<person> writer(){
        mongoitemwriter<person> itemwriter = new mongoitemwriter<person>();
        itemwriter.settemplate(mongotemplate);
        itemwriter.setcollection("branch");
        return itemwriter;
    }

    @bean
    public step step() {
        return stepbuilderfactory.get("step")
                .<person, person> chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    @bean
    public job importuserjob(jobcompletionnotificationlistener listener) {
        return jobbuilderfactory.get("importuserjob")
                .incrementer(new runidincrementer())
                .listener(listener)
                .flow(step())
                .end()
                .build();
    }

}

任务处理结果

0出错,耗时 2 分钟左右,测试机 mac

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网