Spring Batch, 제대로 이해하기 (2) - 동작원리

2022. 6. 19. 23:42Spring

 

 

Spring Batch의 동작원리를 이해하고 코드를 Deep Dive하는 것이 본 포스팅의 목표입니다.

 

 

안녕하세요.

이번에 Spring Batch를 사용할 일이 있었는데, 아주 ... 다양한 내용을 배우며 다양한 감정도 배웠어요,,

그래서 이 Spring Batch를 도장깨기 하고자 Spring Batch Deep dive를 해보려고 합니다.

 

이번 포스팅은 Spring Batch 시리즈의 두 번째 편으로 동작원리를 다룹니다.

 

 

-------------------  📌 Spring Batch Series 📌 -------------------

Spring Batch, 제대로 이해하기 (1) - 개념 이해

✏️ Spring Batch, 제대로 이해하기 (2) - 동작 원리

Spring Batch, 제대로 이해하기 (3) - 데이터 처리

Spring Batch, 제대로 이해하기 (4) - 데이터 처리 활용

Spring Batch, 제대로 이해하기 (5)

- MultiResourceItemReader

 

----------------------------------------------------------------


 

 

@EnableBatchProcessing

@EnableBatchProcessing는 스프링 부트에서 배치를 실행하도록 적용하는 어노테이션입니다.

배치 인프라스트럭처를 부트스트랩합니다.

 

이 어노테이션은 스프링 배치가 제공하는 두 개의 빌더를 자동와이어링합니다.  (하나는 잡 빌드, 하나는 스텝을 빌드입니다.)

즉 @EnableBatchProcessing 를 적용하면 스프링이 각 빌더를 주입하게 만들면 잡이 실행되는 것입니다.

 

 

📌 DataSource

해당 어노테이션을 적용한 다음, 추가적인 요구 작업으로는 JobRepository와 PlatformTransactionManager의 사용에서 필요한 DataSource를 정의해야 합니다. JobRepository과 PlatformTransactionManager는 필요에 따라 데이터 소스를 사용할 수 있기 때문에 스프링 부트가 Classpath에 정의된 데이터 소스 정보를 가지고 HSQLDB를 사용합니다. 

 

 

📌 boolean modular

@EnableBatchProcessing 에서 modular (boolean) 값을 설정할 수 있는데, 여러 Job을 등록할 때 모듈화를 위해 사용할 수 있습니다. default값은 false 이며, 활성화를 위해서는 @EnableBatchProcessing(modular = true) 로 활성화시킵니다.

 

내부적으로는 @EnableBatchProcessing 어노테이션은 BatchConfigurationSelector를 @Import하는데 Batch Configuration을 선택하는 클래스입니다. 기본적으로 SimpleBatchConfiguration 클래스를 로드하며, modular가 활성화되어 있으면 ModularBatchConfiguration를 로드합니다. ModularBatchConfiguration는 여러 Job Configuration들과 Job들이 등록되는 Child Application Context들을 따로 생성해서 포함시킵니다. 

덕분에 Job 마다 정의된 Bean 이름이 충돌되지 않게 되죠.

기본적으로 JobRegistrar에는 AutomaticJobRegistrar라는 빈이 주입됩니다.

 

 

📌 Initialize

BatchConfiguration에서는 부트스트랩 과정으로 아래와 같은 빈을 정의합니다.

 

✔️ JobRepository: 실행 중인 잡의 상태를 기록하는 게 사용

✔️ JobLauncher: 잡을 구동하는데 사용

✔️ JobExplorer: JobRepository를 사용해 읽기 전용 작업을 수행하는데 사용

✔️ JobRegistry: 특정한 런터 구현체를 사용할 때 잡을 찾는 용도로 사용

✔️ PlatformTransactionManager: 잡 진행 과정에서 트랜잭션을 다루는데 사용

✔️ JobBuilderFactory: 잡을 생성하는 빌더 

✔️ StepBuilderFactory: 스텝을 생성하는 빌더 

 

 

 

📌 JobLauncherApplicationRunner

다음으로는 정의된 JobLauncherApplicationRunner을 실행합니다.

JobLauncher를 실행하면서 정의된 job에 인자로 넘어온 Argument를 JobParameter로 생성하여 넘겨줍니다.

 

 

[핵심 코드 요약]  아래 코드는 주로 에러 코드나 상태 업데이트와 같은 대부분의 코드가 생략되었습니다.

protected void execute(Job job, JobParameters jobParameters) {

  JobParameters parameters = getNextJobParameters(job, jobParameters);
  JobExecution execution = this.jobLauncher.run(job, parameters);
}

 

