본문 바로가기
잡다구리

스프링 배치 완벽 가이드 - 4. 잡과 스텝 이해하기

by Growing! 2021. 5. 2.

잡 소개하기

이 책에서 잡의 정의: "처음부터 끝까지 독립적으로 실행할 수 있는, 고유하고, 순서가 있는 스텝의 목록이다."

  • 고유하다(unique):
    • 잡은 자바나 XML을 사용하여 구성하며, 재사용이 가능하다.
    • 하나의 구성으로 필요한 횟수 만큼 잡을 실행할 수 있다.
    • 잡을 여러 번 실행하려고 동일한 잡을 여러 번 정의할 필요가 없다.
  • 순서가 있는 스텝의 목록이다(ordered list of steps):
    • 잡에서 스텝의 순서는 중요하다.
    • 모든 스텝이 논리적인 순서로 실행될 수 있도록 잡을 구성한다.
  • 처음부터 끝까지 실행 가능하다(Can be executed from start to finish):
    • 잡은 외부와 상호작용 없이 실행할 수 있는 일련의 스텝이다.
  • 독립적으로(independently):
    • 각 배치 잡은 외부 의존성에 영향을 받지 않고 실행할 수 있어야 한다.
    • 의존성을 가질 수 없다는 것을 의미하는 것은 아니며, 의존성을 관리할 수 있어야 한다.

잡의 생명주기를 따라가보기

  • 잡이 실행되면 생명주기대로 진행한다.
  • 잡의 실행은 잡 러너에서 시작된다. 잡 러너는 잡 이름과 전달된 파라미터로 잡을 실행시킨다.
  • 스프링 배치는 두 가지 잡 러너를 제공한다:
    • CommandLineJobRunner: 스크립트나 CLI에서 잡을 실행할 때 사용. 스프링을 기동시키고 전달받은 파라미터로 잡을 실행한다.
    • JobRegistryBackgroundJobRunner: 스프링이 이미 기동되어 있는 상황에서 잡을 실행할 때 사용.
  • 스프링 부트는 JobLauncherCommandLineRunner를 제공한다.
    • 기본적으로 기동시에 ApplicationContext에 정의된 Job 타입의 모든 빈을 찾아서 실행한다.
    • 실행할 잡을 별도로 명시할 수 있다.
  • 프레임워크를 실행할 때 실제 진입점은 잡 러너가 아니라 org.springframework.batch.core.launch.JobLauncher 인터페이스의 구현체이다. 스프링 배치는 구현체를 하나만 제공하는데, org.springframework.batch.core.launch.support.SimpleJobLauncher 이다. 이 구현체는 CommandLineJobRunner와 JobLauncherCommandLineRunner에서 내부적으로 사용한다.
  • SimpleJobLauncher는 잡을 실행할 때 코어 스프링의 TaskExecutor 인터페이스를 사용한다.
    • TaskExecutor를 구성하는 여러 가지 방법이 있으며, SyncTaskExecutor를 사용한면 JobLauncher와 동일한 스레드에서 잡이 실행된다. 별도의 스레드에서 잡을 실행하는 방식도 있다.

Job, JobInstance, JobExecution 사이의 관계

  • JobInstance:
    • 배치 잡이 실행되면 JobInstance가 생성된다.
    • 잡의 논리적인 실행을 나타내며 "잡 이름" + "잡 파라미터"로 식별된다.
    • 성공적으로 완료된 JobExecution이 있다면 완료된 것으로 간주된다.
    • JobInstance는 한 번 성공적으로 완료되면 다시 실행시킬 수 없다.
    • BATCH_JOB_INSTANCE 테이블에 저장된다.
  • JobExecution:
    • 잡 실행의 실제 시도를 의미한다.
    • JobInstance가 한 번에 실행 완료되었다면 JobExecution은 하나만 존재한다.
    • 첫 번째 실행이 오류 상태로 종료되었다면 해당 JobInstance(잡 이름 + 잡 파라미터)를 실행하려고 시도할 때 마다 새로운 JobExecution이 생성된다.
    • BATCH_JOB_EXECUTION 테이블에 저장된다.
    • JobExecution의 상태는 BATCH_JOB_EXECUTION_CONTEXT 테이블에 저장된다. 잡을 재시작할 때 이 정보를 이용하여 올바른 지점에서 다시 시작할 수 있다.

잡 구성하기

잡의 기본 구성

MySQL 구성

-- root 로 실행
CREATE DATABASE spring_batch;

USE mysql;

