티스토리 뷰

 

앞서 Spring Batch의 다음과 같은 샘플들을 구현해보았다. 

이번에는 Spring Batch의 기능 중 Decider 라는 컨텐츠에 대해 다룰 계획이다. Decider는 말 그대로 결정지어주는 역할을 한다. 예를들면 이전 Step이 성공을 했으면 다음 Step을 진행하고 아니면 진행하지 않는 Batch의 if else 와 같은 역할을 한다고 보면 된다. 만든 Batch에 대해 이와 같은 요구사항은 많이 있을텐데 오늘 설명할 Decider를 활용해서 하나의 Job 안에서 Step을 분기해 일을 처리하는 방법이 있을수도 있고 아니면 Batch Job 단위로 여러개를 만들고 이것을 스케쥴러(eg. Jenkins)에서 분기를 해주는 방법이 있을수도 있다. 스케쥴러단에서 Job을 분기하는건 이 글을 참조하도록 하자. 


샘플은 2개의 Step이 있는데 첫번째 Step이 성공하면 두번째 Step을 실행시키고, 첫번째 Step이 실패한다면 두번째 Step을 실행시키지 않는 샘플이다. 

 

SampleDecider.java (전체 소스)

package com.example.batchprocessing;

import java.util.Random;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SampleDecider {
    
    private static final Logger log = LoggerFactory.getLogger(SampleDecider.class);
    
    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public Job sampleDeciderJob(JobExecutionDecider decider, Step firstStep, Step secondStep) {
        return jobBuilderFactory.get("sampleDeciderJob")
            .incrementer(new RunIdIncrementer())
            .start(firstStep)
            .next(decider).on("CONTINUE").to(secondStep)
            .end()
            .build();
    }
    
    @Bean
    public Step firstStep() {
        return stepBuilderFactory.get("firstStep")
                .tasklet((contribution, chunkContext) -> {
                    log.info("########### First Step Start #############");
                    Random rand = new Random();
                    int randomNumber = rand.nextInt(2);
                    
                    if(randomNumber == 0) {
                        log.info("########### First Step SUCCESS #############");
                        chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().putString("JOB_STATUS", "CONTINUE");
                    } else {
                        log.info("########### First Step FAILED #############");
                        chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().putString("JOB_STATUS", "FAILED");
                    }
                    return RepeatStatus.FINISHED;
                })
            .build();
    }
    
    @Bean
    public Step secondStep() {
        return stepBuilderFactory.get("secondStep")
                .tasklet((contribution, chunkContext) -> {
                    log.info("########### Second Step Start #############");
                    return RepeatStatus.FINISHED;
                })
            .build();
    }
    
    @Bean
    public JobExecutionDecider decider() {
        return new StepDecider();
    }
    
    public class StepDecider implements JobExecutionDecider{

        private static final String JOB_STATUS="JOB_STATUS";
        private static final String CONTINUE = "CONTINUE";
        private static final String FAILED = "FAILED";
        private static final String COMPLETED = "COMPLETED";
        
        @Override
        public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
            String stepStatus = jobExecution.getExecutionContext().getString(JOB_STATUS);
            FlowExecutionStatus flowExecutionStatus = null;
            if (CONTINUE.equals(stepStatus)) {
                flowExecutionStatus = new FlowExecutionStatus(CONTINUE);
            } else if (FAILED.equals(stepStatus)) {
                flowExecutionStatus = new FlowExecutionStatus(FAILED);
            } else {
                flowExecutionStatus = new FlowExecutionStatus(COMPLETED);
            }
            
            return flowExecutionStatus;
        }
    }
}

이번에도 전체 소스를 일단 보여주고 시작한다. 위에서부터 하나씩 살펴보도록 하자. 


Job

@Bean
public Job sampleDeciderJob(JobExecutionDecider decider, Step firstStep, Step secondStep) {
    return jobBuilderFactory.get("sampleDeciderJob")
        .incrementer(new RunIdIncrementer())
        .start(firstStep)
        .next(decider).on("CONTINUE").to(secondStep)
        .end()
        .build();
}

Job에서는 파라미터로 JobExecutionDecider와 수행할 Step을 사용한다. 다른부분은 다 일반적인 Job 구성과 동일하고 차이점은 바로 JobExecutionDecider가 있는 부분이다. 

 

.start(firstStep)
.next(decider).on("CONTINUE").to(secondStep)

 

바로 이 부분인데 firstStep이 일단 실행을 하고 여기서 후행에 대한 flag를 전달을 해준다. 이 flag를 기준으로 decider가 판단하여 어떤 행위를 할지 결정지어줄 수 있다. on은 조건을 뜻하며 decider에서 나온 결과값과 비교를 하여 일치한다면 to에 해당하는 다음 Step을 실행시킨다. 


Step

@Bean 
public Step firstStep() {
    return stepBuilderFactory.get("firstStep")
            .tasklet((contribution, chunkContext) -> {
                log.info("########### First Step Start #############");
                Random rand = new Random();
                int randomNumber = rand.nextInt(2);
                    
                if(randomNumber == 0) {
                    log.info("########### First Step SUCCESS #############");
                    chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().putString("JOB_STATUS", "CONTINUE");
                } else {
                    log.info("########### First Step FAILED #############");
                    chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().putString("JOB_STATUS", "FAILED");
                }
                return RepeatStatus.FINISHED;
            })
        .build();
}
    
