잡 소개하기
이 책에서 잡의 정의: "처음부터 끝까지 독립적으로 실행할 수 있는, 고유하고, 순서가 있는 스텝의 목록이다."
- 고유하다(unique):
- 잡은 자바나 XML을 사용하여 구성하며, 재사용이 가능하다.
- 하나의 구성으로 필요한 횟수 만큼 잡을 실행할 수 있다.
- 잡을 여러 번 실행하려고 동일한 잡을 여러 번 정의할 필요가 없다.
- 순서가 있는 스텝의 목록이다(ordered list of steps):
- 잡에서 스텝의 순서는 중요하다.
- 모든 스텝이 논리적인 순서로 실행될 수 있도록 잡을 구성한다.
- 처음부터 끝까지 실행 가능하다(Can be executed from start to finish):
- 잡은 외부와 상호작용 없이 실행할 수 있는 일련의 스텝이다.
- 독립적으로(independently):
- 각 배치 잡은 외부 의존성에 영향을 받지 않고 실행할 수 있어야 한다.
- 의존성을 가질 수 없다는 것을 의미하는 것은 아니며, 의존성을 관리할 수 있어야 한다.
잡의 생명주기를 따라가보기
- 잡이 실행되면 생명주기대로 진행한다.
- 잡의 실행은 잡 러너에서 시작된다. 잡 러너는 잡 이름과 전달된 파라미터로 잡을 실행시킨다.
- 스프링 배치는 두 가지 잡 러너를 제공한다:
- CommandLineJobRunner: 스크립트나 CLI에서 잡을 실행할 때 사용. 스프링을 기동시키고 전달받은 파라미터로 잡을 실행한다.
- JobRegistryBackgroundJobRunner: 스프링이 이미 기동되어 있는 상황에서 잡을 실행할 때 사용.
- 스프링 부트는 JobLauncherCommandLineRunner를 제공한다.
- 기본적으로 기동시에 ApplicationContext에 정의된 Job 타입의 모든 빈을 찾아서 실행한다.
- 실행할 잡을 별도로 명시할 수 있다.
- 프레임워크를 실행할 때 실제 진입점은 잡 러너가 아니라 org.springframework.batch.core.launch.JobLauncher 인터페이스의 구현체이다. 스프링 배치는 구현체를 하나만 제공하는데, org.springframework.batch.core.launch.support.SimpleJobLauncher 이다. 이 구현체는 CommandLineJobRunner와 JobLauncherCommandLineRunner에서 내부적으로 사용한다.
- SimpleJobLauncher는 잡을 실행할 때 코어 스프링의 TaskExecutor 인터페이스를 사용한다.
- TaskExecutor를 구성하는 여러 가지 방법이 있으며, SyncTaskExecutor를 사용한면 JobLauncher와 동일한 스레드에서 잡이 실행된다. 별도의 스레드에서 잡을 실행하는 방식도 있다.
- JobInstance:
- 배치 잡이 실행되면 JobInstance가 생성된다.
- 잡의 논리적인 실행을 나타내며 "잡 이름" + "잡 파라미터"로 식별된다.
- 성공적으로 완료된 JobExecution이 있다면 완료된 것으로 간주된다.
- JobInstance는 한 번 성공적으로 완료되면 다시 실행시킬 수 없다.
- BATCH_JOB_INSTANCE 테이블에 저장된다.
- JobExecution:
- 잡 실행의 실제 시도를 의미한다.
- JobInstance가 한 번에 실행 완료되었다면 JobExecution은 하나만 존재한다.
- 첫 번째 실행이 오류 상태로 종료되었다면 해당 JobInstance(잡 이름 + 잡 파라미터)를 실행하려고 시도할 때 마다 새로운 JobExecution이 생성된다.
- BATCH_JOB_EXECUTION 테이블에 저장된다.
- JobExecution의 상태는 BATCH_JOB_EXECUTION_CONTEXT 테이블에 저장된다. 잡을 재시작할 때 이 정보를 이용하여 올바른 지점에서 다시 시작할 수 있다.
잡 구성하기
잡의 기본 구성
MySQL 구성
-- root 로 실행
CREATE DATABASE spring_batch;
USE mysql;
INSERT INTO db (HOST, Db, USER, Select_priv, Insert_priv, Update_priv, Delete_priv, Create_priv, Drop_priv, Index_priv, Alter_priv)
VALUES ('%', 'spring_batch', 'spring_batch', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y');
INSERT INTO db (HOST, Db, USER, Select_priv, Insert_priv, Update_priv, Delete_priv, Create_priv, Drop_priv, Index_priv, Alter_priv)
VALUES ('localhost', 'spring_batch', 'spring_batch', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y');
FLUSH PRIVILEGES;
DataSource 구성
# application.yaml
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/spring_batch
username: spring_batch
password: spring_batch
batch:
initialize-schema: always # 배치 스키마를 자동으로 생성한다
HelloWorldJob
@EnableBatchProcessing
@SpringBootApplication
public class HelloWorldJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() {
return jobBuilderFactory.get("basicJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> {
log.info("Hello, World!");
return RepeatStatus.FINISHED;
})
.build();
}
public static void main(String[] args) {
SpringApplication.run(HelloWorldJob.class, args);
}
}
(코드 상세 설명은 p.98~99)
잡 파라미터
- JobInstance는 잡 이름 + 식별 파라미터로 식별된다.
- 동일한 식별 파라미터를 사용해서 동일한 잡을 두 번 이상 실행할 수 없다.
- JobInstanceAlreadyCompleteException이 발생한다.
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-05-02 19:58:59.983 ERROR 8078 --- [ main] o.s.boot.SpringApplication : Application run failed
java.lang.IllegalStateException: Failed to execute ApplicationRunner
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:813)
at ...
at my.batch.ch04.helloworld.HelloWorldJob.main(HelloWorldJob.java:42)
Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={foo=bar}. If you want to run this job again, change the parameters.
at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:136)
...
- 잡 러너는 잡 실행에 필요한 JobParameters 객체를 생성해 JobInstance에 전달한다.
- 스프링 부트의 JobLauncherCommandLineRunner에 파라미터를 전달하는 방법은 key=value 쌍을 전달하면 된다.
java -jar demo.jar name=Michael
- 잡 러너는 JobParamets 인스턴스를 생성하여 전달된 파라미터를 담는다.
- 스프링 부트의 프로퍼티나 시스템 프로퍼티와도 다르므로 "--" prefix나 "-D" 아규먼트를 사용해서 전달할 수 없다.
- JobParameters는 java.util.Map<String, JobParameter> 객체의 래퍼이다.
- 스프링 배치는 파라미터의 타입을 변환하는 기능을 제공하며, 변환된 타입에 맞는 JobParameter의 접근자를 제공한다.
- 타입 변환 기능을 사용하려면 파라미터 이름 뒤에 괄호를 쓰고 그 안에 파라미터의 타입을 명시하면 된다. 타입의 이름은 모두 소문자여야 한다.
- java -jar demo.jar executionDate(date)=2020/12/27
- 지원하는 파라미터 타입은 String, Double, Long, java.util.Date 이다.
- 잡 파라미터를 확인하려면 BATCH_JOB_EXECUTION_PARAMS 테이블을 조회해 보면 된다.
- 파라미터는 JobInstance를 식별하는데 사용할 수 있으며, 식별에 사용하지 않을 파라미터인지 여부를 지정할 수 있다.
- 파라미터에 접두사 "-"를 사용하면 식별 파라미터로 사용되지 않는다.
- java -jar demo.jar executionDate(date)=2020/12/27 -name=Michael
잡 파라미터에 접근하기
파라미터에 접근하려는 위치에 따라 몇 가지 옵션이 있다.
- ChunkContext:
- 태스크릿에서는 execute 메서드의 두 번째 파라미터는 ChunkContext 인스턴스다. 실행 시점의 잡 상태 및 처리 중인 청크와 관련된 정보를 갖고 있다. 청크 정보는 스텝 및 잡 정보도 갖고 있으며, JobParameters가 포함된 StepContext의 참조가 있다.
// ChunkContext를 사용해 JobParameters에 접근하기
@Bean
public Step step1() {
Tasklet tasklet = (contribution, chunkContext) -> {
String name = (String) chunkContext.getStepContext()
.getJobParameters() // Map<String, Object>
.get("name");
log.info("Hello, World!, {}", name);
return RepeatStatus.FINISHED;
};
return stepBuilderFactory.get("step1")
.tasklet(tasklet)
.build();
}
- Late Binding:
- 스텝이나 잡을 제외한 프레임워크 내 특정 부분에 파라미터를 전달하는 가장 쉬운 방법은 스프링의 DI를 사용하는 것이다.
- 참조하려는 Bean이 StepScope나 JobScope 이어야 한다.
- StepScope를 지정하면 해당 bean이 스텝의 실행범위에 들어갈 때 까지 빈 생성을 지연시킨다. JobScope도 마찬가지이다. 이렇게 함으로써 잡 파라미터를 빈 생성 시점에 주입할 수 있다.
// Late binding을 사용하여 SPEL로 참조하기
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(helloWorldTasklet(null))
.build();
}
@StepScope // StepScope or JobScope를 가져야 한다.
@Bean
public Tasklet helloWorldTasklet(@Value("#{jobParameters['name']}") String name) {
return (contribution, chunkContext) -> {
log.info("Hello, World!, {}", name);
return RepeatStatus.FINISHED;
};
}
잡 파라미터 유효성 검증하기
- JobParametersValidator 인터페이스를 구현하고 해당 구현체를 잡에 구성하면 된다.
- validate 메소드의 반환 타입이 void 이므로, JobParametersInvalidException이 발생하지 않았다면 유효성 검증을 통과했다고 판단한다.
interface JobParameterValidator {
void validate(JobParameters parameters) throws JobParametersInvalidException;
}
- 필수/선택 파라미터가 누락되었는지 확인할 수 있는 DefaultJobParametersValidator를 제공한다.
- 존재 여부를 제외한 다른 유효성 검증을 수행하지는 않는다.
- JobBuilder에서 validator() 메소드로 1개의 validator를 지정할 수 있다.
- 두 개의 유효성 검증기를 모두 사용하고 싶다면 CompositeJobParametersValidator를 사용하여 여러 개의 validator를 구성할 수 있다.
잡 파라미터 증가시키기
- 주어진 식별 파라미터의 집합으로는 잡을 단 한 번만 실행할 수 있다는 제약이 있다. JobParametersIncrementer를 사용하여 이를 우회할 수 있다.
- JobParametersIncrementer는 잡에서 사용할 파라미터를 고유하게 생성할 수 있도록 한다. 매 실행 시마다 파라미터를 증가시키거나 타임스탬프를 추가할 수도 있다.
- 스프링 배치 프레임워크는 이 인터페이스의 구현체인 RunIdIncrementer를 제공한다.
- default로 파라미터 이름이 "run.id"인 long 타입 파라미터의 값을 증가시킨다. 파라미터의 이름은 setKey(String) 메소드로 변경할 수 있다.
@Bean
public Job job() {
return jobBuilderFactory.get("basicJob_Incrementing_Parameter")
.start(step1())
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(helloWorldTasklet(null, 0))
.build();
}
@StepScope
@Bean
public Tasklet helloWorldTasklet(
@Value("#{jobParameters['name']}") String name,
@Value("#{jobParameters['run.id']}") long runId) {
return (contribution, chunkContext) -> {
log.info("Hello, World!, {} [run.id: {}]", name, runId);
return RepeatStatus.FINISHED;
};
}
- 잡 실행 시마다 타임스탬프를 사용하려면 JobParametersIncrementer를 직접 구현해야 한다.
public class DailyJobTimestamp implements JobParametersIncrementer {
@Override
public JobParameters getNext(JobParameters parameters) {
return new JobParametersBuilder(parameters)
.addDate("currentDate", new Date())
.toJobParameters();
}
}
@Bean
public Job job() {
return jobBuilderFactory.get("basicJob_Timestamp_Parameter")
.start(step1())
.incrementer(new DailyJobTimestamp())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(helloWorldTasklet(null, null))
.build();
}
@StepScope
@Bean
public Tasklet helloWorldTasklet(
@Value("#{jobParameters['name']}") String name,
@Value("#{jobParameters['currentDate']}") Date currentDate) {
return (contribution, chunkContext) -> {
log.info("Hello, World!, {} [currentDate: {}]", name, currentDate.getTime());
return RepeatStatus.FINISHED;
};
}
잡 리스너 적용하기
- 스프링 배치에는 생명주기가 잘 정의되어 있으며, 여러 시점에 로직을 추가할 수 있는 기능을 제공한다.
- JobExecutionListener:
- 잡 실행과 관련한 리스너 인터페이스
- beforeJob, afterJob의 콜백 메소드를 제공하며, 잡의 생명주기에서 가장 먼저 실행되거나 가장 나중에 실행된다.
- 유즈 케이스:
- 알림(Notifications): 잡의 시작이나 종료를 다른 시스템에 알림
- 초기화(Initialization): beforeJob에서 잡 실행 전에 준비 로직을 수행하기 알맞다.
- 정리(Cleanup): afterJob에서 정리 작업을 수행하기 알맞다. 정리 작업은 잡의 성공/실패에 영향을 미치지 않아야 하지만 반드시 실행되어야 한다.
- 잡 리스너를 작성하는 두 가지 방법이 있다.
- 하나는 JobExecutionListener 인터페이스를 구현하는 방법이며
- 다른 하나는 @BeforeJob/@AfterJob 애너테이션을 사용하는 방법이다.
- JobExecutionListener 인터페이스:
- void beforeJob(JobExecution): 잡이 실행되기 전에 호출된다.
- void afterJob(JobExecution): 잡이 실행된 후에 호출된다. 잡이 성공했는지 실패했는지 여부와 상관 없이 호출된다. 따라서 잡의 종료 상태에 따라 적절한 로직을 수행할 수 있다.
- 리스너 구현체는 JobBuilder.listener(JobExecutionListener) 메소드로 지정하면 된다.
/**
* Job의 생명주기에서 특정 시점에 호출되는 콜백을 제공한다.
* 스레드 세이프에 주의한다면 상태를 가져도 된다.
*/
public interface JobExecutionListener {
/**
* 잡이 실행되기 직전의 콜백
*
* @param jobExecution the current {@link JobExecution}
*/
void beforeJob(JobExecution jobExecution);
/**
* 잡이 실행된 직후의 콜백.
* 잡이 성공했거나 실패했거나 모두 호출된다.
* 상태에 따라 특정한 로직을 수행하려면 다음과 같이 하면 된다.
* "if (jobExecution.getStatus() == BatchStatus.X)"
*
* @param jobExecution the current {@link JobExecution}
*/
void afterJob(JobExecution jobExecution);
}
- 잡 리스너를 위한 @BeforeJob과 @AfterJob 애너테이션이 제공된다. 메소드에 지정한다.
- 애너테이션이 적용된 리스너를 잡에 주입하려면 JobListenerFactoryBean으로 래핑한 후 JobBuilder.listener(JobExecutionListener) 메소드로 지정하면 된다.
- 리스너는 잡의 특정 시점에 로직을 실행할 수 있는 유용한 도구이다. step, reader, writer 등 여러 컴포넌트에 사용할 수 있다.
ExecutionContext
- 배치는 태생적으로 stateful 하다.
- 배치는 현재 어떤 스텝이 실행되고 있는지, 해당 스텝에서 처리한 레코드 개수 등에 대한 상태 정보가 필요하다.
- 이러한 상태 정보들은 진행중 일때 뿐 아니라, 실패 후 재시작할 때에도 중요하다.
- JobExcution은 잡의 실행 시도를 나타낸다. JobExecution이 잡이나 스텝을 진행하는 동안, 잡의 상태는 변하게 된다.
- 잡의 상태는 JobExecution의 ExecutionContext에 저장된다.
- ExecutionContext는 배치에서의 세션(HttpSession)이라고 할 수 있다.
- ExecutionContext는 key-value의 저장소이다.
- ExecutionContext는 JobRepository에 저장되므로 데이터는 안전하다.
- 잡은 여러 개의 ExcutionContext를 가지게 된다. JobExecution은 하나의 ExecutionContext를 가지며, 각 StepExecution도 하나씩의 ExecutionContext를 가진다.
- 이런 구조는 적절한 수준(개별 스텝용 데이터 or 잡 전체용 글로벌 데이터)으로 데이터의 범위(scope)를 가지게 한다.
ExecutionContext 조작 및 저장
- ExecutionContext는 JobExecution이나 StepExecution의 일부분이며, 여기에서 가져올 수 있다.
Tasklet tasklet = (contribution, chunkContext) -> {
String name = (String) chunkContext
.getStepContext()
.getJobParameters()
.get("name");
// ChunkContext -> StepContext -> StepExecution -> JobExecution -> (Job) ExecutionContext
ExecutionContext jobExecutionContext = chunkContext
.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext();
jobExecutionContext.put("user.name", name);
log.info("Hello, World!, {}", name);
return RepeatStatus.FINISHED;
};
- 잡의 ExecutionContext를 얻기 위해서는 약간의 순회가 필요하다.
- ChunkContext -> StepContext -> StepExecution -> JobExecution -> (Job) ExecutionContext
- StepContext에는 getJobExecutionContext() 메소드가 있는데 잡의 읽기전용 ExecutionContext를 반환한다. 이렇게 얻은 ExecutionContext에 대한 변경사항은 저장되지 않는다.
- 스텝의 ExecutionContext를 얻는 방법도 비슷하며, StepExecution에서 ExecutionContext를 가져올 수 있다.
Tasklet tasklet = (contribution, chunkContext) -> {
String name = (String) chunkContext
.getStepContext()
.getJobParameters()
.get("name");
// ChunkContext -> StepContext -> StepExecution -> (Step) ExecutionContext
ExecutionContext stepExecutionContext = chunkContext
.getStepContext()
.getStepExecution()
.getExecutionContext();
stepExecutionContext.put("user.name", name);
log.info("Hello, World!, {}", name);
return RepeatStatus.FINISHED;
};
- JobExecution의 ExecutionContext에 데이터(key-value)를 저장하는 다른 방법은 StepExecution의 ExecutionContext에 있는 key를 JobExecution의 ExecutionContext로 승격(Step scope -> Job scope)하는 것이다.
- 특정 스텝이 성공했을 때 잡 레벨에서 공유하고자 하는 데이터를 이 방법을 통해 저장할 수 있다.
- p.125~126 예제 참조
스텝 알아보기
- 스텝은 잡의 빌딩 블록이다.
- 독립적이고, 순차적인 배치 프로세서이다.
- 스텝은 단위 작업에 필요한 모든 것을 포함한다.
- 스텝은 자체적으로 입력을 처리하고, 입력된 데이터를 가공할 수 있으며, 출력을 처리한다.
- 트랜잭션은 스텝 내에 포함된다.
- 스텝들은 독립적일 수 있도록 설계되었으며, 개발자가 필요에 따라 자유롭게 잡을 구성할 수 있다.
태스크릿 vs. 청크 처리
- 배치 처리는 일반적으로 데이터를 처리한다.
- 배치 잡의 작업은 두 가지의 모델이 있으며, 스프링 배치는 두 모델을 모두 지원한다.
- 하나의 명령을 실행하여 처리 - 디렉토리를 정리하는 쉘 스크립트, 데이터를 삭제하는 단일 SQL 등
- 대량의 데이터 집합을 이터레이션 하면서 처리 - 한 번에 하나의 레코드(or 아이템)을 읽고, 어떤 로직을 수행하고, 어떤 데이터 저장소에 기록
- 태스크릿(Tasklet) - 첫 번째 모델
- 개발자는 Tasklet.execute 메소드에 코드 블럭을 작성한다.
- Tasklet.execute 메소드는 트랜잭션 내에서 실행된다.
- Tasklet.execute 메소드는 RepeatStatus.FINISHED를 반환할 때 까지 반복적으로 실행된다.
- 청크(Chunk) 기반 처리 - 두 번째 모델
- 주요 컴포넌트는 ItemReader, ItemProcessor, ItemWriter 이다. ItemProcessor는 필수가 아니다.
- 이런 컴포넌트를 사용하여 레코드의 청크(묶음) 단위로 처리한다.
- 각 청크마다 자체 트랜잭션을 가진다.
- 처리가 실패하면 스프링 배치는 마지막에 성공한 트랜잭션의 다음 청크 부터 재시작할 수 있다.
- ItemReader는 청크 단위로 처리할 레코드(아이템)를 하나씩 메모리에 읽어온다.
- ItemProcessor는 메모리상의 아이템을 하나씩 처리한다.
- 청크에 포함되어 처리된 모든 아이템들은 ItemWriter에 한 번에 전달되어 기록된다. 한 번에 전달되기 때문에 물리적인 I/O가 최적화될 수 있다.
스텝 구성 (Step Configuration)
- 개발자는 스텝을 어떻게 전이시킬지 구성하며, 잡은 구성된 대로 스텝들을 전이시킨다.
- 스프링 배치는 상태 머신(state machine)과 유사한 개념을 가진다.
- 스텝의 유형은 Tasklet Step과 Chunk-based Step 두 가지가 있다.
Tasklet Step
- 태스크릿을 만드는 방법은 두 가지가 있다.
- 개발자가 실행될 로직을 POJO로 구현하고, 스프링 배치의 MethodInvokingTaskletAdapter로 해당 코드를 태스크릿으로 실행 하도록 할 수 있다.
- Tasklet 인터페이스를 구현하여 만들 수 있다.
- execute 메소드에 로직을 구현하고, RepeatStatus 객체를 반환한다. 스프링 배치는 처리가 끝난 후 RepeatStatus를 보고 어떻게 진행해야 할지 판단한다.
- Tasklet 인터페이스는 함수형 인터페이스이므로 람다로 구현할 수 있다.
/**
* 스텝에서의 처리를 위한 전략 인터페이스
*/
public interface Tasklet {
/**
* StepContribution의 형태로 현재 컨텍스트가 전달된다.
* 필요한 처리를 수행하면 되며, 트랜잭션 내에서 실행된다.
* 처리가 완전히 끝났다면 {RepeatStatus.FINISHED}를 반환한다.
* 처리가 완전히 끝나지 않았다면 {RepeatStatus.CONTINUABLE}를 반환한다.
* 실패시 exception을 던진다.
*
* @param contribution 변경가능한 상태 정보를 가진다.
* 변경이 발생하면 되돌려져서 현재의 step execution을 갱신한다.
* @param chunkContext execute 메소드가 여러(CONTINUABLE) 번 호출되는 동안에 공유할 속성들을 담고 있다.
* 재시작(restart)하는 경우에는 공유되지 않는다.
* @return {RepeatStatus} 처리가 계속되어야 하는지를 나타낸다.
* null을 반환하면 {RepeatStatus.FINISHED}로 간주된다.
*
* @throws 실행하는 동안 Exception이 발생하면 던진다.
*/
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
throws Exception;
}
- 태스크릿을 람다로 구현하는 예:
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> {
log.info("Hello, World!");
return RepeatStatus.FINISHED;
})
.build();
}
- 처리를 완료하면 RepeatStatus 객체를 반환해야 한다. RepeatStatus.CONTINUABLE과 RepeatStatus.FINISHED를 반환할 수 있다.
- RepeatStatus.CONTINUABLE을 반환하면 스프링 배치가 Tasklet을 다시 실행시킨다. 어떤 조건이 만족될 때 까지 특정 태스크릿을 반복해서 실행하려는 경우에 유용하다.
- RepeatStatus.FINISHED를 반환하면 스프링 배치는 Tasklet이 끝났다고 판단하고 다음 단계로 진행하게 된다.
그 밖의 다른 유형의 태스크릿들
- Tasklet 인터페이스를 구현하는 것 외에도 스프링 배치가 기본적으로 제공하는 3가지 구현체를 사용할 수 있다.
- CallableTaskletAdapter
- MethodInvokingTaskletAdapter
- SystemCommandTasklet
CallableTaskletAdapter
- Callable<RepeatStaus> 인터페이스의 구현체를 사용할 수 있도록 해주는 어댑터이다.
- CallableTaskletAdapter의 인스턴스를 만든 후 Callable의 구현체의 인스턴스를 setCallable(Callable<RepeatStatus>) 메소드로 지정해 주면 된다.
- 스텝이 실행되는 스레드와는 다른 스레드에서 실행된다. => 라고 되어 있지만 실제로 실행해 보면 동일 스레드를 사용한다.
@Bean
public Step callableStep() {
return stepBuilderFactory.get("callableStep")
.tasklet(tasklet())
.build();
}
@Bean
public Tasklet tasklet() {
CallableTaskletAdapter callableTaskletAdapter = new CallableTaskletAdapter();
callableTaskletAdapter.setCallable(callableObject());
return callableTaskletAdapter;
}
@Bean
public Callable<RepeatStatus> callableObject() {
return () -> {
log.info("다른 스레드에서 실행됩니다."); // 동일 스레드에서 실행됨
return RepeatStatus.FINISHED;
};
}
/*
실행 결과
[main] o.s.batch.core.job.SimpleStepHandler : Executing step: [callableStep]
[main] m.b.c.callablestep.CallableTaskletJob : 다른 스레드에서 실행됩니다.
[main] o.s.batch.core.step.AbstractStep : Step: [callableStep] executed in 14ms
*/
MethodInvokingTaskletAdapter
- 어떤 서비스의 메소드가 잡에서 실행하려는 로직을 이미 가지고 있다면, Tasklet 구현체를 만들지 않고 해당 메소드를 바로 호출할 수 있게 해준다.
- 메소드명을 하드 코딩 해야 하기 때문에 실제로 사용하기에는 안정성이 낮아진다.
// Batch Job
@Bean
public Step methodInvokingStep() {
return stepBuilderFactory.get("methodInvokingStep")
.tasklet(methodInvokingTasklet())
.build();
}
@Bean
public Tasklet methodInvokingTasklet() {
MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();
adapter.setTargetObject(service());
adapter.setTargetMethod("serviceMethod"); // method name
//adapter.setArguments(new String[] { message }); // arguments
return adapter;
}
@Bean
public CustomService service() {
return new CustomService();
}
// CustomService
public class CustomService {
public void serviceMethod() {
System.out.println("Service method was called");
}
}
SystemCommandTasklet
- 시스템 명령을 실행할 때 사용한다.
- 시스템 명령은 비동기로 실행되며, 타임아웃을 지정한다.
- 다양한 속성을 지정할 수 있다. 책 p.142를 참조한다.
@Bean
public Step systemCommandStep() {
return this.stepBuilderFactory.get("systemCommandStep")
.tasklet(systemCommandTasklet())
.build();
}
@Bean
public Tasklet systemCommandTasklet() {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
tasklet.setCommand("rm -rf /tmp.txt");
tasklet.setTimeout(5000);
tasklet.setInterruptOnCancel(true);
return tasklet;
}
Chunk-Based Step
- 청크의 크기는 commit interval에 의해 정의된다.
- commit interval이 50 이라면
- 50개의 아이템을 하나씩 읽고(read)
- 50개의 아이템을 하나씩 처리하고(process)
- 50개의 아이템을 한 번에 기록한다(write).
@Slf4j
@RequiredArgsConstructor
@EnableBatchProcessing
@SpringBootApplication
public class ChunkBasedJobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job chunkBasedJob() {
return jobBuilderFactory.get("chunkBasedJob")
.start(chunkBasedStep())
.build();
}
@Bean
public Step chunkBasedStep() {
return stepBuilderFactory.get("chunkBasedStep")
.<String, String>chunk(5)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public ItemReader<String> itemReader() {
AtomicInteger counter = new AtomicInteger(0);
return () -> counter.get() < 12 ? String.valueOf(counter.incrementAndGet()) : null;
}
@Bean
public ItemProcessor<String, String> itemProcessor() {
return item -> "[" + item + "]";
}
@Bean
public ItemWriter<String> itemWriter() {
return items -> {
log.warn("itemWriter.write(items)");
items.forEach(item -> log.info("item: {}", item));
};
}
public static void main(String[] args) {
SpringApplication.run(ChunkBasedJobConfig.class, args);
}
}
/*
실행결과
m.b.ch04.helloworld.ChunkBasedJobConfig : Started ChunkBasedJobConfig in 1.103 seconds (JVM running for 1.564)
o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: []
o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=chunkBasedJob]] launched with the following parameters: []
o.s.batch.core.job.SimpleStepHandler : Executing step: [chunkBasedStep]
m.b.ch04.helloworld.ChunkBasedJobConfig : itemWriter.write(items) <= 첫 번째 chunk
m.b.ch04.helloworld.ChunkBasedJobConfig : item: [1]
m.b.ch04.helloworld.ChunkBasedJobConfig : item: [2]
m.b.ch04.helloworld.ChunkBasedJobConfig : item: [3]
m.b.ch04.helloworld.ChunkBasedJobConfig : item: [4]
m.b.ch04.helloworld.ChunkBasedJobConfig : item: [5]
m.b.ch04.helloworld.ChunkBasedJobConfig : itemWriter.write(items) <= 두 번째 chunk
m.b.ch04.helloworld.ChunkBasedJobConfig : item: [6]
m.b.ch04.helloworld.ChunkBasedJobConfig : item: [7]
m.b.ch04.helloworld.ChunkBasedJobConfig : item: [8]
m.b.ch04.helloworld.ChunkBasedJobConfig : item: [9]
m.b.ch04.helloworld.ChunkBasedJobConfig : item: [10]
m.b.ch04.helloworld.ChunkBasedJobConfig : itemWriter.write(items) <= 세 번째 chunk
m.b.ch04.helloworld.ChunkBasedJobConfig : item: [11]
m.b.ch04.helloworld.ChunkBasedJobConfig : item: [12]
o.s.batch.core.step.AbstractStep : Step: [chunkBasedStep] executed in 13ms
o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=chunkBasedJob]] completed with the following parameters: [] and the following status: [COMPLETED] in 21ms
*/
- chunk(commitInterval) 메소드를 사용해서 청크 기반 스텝을 사용하도록 지정한다.
- 청크 크기(commit interval)을 지정한다. 이 예제에서 5로 지정했는데, 아이템이 5개씩 처리되면 커밋한다. 즉, 청크는 트랜잭션의 단위와 같다.
- 청크 기반 스텝은 reader와 wirter가 필요하며 processor는 선택적이다.
- 청크 크기(commit interval) 에 지정된 수 만큼 read/process가 되어야만 write가 된다. 그 전까지는 write가 발생하지 않는다.
- 청크 크기(commit interval) 만큼의 아이템을 처리하던 도중에 에러가 발생하면 스프링 배치는 현재 청크(==트랜잭션)을 롤백하고 잡을 실패로 처리한다.
- 청크 크기를 1로 지정하면 1개의 아이템을 read하고 process하고 write 한다(+commit 발생).
- 청크 크기는 write의 단위가 되며, write 성능에 많은 영향을 준다. 어느 정도 크게 설정해야 좋은 성능을 낼 수 있다.
청크 크기 구성하기
- 청크의 크기는 두 가지 방식으로 설정할 수 있다:
- 정적 커밋 개수로 설정 - chunk(commitInterval) 메소드로 지정
- CompletionPolicy 구현체를 사용하여 지정 - 크기가 동일하지 않은 청크를 처리해야 하는 경우에 사용하며, 몇 가지 구현체를 기본 제공하고 있다.
- SimpleCompletionPolicy - 고정 갯수를 지정하고, 이 갯수에 도달하면 청크가 완료된 것으로 간주한다. chunk(commitInterval) 메소드를 사용하는 것과 동일하다.
- TimeoutTerminationPolicy - 지정한 시간을 넘어가면 청크가 완료된 것으로 간주한다.
- CompositeCompletionPolicy - 여러 개의 CompletionPolicy를 포함하여 하나라도 만족하면 청크가 완료된 것으로 간주한다.
- CompletionPolicy 사용 예는 p.148~153을 참조한다.
스텝 리스너
- 스텝과 청크 레벨에서의 이벤트를 처리할 수 있는 리스너가 제공된다.
- 리스너는 컴포넌트의 동작 직전에 전처리를 수행하거나 컴포넌트의 동작 직후에 결과를 평가하거나 오류 처리, 정리 작업등을 수행하는데 사용된다.
- StepExecutionListener는 스텝의 시작과 끝에서 특정 로직을 실행할 수 있게 한다.
- beforeStep:void, afterStep:ExitStatus 를 메소드 혹은 애너테이션으로 제공한다.
- afterStep은 ExitStatus를 반환하는데, 스텝이 반환한 ExistStatus를 잡에 전달하기 전에 수정할 수 있게 해준다.
- ChunkListener는 청크의 시작과 끝에서 특정 로직을 실행할 수 있게 한다.
- beforeChunk:void, afterChunk:void 를 메소드 혹은 애너테이션으로 제공한다.
public class LoggingStepStartStopListener {
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
System.out.println(stepExecution.getStepName() + " has begun!");
}
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println(stepExecution.getStepName() + " has ended!");
return stepExecution.getExitStatus();
}
}
@Bean
public Step chunkStep() {
return this.stepBuilderFactory.get("chunkStep")
.<String, String>chunk(1000)
.reader(itemReader())
.writer(itemWriter())
.listener(new LoggingStepStartStopListener())
.build();
}
스텝 플로우
- 지금까지 살펴본 잡은 여러 개의 스텝을 순서대로 하나씩 실행했었다.
- 스프링 배치는 스텝의 흐름을 결정할 수 있도록 하는 여러 가지 방법을 제공한다.
조건 로직
- 여러 개의 스텝을 순서대로 실행하려면, StepBuilder의 start(Step) 메소드를 시작으로 next(Step) 메소드를 필요한 만큼 이어서 정의하면 순서대로 실행된다.
@Bean
public Job job() {
return jobBuilderFactory.get("basicJob")
.start(step1())
.next(step2())
.next(step3())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> {
log.info("Step1");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean public Step step2() { ... }
@Bean public Step step3() { ... }
- 스텝의 exit status를 기반으로 어떤 스텝으로 전이(transition)할지 결정할 수도 있다.
- on(exitStatusPattern) 메소드는 스프링 배치가 스텝의 ExitStatus를 평가해 어떤 스텝을 수행할지 결정할 수 있도록 해준다.
- ExitStatus는 클래스이며, 실제로는 문자열을 나타낸다. 와일드 카드를 활용한 패턴을 사용할 수 있다.
- '*'는 0개 이상의 문자를 일치 - "C*"은 "C", "COMPLETE", "CORRECT"와 일치한다.
- '?'는 1개의 문자를 일치 - "?AT"은 "CAT", "KAT"과 일치하며, "THAT"과는 일치하지 않는다.
- 여러 개의 ExitStatus 중에 가장 제한적인 패턴부터 덜 제한적인 패턴 순서로 적용된다.
@Bean
public Job job() {
return jobBuilderFactory.get("basicJob")
// firstStep의 exit status가 "FAILED"이면 failureStep으로 진행한다
.start(firstStep())
.on("FAILED")
.to(failureStep())
// firstStep의 exit status가 "FAILED"가 아니면 successStep으로 진행한다
.from(firstStep())
.on("*")
.to(successStep())
.end()
.build();
}
@Bean
public Step firstStep() {
return stepBuilderFactory.get("firstStep")
.tasklet((contribution, chunkContext) -> {
log.info("firstStep");
//return RepeatStatus.FINISHED;
throw new RuntimeException("This is a failure"); // exit status: FAILED
})
.build();
}
@Bean
public Step successStep() {
return stepBuilderFactory.get("successStep")
.tasklet((contribution, chunkContext) -> {
log.info("Success!");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step failureStep() {
return stepBuilderFactory.get("failureStep")
.tasklet((contribution, chunkContext) -> {
log.info("Failure!");
return RepeatStatus.FINISHED;
})
.build();
}
- ExitStatus만으로 분기를 판단하기 어려운 요구사항은 JobExecutionDecider 인터페이스를 구현하여 처리할 수 있다.
- decide 메소드에는 JobExecution과 StepExecution 객체가 전달되므로, 필요한 정보를 꺼내서 판단할 수 있다.
- 다음 스텝을 결정하여 FlowExecutionStatus 객체로 반환한다.
public interface JobExecutionDecider {
FlowExecutionStatus decide(
JobExecution jobExecution, @Nullable StepExecution stepExecution
);
}
- JobExecutionDecider 사용 예:
public class RandomDecider implements JobExecutionDecider {
private Random random = new Random();
public FlowExecutionStatus decide(JobExecution jobExecution,
StepExecution stepExecution) {
if (random.nextBoolean()) {
return new FlowExecutionStatus(FlowExecutionStatus.COMPLETED.getName());
} else {
return new FlowExecutionStatus("FAILED");
}
}
}
public class ConditionalJob {
...
@Bean
public Job job() {
return jobBuilderFactory.get("conditionalJob")
.start(firstStep())
.next(decider())
.from(decider())
.on("FAILED").to(failureStep())
.from(decider())
.on("*").to(successStep())
.end()
.build();
}
잡 종료하기
- JobInstance는 성공적으로 완료되면 다시 실행할 수 없다. JobIntance는 잡 이름과 잡 파라미터의 조합으로 식별된다.
- 스프링 배치에서는 세 가지 상태(BatchStatus enum)로 잡을 종료할 수 있다. 예제 코드는 p.164~170을 참조한다.
- Completed:
- 잡이 성공적으로 종료됨.
- 동일한 파라미터를 사용해 잡을 다시 실행할 수 없다.
- StepBuilder의 .on(exitStatus).end() 를 사용하여 ExitStatus가 지정한 exitStatus와 일치하면 Completed로 잡을 종료할 수 있다.
- Failed:
- 잡이 성공적으로 완료되지 않았음.
- 동일한 파라미터를 사용해 잡을 다시 실행할 수 있다.
- StepBuilder의 .on(exitStatus).fail() 를 사용하여 ExitStatus가 지정한 exitStatus와 일치하면 Failed로 잡을 종료할 수 있다.
- Stopped:
- 중지 상태
- 동일한 파라미터를 사용해 잡을 재시작할 수 있다. 잡은 중단했던 위치의 스텝부터 실행된다.
- 스텝 사이에 사람의 개입이나 검사/처리가 필요한 상황에 유용하다.
- StepBuilder의 .on(exitStatus).stopAndRestart(step) 를 사용하여 ExitStatus가 지정한 exitStatus와 일치하면 Stopped로 잡을 종료할 수 있다. 잡을 재시작하면 지정한 step에서 부터 실행된다.
- Completed:
- BatchStatus는 StepExecution이나 JobExecution에 보관되어 있다가 JobRepository에 저장된다. 저장되는 시점에 스텝/청크/잡에서 반환되는 ExitStatus를 평가하여 BatchStatus를 결정한다.
플로우 외부화하기
- 스텝의 정의를 추출해서 재사용 가능한 컴포넌트 형태로 만들 수 있다.
- 자세한 내용은 p.170~179 참조
'잡다구리' 카테고리의 다른 글
Redis Cluster 구성 테스트 (0) | 2022.08.12 |
---|---|
redis docker (0) | 2021.08.24 |
날짜와 시간 (0) | 2021.07.31 |
스프링 배치 완벽 가이드 - 6. 잡 실행하기 (0) | 2021.05.09 |
스프링 배치 완벽 가이드 - 5. JobRepository와 메타데이터 (0) | 2021.05.06 |
스프링 배치 완벽 가이드 - 3. 예제 잡 애플리케이션 (0) | 2021.05.02 |
스프링 배치 완벽 가이드 - 2. 스프링 배치 (0) | 2021.05.01 |
7장 마이크로서비스 쿼리 구현 (Implementing queries in a microservice architecture) (0) | 2021.04.25 |
댓글