INSERT INTO db (HOST, Db, USER, Select_priv, Insert_priv, Update_priv, Delete_priv, Create_priv, Drop_priv, Index_priv, Alter_priv)
VALUES ('%', 'spring_batch', 'spring_batch', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y');
INSERT INTO db (HOST, Db, USER, Select_priv, Insert_priv, Update_priv, Delete_priv, Create_priv, Drop_priv, Index_priv, Alter_priv)
VALUES ('localhost', 'spring_batch', 'spring_batch', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y');

FLUSH PRIVILEGES;

DataSource 구성

# application.yaml
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/spring_batch
    username: spring_batch
    password: spring_batch
  batch:
    initialize-schema: always  # 배치 스키마를 자동으로 생성한다

HelloWorldJob

@EnableBatchProcessing
@SpringBootApplication
public class HelloWorldJob {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("basicJob")
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((contribution, chunkContext) -> {
                    log.info("Hello, World!");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }

    public static void main(String[] args) {
        SpringApplication.run(HelloWorldJob.class, args);
    }

}

(코드 상세 설명은 p.98~99)

잡 파라미터

  • JobInstance는 잡 이름 + 식별 파라미터로 식별된다.
  • 동일한 식별 파라미터를 사용해서 동일한 잡을 두 번 이상 실행할 수 없다.
    • JobInstanceAlreadyCompleteException이 발생한다.
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-05-02 19:58:59.983 ERROR 8078 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute ApplicationRunner
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:813)
	at ...
	at my.batch.ch04.helloworld.HelloWorldJob.main(HelloWorldJob.java:42)
Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={foo=bar}.  If you want to run this job again, change the parameters.
	at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:136)
	...
  • 잡 러너는 잡 실행에 필요한 JobParameters 객체를 생성해 JobInstance에 전달한다.
  • 스프링 부트의 JobLauncherCommandLineRunner에 파라미터를 전달하는 방법은 key=value 쌍을 전달하면 된다.
java -jar demo.jar name=Michael
  • 잡 러너는 JobParamets 인스턴스를 생성하여 전달된 파라미터를 담는다.
  • 스프링 부트의 프로퍼티나 시스템 프로퍼티와도 다르므로 "--" prefix나 "-D" 아규먼트를 사용해서 전달할 수 없다.
  • JobParameters는 java.util.Map<String, JobParameter> 객체의 래퍼이다.
  • 스프링 배치는 파라미터의 타입을 변환하는 기능을 제공하며, 변환된 타입에 맞는 JobParameter의 접근자를 제공한다.
  • 타입 변환 기능을 사용하려면 파라미터 이름 뒤에 괄호를 쓰고 그 안에 파라미터의 타입을 명시하면 된다. 타입의 이름은 모두 소문자여야 한다.
    • java -jar demo.jar executionDate(date)=2020/12/27
  • 지원하는 파라미터 타입은 String, Double, Long, java.util.Date 이다.
  • 잡 파라미터를 확인하려면 BATCH_JOB_EXECUTION_PARAMS 테이블을 조회해 보면 된다.
  • 파라미터는 JobInstance를 식별하는데 사용할 수 있으며, 식별에 사용하지 않을 파라미터인지 여부를 지정할 수 있다.
  • 파라미터에 접두사 "-"를 사용하면 식별 파라미터로 사용되지 않는다.
    • java -jar demo.jar executionDate(date)=2020/12/27 -name=Michael

잡 파라미터에 접근하기

파라미터에 접근하려는 위치에 따라 몇 가지 옵션이 있다.

  • ChunkContext:
    • 태스크릿에서는 execute 메서드의 두 번째 파라미터는 ChunkContext 인스턴스다. 실행 시점의 잡 상태 및 처리 중인 청크와 관련된 정보를 갖고 있다. 청크 정보는 스텝 및 잡 정보도 갖고 있으며, JobParameters가 포함된 StepContext의 참조가 있다.
// ChunkContext를 사용해 JobParameters에 접근하기
@Bean
public Step step1() {
    Tasklet tasklet = (contribution, chunkContext) -> {
        String name = (String) chunkContext.getStepContext()
                      .getJobParameters() // Map<String, Object>
                      .get("name");
        log.info("Hello, World!, {}", name);
        return RepeatStatus.FINISHED;
    };

    return stepBuilderFactory.get("step1")
            .tasklet(tasklet)
            .build();
}
  • Late Binding:
    • 스텝이나 잡을 제외한 프레임워크 내 특정 부분에 파라미터를 전달하는 가장 쉬운 방법은 스프링의 DI를 사용하는 것이다.
    • 참조하려는 Bean이 StepScope나 JobScope 이어야 한다.
    • StepScope를 지정하면 해당 bean이 스텝의 실행범위에 들어갈 때 까지 빈 생성을 지연시킨다. JobScope도 마찬가지이다. 이렇게 함으로써 잡 파라미터를 빈 생성 시점에 주입할 수 있다.
// Late binding을 사용하여 SPEL로 참조하기
@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .tasklet(helloWorldTasklet(null))
            .build();
}

@StepScope // StepScope or JobScope를 가져야 한다.
@Bean
public Tasklet helloWorldTasklet(@Value("#{jobParameters['name']}") String name) {
    return (contribution, chunkContext) -> {
        log.info("Hello, World!, {}", name);
        return RepeatStatus.FINISHED;
    };
}

잡 파라미터 유효성 검증하기

  • JobParametersValidator 인터페이스를 구현하고 해당 구현체를 잡에 구성하면 된다.
    • validate 메소드의 반환 타입이 void 이므로, JobParametersInvalidException이 발생하지 않았다면 유효성 검증을 통과했다고 판단한다.
interface JobParameterValidator {

    void validate(JobParameters parameters) throws JobParametersInvalidException;

}
  • 필수/선택 파라미터가 누락되었는지 확인할 수 있는 DefaultJobParametersValidator를 제공한다.
    • 존재 여부를 제외한 다른 유효성 검증을 수행하지는 않는다.
  • JobBuilder에서 validator() 메소드로 1개의 validator를 지정할 수 있다.
  • 두 개의 유효성 검증기를 모두 사용하고 싶다면 CompositeJobParametersValidator를 사용하여 여러 개의 validator를 구성할 수 있다.

