Spring Batch サンプルコード (Java/Gradle)
[履歴] [最終更新] (2017/07/20 22:35:34)
最近の投稿
注目の記事

概要

こちらのページで環境構築した Spring Boot でバッチ処理アプリケーションを作成します。内部的に Spring Batch を利用します。CSV ファイルを読み込んで、文字列加工して、MySQL DB に出力するバッチ処理です。

公式ドキュメント

プロジェクト構成

.
|-- build.gradle
|-- gradle
|   `-- wrapper
|       |-- gradle-wrapper.jar
|       `-- gradle-wrapper.properties
|-- gradlew
|-- gradlew.bat
`-- src
    `-- main
        |-- java
        |   `-- hello
        |       |-- Application.java
        |       |-- BatchConfiguration.java
        |       |-- JobCompletionNotificationListener.java
        |       |-- Person.java
        |       `-- PersonItemProcessor.java
        `-- resources
            |-- application.properties
            |-- sample-data.csv
            `-- schema-all.sql

build.gradle

設定内容についてはこちらをご参照ください。今回は Spring Batch を直接利用する場合は想定しておらず、あくまでも、Spring Batch を内部的に利用した Spring Boot アプリケーションです。

buildscript {
    ext {
        springBootVersion = '1.5.3.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'

jar {
    baseName = 'gs-batch-processing'
    version =  '0.1.0'
}

repositories {
    mavenCentral()
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
    // Spring Batch を内部的に利用します。
    // 今回は Web アプリケーションではないため `spring-boot-starter-web` は不要です。
    compile('org.springframework.boot:spring-boot-starter-batch')

    // http://search.maven.org/#artifactdetails|mysql|mysql-connector-java|6.0.6|jar
    compile('mysql:mysql-connector-java:6.0.6')
}

リソースファイル

sample-data.csv

今回のサンプル入力となる CSV ファイルです。

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

application.properties

出力先 MySQL DB への接続情報です。Spring Boot では、こちらのページに記載の優先順位でプロパティを読み込みます。今回は特に、「15. Application properties packaged inside your jar (JAR 内のアプリケーション設定ファイル)」を利用していることになります。ちなみに、「14. Application properties outside of your packaged jar (JAR 外のアプリケーション設定ファイル)」の方が優先順位が高いため、実行時に設定を上書きできます。設定項目の一覧はこちらです。

spring.datasource.url=jdbc:mysql://localhost:3306/mydb
spring.datasource.username=myuser
spring.datasource.password=myuser
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

application.yml を利用することもできます。

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/mydb
    username: myuser
    password: myuser
    driver-class-name: com.mysql.jdbc.Driver

schema-all.sql

出力先 MySQL テーブルの DDL です。こちらのページに記載されているとおり、schema-all.sql というファイル名の SQL がアプリケーション起動時に実行されます。より柔軟かつ高度な初期化処理が必要な場合は Flyway と連携するように設定します

DROP TABLE IF EXISTS people;
CREATE TABLE people (
    person_id BIGINT NOT NULL AUTO_INCREMENT,
    first_name VARCHAR(20),
    last_name VARCHAR(20),
    PRIMARY KEY (person_id)
);

プロパティ値をメンバ変数に設定 (参考)

先程記載した優先順位に関するページで紹介されている @Value アノテーションを利用すると application.properties などで設定したプロパティをメンバ変数に設定して利用できます。Lombok@Value と区別して利用します。

package hello;

import org.springframework.web.bind.annotation.RestController;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;

@RestController
public class HelloController {

    @Value("${spring.datasource.url}")
    private String datasourceUrl;

    @RequestMapping("/")
    public String index() {
        System.out.println(datasourceUrl);
        return "Greetings from Spring Boot! " + datasourceUrl;
    }
}

Java ソースコード

Person.java

mydb.people テーブル内の 1 レコードに対応するクラスです。

package hello;

public class Person {
    private String firstName;
    private String lastName;

    public Person() {
    }

    public Person(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public String getFirstName() {
        return firstName;
    }

    public String getLastName() {
        return lastName;
    }

    @Override
    public String toString() {
        return "firstName: " + firstName + ", lastName: " + lastName;
    }
}

PersonItemProcessor.java

一連バッチ処理「Read → Process → Write」における Process 処理のためのクラスです。Spring Batch が提供する ItemProcessor インターフェースを実装します。

package hello;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

    private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

    @Override
    public Person process(final Person person) throws Exception {
        final String firstName = person.getFirstName().toUpperCase();
        final String lastName = person.getLastName().toUpperCase();

        final Person transformedPerson = new Person(firstName, lastName);

        log.info("Converting (" + person + ") into (" + transformedPerson + ")");

        return transformedPerson;
    }
}

Application.java

Spring Boot のエントリーポイントとなるクラスです。アノテーションの意味については、こちらをご参照ください。

package hello;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(Application.class, args);
    }
}

