티스토리 뷰

앞에 Springboot 기반의 Batch에 대해 위와 같은 예제를 통해 알아보았다. 이번에는 Partitioner 라는 녀석이다. 

'Disk 파티션을 나누다' 와 같이 보통 크기가 정해진 무엇인가를 나누는것을 파티셔닝 한다고 한다. 

나눈다는 개념에서 앞서 다룬 Parallel(병렬처리)와 헷갈릴수도 있는데 Parallel여러개의 Step을 병렬로 처리를 하는 것이고 Spring Batch의 Partitioner는 일반적으로 하나의 Step을 여러개의 Thread 단위로 나누어 처리를 하는 것이라고 보면 된다. Partitioner는 대량 데이터를 처리해야 할 Step이 있을 경우 사용한다고 알고 있으면 된다. 즉 하나의 Step에 대해서 성능향상이 필요할 경우 사용한다. 또한 Partitioner는 기존 Step 코드의 변경이 거의 없다. 

 

Partitioner 시퀀스 다이어그램

PartitionStep으로 지정된 Step은 위와 같은 과정을 거쳐 n분할된 작은 Worker Step으로 나뉘어지게 된다. 이 과정에서 필요한게 PartitionHandler이다. 위의 과정을 쉽게 바로 샘플을 통해 알아보도록 하자. 


일단 필자의 환경은 springboot 2.2.2.RELEASE, mybatis-spring-boot-starter 2.1.4, 그리고 DB는 h2를 사용하였다. 

(대용량에 적합한 Partitioner이지만 데이터를 조금만 넣고 테스트를 하였다. )

SamplePartitioner.java

package com.example.batchprocessing;

import java.util.HashMap;
import java.util.Map;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisCursorItemReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;

@Component
public class SamplePartitioner {
    
    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public Job samplePartitionerJob(Step partitionStep) {
        return jobBuilderFactory.get("samplePartitionerJob")
            .incrementer(new RunIdIncrementer())
            .start(partitionStep)
            .build();
    }
    
    @Bean
    public Step partitionStep(Step workerStep, Partitioner partitioner, PartitionHandler partitionHandler, SamplePartitionerListener listener) {
        return stepBuilderFactory.get("partitionStep")
                .listener(listener)
                .partitioner(workerStep.getName(), partitioner)
                .partitionHandler(partitionHandler)
                .build();
    }    
    
    @Bean
    public Step workerStep(MyBatisCursorItemReader<Employee2> testReader1, SampleWorkerStepListener stepListener, FlatFileItemWriter<Employee2> testWriter1 ) {
        return stepBuilderFactory.get("workerStep")
            .listener(stepListener)
            .<Employee2, Employee2> chunk(10)
            .reader(testReader1)
            .processor(testProcessor2())
            .writer(testWriter1)
            .build();
    }
    
    @Bean
    @StepScope
    public MyBatisCursorItemReader<Employee2> myBatisCursorItemReader(SqlSessionFactory sqlSessionFactory,
            @Value("#{stepExecutionContext[gridSize]}") final String gridSize,
            @Value("#{stepExecutionContext[index]}") final String index) { 
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("gridSize", gridSize);
        parameterValues.put("index", index);
        MyBatisCursorItemReader<Employee2> myBatisCursorItemReader = new MyBatisCursorItemReader<>();
        myBatisCursorItemReader.setSqlSessionFactory(sqlSessionFactory);
        myBatisCursorItemReader.setParameterValues(parameterValues);
        myBatisCursorItemReader.setQueryId("employee.getEmployee");
        return myBatisCursorItemReader;
    }
    
    @Bean
    @StepScope
    public EmployeeProcessor testProcessor2() {
        return new EmployeeProcessor();
    }
    