잡 파라미터 증가시키기

  • 주어진 식별 파라미터의 집합으로는 잡을 단 한 번만 실행할 수 있다는 제약이 있다. JobParametersIncrementer를 사용하여 이를 우회할 수 있다.
  • JobParametersIncrementer는 잡에서 사용할 파라미터를 고유하게 생성할 수 있도록 한다. 매 실행 시마다 파라미터를 증가시키거나 타임스탬프를 추가할 수도 있다.
  • 스프링 배치 프레임워크는 이 인터페이스의 구현체인 RunIdIncrementer를 제공한다.
  • default로 파라미터 이름이 "run.id"인 long 타입 파라미터의 값을 증가시킨다. 파라미터의 이름은 setKey(String) 메소드로 변경할 수 있다.
@Bean
public Job job() {
    return jobBuilderFactory.get("basicJob_Incrementing_Parameter")
            .start(step1())
            .incrementer(new RunIdIncrementer())
            .build();
}

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .tasklet(helloWorldTasklet(null, 0))
            .build();
}

@StepScope
@Bean
public Tasklet helloWorldTasklet(
        @Value("#{jobParameters['name']}") String name,
        @Value("#{jobParameters['run.id']}") long runId) {
    return (contribution, chunkContext) -> {
        log.info("Hello, World!, {} [run.id: {}]", name, runId);
        return RepeatStatus.FINISHED;
    };
}
  • 잡 실행 시마다 타임스탬프를 사용하려면 JobParametersIncrementer를 직접 구현해야 한다.
public class DailyJobTimestamp implements JobParametersIncrementer {
    @Override
    public JobParameters getNext(JobParameters parameters) {
        return new JobParametersBuilder(parameters)
                .addDate("currentDate", new Date())
                .toJobParameters();
    }
}
@Bean
public Job job() {
    return jobBuilderFactory.get("basicJob_Timestamp_Parameter")
            .start(step1())
            .incrementer(new DailyJobTimestamp())
            .build();
}

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .tasklet(helloWorldTasklet(null, null))
            .build();
}

@StepScope
@Bean
public Tasklet helloWorldTasklet(
        @Value("#{jobParameters['name']}") String name,
        @Value("#{jobParameters['currentDate']}") Date currentDate) {
    return (contribution, chunkContext) -> {
        log.info("Hello, World!, {} [currentDate: {}]", name, currentDate.getTime());
        return RepeatStatus.FINISHED;
    };
}

잡 리스너 적용하기

  • 스프링 배치에는 생명주기가 잘 정의되어 있으며, 여러 시점에 로직을 추가할 수 있는 기능을 제공한다.
  • JobExecutionListener:
    • 잡 실행과 관련한 리스너 인터페이스
    • beforeJob, afterJob의 콜백 메소드를 제공하며, 잡의 생명주기에서 가장 먼저 실행되거나 가장 나중에 실행된다.
    • 유즈 케이스:
      • 알림(Notifications): 잡의 시작이나 종료를 다른 시스템에 알림
      • 초기화(Initialization): beforeJob에서 잡 실행 전에 준비 로직을 수행하기 알맞다.
      • 정리(Cleanup): afterJob에서 정리 작업을 수행하기 알맞다. 정리 작업은 잡의 성공/실패에 영향을 미치지 않아야 하지만 반드시 실행되어야 한다.
  • 잡 리스너를 작성하는 두 가지 방법이 있다.
    • 하나는 JobExecutionListener 인터페이스를 구현하는 방법이며
    • 다른 하나는 @BeforeJob/@AfterJob 애너테이션을 사용하는 방법이다.
  • JobExecutionListener 인터페이스:
    • void beforeJob(JobExecution): 잡이 실행되기 전에 호출된다.
    • void afterJob(JobExecution): 잡이 실행된 후에 호출된다. 잡이 성공했는지 실패했는지 여부와 상관 없이 호출된다. 따라서 잡의 종료 상태에 따라 적절한 로직을 수행할 수 있다.
    • 리스너 구현체는 JobBuilder.listener(JobExecutionListener) 메소드로 지정하면 된다.
/**
 * Job의 생명주기에서 특정 시점에 호출되는 콜백을 제공한다.
 * 스레드 세이프에 주의한다면 상태를 가져도 된다.
 */
public interface JobExecutionListener {

    /**
     * 잡이 실행되기 직전의 콜백
     *
     * @param jobExecution the current {@link JobExecution}
     */
    void beforeJob(JobExecution jobExecution);

    /**
     * 잡이 실행된 직후의 콜백.
     * 잡이 성공했거나 실패했거나 모두 호출된다.
     * 상태에 따라 특정한 로직을 수행하려면 다음과 같이 하면 된다.
     * "if (jobExecution.getStatus() == BatchStatus.X)"
     *
     * @param jobExecution the current {@link JobExecution}
     */
    void afterJob(JobExecution jobExecution);

}
  • 잡 리스너를 위한 @BeforeJob과 @AfterJob 애너테이션이 제공된다. 메소드에 지정한다.
    • 애너테이션이 적용된 리스너를 잡에 주입하려면 JobListenerFactoryBean으로 래핑한 후 JobBuilder.listener(JobExecutionListener) 메소드로 지정하면 된다.
  • 리스너는 잡의 특정 시점에 로직을 실행할 수 있는 유용한 도구이다. step, reader, writer 등 여러 컴포넌트에 사용할 수 있다.

