neo4j提供两种在线增加节点和关系的方案:
cyher 语句创建 将数据从数据源中获取,经过清洗转换,生成cyher语句,使用neo4jSession对象执行cyher语句执行创建load csv文件 将数据从数据源中获取,经过清洗转换,生成本地csv文件(注意:一定要生成在neo4j按照目录下的import文件夹),使用neo4jSession对象执行load csv语句,执行导入以上两种新增数据的方案,只能适用于少量数据,达到万级就会效率非常非常慢,简直不能忍受
好在neo4j提供了官方的导入方式,neo4j-admin脚本,该脚本存在于neo4j/bin目录下,下面就是我导入大量数据的流程:
首先看一下流程图 流程说明 从数据源中读取到初始数据,将数据转成节点和关系数据将转换后的数据写入csv文件,注意一下以下格式 每行已逗号隔开文件格式为UTF-8每行数据要有唯一id,不允许重复,例如:name:ID,value其中name就是唯一ID导入默认是string类型,如果有非string类型的数据要指明,例如:name:ID,serviceType:int创建关系的标头格式一定要包含这几个::START_ID,:END_ID,:TYPE 执行neo4j-admin脚本前,一定要先停止neo4j服务备份当前数据,导出文件:bin/neo4j-admin dump --database=graph.db --to=/opt/deploy_software/neo4j/import//1593651698080.dump备份完成后,删除数据库文件,因为导入cvs文件到数据库中,必须指明一个空文件的数据库,默认导入的数据库是graph.db数据库,数据库文件地址:data/databases/graph.db开始导入csv数据:neo4j-admin import --ignore-extra-columns=true --ignore-missing-nodes=true --ignore-duplicate-nodes=true --nodes:BasicNode /opt/deploy_software/neo4j/import/2020-07-01-nodes-1593651695274.csv --relationships /opt/deploy_software/neo4j/import//2020-07-01-relationship-1593651695275.csv恢复刚才备份的文件:bin/neo4j-admin load --database=graph.db --force --from=/opt/deploy_software/neo4j/import//1593651698080.dump启动neo4j:bin/neo4j start以上就完成了neo4j的大量数据导入,下面贴出关键代码,代码有点乱,望理解后在动手。注意,只有关键代码!~~
package com.xxl.job.executor.util; import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.Map; /** * cvs文件操作类 * @author yangwei * */ public class CSVFileUtil { BufferedWriter csvWrite = null; /** * CSV文件生成方法 * * @param head * 文件头 * @param dataList * 数据列表 * @param outPutPath * 文件输出路径 * @param filename * 文件名 * @return */ public BufferedWriter createCSVFile(String head, String outPutPath, String filename) { try { File csvFile = new File(outPutPath + File.separator + filename + ".csv"); File parent = csvFile.getParentFile(); if (parent != null && !parent.exists()) { parent.mkdirs(); } if (!csvFile.exists()) { csvFile.createNewFile(); csvWrite = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(csvFile), "UTF-8"), 1024); // 写入文件头部 writeRow(head); } else { csvWrite = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(csvFile), "UTF-8"), 1024); } } catch (Exception e) { e.printStackTrace(); } return csvWrite; } /** * 写多行数据方法 * * @param row * @param csvWriter * @throws IOException */ public void writeRow(Map<String, String> dataList) throws IOException { // 写入文件内容 for (String row : dataList.keySet()) { csvWrite.write(row + "," + dataList.get(row)); csvWrite.newLine(); } } /** * 写一行数据方法 * * @param row * @param csvWriter * @throws IOException */ private void writeRow(String str) throws IOException { // 写入文件内容 csvWrite.write(str); csvWrite.newLine(); } } private void execCommand(String cmd) throws Exception { XxlJobLogger.log("execCommand:" + cmd); Runtime rt = Runtime.getRuntime(); Process proc = rt.exec(cmd, null, null); InputStream stderr = proc.getInputStream(); InputStreamReader isr = new InputStreamReader(stderr, "GBK"); BufferedReader br = new BufferedReader(isr); String line = ""; while ((line = br.readLine()) != null) { XxlJobLogger.log("execCommand:" + line); } } private void saveNodeList(BasicResourcesInfo basicResourcesInfo, Map<String, String> nodeMap, Map<String, String> relationMap) { // 选取一个不为空的数据为起始节点 String startNode = null; if (!StringUtils.isEmpty(basicResourcesInfo.getImei())) { if (!nodeMap.containsKey(basicResourcesInfo.getImei())) { nodeMap.put(basicResourcesInfo.getImei(), "IMEI,,"); } startNode = basicResourcesInfo.getImei(); } if (!StringUtils.isEmpty(basicResourcesInfo.getIp())) { if (!nodeMap.containsKey(basicResourcesInfo.getIp())) { nodeMap.put(basicResourcesInfo.getIp(), "IP,," + basicResourcesInfo.getHometown()); } if (!StringUtils.isEmpty(startNode)) { if (!relationMap.containsKey(startNode + "," + basicResourcesInfo.getIp())) { relationMap.put(startNode + "," + basicResourcesInfo.getIp(), "IP"); } } else { startNode = basicResourcesInfo.getIp(); } } if (!StringUtils.isEmpty(basicResourcesInfo.getMac())) { if (!nodeMap.containsKey(basicResourcesInfo.getMac())) { nodeMap.put(basicResourcesInfo.getMac(), "MAC,,"); } if (!StringUtils.isEmpty(startNode)) { if (!relationMap.containsKey(startNode + "," + basicResourcesInfo.getMac())) { relationMap.put(startNode + "," + basicResourcesInfo.getMac(), "MAC"); } } else { startNode = basicResourcesInfo.getMac(); } } if (!StringUtils.isEmpty(basicResourcesInfo.getImsi())) { if (!nodeMap.containsKey(basicResourcesInfo.getImsi())) { nodeMap.put(basicResourcesInfo.getImsi(), "IMSI,,"); } if (!StringUtils.isEmpty(startNode)) { if (!relationMap.containsKey(startNode + "," + basicResourcesInfo.getImsi())) { relationMap.put(startNode + "," + basicResourcesInfo.getImsi(), "IMSI"); } } else { startNode = basicResourcesInfo.getImsi(); } } if (!StringUtils.isEmpty(basicResourcesInfo.getPhone())) { if (!nodeMap.containsKey(basicResourcesInfo.getPhone())) { nodeMap.put(basicResourcesInfo.getPhone(), "PHONE,,"); } if (!StringUtils.isEmpty(startNode)) { if (!relationMap.containsKey(startNode + "," + basicResourcesInfo.getPhone())) { relationMap.put(startNode + "," + basicResourcesInfo.getPhone(), "PHONE"); } } else { startNode = basicResourcesInfo.getPhone(); } } if (!StringUtils.isEmpty(basicResourcesInfo.getAccount())) { if (!nodeMap.containsKey(basicResourcesInfo.getAccount())) { if (!":".equals(basicResourcesInfo.getAccount())) { nodeMap.put(basicResourcesInfo.getAccount(), "ACCOUNT," + basicResourcesInfo.getServiceType() + ","); } } if (!StringUtils.isEmpty(startNode)) { if (!relationMap.containsKey(startNode + "," + basicResourcesInfo.getAccount())) { relationMap.put(startNode + "," + basicResourcesInfo.getAccount(), "ACCOUNT"); } } else { startNode = basicResourcesInfo.getAccount(); } } if (!StringUtils.isEmpty(basicResourcesInfo.getIdCode())) { if (!nodeMap.containsKey(basicResourcesInfo.getIdCode())) { nodeMap.put(basicResourcesInfo.getIdCode(), "IDCODE,,"); } if (!StringUtils.isEmpty(startNode)) { if (!relationMap.containsKey(startNode + "," + basicResourcesInfo.getIdCode())) { relationMap.put(startNode + "," + basicResourcesInfo.getIdCode(), "IDCODE"); } } else { startNode = basicResourcesInfo.getIdCode(); } } } Map<String, Object> map = new HashMap<String, Object>(); map.put("createDate", yesterday); int totalPage = (int) (countNum / 100000 + 1); map.put("rowNum", 100000); // 保存节点及节点关系数据 Map<String, String> nodeMap = new HashMap<String, String>(); Map<String, String> relationMap = new HashMap<String, String>(); // 生成node节点CSV文件数据 String nodeFileName = yesterday + "-nodes-" + System.currentTimeMillis(); String head = "name:ID,value,serviceType:int,hometown"; CSVFileUtil nodeCsvFileUtil = new CSVFileUtil(); BufferedWriter nodeBufferedWriter = nodeCsvFileUtil.createCSVFile(head, filePath, nodeFileName); // 生成relation节点CSV文件数据 String relationshipFileName = yesterday + "-relationship-" + System.currentTimeMillis(); CSVFileUtil relationCSVFileUtil = new CSVFileUtil(); head = ":START_ID,:END_ID,:TYPE"; BufferedWriter relationBufferedWriter = relationCSVFileUtil.createCSVFile(head, filePath, relationshipFileName); for (int i = 0; i < totalPage; i++) { try { map.put("startRow", i * 100000); List<BasicResourcesInfo> list = logStatisticsMonitorDao.findBasicInfoByDate(map); XxlJobLogger.log(i + "----------" + totalPage + "---------" + list.size()); for (BasicResourcesInfo basicResourcesInfo : list) { // 转换数据为节点和关系数据 saveNodeList(basicResourcesInfo, nodeMap, relationMap); } nodeCsvFileUtil.writeRow(nodeMap); nodeMap.clear(); nodeMap = new HashMap<String, String>(); relationCSVFileUtil.writeRow(relationMap); relationMap.clear(); relationMap = new HashMap<String, String>(); } catch (Exception e) { e.printStackTrace(); } } if (nodeBufferedWriter != null) { nodeBufferedWriter.flush(); nodeBufferedWriter.close(); } if (relationBufferedWriter != null) { relationBufferedWriter.flush(); relationBufferedWriter.close(); } XxlJobLogger.log("清洗节点和关系数据结束,开始导入数据........"); // 暂停neo4j execCommand(filePath + "/../bin/neo4j stop"); // 导出neo4j中的数据 dumpFileName = filePath + "/" + System.currentTimeMillis() + ".dump"; execCommand(filePath + "/../bin/neo4j-admin dump --database=graph.db --to=" + dumpFileName); // 删除neo4j中的数据 execCommand("rm -rf " + filePath + "/../data/databases/graph.db/*"); // 拼接导入节点关系文件的shell StringBuffer sb = new StringBuffer(); sb.append(filePath + "/../bin/neo4j-admin import "); sb.append(" --ignore-extra-columns=true --ignore-missing-nodes=true --ignore-duplicate-nodes=true"); if (new File(filePath + "/" + nodeFileName + "/" + ".csv").exists()) { sb.append(" --nodes:BasicNode " + filePath + nodeFileName + ".csv"); if (new File(filePath + "/" + relationshipFileName + ".csv").exists()) { sb.append(" --relationships " + filePath + "/" + relationshipFileName + ".csv"); } execCommand(sb.toString()); } else { XxlJobLogger.log("导入节点数据失败,未找到node文件: " + (filePath + "/" + nodeFileName + ".csv")); } return ReturnT.SUCCESS; } catch (Exception e) { e.printStackTrace(); return ReturnT.FAIL; } finally { try { XxlJobLogger.log("开始恢复数据........"); File graph = new File(filePath + "/../data/databases/graph.db"); if (graph.exists()) { graph.mkdirs(); } if (!StringUtils.isEmpty(dumpFileName) && new File(dumpFileName).exists()) { execCommand(filePath + "/../bin/neo4j-admin load --database=graph.db --force --from=" + dumpFileName); } execCommand(filePath + "/../bin/neo4j start"); session.run("create index on :BasicNode(name)"); XxlJobLogger.log("createGraphDataBaseNodes, end."); } catch (Exception e) { e.printStackTrace(); } }