보시다시피, 등록된 JobLauncher의 run을 실행합니다.

 

 

JobLauncher

다음으로, JobLauncher의 기본 구현체인 SimpleJobLauncher가 실행됩니다.

JobRepository에서 마지막으로 실행된 JobExecution정보를 가져오고 해당 Job과 JobParameter의 유효성을 체크합니다.

재시작 가능한지, 실행가능한 상태인지, 등록된 Step 상태 등을 체크합니다.

 

 

JobRepository에 새로운 JobExecution을 생성한 후, TaskExecutor에 Runnable을 등록해 하나의 job을 실행시킵니다.

이 때, Job.execute하면서 JobExecution를 넘겨줍니다.

기본적으로 SyncTaskExecutor을 통해 실행됩니다.

 

[핵심 코드 요약]  아래 코드는 주로 에러 코드나 상태 업데이트와 같은 대부분의 코드가 생략되었습니다.

public JobExecution run(final Job job, final JobParameters jobParameters) {
    final JobExecution jobExecution;
    
    JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
    
    // lastExecution 상태 체크 & 상태 받아오기(BatchStatus) & 모든 Step 상태 체크
    
    job.getJobParametersValidator().validate(jobParameters);
        
    jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

    taskExecutor.execute(new Runnable() {
        job.execute(jobExecution);
    }
}

 

이 때 Job의 상태를 확인하는데 하나라도 실행중이거나 STOPPING 상태면 이미 실행됐다면 이미 실행됐다는 오류를 주고,

UNKOWN 상태면 롤백 안됐다고 오류를 줍니다.

 

 

Job

이제 Job이 실행됩니다.

기본적으로 Job의 추상클래스인 AbstractJob가 호출되고, 그 다음으로 기본 구현체인 SimpleJob이 실행됩니다.

AbstractJob의 구현체로는 FlowJob, SimpleJob 가 있습니다.

 

 

📌 AbstractJob

> Spring Docs

 

AbstractJob에서 등록된 JobListener가 있을 때 beforeJob을 수행합니다.

그 이후, SimpleJob의 execute가 실행됩니다.

마지막으로 Job이 모두 실행을 마쳤을 때, listener에서 afterJob이 수행됩니다.

이 때, JobListener afterJob은 finally 구문에 있기 때문에 job에서 예외가 발생해도 실행됩니다. 

 

그 이후, repository에서 상태를 업데이트 합니다.

 

 

[핵심 코드 요약]  아래 코드는 주로 에러 코드나 상태 업데이트와 같은 대부분의 코드가 생략되었습니다.

public final void execute(JobExecution execution) {
  try {
    jobParametersValidator.validate(execution.getJobParameters());
    
    if (execution.getStatus() != BatchStatus.STOPPING) {
      listener.beforeJob(execution);
      doExecute(execution);
    }
  } catch (...) {
    // STOPPED or FAILED 로 상태 변경
    
  } finally {
    listener.afterJob(execution);
    jobRepository.update(execution);
  }
}

 

상태 체크에 대해서는, 실행 전 job의 status가 STOPPING이라면 실행오류 (BatchStatus.STOPPED, ExitStatus.COMPLETED)를 하고 정상 종료하고 예외 발생 시, Job BatchStatus를 갱신 (STOPPED, FAILED, NOOP)합니다.

 

 

📌 SimpleJob

실제적인 Job의 역할로는 모든 StepExecution을 가져와서 실행시킵니다. 

정확히는 Step 실행을 StepHandler에게 넘깁니다.

마지막으로 Step의 마지막 상태를 Job에게 넘겨줍니다.

 

 

[핵심 코드 요약]  아래 코드는 주로 에러 코드나 상태 업데이트와 같은 대부분의 코드가 생략되었습니다.

@Override
protected void doExecute(JobExecution execution) throws ... {

    StepExecution stepExecution = null;
    for (Step step : steps) {
        stepExecution = handleStep(step, execution);
        if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
            break; // Terminate the job if a step fails
        }
    }

    // Update the job status to be the same as the last step
    if (stepExecution != null) 
    {
        if (logger.isDebugEnabled()) {
            logger.debug("Upgrading JobExecution status: " + stepExecution);
        }
        execution.upgradeStatus(stepExecution.getStatus());
        execution.setExitStatus(stepExecution.getExitStatus());
    }
}

 

 

 

Step

Job 구현체에서 불러진 StepHandler의 handleStep이 먼저 실행됩니다.

이후, AbstractStep이 실행된 후 AbstractStep의 구현체가 실행되는데요.