ExecutionContext

  • 배치는 태생적으로 stateful 하다.
    • 배치는 현재 어떤 스텝이 실행되고 있는지, 해당 스텝에서 처리한 레코드 개수 등에 대한 상태 정보가 필요하다.
    • 이러한 상태 정보들은 진행중 일때 뿐 아니라, 실패 후 재시작할 때에도 중요하다.
  • JobExcution은 잡의 실행 시도를 나타낸다. JobExecution이 잡이나 스텝을 진행하는 동안, 잡의 상태는 변하게 된다.
  • 잡의 상태는 JobExecution의 ExecutionContext에 저장된다.
  • ExecutionContext는 배치에서의 세션(HttpSession)이라고 할 수 있다.
  • ExecutionContext는 key-value의 저장소이다.
  • ExecutionContext는 JobRepository에 저장되므로 데이터는 안전하다.
  • 잡은 여러 개의 ExcutionContext를 가지게 된다. JobExecution은 하나의 ExecutionContext를 가지며, 각 StepExecution도 하나씩의 ExecutionContext를 가진다.
  • 이런 구조는 적절한 수준(개별 스텝용 데이터 or 잡 전체용 글로벌 데이터)으로 데이터의 범위(scope)를 가지게 한다.

JobExecution은 하나의 ExecutionContext를 가진다. StepExecution은 ExecutionContext를 하나씩 가진다.

ExecutionContext 조작 및 저장

  • ExecutionContext는 JobExecution이나 StepExecution의 일부분이며, 여기에서 가져올 수 있다.
Tasklet tasklet = (contribution, chunkContext) -> {
    String name = (String) chunkContext
            .getStepContext()
            .getJobParameters()
            .get("name");

    // ChunkContext -> StepContext -> StepExecution -> JobExecution -> (Job) ExecutionContext
    ExecutionContext jobExecutionContext = chunkContext
            .getStepContext()
            .getStepExecution()
            .getJobExecution()
            .getExecutionContext();

    jobExecutionContext.put("user.name", name);

    log.info("Hello, World!, {}", name);
    return RepeatStatus.FINISHED;
};

 

Job의 ExecutionContext에 저장된 key-value

  • 잡의 ExecutionContext를 얻기 위해서는 약간의 순회가 필요하다.
    • ChunkContext -> StepContext -> StepExecution -> JobExecution -> (Job) ExecutionContext
  • StepContext에는 getJobExecutionContext() 메소드가 있는데 잡의 읽기전용 ExecutionContext를 반환한다. 이렇게 얻은 ExecutionContext에 대한 변경사항은 저장되지 않는다.
  • 스텝의 ExecutionContext를 얻는 방법도 비슷하며, StepExecution에서 ExecutionContext를 가져올 수 있다.
Tasklet tasklet = (contribution, chunkContext) -> {
    String name = (String) chunkContext
            .getStepContext()
            .getJobParameters()
            .get("name");

    // ChunkContext -> StepContext -> StepExecution -> (Step) ExecutionContext
    ExecutionContext stepExecutionContext = chunkContext
            .getStepContext()
            .getStepExecution()
            .getExecutionContext();

    stepExecutionContext.put("user.name", name);

    log.info("Hello, World!, {}", name);
    return RepeatStatus.FINISHED;
};

 

Step의 ExecutionContext에 저장된 key-value

  • JobExecution의 ExecutionContext에 데이터(key-value)를 저장하는 다른 방법은 StepExecution의 ExecutionContext에 있는 key를 JobExecution의 ExecutionContext로 승격(Step scope -> Job scope)하는 것이다.
    • 특정 스텝이 성공했을 때 잡 레벨에서 공유하고자 하는 데이터를 이 방법을 통해 저장할 수 있다.
    • p.125~126 예제 참조

스텝 알아보기

  • 스텝은 잡의 빌딩 블록이다.
  • 독립적이고, 순차적인 배치 프로세서이다.
  • 스텝은 단위 작업에 필요한 모든 것을 포함한다.
  • 스텝은 자체적으로 입력을 처리하고, 입력된 데이터를 가공할 수 있으며, 출력을 처리한다.
  • 트랜잭션은 스텝 내에 포함된다.
  • 스텝들은 독립적일 수 있도록 설계되었으며, 개발자가 필요에 따라 자유롭게 잡을 구성할 수 있다.