    @Bean
    @StepScope
    public FlatFileItemWriter<Employee2> flatFileItemWriter() {
        FlatFileItemWriter<Employee2> writer = new FlatFileItemWriter<>();
        writer.setResource(new FileSystemResource("batch/result.txt"));
        writer.setAppendAllowed(true);
        writer.setLineAggregator(new DelimitedLineAggregator<Employee2>() {
            {
                setDelimiter(",");
                setFieldExtractor(new BeanWrapperFieldExtractor<Employee2>() {
                    {
                        setNames(new String[] { "userId", "userName", "userGender", "departmentCode" });
                    }
                });
            }
        });
        return writer;
    }
    
    @Bean 
    public Partitioner partitioner() {
        return new ParallelPartitioner(); 
    }    
    
    public class ParallelPartitioner implements Partitioner {
        @Override
        public Map<String, ExecutionContext> partition(int gridSize) {
            Map<String, ExecutionContext> result = new HashMap<>(gridSize); 
            for(int index= 0; index < gridSize; index++) {
                ExecutionContext ctx = new ExecutionContext();
                ctx.putInt("gridSize", gridSize);
                ctx.putInt("index", index);
                result.put("partition" + index, ctx);
            }
            return result;
        }
    }
    
    @Bean
    @JobScope
    public PartitionHandler partitionHandler(@Value("#{jobParameters[GridSize]}") int gridSize,
            TaskExecutor taskExecutor, Step workerStep) {
        TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
        handler.setGridSize(gridSize);
        handler.setTaskExecutor(taskExecutor);
        handler.setStep(workerStep);
        try {
            handler.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return handler;
    }
}

일단 통 소스를 붙여 넣었다. 크게 Job, Step 정의, Reader, Processor, Writer 정의, Partitioner 정의 이렇게 세곳으로 나누어서 살펴보도록 하자. 


Job, PartitionStep 정의

@Bean
public Job samplePartitionerJob(Step partitionStep) {
    return jobBuilderFactory.get("samplePartitionerJob")
        .incrementer(new RunIdIncrementer())
        .start(partitionStep)
        .build();
}

@Bean
public Step partitionStep(Step workerStep, Partitioner partitioner, PartitionHandler partitionHandler, SamplePartitionerListener listener) {
    return stepBuilderFactory.get("partitionStep")
            .listener(listener)
            .partitioner(workerStep.getName(), partitioner)
            .partitionHandler(partitionHandler)
            .build();
}    

@Bean
public Step workerStep(MyBatisCursorItemReader<Employee2> testReader1, SampleWorkerStepListener stepListener, FlatFileItemWriter<Employee2> testWriter1 ) {
    return stepBuilderFactory.get("workerStep")
        .listener(stepListener)
        .<Employee2, Employee2> chunk(10)
        .reader(testReader1)
        .processor(testProcessor2())
        .writer(testWriter1)
        .build();
}

일단 가장 기본인 Job, Step 정의이다.

Job은 일반적인 Spring Batch Job과 다르지 않다. Step으로 partitionStep 이라는 Step을 가지고 있다.

Step은 Manager 성격인 partitionStep과 Worker 성격인 workerStep이 있다.

partitionStep은 Step에 대해 관리하는 Step이라고 생각하면 된다. 즉 Step을 partitioner, partitionHandler를 통해 어떻게 나눌지 각을 잡는 역할을 한다고 보면 된다. 

workerStep은 일반적으로 우리가 다루는 실제 Step의 모습과 동일하다. 위에서 얘기한 기존 Step의 모습과 크게 다르지 않다는 것이 바로 이 부분때문에 그렇다는 것이다. 

그리고 partitionStep의 listener와 workerStep의 listener는 각각 정의해서 사용할 수 있다. 각 listener는 logging의 용도로 다음과 같이 정의하였다. 

 

SamplePartitionerListener.java

@Component
public class SamplePartitionerListener extends StepExecutionListenerSupport {

    private static final Logger log = LoggerFactory.getLogger(SamplePartitionerListener.class);

    @Override
    public void beforeStep(StepExecution stepExecution) {
        if(stepExecution.getStatus() == BatchStatus.STARTED) {
            log.info("PartitionStep start! " + stepExecution.getId());
        }
    }
    
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        if(stepExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("PartitionStep end! " + stepExecution.getId());
        }
        return new ExitStatus("stepListener exit");
    }
}

 

