领导提供新文件进行测试 数据文件名 T_USER_20200630.dat
mawuhui1%@#%0%@#%java%@#% mawuhui2%@#%0%@#%mysql%@#%信号文件名 T_USER_20200630.ctl
columnList=[user_name]%@#%[sex]%@#%[note]%@#% split=%@#% tableName=T_USER rows=2建表语句
create table t_user ( user_name varchar(60) not null, sex int(3) default 1 not null, note varchar(256) null );添加数据文件和信号文件到 E:\project 数据库执行建表语句
仅修改执行方法的入参即可 修改 FileImportApplication.java
package com.xiaoma.fileimport; import com.xiaoma.fileimport.service.BatTaskRun; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class FileImportApplication implements CommandLineRunner { @Autowired BatTaskRun batTaskRun; public static void main(String[] args) { //参数写一个就是使用当前日期,当提供测试文件日期不为当前日期的时候,传入日期为参数即可 //SpringApplication.run(FileImportApplication.class, new String[]{"myfile"}); // 修改入参为 T_USER 即可 SpringApplication.run(FileImportApplication.class, new String[]{"T_USER", "20200630"}); } /** * 文件导入数据库执行入口 * 通过配置根据传入的文件导入编号找到对应的文件名 * * @param args * @throws Exception */ @Override public void run(String... args) throws Exception { String taskNo = args[0]; if (args.length == 2) { String taskDate = args[1]; batTaskRun.execute(taskNo, taskDate); } else { batTaskRun.execute(taskNo); } } }执行FileImportApplication.java 中的main方法,测试通过
领导: 不错,但是现在需求又变了,我们现在的要求是:
myfile文件导入的时候全量导入,表中的数据要与文件中的数据一致,myfileT_USER文件导入的时候增量导入,根据数据文件中USER_NAME 字段,表中可以查到这条数据则更新,不存在则新增这条数据,(要求速度快点一点,数据可能很多,怎么实现我不管);需求1 只需要在导入前添加前置处理,将myfile表清空即可; 需求2, 我是先将给的数据文件导入临时表中T_USER_TEMP中,然后通过存储过程去进行数据更新和插入; 时间紧任务多,先不管三七二十一,代码能跑起来再说; sql 改造
-- 创建临时表 create table t_user_temp as select * from t_user; -- 创建数据导入后置处理存储过程 create procedure updateTUser() begin -- 更新t_user原有的数据 update t_user t1 set t1.note = (select t2.note from t_user_temp t2 where t1.user_name = t2.user_name), t1.sex = (select t2.sex from t_user_temp t2 where t1.user_name = t2.user_name) where t1.user_name in (select user_name from t_user_temp); -- 插入t_user新增的数据 insert into t_user (user_name, sex, note) select user_name, sex, note from t_user_temp where user_name not in (select user_name from t_user); end;添加导入文件的前后处理类(这里暂时自己编写的,后期会优化使用springboot的aop功能) 处理类 Processor.java
package com.xiaoma.fileimport.handler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; /** * @author mawuhui * @since 2020-07-01 10:31 */ @Component public class Processor { @Autowired JdbcTemplate jdbcTemplate; /** * 前置处理器 * * @return */ public boolean preProcessor(String taskNo) { if (taskNo.equals("myfile")) { jdbcTemplate.execute("truncate table " + taskNo); return true; } else if (taskNo.equals("T_USER")) { // 文件导入到临时表中 T_USER_TEMP中 jdbcTemplate.execute("truncate table " + taskNo + "_TEMP"); return true; } return false; } /** * 后置处理器 * * @return */ public boolean postProcessor(String taskNo) { // 无后置处理,直接返回 if (taskNo.equals("myfile")) { return true; } else if (taskNo.equals("T_USER")) { // 后置处理调用存储过程实现文件数据增量导入 jdbcTemplate.execute("call updateTUser"); } return true; } }修改主任务类 BatTaskRun.java
package com.xiaoma.fileimport.service; import com.xiaoma.fileimport.handler.Processor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.List; /** * @author mawuhui * @since 2020-06-30 17:59 */ @Component public class BatTaskRun { @Autowired FileReader reader; @Autowired FileToDataBase fileToDataBase; //modify by mwh 添加处理器 @Autowired Processor processor; public int execute(String taskNo) { // 未传入时间的时候使用当前时间 如 20200630 DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyyMMdd"); String taskDate = LocalDate.now().format(df); return execute(taskNo, taskDate); } public int execute(String taskNo, String taskDate) { // 文件名获取 String fileName = taskNo + "_" + taskDate; // 1. 信号文件读取 List<String> ctlFile = reader.readTxtFile("E:\\project\\" + fileName + ".ctl"); // 2. 数据文件读取 List<String> datFile = reader.readTxtFile("E:\\project\\" + fileName + ".dat"); // 3. 校验行数 /** 信号文件中的行数 */ Integer ctllines = Integer.parseInt(ctlFile.get(3).split("=")[1]); /** 数据文件中的行数 */ Integer datlines = datFile.size(); if (!ctllines.equals(datlines)) { throw new RuntimeException("文件校验出错,文件实际条数:" + datlines + "信号文件条数:" + ctllines); } // modify by mwh 入库前操作 processor.preProcessor(taskNo); // 4. 执行入库 fileToDataBase.execute(ctlFile, datFile); // 入库后操作 processor.postProcessor(taskNo); return 0; } }修改 FileToDataBase.java
package com.xiaoma.fileimport.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; /** * 文件数据解析导入数据库 * * @author mawuhui * @since 2020-06-30 18:21 */ @Component public class FileToDataBase { @Autowired JdbcTemplate jdbcTemplate; /** * @param ctlFile * @param datFile * @return */ public int execute(List<String> ctlFile, List<String> datFile) { String[] sql = buildSql(ctlFile, datFile); jdbcTemplate.batchUpdate(sql); return 0; } /** * @param ctlFile * @param datFile * @return */ public String[] buildSql(List<String> ctlFile, List<String> datFile) { String[] s = new String[datFile.size()]; // 这里tableName 写死了,需要修改 String tableName = ctlFile.get(2).split("=")[1]; StringBuilder builder1 = new StringBuilder(); // modify by mwh 修改TABLENAME if (tableName.equals("T_USER")) { tableName = "T_USER_TEMP"; } builder1.append("insert into ").append(tableName).append("("); String columns = ctlFile.get(0).split("=")[1].replace("%@#%", "").replace("][", ","); builder1.append(columns.substring(1, columns.length() - 1)); builder1.append(") values("); int i = 0; for (String line : datFile) { List<String> temp = Arrays.asList(line.split("%@#%")); String dat = temp.stream().map(item -> "'" + item + "'").collect(Collectors.joining(",")); s[i] = builder1.toString() + dat + ")"; i++; } return s; } }修改T_USER_20200630.dat 数据文件进行测试
mawuhui1%@#%0%@#%PHP%@#% mawuhui3%@#%0%@#%oracle%@#%期望结果: 数据库中T_USER 表中多了一条数据username 为 mawuhui3 的数据; username 为 mawuhui1的数据java 更新为了PHP
执行FileImportApplication.java 中的main 方法后; 连接数据库后可以看到,实现了数据增量导入
自定义处理类,在实际调用前后进行处理,这样以后新增文件,只用修改处理类Processor即可; 实现功能单一原则; 业务不断增加,实际上也就是增加不同的文件进行导入,每次新增文件导入都需要修改Processor类,后期Processor类会出现
if(something){ doSomeThing() }else if(something){ doSomeThing() }else if(something){ doSomeThing() } ...并且为了兼容其他系统,可能提供了数据格式不同的文件进行导入,下一节我们将使用工厂模式第一次对代码进行重构;