태스크릿 vs. 청크 처리

  • 배치 처리는 일반적으로 데이터를 처리한다.
  • 배치 잡의 작업은 두 가지의 모델이 있으며, 스프링 배치는 두 모델을 모두 지원한다.
    • 하나의 명령을 실행하여 처리 - 디렉토리를 정리하는 쉘 스크립트, 데이터를 삭제하는 단일 SQL 등
    • 대량의 데이터 집합을 이터레이션 하면서 처리 - 한 번에 하나의 레코드(or 아이템)을 읽고, 어떤 로직을 수행하고, 어떤 데이터 저장소에 기록
  • 태스크릿(Tasklet) - 첫 번째 모델
    • 개발자는 Tasklet.execute 메소드에 코드 블럭을 작성한다.
    • Tasklet.execute 메소드는 트랜잭션 내에서 실행된다.
    • Tasklet.execute 메소드는 RepeatStatus.FINISHED를 반환할 때 까지 반복적으로 실행된다.
  • 청크(Chunk) 기반 처리 - 두 번째 모델
    • 주요 컴포넌트는 ItemReader, ItemProcessor, ItemWriter 이다. ItemProcessor는 필수가 아니다.
    • 이런 컴포넌트를 사용하여 레코드의 청크(묶음) 단위로 처리한다.
    • 각 청크마다 자체 트랜잭션을 가진다.
    • 처리가 실패하면 스프링 배치는 마지막에 성공한 트랜잭션의 다음 청크 부터 재시작할 수 있다.
    • ItemReader는 청크 단위로 처리할 레코드(아이템)를 하나씩 메모리에 읽어온다.
    • ItemProcessor는 메모리상의 아이템을 하나씩 처리한다.
    • 청크에 포함되어 처리된 모든 아이템들은 ItemWriter에 한 번에 전달되어 기록된다. 한 번에 전달되기 때문에 물리적인 I/O가 최적화될 수 있다. 

청크 기반 처리

스텝 구성 (Step Configuration)

  • 개발자는 스텝을 어떻게 전이시킬지 구성하며, 잡은 구성된 대로 스텝들을 전이시킨다.
  • 스프링 배치는 상태 머신(state machine)과 유사한 개념을 가진다.
  • 스텝의 유형은 Tasklet Step과 Chunk-based Step 두 가지가 있다.

Tasklet Step

  • 태스크릿을 만드는 방법은 두 가지가 있다.
    • 개발자가 실행될 로직을 POJO로 구현하고,  스프링 배치의 MethodInvokingTaskletAdapter로 해당 코드를 태스크릿으로 실행 하도록 할 수 있다.
    • Tasklet 인터페이스를 구현하여 만들 수 있다.
      • execute 메소드에 로직을 구현하고, RepeatStatus 객체를 반환한다. 스프링 배치는 처리가 끝난 후 RepeatStatus를 보고 어떻게 진행해야 할지 판단한다.
      • Tasklet 인터페이스는 함수형 인터페이스이므로 람다로 구현할 수 있다.
/**
 * 스텝에서의 처리를 위한 전략 인터페이스
 */
public interface Tasklet {

    /**
     * StepContribution의 형태로 현재 컨텍스트가 전달된다.
     * 필요한 처리를 수행하면 되며, 트랜잭션 내에서 실행된다.
     * 처리가 완전히 끝났다면 {RepeatStatus.FINISHED}를 반환한다.
     * 처리가 완전히 끝나지 않았다면 {RepeatStatus.CONTINUABLE}를 반환한다.
     * 실패시 exception을 던진다.
     * 
     * @param contribution 변경가능한 상태 정보를 가진다. 
     *                     변경이 발생하면 되돌려져서 현재의 step execution을 갱신한다.
     * @param chunkContext execute 메소드가 여러(CONTINUABLE) 번 호출되는 동안에 공유할 속성들을 담고 있다.
     *                     재시작(restart)하는 경우에는 공유되지 않는다.
     * @return {RepeatStatus} 처리가 계속되어야 하는지를 나타낸다.
     *                     null을 반환하면 {RepeatStatus.FINISHED}로 간주된다.
     *
     * @throws 실행하는 동안 Exception이 발생하면 던진다.
     */
    RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
        throws Exception;

}
  • 태스크릿을 람다로 구현하는 예:
@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .tasklet((contribution, chunkContext) -> {
                log.info("Hello, World!");
                return RepeatStatus.FINISHED;
            })
            .build();
}
  • 처리를 완료하면 RepeatStatus 객체를 반환해야 한다. RepeatStatus.CONTINUABLE과 RepeatStatus.FINISHED를 반환할 수 있다.
  • RepeatStatus.CONTINUABLE을 반환하면 스프링 배치가 Tasklet을 다시 실행시킨다. 어떤 조건이 만족될 때 까지 특정 태스크릿을 반복해서 실행하려는 경우에 유용하다.
  • RepeatStatus.FINISHED를 반환하면 스프링 배치는 Tasklet이 끝났다고 판단하고 다음 단계로 진행하게 된다.

그 밖의 다른 유형의 태스크릿들

  • Tasklet 인터페이스를 구현하는 것 외에도 스프링 배치가 기본적으로 제공하는 3가지 구현체를 사용할 수 있다.
    • CallableTaskletAdapter
    • MethodInvokingTaskletAdapter
    • SystemCommandTasklet

CallableTaskletAdapter

  • Callable<RepeatStaus> 인터페이스의 구현체를 사용할 수 있도록 해주는 어댑터이다.
  • CallableTaskletAdapter의 인스턴스를 만든 후 Callable의 구현체의 인스턴스를 setCallable(Callable<RepeatStatus>) 메소드로 지정해 주면 된다.
  • 스텝이 실행되는 스레드와는 다른 스레드에서 실행된다. => 라고 되어 있지만 실제로 실행해 보면 동일 스레드를 사용한다.
