티스토리 뷰

지난 포스팅으로 Chunk 방식Tasklet 방식의 Spring Batch에 대해 알아보았다. 이제는 이 방식을 조금 응용하여 이를 병렬처리하는 방법에 대해 알아보겠다. Chunk와 Tasklet과 마찬가지로 이것도 매우 직관적이라 이해가 쉬울 것이다. 


SampleParallel.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
public class SampleParallel {
    
    private static final Logger log = LoggerFactory.getLogger(SampleParallel.class);

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public Job sampleParallelJob(Step sampleParallelStep1, Step sampleParallelStep2, Step sampleParallelStep3, Step sampleParallelStep4) {
        Flow flow1 = new FlowBuilder<Flow>("flow1")
                .start(sampleParallelStep1)
                .next(sampleParallelStep4)
                .build();

        Flow flow2 = new FlowBuilder<Flow>("flow2")
                .start(sampleParallelStep2)
                .build();
        
        Flow flow3 = new FlowBuilder<Flow>("flow3")
                .start(sampleParallelStep3)
                .build();

        Flow parallelStepFlow = new FlowBuilder<Flow>("parallelStepFlow")
                .split(new SimpleAsyncTaskExecutor())
                .add(flow1, flow2, flow3)
                .build();

        return jobBuilderFactory.get("sampleParallelJob")
                .start(parallelStepFlow)
                .next(sampleParallelStep1)
                .build()
                .build();
    }
    
    @Bean
    public Step sampleParallelStep1() {
        return stepBuilderFactory.get("sampleParallelStep1")
                .tasklet((contribution, chunkContext) -> {
                    for(int inx = 1 ; inx < 21 ; inx++) {
                        log.info("[step] : " + inx);
                    }
                    return RepeatStatus.FINISHED;
                })
            .build();
    }
    
    @Bean
    public Step sampleParallelStep2() {
        return stepBuilderFactory.get("sampleParallelStep2")
                .tasklet((contribution, chunkContext) -> {
                    for(int inx = 21 ; inx < 41 ; inx++) {
                        log.info("[step] : " + inx);
                    }
                    return RepeatStatus.FINISHED;
                })
            .build();
    }
    
    @Bean
    public Step sampleParallelStep3() {
        return stepBuilderFactory.get("sampleParallelStep3")
                .tasklet((contribution, chunkContext) -> {
                    for(int inx = 41 ; inx < 61 ; inx++) {
                        log.info("[step] : " + inx);
                    }
                    return RepeatStatus.FINISHED;
                })
            .build();
    }
    
    @Bean
    public Step sampleParallelStep4() {
        return stepBuilderFactory.get("sampleParallelStep4")
                .tasklet((contribution, chunkContext) -> {
                    for(int inx = 61 ; inx < 81 ; inx++) {
                        log.info("[step] : " + inx);
                    }
                    return RepeatStatus.FINISHED;
                })
            .build();
    }
}

어차피 이 글을 보는 사람들은 글은 잘 안읽어볼꺼라 샘플 소스부터 통으로 올린다. Tasklet Sample과 거의 흡사한데 여기서는 Flow를 적극적으로 사용하는것을 볼 수 있다. Chunk는 Tasklet을 정의하는 대신 정의해서 사용하면 된다. 병렬처리를 하는데 있어서 다른점은 없다. 그럼 어떻게 Flow를 정의하고 사용하는지 알아보자. 

 

Job의 Flow 정의

@Bean
public Job sampleParallelJob(Step sampleParallelStep1, Step sampleParallelStep2, Step sampleParallelStep3, Step sampleParallelStep4) {
    Flow flow1 = new FlowBuilder<Flow>("flow1")
            .start(sampleParallelStep1)
            .next(sampleParallelStep4)
            .build();

    Flow flow2 = new FlowBuilder<Flow>("flow2")
            .start(sampleParallelStep2)
            .build();
        
    Flow flow3 = new FlowBuilder<Flow>("flow3")
            .start(sampleParallelStep3)
            .build();

    Flow parallelStepFlow = new FlowBuilder<Flow>("parallelStepFlow")
            .split(new SimpleAsyncTaskExecutor())
            .add(flow1, flow2, flow3)
            .build();

    return jobBuilderFactory.get("sampleParallelJob")
            .start(parallelStepFlow)
            .next(sampleParallelStep1)
            .build()
            .build();
}

아래서부터 접근을 해보자.

sampleParallelJob은 최초 parallelStepFlow라는 녀석을 호출한다.

parallelStepFlow는 flow1, flow2, flow3에 대해 add()를 통해 병렬처리를 하겠다고 선언을 했다.

flow1은 step1과 4를 순차처리를 하고 flow2는 step2만, flow3은 step3만 처리를 한다.

주의할점은 step1, step2, step3은 동시에 병렬처리가 되고 이 모든게 끝나야 step1의 후속작업인 step4가 실행이 된다.step4까지는 병렬처리가 안된다는 것이다

이렇게 해서 parallelStepFlow가 모두 끝나면 sampleParallelJob에 정의된대로 다음 작업으로 step1을 다시 실행을 한다. 

아주 직관적으로 정의를 할 수 있어서 한번만 보면 이해가 잘 간다.

 

spring batch parallel

그림으로 그려보면 위와 같은 모습이다. 다음은 이를 실행을 해보자. 

step1은 1~20, 2는 21~40, 3은 41~60, 4는 61~80 의 숫자를 출력하는 예제이다. 

 

좌측 그림은 최초 실행한 step1, step2, step3이 병렬처리되는 모습이다.

가운데 그림은 이 병렬처리가 끝나고 step1의 후속 step인 step4가 실행이 되는 모습이다. 

우측 그림은 parallelStepFlow가 모두 끝나고 이것의 후속 step인 step1이 실행되는 모습니다. 

본인의 Batch 업무에 맞게 이를 잘 조합해서 사용하면 좋은 결과를 얻을수 있을것이다. 

 

끝!

댓글
최근에 올라온 글
최근에 달린 댓글
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31