SampleWorkerStepListener.java

@Component
public class SampleWorkerStepListener extends StepExecutionListenerSupport {

    private static final Logger log = LoggerFactory.getLogger(SampleWorkerStepListener.class);

    @Override
    public void beforeStep(StepExecution stepExecution) {
        if(stepExecution.getStatus() == BatchStatus.STARTED) {
            log.info("WorkerStep start! " + stepExecution.getId());
        }
    }
    
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        if(stepExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("WorkerStep end! "   + stepExecution.getId());
        }
        return new ExitStatus("stepListener exit ");
    }
}

 


Reader, Processor, Writer 정의

@Bean
@StepScope
public MyBatisCursorItemReader<Employee2> myBatisCursorItemReader(SqlSessionFactory sqlSessionFactory,
        @Value("#{stepExecutionContext[gridSize]}") final String gridSize,
        @Value("#{stepExecutionContext[index]}") final String index) {
    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("gridSize", gridSize);
    parameterValues.put("index", index);
    MyBatisCursorItemReader<Employee2> myBatisCursorItemReader = new MyBatisCursorItemReader<>();
    myBatisCursorItemReader.setSqlSessionFactory(sqlSessionFactory);
    myBatisCursorItemReader.setParameterValues(parameterValues);
    myBatisCursorItemReader.setQueryId("employee.getEmployee");
    return myBatisCursorItemReader;
}

@Bean
@StepScope
public EmployeeProcessor testProcessor2() {
    return new EmployeeProcessor();
}

@Bean
@StepScope
public FlatFileItemWriter<Employee2> flatFileItemWriter() {
    FlatFileItemWriter<Employee2> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource("batch/result.txt"));
    writer.setAppendAllowed(true);
    writer.setLineAggregator(new DelimitedLineAggregator<Employee2>() {
        {
            setDelimiter(",");
            setFieldExtractor(new BeanWrapperFieldExtractor<Employee2>() {
                {
                    setNames(new String[] { "userId", "userName", "userGender", "departmentCode" });
                }
            });
        }
    });
    return writer;
}

다음은 Reader, Processor, Writer에 대한 부분이다. 

읽어오는 Reader는 MyBatisCursorItemReader를 선택했다. 변수로는 gridSize와 index라는것을 앞단으로부터 넘겨 받는다. 자세한 설명은 뒷부분에서 하겠다. 이 gridSize라는것은 Partitioner라는것의 핵심이라고 보면 된다. Step을 몇개의 Worker Step으로 나눌지에 대한 값이라고 보면 된다. index는 이와 연관되는 것으로 gridSize로 나눈 것들을 indexing을 하여 온전히 값을 나눌 수 있게 해준다. 

이 gridSize, index를 가지고 employee.getEmployee 라는 쿼리를 수행한다. 

 

실행 쿼리 (employee.getEmployee)

<mapper namespace="employee">
    <select id="getEmployee" resultType="com.example.batchprocessing.Employee2">
    <![CDATA[
        SELECT * FROM (
            SELECT A.USER_ID userId
                , A.USER_NAME AS userName
                , A.USER_GENDER AS userGender
                , A.DEPARTMENT_CODE AS departmentCode
                , ROWNUM AS RN from BATCH_SAMPLE_EMPLOYEE A
                ORDER BY A.USER_ID
        ) WHERE MOD(RN, #{gridSize}) = #{index}
        ]]>
    </select>
</mapper>

gridSize, index는 이런 용도로 사용이 된다. 즉 입력받은 값으로 결과값을 n개로 나누어 처리하기 위함이다. 

 

이렇듯 Reader에서부터 원래 하려던 일을 n등분을 하게 되고 그 후의 Processor, Writer 나눠진 작업들은 각각 n개로 나뉘어져 병렬로 처리된다. Processor와 Writer는 기존과 동일하게 처리되므로 넘어가도록 하겠다. 

 

여기서 주의해야 할점은 자세히 보면 인지했겠지만 모두 @StepScope 가 붙어있다. Step 하위 개념인 Reader, Writer, Processor는 @StepScope를 붙이는것을 명심하자. 


Partitioner 정의

@Bean 
public Partitioner partitioner() {
    return new ParallelPartitioner(); 
}    

public class ParallelPartitioner implements Partitioner {
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> result = new HashMap<>(gridSize); 
        for(int index= 0; index < gridSize; index++) {
            ExecutionContext ctx = new ExecutionContext();
            ctx.putInt("gridSize", gridSize);
            ctx.putInt("index", index);
            result.put("partition" + index, ctx);
        }
        return result;
    }
}