@Bean
public Step callableStep() {
    return stepBuilderFactory.get("callableStep")
            .tasklet(tasklet())
            .build();
}

@Bean
public Tasklet tasklet() {
    CallableTaskletAdapter callableTaskletAdapter = new CallableTaskletAdapter();
    callableTaskletAdapter.setCallable(callableObject());
    return callableTaskletAdapter;
}

@Bean
public Callable<RepeatStatus> callableObject() {
    return () -> {
        log.info("다른 스레드에서 실행됩니다."); // 동일 스레드에서 실행됨
        return RepeatStatus.FINISHED;
    };
}

/*
실행 결과
[main] o.s.batch.core.job.SimpleStepHandler  : Executing step: [callableStep]
[main] m.b.c.callablestep.CallableTaskletJob : 다른 스레드에서 실행됩니다.
[main] o.s.batch.core.step.AbstractStep      : Step: [callableStep] executed in 14ms
*/

MethodInvokingTaskletAdapter

  • 어떤 서비스의 메소드가 잡에서 실행하려는 로직을 이미 가지고 있다면, Tasklet 구현체를 만들지 않고 해당 메소드를 바로 호출할 수 있게 해준다.
  • 메소드명을 하드 코딩 해야 하기 때문에 실제로 사용하기에는 안정성이 낮아진다.
// Batch Job
@Bean
public Step methodInvokingStep() {
    return stepBuilderFactory.get("methodInvokingStep")
               .tasklet(methodInvokingTasklet())
               .build();
}

@Bean
public Tasklet methodInvokingTasklet() {
    MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();

    adapter.setTargetObject(service());
    adapter.setTargetMethod("serviceMethod");         // method name
    //adapter.setArguments(new String[] { message }); // arguments

    return adapter;
}

@Bean
public CustomService service() {
    return new CustomService();
}

// CustomService
public class CustomService {

    public void serviceMethod() {
        System.out.println("Service method was called");
    }

}

SystemCommandTasklet

  • 시스템 명령을 실행할 때 사용한다.
  • 시스템 명령은 비동기로 실행되며, 타임아웃을 지정한다.
  • 다양한 속성을 지정할 수 있다. 책 p.142를 참조한다.
@Bean
public Step systemCommandStep() {
    return this.stepBuilderFactory.get("systemCommandStep")
            .tasklet(systemCommandTasklet())
            .build();
}

@Bean
public Tasklet systemCommandTasklet() {
    SystemCommandTasklet tasklet = new SystemCommandTasklet();

    tasklet.setCommand("rm -rf /tmp.txt");
    tasklet.setTimeout(5000);
    tasklet.setInterruptOnCancel(true);

    return tasklet;
}

Chunk-Based Step

  • 청크의 크기는 commit interval에 의해 정의된다.
    • commit interval이 50 이라면
    • 50개의 아이템을 하나씩 읽고(read)
    • 50개의 아이템을 하나씩 처리하고(process)
    • 50개의 아이템을 한 번에 기록한다(write).
@Slf4j
@RequiredArgsConstructor
@EnableBatchProcessing
@SpringBootApplication
public class ChunkBasedJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job chunkBasedJob() {
        return jobBuilderFactory.get("chunkBasedJob")
                .start(chunkBasedStep())
                .build();
    }

    @Bean
    public Step chunkBasedStep() {
        return stepBuilderFactory.get("chunkBasedStep")
                .<String, String>chunk(5)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public ItemReader<String> itemReader() {
        AtomicInteger counter = new AtomicInteger(0);
        return () -> counter.get() < 12 ? String.valueOf(counter.incrementAndGet()) : null;
    }

    @Bean
    public ItemProcessor<String, String> itemProcessor() {
        return item -> "[" + item + "]";
    }

    @Bean
    public ItemWriter<String> itemWriter() {
        return items -> {
            log.warn("itemWriter.write(items)");
            items.forEach(item -> log.info("item: {}", item));
        };
    }

    public static void main(String[] args) {
        SpringApplication.run(ChunkBasedJobConfig.class, args);
    }

}

