spring batch execution context 예외 처리하기

업무 중 batch job을 작성하다 아래와 같은 메시지가 발생하였고 이것을 해결한 내용 및 원리를 정리합니다.

Failed to serialize object of type: class org.springframework.batch.item.ExecutionContext

Execution Context

배치 처리는 job 혹은 step별 상태를 가지고 있어야 합니다. 저장된 상태를 통해 commit등의 transaction처리를 하게 됩니다. 다행인 점은 사용자가 직접 처리하는 것이 아니라 spring이 알아서 자동화해준다는 점 입니다. Executioncontext가 그 역할을 처리하며 내부적으로 key, value 형태로 되어 있습니다. ExecutionContext는 단순히 spring 내부에서 그 과정을 제어하는 수준을 넘어서 global하게 영역별 데이터를 사용자가 직접 공유도 할 수 있도록 도와줍니다. 하기 코드는 tasklet에서 Executioncontext에 접근하는 예제입니다.

data class Person(
    val name: String,
    val age: Int
)

class FirstTasklet : Tasklet {
    override fun execute(stepContribution: StepContribution, chunkContext: ChunkContext): RepeatStatus {
        val jobContext = chunkContext.stepContext.stepExecution.executionContext
        jobContext.put("key", Person("hahava", 31))
        return RepeatStatus.FINISHED
    }
}

언급한 바와 같이 Executioncontext는 내부가 ConcurrentHashmap으로 구현되어 있기에 get메서드를 호출하여 데이터를 안전하게 가져올 수 있습니다.

class SecondTasklet : Tasklet {
    override fun execute(stepContribution: StepContribution, chunkContext: ChunkContext): RepeatStatus {
        val jobContext = chunkContext.stepContext.stepExecution.jobExecution.executionContext
        println(jobContext.get("key"))

        return RepeatStatus.FINISHED
    }
}

그러나 회사에서 작업할 때는 예외가 발생하였습니다.

Failed to serialize object of type: class org.springframework.batch.item.ExecutionContext

DefaultBatchConfigurer

@EnableBatchProcessing을 통해 간편하게 batch를 사용할 수 있습니다. 내부적으로 DefaultBatchConfigurer사용하며 서비스나 경우에 맞춰 특정 설정 등을 override하여 구현합니다. 일부 시스템에선 데이터베이스를 지정하지 않고 batch를 작업하기 위해 아래와 같이 setDatasource를 override하여 공백으로 지정합니다.

@SpringBootApplication
@EnableBatchProcessing
@Configuration
class DemoApplication : DefaultBatchConfigurer() {
    override fun setDataSource(dataSource: DataSource) {
    }
}

문제는 datasource를 null로 처리했을 경우 initialize에서 기대하는 동작이 일부 달라진다는 점입니다.

@PostConstruct
public void initialize() {
  try {
    if(dataSource == null) {
      logger.warn("No datasource was provided...using a Map based JobRepository");

      if(getTransactionManager() == null) {
        logger.warn("No transaction manager was provided, using a ResourcelessTransactionManager");
        this.transactionManager = new ResourcelessTransactionManager();
      }

      MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean(getTransactionManager());
      jobRepositoryFactory.afterPropertiesSet();
      this.jobRepository = jobRepositoryFactory.getObject(); // ...(1)

      MapJobExplorerFactoryBean jobExplorerFactory = new MapJobExplorerFactoryBean(jobRepositoryFactory);
      jobExplorerFactory.afterPropertiesSet();
      this.jobExplorer = jobExplorerFactory.getObject(); 
    } else {
      this.jobRepository = createJobRepository(); 
      this.jobExplorer = createJobExplorer(); // ... (2)
    }

    this.jobLauncher = createJobLauncher();
  } catch (Exception e) {
    throw new BatchConfigurationException(e);
  }
}

(2)와 같이 호출되면 JobExplorerFactoryBean가 생성되고 내부에 ExecutionContextSerializer 구현되어 있어 추가적이 작업을 하지 않아도 되지만, (1)과 같이 호출되면 MapJobExecutionDao 생성되고 org.springframework.util패키지의 SerializationUtils호출 합니다. SerializationUtilsSerializable을 사용하기 때문에 Serializable구현하지 않은 클래스에선 예외가 발생했던 것입니다. 따라서 기본 타입이 아닌 클래스 타입에선 반드시 Serializable 구현해야 정상적으로 context을 사용할 수 있습니다.

full code

@Configuration
class JobConfig(
    private val jobBuilderFactory: JobBuilderFactory,
    private val stepBuilderFactory: StepBuilderFactory
) {
    @Bean
    fun sampleJob(): Job = jobBuilderFactory.get("sampleJob")
        .start(firstStep()).next(secondStep())
        .build()

    @Bean
    fun firstStep(): Step = stepBuilderFactory.get("firstStep")
        .tasklet(firstTasklet())
        .build()

    @Bean
    fun secondStep(): Step = stepBuilderFactory.get("secondStep")
        .tasklet(secondTasklet())
        .build()

    @Bean
    fun firstTasklet() = FirstTasklet()

    @Bean
    fun secondTasklet() = SecondTasklet()
}

class FirstTasklet : Tasklet {
    override fun execute(stepContribution: StepContribution, chunkContext: ChunkContext): RepeatStatus {
        val jobContext = chunkContext.stepContext.stepExecution.executionContext
        jobContext.put("key", Person("hahava", 31))
        return RepeatStatus.FINISHED
    }
}

class SecondTasklet : Tasklet {
    override fun execute(stepContribution: StepContribution, chunkContext: ChunkContext): RepeatStatus {
        val jobContext = chunkContext.stepContext.stepExecution.jobExecution.executionContext
        println(jobContext.get("key"))

        return RepeatStatus.FINISHED
    }
}

data class Person(
    val name: String,
    val age: Int
) : Serializable

카테고리:

업데이트:

댓글남기기