Spring-Batch的使用

    技术2022-07-11  96

    在pom.xml 添加:

    <dependency> <groupId>com.croot</groupId> <artifactId>croot_batch_starter</artifactId> </dependency>

    使用示例

    @Configuration public class TaInterface03Config { private static final Logger log = LoggerFactory.getLogger(TaInterface03Config.class); @Autowired private JobBuilderFactory jobBuilderFactory; // 用于构建JOB @Autowired private StepBuilderFactory stepBuilderFactory; // 用于构建Step @Autowired private FileJobListener fileJobListener; @Autowired private TaInterface03Service service; @Autowired private TaInterface03Mapper mapper; @Bean public Job taInterface03Job() { return jobBuilderFactory.get("taInterface03Job").incrementer(new RunIdIncrementer()).listener(fileJobListener) .flow(taInterface03Step()).end().build(); } @Bean public Step taInterface03Step() { return stepBuilderFactory.get("taInterface03Step1").<TaInterface03, TaInterface03>chunk(5) .reader(taInterface03Reader(null)).processor(taInterface03Processor(null)).writer(taInterface03Writer(null)) .faultTolerant().skipLimit(1).skip(IncorrectLineLengthException.class).skip(FlatFileParseException.class) .build(); } @Bean @StepScope public FlatFileItemReader<TaInterface03> taInterface03Reader(@Value("#{jobParameters[param]}") String param) { return FileReaderHelper.flatFileItemReader(TaInterface03.class, param, true); } @Bean @StepScope public ItemProcessor<TaInterface03, TaInterface03> taInterface03Processor(@Value("#{jobParameters[param]}") String param) { return new ItemProcessor<TaInterface03, TaInterface03>() { FileEntity fileEntity = JacksonUtil.readValue(param, FileEntity.class); String taCode = GlobalparaCache.getDictValue(GlobalDictConst.TACODE); @Override public TaInterface03 process(TaInterface03 entity) throws Exception { entity.setFilename(fileEntity.getFileName()); entity.setTacode(taCode); entity.setOptid(fileEntity.getOptAuth() == null ? "" : fileEntity.getOptAuth().getOptid()); entity.setOptname(fileEntity.getOptAuth() == null ? "" : fileEntity.getOptAuth().getOptName()); entity.setInputtime(DateUtil.getCurrentTimeLong()); return entity; } }; } @Bean @StepScope public ItemWriter<TaInterface03> taInterface03Writer(@Value("#{jobParameters[param]}") String param) { return list -> { FileEntity fileEntity = JacksonUtil.readValue(param, FileEntity.class); TaInterface03 interfaceParam = new TaInterface03(); interfaceParam.setFilename(fileEntity.getFileName()); mapper.delete(interfaceParam); List<TaInterface03> dataList = new ArrayList<>(); int i = 1; for (TaInterface03 entity : list) { entity.setId(i++); dataList.add(entity); log.info("write data : {}", entity); } service.saveBatchWithCustomId(dataList); }; } }

    spring batch 介绍

    官方文档:https://docs.spring.io/spring-batch/docs/4.2.x/reference/html/index.html

    Job

    在Spring批处理中,Job是Step实例的容器。它将逻辑上属于一个流程的多个Step组合在一起,并允许为所有Step全局配置属性,例如可重新启动。作业配置包含: 名字 Step 实例的定义和排序 Job是否可重启 定义job

    @Bean public Job footballJob() { return this.jobBuilderFactory.get("footballJob") .start(playerLoad()) .next(gameLoad()) .next(playerSummarization()) .end() .build();

    JobInstance

    JobInstance是指逻辑作业运行的概念,每个JobInstance可以有多个executions,一个JobInstance只对应于一个特定的Job,标识的JobParameters可以在给定的时间运行。

    JobParameters

    JobParameters对象包含用于启动批处理Job的一组参数。在运行过程中,它们可用于识别或甚至用作参考数据.

    JobExecution

    JobExecution指单次尝试运行Job的技术概念.

    Step

    Step 是封装批处理作业的独立顺序阶段的域对象。因此,每个Job 完全由一个或多个 Step 组成。一个Step包含定义和控制实际批处理所需的全部信息。与Job一样,一个Step具有与唯一的JobExecution关联的单个StepExecution.

    StepExecution

    StepExecution表示执行一个 Step 的单次尝试。每次运行一个 Step 时都会创建一个新的StepExecution,与JobExecution类似。但是,如果某个 Step 因其之前的 Step 失败而无法执行,则不会执行任何执行。 StepExecution仅在其 Step 实际启动时创建。

    @Bean public Step sampleStep(PlatformTransactionManager transactionManager) { return this.stepBuilderFactory.get("sampleStep") .transactionManager(transactionManager) .<String, String>chunk(10) .reader(itemReader()) .writer(itemWriter()) .build(); }

    TaskletStep

    如果一个步骤必须包含一个简单的存储过程调用,该怎么办?您可以将该调用实现为ItemReader,并在过程结束后返回null。然而,这样做有点不自然,因为需要一个无操作的ItemWriter。Spring Batch为这个场景提供了TaskletStep。

    @Bean public Step step1() { return this.stepBuilderFactory.get("step1") .tasklet(myTasklet()) .build(); }

    @Bean public Job job() { return this.jobBuilderFactory.get("job") .start(stepA()) .next(stepB()) .next(stepC()) .build(); }

    @Bean public Job job() { return this.jobBuilderFactory.get("job") .start(stepA()) .on("*").to(stepB()) .from(stepA()).on("FAILED").to(stepC()) .end() .build(); }

    JobLauncher

    JobLauncher表示一个简单的接口,用于启动给定的一组JobParameters的一个Job。

    @Controller public class JobLauncherController { @Autowired JobLauncher jobLauncher; @Autowired Job job; @RequestMapping("/jobLauncher.html") public void handle() throws Exception{ jobLauncher.run(job, new JobParameters()); } }
    Processed: 0.013, SQL: 9