不同Oracle数据库之间数据迁移的实现,只要主表的数据发生了变化,那么就要自动同步到从库中
mvn依赖
<dependency> <groupId>commons-dbutils</groupId> <artifactId>commons-dbutils</artifactId> <version>1.7</version> </dependency>数据库连接
/** * 数据库连接类 * @author pihao * */ public class DBTools { static { try { Class.forName("oracle.jdbc.driver.OracleDriver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } } /** * 创建一个数据库链接 * * @return * @throws ClassNotFoundException */ public static Connection getConnection(String url, String username, String password){ Connection conn = null; try { conn = DriverManager.getConnection(url, username, password); } catch (SQLException e) { e.printStackTrace(); } return conn; } /** * Close a <code>Connection</code>, avoid closing if null. * * @param conn * Connection to close. * @throws SQLException * if a database access error occurs */ public static void close(Connection conn) throws SQLException { if (conn != null) { conn.close(); } } /** * Close a <code>Connection</code>, avoid closing if null and hide any * SQLExceptions that occur. * * @param conn * Connection to close. */ public static void closeQuietly(Connection conn) { try { close(conn); } catch (SQLException e) { // NOPMD // quiet e.printStackTrace(); } } }实现数据迁移的核心方法
@SuppressWarnings("all") public class TableDao { private static final Logger logger =LoggerFactory.getLogger(TableDao.class); private static QueryRunner query = new QueryRunner(); private static String pksSql; static { try { pksSql = Global.getConfig("pksSql"); } catch (Exception e) { e.printStackTrace(); } } //pksSql = select cu.COLUMN_NAME from user_cons_columns cu, user_constraints au where cu.constraint_name = au.constraint_name and au.constraint_type = 'P' and au.table_name = ? /** * 根据表名获取表的主键列名 * @param tableName * @return */ public static List<String> findPrimaryKeysByTableName(String tableName,ConnectionInfo info){ Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod()); tableName = tableName.toUpperCase(); List<Object> primaryKeys = null; List<String> pks = null; try { primaryKeys = query.query(conn,pksSql,new ColumnListHandler<>("column_name"),tableName); pks = new ArrayList<>(); for (Object key : primaryKeys) { String k = (String) key; pks.add(k.toLowerCase()); } return pks; } catch (Exception e) { logger.error("获取 {} 的主键列失败,sql为:{},原因:{}",tableName,pksSql,e.getMessage()); } finally { DBTools.closeQuietly(conn); } return pks; } /** * 根据表名查找该表的列的信息 * @param tableName * @return */ public static List<ColumnInfo> findColumninfoListByTableName(String tableName,ConnectionInfo info) { Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod()); String columnsSql = "select * from user_tab_columns where table_name = ?"; List<ColumnInfo> list = null; try { BeanProcessor bean = new GenerousBeanProcessor(); RowProcessor processor = new BasicRowProcessor(bean); list = query.query(conn, columnsSql, new BeanListHandler<ColumnInfo>(ColumnInfo.class,processor), new Object[] { tableName.toUpperCase() }); } catch (SQLException e) { logger.error("获取 {} 的列信息失败,sql为:{},错误:{}",tableName,columnsSql,e.getMessage()); } finally { DBTools.closeQuietly(conn); } return list; } /** * 根据表名查找到该表的所有字段的名称 * @param tableName * @return */ public static List<String> findColumesNamesByTableName(String tableName,ConnectionInfo info){ Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod()); String sql="select * from user_tab_columns where table_name = ?"; tableName = tableName.toUpperCase(); List<Object> columes = null; List<String> cols = null; try { columes = query.query(conn, sql,new ColumnListHandler<>("column_name"),tableName); cols = new ArrayList<>(); for (Object col : columes) { String co = (String) col; cols.add(co.toLowerCase()); } return cols; } catch (Exception e) { logger.error("获取 {} 的字段名称失败,sql为{},错误为:{}",tableName,sql,e.getMessage()); } finally { DBTools.closeQuietly(conn); } return cols; } /** * 根据主键找到一条记录 * @param tableName * @param pks * @param params * @return */ public static Map<String,Object> findOneMapByPrimaryKeys(String tableName,Object[] pks,Object[] params,ConnectionInfo info){ Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod()); StringBuffer sb = new StringBuffer(); String sql = "select * from "+tableName; sb.append(sql); if(pks!=null && pks.length>0 && pks.length == params.length){ sb.append(" where "); for (int i = 0; i < pks.length; i++) { sb.append( pks[i] +" = ?"); if(i+1<pks.length){ sb.append(" and "); } } } try { sql = sb.toString(); Map<String, Object> map = query.query(conn,sql, new MapHandler(),params); logger.info("查询成功,主键为:{},sql语句:{}, 找到记录:{}",pks,sql,map); return map; } catch (Exception e) { logger.error("查询失败,主键为:{}, sql语句:{},原因:{}",pks,sql,e.getMessage()); } finally { DBTools.closeQuietly(conn); } return null; } /** * 根据表名找到bean的list集合(废弃使用) * @param tableName * @param T * @return * @throws SQLException */ @Deprecated public static <T> List<T> getTableBeanList(String tableName,Class<T> T,ConnectionInfo info) throws SQLException{ Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod()); String sql = "select * from "+tableName; List<T> beanList = new ArrayList<>(); beanList = (List<T>) query.query(conn,sql, new BeanListHandler<>(T)); return beanList; } /** * 根据表名找到该表的所有数据 * @param tableName * @return */ public static List<Map<String,Object>> findMapListByTableName(String tableName,ConnectionInfo info) { Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod()); String sql = "select * from "+tableName; List<Map<String,Object>> mapList = null; try { mapList = query.query(conn,sql,new MapListHandler()); return mapList; } catch (Exception e) { logger.error("获取表的所有记录失败,sql:{},原因:{}",sql,e.getMessage()); } finally { DBTools.closeQuietly(conn); } return mapList; } /** * 插入记录 * @param tableName 表名 * @param map 字段和参数 */ public static void insert(String tableName,Map<String,Object> map,ConnectionInfo info){ Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod()); String sql = "insert into "+tableName+"("; StringBuffer sb = new StringBuffer(); sb.append(sql); Set<Entry<String,Object>> entrySet = map.entrySet(); Iterator<Entry<String, Object>> iterator = entrySet.iterator(); String cols = ""; String vals = ""; List<Object> params = new ArrayList<>(); while (iterator.hasNext()) { Entry<String, Object> next = iterator.next(); cols += next.getKey()+","; vals += " ?,"; params.add(next.getValue()); //去除最后的逗号 if(!iterator.hasNext()){ StringBuffer s1 = new StringBuffer(cols); s1 = s1.deleteCharAt(s1.length()-1); s1.append(")"); cols = s1.toString(); StringBuffer s2 = new StringBuffer(vals); s2 = s2.deleteCharAt(s2.length()-1); s2.append(")"); vals = s2.toString(); } } sb.append(cols).append(" values(").append(vals); sql = sb.toString(); //执行 try { query.update(conn,sql,params.toArray()); logger.info("插入数据成功,sql为:{}",sql); logger.debug("参数为:{}",params); } catch (Exception e) { logger.error("插入数据失败,sql为:{}",sql); logger.error("参数为:{}",params); logger.error("失败原因:{}",e.getMessage()); } finally{ DBTools.closeQuietly(conn); } } /** * 插入操作,只适用于同一个数据库下(废弃使用) * 例如: insert into tab1 select * from tab2 where ... * @param destTable * @param sourTable * @param pks * @param params */ @Deprecated public static void directInsert(String destTable,String sourTable,String[] pks,Object[] params,ConnectionInfo info) { Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod()); StringBuffer sb = new StringBuffer(); String sql = "insert into "+ destTable+ " select * from "+ sourTable; sb.append(sql); if(pks!=null && pks.length>0 && pks.length == params.length){ sb.append(" where "); for (int i = 0; i < pks.length; i++) { if(params[i] instanceof Integer || params[i] instanceof Long ||params[i] instanceof Double){ //数字类型 sb.append( pks[i] +" = "+ params[i]); } else if(params[i] instanceof String){ //字符串类型加单引号 '' sb.append( pks[i] +" = "+ "'"+params[i]+"'"); } else{ //其他类型暂时不考虑,当字符串处理 sb.append( pks[i] +" = "+ "'"+params[i]+"'"); } if(i+1<pks.length){ sb.append(" and "); } } } sql = sb.toString(); try { query.update(conn,sql); logger.info("插入目标表:{} 成功,插入的sql语句为:{}",destTable,sql); } catch (Exception e) { logger.error("插入目标表:{} 失败,插入的sql语句为:{}",destTable,sql); logger.error("失败原因:{}",e.getMessage()); } } /** * 删除记录 * @param tableName 表名 * @param pks 主键 * @param params 参数 */ public static void delete(String tableName,Object[] pks,Object[] params,ConnectionInfo info){ if(pks.length == 0){ logger.error("删除指定数据请声明条件"); return; } Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod()); StringBuffer sb = new StringBuffer(); String sql = "delete from "+tableName+" where "; sb.append(sql); if(pks!=null && pks.length>0 && pks.length == params.length){ for (int i = 0; i < pks.length; i++) { sb.append( pks[i] +" = ?"); if(i+1<pks.length){ sb.append(" and "); } } } sql = sb.toString(); try { query.update(conn, sql,params); logger.info("删除指定记录成功,sql为:{}",sql); logger.debug("参数为:{}",Arrays.toString(params)); } catch (Exception e) { logger.error("删除指定记录失败,sql为:{},原因:{}",sql,e.getMessage()); logger.error("参数为:{}",Arrays.toString(params)); } finally{ DBTools.closeQuietly(conn); } } /** * 更新记录 * @param tableName 要更新表的名字 * @param pks 主键 * @param otherCols 除主键外的要更新的列名 * @param params 要修改的字段的值和主键的值,按顺序排列 */ public static void update(String tableName,Object[] pks,Object[] otherCols,Object[] params,ConnectionInfo info){ if(pks.length == 0 || otherCols.length == 0 || pks.length+otherCols.length != params.length){ logger.error("参数不规范: 无主键,无修改的字段,参数个数不对"); return; } Connection conn = DBTools.getConnection(info.getJdbcUrl(), info.getUserName(), info.getPdwrod()); String sql = "update "+tableName+" set "; StringBuffer sb = new StringBuffer(); sb.append(sql); for (int i = 0; i < otherCols.length; i++) { sb.append(otherCols[i] + " = ?"); if(i+1<otherCols.length){ sb.append(","); } } sb.append(" where "); for (int i = 0; i < pks.length; i++) { sb.append(pks[i] + " =?"); if(i+1<pks.length){ sb.append(" and "); } } sql = sb.toString(); //执行 try { query.update(conn,sql,params); logger.info("表名:{},更新成功:",tableName); logger.debug("sql为:{}",sql); logger.debug("参数为:{}",params); } catch (Exception e) { logger.error("表名:{},更新失败; 原因:{}",tableName,e.getMessage()); logger.error("sql为:{}",sql); logger.error("参数为:{}",Arrays.toString(params)); } finally{ DBTools.closeQuietly(conn); } } /** * MD5加密 * @param str 加密字符串 * @return * @throws Exception */ public static String getMD5(String str){ MessageDigest md5 = null; byte[] byteArray = null; try { md5 = MessageDigest.getInstance("MD5"); byteArray = str.getBytes("UTF-8"); } catch (Exception e) { logger.error("获取MD5密文失败!!!"); return "1"; //返回相同的密文,防止数据更新错误 } byte[] md5Bytes = md5.digest(byteArray); StringBuffer sb = new StringBuffer(); for (int i = 0; i < md5Bytes.length; i++) { int val = ((int) md5Bytes[i]) & 0xff; if (val < 16) { sb.append("0"); } sb.append(Integer.toHexString(val)); } return sb.toString(); } /** * 返回为Map,进行比较 * @param masterListMap * @param slaveListMap */ public static void updateTbDataByinsert2(List<Map<String,Object>> masterListMap,List<Map<String,Object>> slaveListMap){ } public static void updateTbDataByDelete2(List<Map<String,Object>> masterListMap,List<Map<String,Object>> slaveListMap){ } public static void updateTbDataByUpdate2(List<Map<String,Object>> masterListMap,List<Map<String,Object>> slaveListMap){ } }测试数据库同步
/** * 启动主程序 * * @author zhoucl * */ public class StartupMain { private static Logger logger = LoggerFactory.getLogger(StartupMain.class); private static final String TABLE_MASTER = "TB_BASE_COUNTRY_INFO"; private static final String TABLE_SLAVE = "TB_BASE_COUNTRY_INFO"; private static final String MD5_KEY_NAME = "md5_record_value"; public static void main(String[] args) throws Exception { //取主表所有记录成list hashmap //对记录进行for循环,将所有的数据计算MD5值(规则是所有)A=DDD,DSF=DSF,DF=DSF,SDF,再计算MD5值 方法DigestUtils.md5Hex,然后修改haspmap //增加一个KEY进去,KEY的名字叫MD5_RECORD_VALUE(这个名字定义成常量) //比较两个list,找出需要新增的数据,找出需要修改数据,找出需要删除的记录 //FOR循环 insert 新增的数据 //FOR循环 update 修改数据 //FOR循环 delete 删除的记录 ConnectionInfo slaveConnection = new ConnectionInfo(); slaveConnection.setJdbcUrl("jdbc:oracle:thin:@112.74.163.78:1521:xe"); slaveConnection.setUserName("system"); slaveConnection.setPdwrod("oracle"); ConnectionInfo masterConnection = new ConnectionInfo(); masterConnection.setJdbcUrl("jdbc:oracle:thin:@112.74.163.97:1521:xe"); masterConnection.setUserName("system"); masterConnection.setPdwrod("oracle"); //获取主表的主键 List<String> pksMaster = TableDao.findPrimaryKeysByTableName(TABLE_MASTER,masterConnection); //获取从表的主键 List<String> pksSlave = TableDao.findPrimaryKeysByTableName(TABLE_SLAVE,slaveConnection); //获取主表的所有列名 List<String> colsMaster = TableDao.findColumesNamesByTableName(TABLE_MASTER,masterConnection); //获取从表的所有列名 List<String> colsSlave = TableDao.findColumesNamesByTableName(TABLE_SLAVE,slaveConnection); //除主键外的其他字段的列名 List<String> otherCols = TableDao.findColumesNamesByTableName(TABLE_MASTER,masterConnection); otherCols.removeAll(pksMaster); //判断主键是否相同 boolean isPksSame = isListSame(pksMaster,pksSlave); //判断字段 boolean isColsSame = isListSame(colsMaster,colsSlave); if(!isPksSame){ throw new RuntimeException("{"+TABLE_MASTER+"},{"+TABLE_SLAVE+"} 两张表的主键不一致,请查看"); } if(!isColsSame){ throw new RuntimeException("{"+TABLE_MASTER+"},{"+TABLE_SLAVE+"} 两张表的字段不一致,请查看"); } logger.info("========================================调整MAP结构======================================="); //获取主表的所有数据 List<Map<String, Object>> masterMapList = TableDao.findMapListByTableName(TABLE_MASTER,masterConnection); masterMapList = reConstructMap(masterMapList); //获取从表的所有数据 List<Map<String, Object>> slaveMapList = TableDao.findMapListByTableName(TABLE_SLAVE,slaveConnection); slaveMapList = reConstructMap(slaveMapList); logger.info("========================================MAP结构设置完毕======================================="); logger.info("==========================================开始更新========================================="); //待插入的数据集合 List<Map<String,Object>> needInsertMapList = new ArrayList<>(); //待更新的数据集合 List<Map<String,Object>> needUpdateMapList = new ArrayList<>(); //待删除的数据集合 List<Map<String,Object>> needDeleteMapList = new ArrayList<>(); //找到需要插入和更新的数据 findNeedInsertAndUpdateData(masterMapList,slaveMapList,needInsertMapList,needUpdateMapList,pksMaster); //找到需要删除的数据 findNeedDeleteMapList(masterMapList,slaveMapList,needDeleteMapList,pksSlave); //批量新增 batchInsert(needInsertMapList,slaveConnection); //批量删除 batchDelete(needDeleteMapList, pksSlave,slaveConnection); //批量更新 batchUpdate(needUpdateMapList, otherCols, pksSlave,slaveConnection); logger.info("==========================================更新完毕=========================================="); } /** * 批量插入 * @param tableName * @param mapList */ public static void batchInsert(List<Map<String,Object>> mapList,ConnectionInfo conn){ for (int i = 0,size=mapList.size();i < size; i++) { Map<String, Object> map = mapList.get(i); logger.info("正在插入,总的记录数:{},进度:[{}/{}],当前插入数据:{}",size,i+1,size,map); map.remove(MD5_KEY_NAME); TableDao.insert(TABLE_SLAVE, map,conn); } } /** * 批量删除 * @param mapList * @param pks */ public static void batchDelete(List<Map<String,Object>> mapList,List<String> pksSlave,ConnectionInfo conn){ for (int i = 0,size=mapList.size();i < size; i++) { Map<String, Object> map = mapList.get(i); logger.info("正在删除,总的记录数:{},进度:[{}/{}],当前删除数据:{}",size,i+1,size,map); map.remove(MD5_KEY_NAME); Object[] slavePkValues = getValueToPks(pksSlave,map); TableDao.delete(TABLE_SLAVE, pksSlave.toArray(), slavePkValues,conn); } } /** * 批量更新 * @param mapList * @param otherCols * @param slavePks */ public static void batchUpdate(List<Map<String,Object>> mapList,List<String> otherCols,List<String> pksSlave,ConnectionInfo conn){ //TableDao.update(tableName, pks, otherCols, params); for (int i = 0,size=mapList.size();i < size; i++) { Map<String, Object> map = mapList.get(i); logger.info("正在更新,总的记录数:{},进度:[{}/{}],当前插入数据:{}",size,i+1,size,map); Object[] pkValues = getValueToPks(pksSlave,map); List<Object> params = new ArrayList<>(); for (int j = 0, num=otherCols.size();j < num; j++) { String col = otherCols.get(j); Object val = map.get(col); params.add(val); } for (int z = 0; z < pkValues.length; z++) { Object obj = pkValues[z]; params.add(obj); } //更新 TableDao.update(TABLE_SLAVE, pksSlave.toArray(), otherCols.toArray(), params.toArray(),conn); } } /** * 找到需要插入和更新的map * @param masterMapList 主表数据集合 * @param slaveMapList 从表数据集合 * @param needInsertMapList 待插入数据集合 * @param needUpdateMapList 待更新数据集合 * @param pksMaster 主键列表 */ public static void findNeedInsertAndUpdateData(List<Map<String,Object>> masterMapList,List<Map<String,Object>> slaveMapList, List<Map<String,Object>> needInsertMapList,List<Map<String,Object>> needUpdateMapList, List<String> pksMaster){ for (int i = 0,size=masterMapList.size(); i < size; i++) { Map<String, Object> masterMap = masterMapList.get(i); String masterMd5 = (String) masterMap.get(MD5_KEY_NAME); Object[] masterPkValues = getValueToPks(pksMaster,masterMap); boolean flag = false; for (int j = 0,num=slaveMapList.size(); j < num; j++) { Map<String, Object> slaveMap = slaveMapList.get(j); Object[] slavePkValues = getValueToPks(pksMaster,slaveMap); if(Arrays.equals(masterPkValues, slavePkValues)){ flag = true; //找到了相同的主键 //判断MD5的值 String slaveMd5 = (String) slaveMap.get(MD5_KEY_NAME); if(masterMd5.equals(slaveMd5)){ break; }else{ //放入待更新数据集合 needUpdateMapList.add(masterMap); } } } if(!flag){ //遍历了一轮还没有找到相同的主键的值,放入待新增数据 needInsertMapList.add(masterMap); } } logger.info("==============================待插入数据条数:{}",needInsertMapList.size()); logger.info("==============================待更新数据条数:{}",needUpdateMapList.size()); } /** * 找到需要删除的map * @param masterMapList 主表数据集合 * @param slaveMapList 从表数据集合 * @param needDeleteMapList 待删除数据集合 * @param pksSlave 主键列表 */ public static void findNeedDeleteMapList(List<Map<String,Object>> masterMapList,List<Map<String,Object>> slaveMapList, List<Map<String,Object>> needDeleteMapList,List<String>pksSlave){ for (int i = 0,size=slaveMapList.size();i<size; i++) { Map<String, Object> slaveMap = slaveMapList.get(i); Object[] slavePkValuees = getValueToPks(pksSlave,slaveMap); boolean flag = false; for (int j = 0,num=masterMapList.size(); j < num; j++) { Map<String, Object> masterMap = masterMapList.get(j); Object[] masterPkValues = getValueToPks(pksSlave,masterMap); if(Arrays.equals(slavePkValuees, masterPkValues)){ flag = true; break; } } if(!flag){ //如果主表中根据主键没有找到数据,放入待删除数据 needDeleteMapList.add(slaveMap); } } logger.info("==============================待删除数据条数:{}",needDeleteMapList.size()); } /** * 为每条记录添加md5字段 * @param msaterMapList 主表或从表数据集合 * @return */ public static List<Map<String, Object>> reConstructMap(List<Map<String, Object>> msaterMapList) { for (int i = 0, size =msaterMapList.size() ; i < size; i++) { Map<String, Object> map = msaterMapList.get(i); StringBuffer sb = new StringBuffer(); Set<Entry<String,Object>> entrySet = map.entrySet(); Iterator<Entry<String, Object>> it = entrySet.iterator(); sb.append("{"); while(it.hasNext()){ Entry<String, Object> entry = it.next(); sb.append(entry.getKey()+" = "); sb.append(String.valueOf(entry.getValue())); if(it.hasNext()){ sb.append(", "); } } sb.append("}"); String jsonInfo = sb.toString(); // logger.info("json为:{}",jsonInfo); //对字符串加密 String md5Hex = DigestUtils.md5Hex(jsonInfo); map.put(MD5_KEY_NAME, md5Hex); // logger.info("map为:{}",map); } return msaterMapList; } /** * 获取Key加载信息 */ public static void printKeyLoadMessage() { StringBuilder sb = new StringBuilder(); sb.append("\r\n======================================================================\r\n"); sb.append("\r\n 欢迎使用小表广播程序 \r\n"); sb.append("\r\n======================================================================\r\n"); System.out.println(sb.toString()); } /** * 判断主键,cols是否相同 * @param pks1 * @param pks2 * @return */ private static boolean isListSame(List<?> pks1,List<?> pks2){ if(pks1 != null && pks2 != null){ if(pks1.containsAll(pks2) && pks2.containsAll(pks1)){ return true; } } logger.warn("list1:{}",pks1); logger.warn("list2:{}",pks2); return false; } /** * 用于获取每条记录主键的值 * @param pks 主键列表 * @param map * @return */ public static Object[] getValueToPks(List<String> pks, Map<String, Object> map) { List<Object> values = new ArrayList<>(); for (String pk : pks) { Object val = map.get(pk); values.add(val); } return values.toArray(); } }