구현체에는 DecisionStep, DelegateStep, FlowStep, JobStep, PartitionStep, TaskletStep 등이 있습니다.

기본적으로 생성되는 TaskletStep을 살펴보겠습니다.

 

 

📌 StepHandler

> Spring Docs

StepHandler의 구현체로는 JsrStepHandler, SimpleStepHandler 가 있으며, 기본적으로 SimpleStepHandler를 불러옵니다.

 

✔️ SimpleStepHandler

SimpleStepHandler는 JobInstance를 가져오고, 해당 Step의 마지막 상태를 JobRepository로부터 가져옵니다.

만약, Step이 실행가능한 상태라면 StepExecution을 생성하고 재시작 상태라면 ExecutionContext를 업데이트 합니다.

Step 실행 전 JobRepository에 StepExecution를 추가한 후 Step을 실행합니다.

실행 후에 ExecutionContext을 업데이트한 후 해당 StepExecution을 반환합니다.

 

 

[핵심 코드 요약]  아래 코드는 주로 에러 코드나 상태 업데이트와 같은 대부분의 코드가 생략되었습니다.

public StepExecution handleStep(Step step, JobExecution execution) throws ... {
  JobInstance jobInstance = execution.getJobInstance();
  StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName());
  
  if (shouldStart(lastStepExecution, execution, step)) {
    currentStepExecution = execution.createStepExecution(step.getName());
    jobRepository.add(currentStepExecution);
    
    step.execute(currentStepExecution);
    
    jobRepository.updateExecutionContext(execution);
  }
  
  return currentStepExecution;
}

 

보시면, step.execute(...)로 Step을 실행합니다.

 

 

📌 AbstractStep

> Spring Docs

아래 doExecute() 부분이 Step 구현체를 실행하는 코드입니다.

 

[핵심 코드 요약]  아래 코드는 주로 에러 코드나 상태 업데이트와 같은 대부분의 코드가 생략되었습니다.

public final void execute(StepExecution stepExecution) throws ... {

  stepExecution.setStatus(BatchStatus.STARTED);
  ExitStatus exitStatus = ExitStatus.EXECUTING;
  
  try { 
    getCompositeListener().beforeStep(stepExecution);
    doExecute(stepExecution);
    exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus());
    stepExecution.upgradeStatus(BatchStatus.COMPLETED);
  } catch (Throwable e) {
  
    stepExecution.upgradeStatus(determineBatchStatus(e));
    
  } finallly {
    getCompositeListener().afterStep(stepExecution);
    getJobRepository().updateExecutionContext(stepExecution);
  }
}

 

JobRepository에 BatchStatus를 STARTED로 변경하고, StepExecution을 업데이트합니다.

AbstractJob과 동일하게 등록된 StepListener가 있을 때 beforeStep을 수행합니다.

수행 후 정상 완료되었으면 BatchStatus를 COMPLETED로, ExitStatus를 COMPLETE로 변경합니다.

 

마지막으로 Listener의 afterStep을 수행합니다.

Job과 마찬가지로 finally 내에 Listener의 afterStep을 수행합니다.

모든 Step작업이 실패든 성공이든(finally 구문) 마지막에 JobRepository의 업데이트 과정을 거칩니다. 

예외 발생 시, step BatchStatus를 갱신 (STOPPED)합니다.

만약, JobRepository의 작업 수행시 오류가 발생하면 (BatchStatus.UNKNOWN, ExitStatus.UNKNOWN)으로 설정 후 종료합니다.

 

 

AbstractStep을 구현하는 클래스는 DecisionStepDelegateStepFlowStepJobStepPartitionStepTaskletStep 등이 있는데, 이번 포스팅에는 기본적으로 Chunk기반의 ItemReader, ItemProcessor,ItemWriter를 설정할 때 생성되는 TaskletStep을 살펴보겠습니다.

 

 

📌 TaskletStep

기본적으로 Chunk기반 Step을 작성하면 TaskletStep 클래스를 불러옵니다.

해당 스텝에서는 트랜잭션 단위로 tasklet을 실행합니다.

 

[핵심 코드 요약]  아래 코드는 주로 에러 코드나 상태 업데이트와 같은 대부분의 코드가 생략되었습니다.

protected void doExecute(StepExecution stepExecution) throws Exception {
  stream.update(stepExecution.getExecutionContext());
  getJobRepository().updateExecutionContext(stepExecution);
  
  stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
  
    @Override
    public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) ... {
      StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
               
      result = new TransactionTemplate(transactionManager, transactionAttribute)
                .execute(new ChunkTransactionCallback(chunkContext, semaphore));
      chunkListener.afterChunk(chunkContext);
      
    }
  }
}

 