/*
실행결과
m.b.ch04.helloworld.ChunkBasedJobConfig  : Started ChunkBasedJobConfig in 1.103 seconds (JVM running for 1.564)
o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=chunkBasedJob]] launched with the following parameters: []
o.s.batch.core.job.SimpleStepHandler     : Executing step: [chunkBasedStep]
m.b.ch04.helloworld.ChunkBasedJobConfig  : itemWriter.write(items)  <= 첫 번째 chunk
m.b.ch04.helloworld.ChunkBasedJobConfig  : item: [1]
m.b.ch04.helloworld.ChunkBasedJobConfig  : item: [2]
m.b.ch04.helloworld.ChunkBasedJobConfig  : item: [3]
m.b.ch04.helloworld.ChunkBasedJobConfig  : item: [4]
m.b.ch04.helloworld.ChunkBasedJobConfig  : item: [5]
m.b.ch04.helloworld.ChunkBasedJobConfig  : itemWriter.write(items)  <= 두 번째 chunk
m.b.ch04.helloworld.ChunkBasedJobConfig  : item: [6]
m.b.ch04.helloworld.ChunkBasedJobConfig  : item: [7]
m.b.ch04.helloworld.ChunkBasedJobConfig  : item: [8]
m.b.ch04.helloworld.ChunkBasedJobConfig  : item: [9]
m.b.ch04.helloworld.ChunkBasedJobConfig  : item: [10]
m.b.ch04.helloworld.ChunkBasedJobConfig  : itemWriter.write(items)  <= 세 번째 chunk
m.b.ch04.helloworld.ChunkBasedJobConfig  : item: [11]
m.b.ch04.helloworld.ChunkBasedJobConfig  : item: [12]
o.s.batch.core.step.AbstractStep         : Step: [chunkBasedStep] executed in 13ms
o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=chunkBasedJob]] completed with the following parameters: [] and the following status: [COMPLETED] in 21ms
*/
  • chunk(commitInterval) 메소드를 사용해서 청크 기반 스텝을 사용하도록 지정한다.
  • 청크 크기(commit interval)을 지정한다. 이 예제에서 5로 지정했는데, 아이템이 5개씩 처리되면 커밋한다. 즉, 청크는 트랜잭션의 단위와 같다.
  • 청크 기반 스텝은 reader와 wirter가 필요하며 processor는 선택적이다.
  • 청크 크기(commit interval) 에 지정된 수 만큼 read/process가 되어야만 write가 된다. 그 전까지는 write가 발생하지 않는다.
  • 청크 크기(commit interval) 만큼의 아이템을 처리하던 도중에 에러가 발생하면 스프링 배치는 현재 청크(==트랜잭션)을 롤백하고 잡을 실패로 처리한다.
  • 청크 크기를 1로 지정하면 1개의 아이템을 read하고 process하고 write 한다(+commit 발생).
  • 청크 크기는 write의 단위가 되며, write 성능에 많은 영향을 준다. 어느 정도 크게 설정해야 좋은 성능을 낼 수 있다.

청크 크기 구성하기

  • 청크의 크기는 두 가지 방식으로 설정할 수 있다:
    • 정적 커밋 개수로 설정 - chunk(commitInterval) 메소드로 지정
    • CompletionPolicy 구현체를 사용하여 지정 - 크기가 동일하지 않은 청크를 처리해야 하는 경우에 사용하며, 몇 가지 구현체를 기본 제공하고 있다.
      • SimpleCompletionPolicy - 고정 갯수를 지정하고, 이 갯수에 도달하면 청크가 완료된 것으로 간주한다. chunk(commitInterval) 메소드를 사용하는 것과 동일하다.
      • TimeoutTerminationPolicy - 지정한 시간을 넘어가면 청크가 완료된 것으로 간주한다.
      • CompositeCompletionPolicy - 여러 개의 CompletionPolicy를 포함하여 하나라도 만족하면 청크가 완료된 것으로 간주한다.
    • CompletionPolicy 사용 예는 p.148~153을 참조한다.

스텝 리스너

  • 스텝과 청크 레벨에서의 이벤트를 처리할 수 있는 리스너가 제공된다.
  • 리스너는 컴포넌트의 동작 직전에 전처리를 수행하거나 컴포넌트의 동작 직후에 결과를 평가하거나 오류 처리, 정리 작업등을 수행하는데 사용된다.
  • StepExecutionListener는 스텝의 시작과 끝에서 특정 로직을 실행할 수 있게 한다.
    • beforeStep:void, afterStep:ExitStatus 를 메소드 혹은 애너테이션으로 제공한다.
    • afterStep은 ExitStatus를 반환하는데, 스텝이 반환한 ExistStatus를 잡에 전달하기 전에 수정할 수 있게 해준다.
  • ChunkListener는 청크의 시작과 끝에서 특정 로직을 실행할 수 있게 한다.
    • beforeChunk:void, afterChunk:void 를 메소드 혹은 애너테이션으로 제공한다.
public class LoggingStepStartStopListener {

    @BeforeStep
    public void beforeStep(StepExecution stepExecution) {
        System.out.println(stepExecution.getStepName() + " has begun!");
    }

    @AfterStep
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println(stepExecution.getStepName() + " has ended!");

        return stepExecution.getExitStatus();
    }
}

@Bean
public Step chunkStep() {
    return this.stepBuilderFactory.get("chunkStep")
            .<String, String>chunk(1000)
            .reader(itemReader())
            .writer(itemWriter())
            .listener(new LoggingStepStartStopListener())
            .build();
}

스텝 플로우

  • 지금까지 살펴본 잡은 여러 개의 스텝을 순서대로 하나씩 실행했었다.
  • 스프링 배치는 스텝의 흐름을 결정할 수 있도록 하는 여러 가지 방법을 제공한다.

조건 로직

  • 여러 개의 스텝을 순서대로 실행하려면, StepBuilder의 start(Step) 메소드를 시작으로 next(Step) 메소드를 필요한 만큼 이어서 정의하면 순서대로 실행된다.
@Bean
public Job job() {
    return jobBuilderFactory.get("basicJob")
            .start(step1())
            .next(step2())
            .next(step3())
            .build();
}

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .tasklet((contribution, chunkContext) -> {
                log.info("Step1");
                return RepeatStatus.FINISHED;
            })
            .build();
}

