티스토리 뷰

이전에 Listener 관련 글을 쓰면서 JobListener와 StepListener에 대해 설정과 사용법에 대해 알아보았다. 여기에 추가적으로 ChunkListener에 대해서 알아보도록 하겠다. Job 안에 Step이 있고 Step 안에 Chunk가 있으니 앞서 본 것보다 더 작은 단위라고 이해하면 된다. ChunkListener 외에도 Step 하위에서 실행되는 Listener의 목록은 다음과 같다. 

 

StepListener Type Hierarchy

Spring Batch는 이렇게 아주 작은 단위까지도 세밀하게 Listener를 넣을 수 있고 Control 할 수 있다. 


Chunk 단위로 잘 실행이 되는지 테스트를 하며 이 ChunkListener를 사용해 봤다. 물론 Chunk로 수행하는 Step이어야지 Tasklet은 ChunkListener를 사용하지 못한다. 

 

ChunkListener

@Component
public class SampleChunkListener implements ChunkListener {

    private static final Logger log = LoggerFactory.getLogger(SampleChunkListener.class);

    @Override
    public void beforeChunk(ChunkContext context) {
        StepContext stepContext = context.getStepContext();
        StepExecution stepExecution = stepContext.getStepExecution();
        
        log.info("###### beforeChunk : " + stepExecution.getReadCount());
    }

    @Override
    public void afterChunk(ChunkContext context) {
        StepContext stepContext = context.getStepContext();
        StepExecution stepExecution = stepContext.getStepExecution();
        
        log.info("##### afterChunk : " + stepExecution.getCommitCount());
    }

    @Override
    public void afterChunkError(ChunkContext context) {
        StepContext stepContext = context.getStepContext();
        StepExecution stepExecution = stepContext.getStepExecution();
        
        log.info("##### afterChunkError : " + stepExecution.getRollbackCount());
    }
}

beforeChunk는 말 그대로 Chunk 실행 전에, afterChunk는 Chunk 실행 후에, afterChunkError는 Chunk 수행 중 Error가 발생했을시 작동을 한다. 위와 같이 ChunkContext를 받아서 필요한 값을 꺼내서 logging을 할 수 있다. 필자는 Chunk 실행 전에 현재 몇건을 Read 했는지에 대한 정보, Chunk 실행 후에 몇번 Commit을 했는지에 대한 정보를 출력하려고 한다. 

 

Step 정의

@Bean
public Step sampleChunkStep(FlatFileItemReader<Employee> testReader, SampleStepListener stepListener, SampleChunkListener chunkListener, MyBatisBatchItemWriter<Employee> testWriter) {
    return stepBuilderFactory.get("sampleChunkStep")
        .listener(stepListener)
        .<Employee, Employee> chunk(10)
        .reader(testReader)
        .processor(sampleChunkProcessor())
        .writer(testWriter)
        .listener(chunkListener)
        .build();
}

Chunk를 사용하는 Step을 위와 같이 정의하였다. chunk(10) 이라고 정의가 된 것이 바로 chunk의 단위이다. 10건씩 commit 하겠다는 것이다. 이게 무슨 말이냐면 다음 그림을 보면 이해가 쉬울 것이다. 

 

출처 : https://terasoluna-batch.github.io/guideline/5.0.0.RELEASE/en/single_index.html

read하고 process하고 한 결과물이 10개가 되면 그때 write를 하겠다는 것이다. 적은 DB I/O를 통해 빠른 배치 처리를 하는데 목적이 있다. 

 

소스 4번째 줄은 StepListener를, 9번째 줄에서는 처음에 정의한 ChunkListener를 넣어줬다. 예제와 같은 순서로 넣어줘야 하고 순서가 바뀌면 오류가 발생하니 주의하도록 하자. 

 

Test

그리고 Job을 실행시켜보자. 참고로 1000건의 데이터를 처리하는 Job이다. 

[SampleChunkListener.beforeChunk:line21] - ###### beforeChunk : 0
[SampleChunkListener.afterChunk:line29] - ##### afterChunk : 1
[SampleChunkListener.beforeChunk:line21] - ###### beforeChunk : 10
[SampleChunkListener.afterChunk:line29] - ##### afterChunk : 2
[SampleChunkListener.beforeChunk:line21] - ###### beforeChunk : 20
[SampleChunkListener.afterChunk:line29] - ##### afterChunk : 3
[SampleChunkListener.beforeChunk:line21] - ###### beforeChunk : 30
[SampleChunkListener.afterChunk:line29] - ##### afterChunk : 4
[SampleChunkListener.beforeChunk:line21] - ###### beforeChunk : 40
[SampleChunkListener.afterChunk:line29] - ##### afterChunk : 5
...
[SampleChunkListener.beforeChunk:line21] - ###### beforeChunk : 990
[SampleChunkListener.afterChunk:line29] - ##### afterChunk : 100

이렇게 쭉 Chunk 단위로 로깅이 되는것을 확인할 수 있다. 맨 처음 beforeChunk에서는 Chunk 실행 전이니 읽은것이 없고 Chunk가 모두 실행되고 수행되는 afterChunk에서는 1건을 commit 했다는 뜻이다. 이런식으로 1000건의 데이터를 Chunk로 처리하면 총 read는 1000번 하고 commit은 100번을 하는 것이다. commit-interval을 10으로 주었기 때문이다. 이렇게 정상적으로 Chunk가 실행이 되었고 이를 ChunkListener를 통해 logging을 하여 살펴보았다. 

 

끝!

댓글
최근에 올라온 글
최근에 달린 댓글
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31