@Bean
@JobScope
public PartitionHandler partitionHandler(@Value("#{jobParameters[GridSize]}") int gridSize,
        TaskExecutor taskExecutor, Step workerStep) {
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
    handler.setGridSize(gridSize);
    handler.setTaskExecutor(taskExecutor);
    handler.setStep(workerStep);
    try {
        handler.afterPropertiesSet();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return handler;
}

그 다음은 Partitioner의 핵심인 Partitioner와 PartitionHandler를 정의하는 부분이다. 

Customizing을 할 여지가 거의 없으므로 공통 메소드로 뽑아도 괜찮을것 같다.

Partitioner에서는 위에서 언급된 값들에 대해 ExcutionContext에 값을 담아 넣는 역할을 한다.

PartitionerHandler는 외부 환경변수로부터 GridSize를 받아서 TaskExecutorPartitionHandler에 담아주는 역할을 한다. 

그리고 PartitionerHandler는 @JobScope로 정의를 해줘야 한다. 

이 정보들은 위에서 나온 partitionStep을 구성할때 사용이 된다. 이렇게 Partitioner에 대한 구성이 끝났다. 


실행 및 결과 확인

GridSize Setting

실행을 하기 위해 Batch Job 이름과 GridSize를 넣어준다. GridSize를 2로 설정했으니 workerStep이 2개가 되어야 한다. 이대로 실행을 해보도록 하자. 

 

[           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=samplePartitionerJob]] launched with the following parameters: [{GridSize=2, requestDate=1620294352697, -spring.profiles.active=local, run.id=1, -spring.output.ansi.enabled=always, -job.name=samplePartitionerJob}]
[           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [partitionStep]
[           main] c.e.b.SamplePartitionerListener          : PartitionStep start! 1
[           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
[         task-2] c.e.b.SampleWorkerStepListener           : WorkerStep start! 3
[         task-1] c.e.b.SampleWorkerStepListener           : WorkerStep start! 2
[         task-1] c.e.b.SampleWorkerStepListener           : WorkerStep end! 2
[         task-2] c.e.b.SampleWorkerStepListener           : WorkerStep end! 3
[         task-2] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition0] executed in 52ms
[         task-1] o.s.batch.core.step.AbstractStep         : Step: [workerStep:partition1] executed in 53ms
[           main] c.e.b.SamplePartitionerListener          : PartitionStep end! 1
[           main] o.s.batch.core.step.AbstractStep         : Step: [partitionStep] executed in 105ms
[           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=samplePartitionerJob]] completed with the following parameters: [{GridSize=2, requestDate=1620294352697, -spring.profiles.active=local, run.id=1, -spring.output.ansi.enabled=always, -job.name=samplePartitionerJob}] and the following status: [COMPLETED] in 117ms
[           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

Job이 실행되고 PartitionStep (Manager) 가 실행이 되고 그 하위의 WorkerStep이 2개가 실행되고 종료된다. 그리고 PartitionStep도 종료가 된다. 그리고 Job도 종료가 된다. 

 

이상 대용량 처리를 할 Step에서 유용하게 사용되는 Partitioner였다. 

 

 

댓글
최근에 올라온 글
최근에 달린 댓글
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31