@Bean public Step step2() { ... }
@Bean public Step step3() { ... }
  • 스텝의 exit status를 기반으로 어떤 스텝으로 전이(transition)할지 결정할 수도 있다.
  • on(exitStatusPattern) 메소드는 스프링 배치가 스텝의 ExitStatus를 평가해 어떤 스텝을 수행할지 결정할 수 있도록 해준다.
  • ExitStatus는 클래스이며, 실제로는 문자열을 나타낸다. 와일드 카드를 활용한 패턴을 사용할 수 있다.
    • '*'는 0개 이상의 문자를 일치 - "C*"은 "C", "COMPLETE", "CORRECT"와 일치한다.
    • '?'는 1개의 문자를 일치 - "?AT"은 "CAT", "KAT"과 일치하며, "THAT"과는 일치하지 않는다.
  • 여러 개의 ExitStatus 중에 가장 제한적인 패턴부터 덜 제한적인 패턴 순서로 적용된다.
@Bean
public Job job() {
    return jobBuilderFactory.get("basicJob")
            // firstStep의 exit status가 "FAILED"이면 failureStep으로 진행한다
            .start(firstStep())
                .on("FAILED")
                .to(failureStep())
            // firstStep의 exit status가 "FAILED"가 아니면 successStep으로 진행한다
            .from(firstStep())
                .on("*")
                .to(successStep())
            .end()
            .build();
}

@Bean
public Step firstStep() {
    return stepBuilderFactory.get("firstStep")
            .tasklet((contribution, chunkContext) -> {
                log.info("firstStep");
                //return RepeatStatus.FINISHED;
                throw new RuntimeException("This is a failure"); // exit status: FAILED
            })
            .build();
}

@Bean
public Step successStep() {
    return stepBuilderFactory.get("successStep")
            .tasklet((contribution, chunkContext) -> {
                log.info("Success!");
                return RepeatStatus.FINISHED;
            })
            .build();
}

@Bean
public Step failureStep() {
    return stepBuilderFactory.get("failureStep")
            .tasklet((contribution, chunkContext) -> {
                log.info("Failure!");
                return RepeatStatus.FINISHED;
            })
            .build();
}
  • ExitStatus만으로 분기를 판단하기 어려운 요구사항은 JobExecutionDecider 인터페이스를 구현하여 처리할 수 있다.
    • decide 메소드에는 JobExecution과 StepExecution 객체가 전달되므로, 필요한 정보를 꺼내서 판단할 수 있다.
    • 다음 스텝을 결정하여 FlowExecutionStatus 객체로 반환한다.
public interface JobExecutionDecider {

    FlowExecutionStatus decide(
            JobExecution jobExecution, @Nullable StepExecution stepExecution
    );

}
  • JobExecutionDecider 사용 예:
public class RandomDecider implements JobExecutionDecider {

    private Random random = new Random();

    public FlowExecutionStatus decide(JobExecution jobExecution,
                                      StepExecution stepExecution) {
        if (random.nextBoolean()) {
            return new FlowExecutionStatus(FlowExecutionStatus.COMPLETED.getName());
        } else {
            return new FlowExecutionStatus("FAILED");
        }
    }
}

public class ConditionalJob {
    ...

    @Bean
    public Job job() {
        return jobBuilderFactory.get("conditionalJob")
                .start(firstStep())
                .next(decider())
                .from(decider())
                    .on("FAILED").to(failureStep())
                .from(decider())
                    .on("*").to(successStep())
                .end()
                .build();
    }

잡 종료하기

  • JobInstance는 성공적으로 완료되면 다시 실행할 수 없다. JobIntance는 잡 이름과 잡 파라미터의 조합으로 식별된다.
  • 스프링 배치에서는 세 가지 상태(BatchStatus enum)로 잡을 종료할 수 있다. 예제 코드는 p.164~170을 참조한다.
    • Completed:
      • 잡이 성공적으로 종료됨.
      • 동일한 파라미터를 사용해 잡을 다시 실행할 수 없다.
      • StepBuilder의 .on(exitStatus).end() 를 사용하여 ExitStatus가 지정한 exitStatus와 일치하면 Completed로 잡을 종료할 수 있다.
    • Failed:
      • 잡이 성공적으로 완료되지 않았음.
      • 동일한 파라미터를 사용해 잡을 다시 실행할 수 있다.
      • StepBuilder의 .on(exitStatus).fail() 를 사용하여 ExitStatus가 지정한 exitStatus와 일치하면 Failed로 잡을 종료할 수 있다. 
    • Stopped:
      • 중지 상태
      • 동일한 파라미터를 사용해 잡을 재시작할 수 있다. 잡은 중단했던 위치의 스텝부터 실행된다.
      • 스텝 사이에 사람의 개입이나 검사/처리가 필요한 상황에 유용하다.
      • StepBuilder의 .on(exitStatus).stopAndRestart(step) 를 사용하여 ExitStatus가 지정한 exitStatus와 일치하면 Stopped로 잡을 종료할 수 있다. 잡을 재시작하면 지정한 step에서 부터 실행된다.
  • BatchStatus는 StepExecution이나 JobExecution에 보관되어 있다가 JobRepository에 저장된다. 저장되는 시점에 스텝/청크/잡에서 반환되는 ExitStatus를 평가하여 BatchStatus를 결정한다.

플로우 외부화하기

  • 스텝의 정의를 추출해서 재사용 가능한 컴포넌트 형태로 만들 수 있다.
  • 자세한 내용은 p.170~179 참조
 
 
 

댓글