在pom.xml 添加:
<dependency>
<groupId>com
.croot
</groupId
>
<artifactId>croot_batch_starter
</artifactId
>
</dependency
>
使用示例
@Configuration
public class TaInterface03Config {
private static final Logger log
= LoggerFactory
.getLogger(TaInterface03Config
.class);
@Autowired
private JobBuilderFactory jobBuilderFactory
;
@Autowired
private StepBuilderFactory stepBuilderFactory
;
@Autowired
private FileJobListener fileJobListener
;
@Autowired
private TaInterface03Service service
;
@Autowired
private TaInterface03Mapper mapper
;
@Bean
public Job
taInterface03Job() {
return jobBuilderFactory
.get("taInterface03Job").incrementer(new RunIdIncrementer()).listener(fileJobListener
)
.flow(taInterface03Step()).end().build();
}
@Bean
public Step
taInterface03Step() {
return stepBuilderFactory
.get("taInterface03Step1").<TaInterface03, TaInterface03>chunk(5)
.reader(taInterface03Reader(null
)).processor(taInterface03Processor(null
)).writer(taInterface03Writer(null
))
.faultTolerant().skipLimit(1).skip(IncorrectLineLengthException
.class).skip(FlatFileParseException
.class)
.build();
}
@Bean
@StepScope
public FlatFileItemReader
<TaInterface03> taInterface03Reader(@Value("#{jobParameters[param]}") String param
) {
return FileReaderHelper
.flatFileItemReader(TaInterface03
.class, param
, true);
}
@Bean
@StepScope
public ItemProcessor
<TaInterface03, TaInterface03>
taInterface03Processor(@Value("#{jobParameters[param]}") String param
) {
return new ItemProcessor<TaInterface03, TaInterface03>() {
FileEntity fileEntity
= JacksonUtil
.readValue(param
, FileEntity
.class);
String taCode
= GlobalparaCache
.getDictValue(GlobalDictConst
.TACODE
);
@Override
public TaInterface03
process(TaInterface03 entity
) throws Exception
{
entity
.setFilename(fileEntity
.getFileName());
entity
.setTacode(taCode
);
entity
.setOptid(fileEntity
.getOptAuth() == null
? "" : fileEntity
.getOptAuth().getOptid());
entity
.setOptname(fileEntity
.getOptAuth() == null
? "" : fileEntity
.getOptAuth().getOptName());
entity
.setInputtime(DateUtil
.getCurrentTimeLong());
return entity
;
}
};
}
@Bean
@StepScope
public ItemWriter
<TaInterface03> taInterface03Writer(@Value("#{jobParameters[param]}") String param
) {
return list
-> {
FileEntity fileEntity
= JacksonUtil
.readValue(param
, FileEntity
.class);
TaInterface03 interfaceParam
= new TaInterface03();
interfaceParam
.setFilename(fileEntity
.getFileName());
mapper
.delete(interfaceParam
);
List
<TaInterface03> dataList
= new ArrayList<>();
int i
= 1;
for (TaInterface03 entity
: list
) {
entity
.setId(i
++);
dataList
.add(entity
);
log
.info("write data : {}", entity
);
}
service
.saveBatchWithCustomId(dataList
);
};
}
}
spring batch 介绍
官方文档:https://docs.spring.io/spring-batch/docs/4.2.x/reference/html/index.html
Job
在Spring批处理中,Job是Step实例的容器。它将逻辑上属于一个流程的多个Step组合在一起,并允许为所有Step全局配置属性,例如可重新启动。作业配置包含: 名字 Step 实例的定义和排序 Job是否可重启 定义job
@Bean
public Job
footballJob() {
return this.jobBuilderFactory
.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.end()
.build();
JobInstance
JobInstance是指逻辑作业运行的概念,每个JobInstance可以有多个executions,一个JobInstance只对应于一个特定的Job,标识的JobParameters可以在给定的时间运行。
JobParameters
JobParameters对象包含用于启动批处理Job的一组参数。在运行过程中,它们可用于识别或甚至用作参考数据.
JobExecution
JobExecution指单次尝试运行Job的技术概念.
Step
Step 是封装批处理作业的独立顺序阶段的域对象。因此,每个Job 完全由一个或多个 Step 组成。一个Step包含定义和控制实际批处理所需的全部信息。与Job一样,一个Step具有与唯一的JobExecution关联的单个StepExecution.
StepExecution
StepExecution表示执行一个 Step 的单次尝试。每次运行一个 Step 时都会创建一个新的StepExecution,与JobExecution类似。但是,如果某个 Step 因其之前的 Step 失败而无法执行,则不会执行任何执行。 StepExecution仅在其 Step 实际启动时创建。
@Bean
public Step
sampleStep(PlatformTransactionManager transactionManager
) {
return this.stepBuilderFactory
.get("sampleStep")
.transactionManager(transactionManager
)
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.build();
}
TaskletStep
如果一个步骤必须包含一个简单的存储过程调用,该怎么办?您可以将该调用实现为ItemReader,并在过程结束后返回null。然而,这样做有点不自然,因为需要一个无操作的ItemWriter。Spring Batch为这个场景提供了TaskletStep。
@Bean
public Step
step1() {
return this.stepBuilderFactory
.get("step1")
.tasklet(myTasklet())
.build();
}
@Bean
public Job
job() {
return this.jobBuilderFactory
.get("job")
.start(stepA())
.next(stepB())
.next(stepC())
.build();
}
@Bean
public Job
job() {
return this.jobBuilderFactory
.get("job")
.start(stepA())
.on("*").to(stepB())
.from(stepA()).on("FAILED").to(stepC())
.end()
.build();
}
JobLauncher
JobLauncher表示一个简单的接口,用于启动给定的一组JobParameters的一个Job。
@Controller
public class JobLauncherController {
@Autowired
JobLauncher jobLauncher
;
@Autowired
Job job
;
@RequestMapping("/jobLauncher.html")
public void handle() throws Exception
{
jobLauncher
.run(job
, new JobParameters());
}
}