JobCompletionNotificationListener.java

必須ではありませんが、以下のように記述することで、バッチ処理が完了した後に実行する処理を設定できます。spring-jdbc を利用して DB からデータを SELECT しています。

package hello;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

    private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

    private final JdbcTemplate jdbcTemplate;

    @Autowired
    public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("!!! JOB FINISHED! Time to verify the results");

            List<Person> results = jdbcTemplate.query("SELECT first_name, last_name FROM people", new RowMapper<Person>() {
                @Override
                public Person mapRow(ResultSet rs, int row) throws SQLException {
                    return new Person(rs.getString(1), rs.getString(2));
                }
            });

            for (Person person : results) {
                log.info("Found <" + person + "> in the database.");
            }
        }
    }
}

BatchConfiguration.java

バッチ処理を設定するクラスです。

package hello;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public DataSource dataSource;

    // reader/processor/writer
    @Bean
    public FlatFileItemReader<Person> reader() {
        FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
        reader.setResource(new ClassPathResource("sample-data.csv"));
        reader.setLineMapper(new DefaultLineMapper<Person>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[] { "firstName", "lastName" });
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person.class);
            }});
        }});
        return reader;
    }

    @Bean
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Person> writer() {
        JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
        writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)");
        writer.setDataSource(dataSource);
        return writer;
    }

    // job/step
    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener) {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(step1())
                .end()
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Person, Person> chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }
}
項目 概要
@Configuration @Bean 定義を行うクラスに設定します。
@EnableBatchProcessing Spring Batch のバッチ処理を定義するクラスに設定します。
@Autowired 他のクラスの Bean 定義で生成されたシングルトンを関連付けて利用できるようにします。
@Bean @Configuration が設定されたクラスのメソッドに対して設定できます。@Bean 設定されたメソッドはインスタンスを返します。これは設定値をもつシングルトンとして、アプリケーション全体で利用できます。

アプリケーション実行

./gradlew build && java -jar build/libs/gs-batch-processing-0.1.0.jar
(or ./gradlew bootRun)

実行結果は以下のとおりです。すべて大文字化されて、レコードとして格納されました。

mysql> show tables from mydb;
+------------------------------+
| Tables_in_mydb               |
+------------------------------+
| BATCH_JOB_EXECUTION          |
| BATCH_JOB_EXECUTION_CONTEXT  |
| BATCH_JOB_EXECUTION_PARAMS   |
| BATCH_JOB_EXECUTION_SEQ      |
| BATCH_JOB_INSTANCE           |
| BATCH_JOB_SEQ                |
| BATCH_STEP_EXECUTION         |
| BATCH_STEP_EXECUTION_CONTEXT |
| BATCH_STEP_EXECUTION_SEQ     |
| people                       |  ←結果
+------------------------------+
10 rows in set (0.00 sec)

mysql> select * from mydb.people;
+-----------+------------+-----------+
| person_id | first_name | last_name |
+-----------+------------+-----------+
|         1 | JILL       | DOE       |
|         2 | JOE        | DOE       |
|         3 | JUSTIN     | DOE       |
|         4 | JANE       | DOE       |
|         5 | JOHN       | DOE       |
+-----------+------------+-----------+
5 rows in set (0.00 sec)
関連ページ
    概要 Flyway は DB マイグレーションを実現するためのツールです。主に Java を対象としています。Rails におけるマイグレーション機能のようなものです。基本的な使い方をまとめます。 公式ドキュメント Get Started Download Command-line Maven Gradle
    概要 こちらのページでは、Java のソースコードにハードコーディングしたユーザーとパスワードの情報をもとに、Spring Security でログインフォーム認証を行いました。本ページではユーザー認証を LDAP サーバーからの情報をもとに行います。 Spring LDAP が提供する LDAP クライアントを