@Bean
public Step secondStep() {
    return stepBuilderFactory.get("secondStep")
            .tasklet((contribution, chunkContext) -> {
                log.info("########### Second Step Start #############");
                return RepeatStatus.FINISHED;
            })
        .build();
}

Step은 두개를 만들었다. 첫번째 Step은 하는일이 랜덤으로 성공과 실패를 만든다. 그래서 성공이면 JOB_STATUS 값으로 CONTINUE를 넣어주고 실패면 FAILED를 넣어준다. 예제를 위해 간단하게 표기했지만 예외사항이 발생하는것에 따라 분기처리를 해주면 된다. 두번째 Step은 그냥 수행되었다는 로그만 찍어주는 Step이다. 일반적인 Step의 모습과 거의 동일하다고 보면 된다. 


Decider

@Bean
public JobExecutionDecider decider() {
    return new StepDecider();
}

public class StepDecider implements JobExecutionDecider{

    private static final String JOB_STATUS="JOB_STATUS";
    private static final String CONTINUE = "CONTINUE";
    private static final String FAILED = "FAILED";
    private static final String COMPLETED = "COMPLETED";
    
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        String stepStatus = jobExecution.getExecutionContext().getString(JOB_STATUS);
        FlowExecutionStatus flowExecutionStatus = null;
        if (CONTINUE.equals(stepStatus)) {
            flowExecutionStatus = new FlowExecutionStatus(CONTINUE);
        } else if (FAILED.equals(stepStatus)) {
            flowExecutionStatus = new FlowExecutionStatus(FAILED);
        } else {
            flowExecutionStatus = new FlowExecutionStatus(COMPLETED);
        }
        
        return flowExecutionStatus;
    }
}

Job에서 사용하기 위해 decider를 bean으로 생성한다. 그리고 내부에 StepDecider라는 클래스를 생성하였다. 생성을 할때는 JobExecutionDecider를 impl하여 생성해야 한다. 그럼 decide라는 메소드를 구현해야 하는데 이곳에 내가 분기처리를 해야 할 일에 대해서 작성을 해주면 된다. 필자는 fisrtStep으로부터 받은 JOB_STATUS라는 값을 통해 FlowExecutionStatus를 결정지으려고 한다. FlowExecutionStatus는 말 그대로 다음 실행에 영향을 미치는 값을 뜻한다. 이 값과 Job 구성시 나온 on 조건문이 일치하면 다음 Step을 실행하게 된다. 


테스트

테스트 방법은 간단하다. 랜덤으로 성공 혹은 실패에 대한 값이 나오므로 그냥 이 Job을 실행시켜보자. 

Job: [FlowJob: [name=sampleDeciderJob]] launched with the following parameters: [{requestDate=1621232843647, -spring.profiles.active=local, run.id=1, -spring.output.ansi.enabled=always, -job.name=sampleDeciderJob}]
Executing step: [firstStep]
########### First Step Start #############
########### First Step SUCCESS #############
Step: [firstStep] executed in 10ms
Executing step: [secondStep]
########### Second Step Start #############
Step: [secondStep] executed in 3ms
Job: [FlowJob: [name=sampleDeciderJob]] completed with the following parameters: [{requestDate=1621232843647, -spring.profiles.active=local, run.id=1, -spring.output.ansi.enabled=always, -job.name=sampleDeciderJob}] and the following status: [COMPLETED] in 35ms

첫번째 실행을 해봤다.  
성공해서 JOB_STATUS에 CONTINUE라는 값이 들어갔고 Decider에서도 CONTINUE가 FlowExecutionStatus로 설정이 되었다. 그리고 Job 설정에서는 CONTINUE 라고 되어 있는 경우에 다음 Step을 실행하라고 되어 있어서 Second Step이 정상적으로 실행이 되었다. 

 

Job: [FlowJob: [name=sampleDeciderJob]] launched with the following parameters: [{requestDate=1621234791699, -spring.profiles.active=local, run.id=1, -spring.output.ansi.enabled=always, -job.name=sampleDeciderJob}]
Executing step: [firstStep]
########### First Step Start #############
########### First Step FAILED #############
Step: [firstStep] executed in 12ms
Job: [FlowJob: [name=sampleDeciderJob]] completed with the following parameters: [{requestDate=1621234791699, -spring.profiles.active=local, run.id=1, -spring.output.ansi.enabled=always, -job.name=sampleDeciderJob}] and the following status: [FAILED] in 30ms

N번째 실행을 해봤다. (실패 나올때까지.. 1/2 확률인데 꽤 많이 돌린것 같다.)

JOB_STATUS에 FAILED 라는 값이 들어가고 이 값이 FlowExecutionStatus로 설정이 되었다. 하지만 Job에 기술된것은 CONTINUE 인 경우만 다음 Step을 진행하라고 했으므로 다음 Step은 실행되지 않는다. 

 

아주 간단한 샘플로 Decider에 대해 알아보았다. 

 

끝!

 

댓글
최근에 올라온 글
최근에 달린 댓글
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31