Transaction.execute를 하게 되면 결국 ChunkTransactionCallback을 통해 아래와 같은 코드가 실행됩니다.

 

 

[핵심 코드 요약]  아래 코드는 주로 에러 코드나 상태 업데이트와 같은 대부분의 코드가 생략되었습니다.

StepContribution contribution = stepExecution.createStepContribution();

chunkListener.beforeChunk(chunkContext);
result = tasklet.execute(contribution, chunkContext);

stepExecution.apply(contribution);
stream.update(stepExecution.getExecutionContext());

getJobRepository().update(stepExecution);

 

코드를 요약하면 Chunk 단위로 실행한 후 다시 Step에 상태를 올려보냅니다.

여기서 tasklet을 execute하는 코드가 있는데, 해당되는 대표적인 클래스가 ChunkOrientedTasklet입니다.

 

 

 

ChunkOrientedTasklet

Tasklet의 구현체로는 StoppableTasklet, ChunkOrientedTasklet, SystemCommandTasklet 등이 있는데, 이번에는 ChunkOrientedTasklet을 살펴보겠습니다.

ChunkOrientedTasklet는 ChunkProcess와 ChunkProvider를 속성값으로 갖고 있습니다.

 

 

[핵심 코드 요약]  아래 코드는 주로 에러 코드나 상태 업데이트와 같은 대부분의 코드가 생략되었습니다.

public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

  Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
  inputs = chunkProvider.provide(contribution);
  
  chunkProcessor.process(contribution, inputs);
  chunkProvider.postProcess(contribution, inputs); // do nothing
}

 

가장 먼저 chunkProvider의 provide가 실행되고, chunkProcessor의 process가 실행됩니다.

chunkProvider의 provide는 Reader작업을 하고,

chunkProcessor의 process는 Writer작업을 합니다.

 

순서대로 살펴보겠습니다.

 

 

 

📌 SimpleChunkProvider

> ChunkProvider Spring Docs

아래와 같이 Chunk 단위로 아이템을 읽어오는 것을 확인할 수 있습니다.

 

[핵심 코드 요약]  아래 코드는 주로 에러 코드나 상태 업데이트와 같은 대부분의 코드가 생략되었습니다.

public Chunk<I> provide(final StepContribution contribution) throws Exception {
  final Chunk<I> inputs = new Chunk<>();
  repeatOperations.iterate(new RepeatCallback() {
    @Override
    public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
      item = read(contribution, inputs);
      return inputs;
	}
  }
}

// read(...)는 .doRead()를 호출
protected final I doRead() throws Exception {
  listener.beforeRead();
  I item = itemReader.read();
  return item;
}

 

 

📌 SimpleChunkProcessor

> ChunkProcessor Spring Docs

마지막으로, ChunkProcessr는 Writer 작업을 실행합니다.

코드를 보면 실제로 write를 진행하는데, 그 전에 등록된 ItemProcessor를 Chunk단위로 실행하는 것을 확인할 수 있습니다.

 

[핵심 코드 요약] (아래 코드는 주로 에러 코드나 상태 업데이트와 같은 대부분의 코드가 생략되었습니다.)

public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
  Chunk<O> outputs = transform(contribution, inputs);

  // chunk 단위의 processor 실행 모두 마친 후 writer 실행
  write(contribution, inputs, getAdjustedOutputs(inputs, outputs));
}

// chunk 단위로 processor 실행
protected Chunk<O> transform(final StepContribution contribution, Chunk<I> inputs) throws Exception {
  for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
    RetryCallback<O, Exception> retryCallback = new RetryCallback<O, Exception>() {
      doProcess(item);
    }
  }
}

protected final O doProcess(I item) throws Exception {
  listener.beforeProcess(item);
  O result = itemProcessor.process(item);
  listener.afterProcess(item, result);
  
  return result;
}

// writer
protected final void doWrite(List<O> items) throws Exception {
  listener.beforeWrite(items);
  writeItems(items);            // itemWriter.write(items);
  doAfterWrite(items);          // listener.afterWrite(items);
}

 

 

 

그럼 이렇게 모든 실행 작업을 뜯어보았습니다.

재미있게 하긴 했는데, 생각보다 오래걸리네요 🥲

 

 

이상으로 Spring Batch에 대한 동작 원리를 다뤘습니다.

오타나 잘못된 내용을 댓글로 남겨주세요!

감사합니다 ☺️