需求背景:根据一个父级id查询所有关联在该id下的子级数据
图示大概为:
1 2 3 4 5 6 7 8 9 10 ............................................. .....................................................如查找父节点1下面所有的子节点数量为9(2,3,4,5,6,7,8,9,10)
库表设计为: id parentId 1 -1 2 1 3 1 4 2 5 2 … 如使用sql查询:
String querySql = "select count(id) from (" + " select t1.*," + " if(find_in_set(parent_id, @pids) > 0, @pids := concat(@pids, ',', id), 0) as ischild" + " from (" + " select * from table t order by parent_id, id" + " ) t1," + " (select @pids := :parentId) t2" + " ) t3 where ischild != 0 ";在表规模较大的情况下,@pids拼凑长度会无法控制.
替换为levelDB存储上下级关系,用磁盘db遍历替换数据库查询过程.
具体数据操作过程:
1.记录插入levelDB key的格式为:parentId_selfId_xxx(xxx为附属信息,如状态等) value 为记录使用 protoBuffer 序列化后的数据
插入流程为: 构建记录的key(parentId_selfId),如插入id:2 parentId:1 的记录,构建key(1_2),此时先检查 levelDB里面是否有索引键:(1_0),该索引键用于iterator.seek()的key,用于快速定位 以父级id作为key前缀开始的地方. 最终levelDB中存储的key为: -1_1 1_0 1_2 1_3 2_0 2_4 2_5 .........2.查询列表和统计数量 查询指定parentId下级记录列表和统计下级数量 查询的时候使用TreeSet存储要递归遍历的父级id,初始化为待查询的parentId. levelDB 的迭代器iterator初始化seek到parentId_0的key,然后从该key往后迭代 就是直接以parentId为父级id的记录数,出去key中parentId_id.split("_")[1]即是下级 记录的主键id,然后依次将遍历出来的下级id添加到TreeSet中作为遍历下下级记录的 parentId,重复上述过程.直到TreeSet为空或者达到分页条件.
levelDB数据在初始化判断db文件不存在的时候会加载全表数据,然后通过binlog订阅事件异步更新 levelDB数据,binlog订阅服务可以看看:https://gitee.com/Rezar/dbSub,还在维护,有问题留言沟通.
具体代码为:
import static org.fusesource.leveldbjni.JniDBFactory.factory; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; import java.util.Comparator; import java.util.List; import java.util.Map.Entry; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.RandomUtils; import org.dumas.common.enums.EnumBusinessErrorCode; import org.dumas.common.enums.RecordStatus; import org.dumas.common.exceptions.BusinessException; import org.dumas.common.utils.JacksonUtil; import org.dumas.common.utils.ProtoBufSerializeUtils; import org.dumas.user.base.enums.EnumIteratorFlag; import org.dumas.user.base.enums.EnumRewardLevel; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; import org.iq80.leveldb.Options; import org.iq80.leveldb.ReadOptions; import com.Rezar.dbSub.base.enums.ChangeType; import com.google.common.base.Stopwatch; import com.zmeng.rinascimento.dumas.repository.po.user.UserAssignCodeInfo; import lombok.extern.slf4j.Slf4j; /** * 基于levelDB记录关系,不查DB * * @say little Boy, don't be sad. * @name Rezar * @time Jun 19, 2020 2:13:24 PM * @Desc 些年若许,不负芳华. * * 用于父级找子级 * */ @Slf4j public class CodeRelationStorageTopToSub implements Closeable, CodeInfoStorage { private File storageDir; private DB db; private volatile CountDownLatch initLatch = null; public CodeRelationStorageTopToSub() { this.storageDir = new File(ServerConstants.SAVE_BASE_FILE.getAbsoluteFile(), "codeRelationTopToSub"); log.info("CodeRelationStorageTopToSub storageDir :{}", this.storageDir.getAbsolutePath()); try { if (!this.storageDir.exists()) { this.initLatch = new CountDownLatch(1); } Options options = new Options(); options.comparator(ServerConstants.comparatorForTopToSub()); options.createIfMissing(true); DB db = factory.open(storageDir, options); this.db = db; } catch (IOException e) { log.error("BinlogDataStorageWithLevelDB init db error:{}", e); throw new BusinessException(EnumBusinessErrorCode.SYSTEM_ERROR, "unknow exception while initDbInfo:" + this.storageDir.getAbsolutePath()); } } public static void main(String[] args) throws IOException { CodeRelationStorageTopToSub storage = new CodeRelationStorageTopToSub(); Consumer<UserAssignCodeInfo> doLoadConsumer = storage.doLoadConsumer(); if (doLoadConsumer != null) { List<Long> parentId = new ArrayList<Long>(); parentId.add(-1l); for (int i = 0; i < 200000; i++) { UserAssignCodeInfo build = UserAssignCodeInfo.builder() .parentId(parentId.get(RandomUtils.nextInt(0, parentId.size()))).id((long) i) .level(RandomUtils.nextInt(0, 4)).build(); if (RandomUtils.nextInt(0, 1000) >= 888 && !parentId.contains(build.getId())) { parentId.add(build.getId()); } doLoadConsumer.accept(build); } storage.onLoadOver(); } storage.iteratorAll(); Stopwatch watch = Stopwatch.createStarted(); long parentId = 8l; System.out.println(storage.countSubCode(parentId, true, Arrays.asList(0))); System.out.println("use time:" + watch.elapsed(TimeUnit.MILLISECONDS)); watch = Stopwatch.createStarted(); System.out.println(storage.countSubCode(parentId, true, Arrays.asList(1, 2, 3, 4))); System.out.println("use time:" + watch.elapsed(TimeUnit.MILLISECONDS)); watch = Stopwatch.createStarted(); System.out.println(storage.countSubCode(parentId, false, EnumRewardLevel.list(false))); System.out.println("use time:" + watch.elapsed(TimeUnit.MILLISECONDS)); PageDto pageDto = new PageDto(); pageDto.setPageNum(2); pageDto.setPageSize(2); System.out.println(storage.listSubCodeInfo(parentId, Arrays.asList(0), pageDto).stream() .map(code -> code.getId()).collect(Collectors.toList())); System.out.println("totalCount:" + pageDto.getCount()); IOUtils.closeQuietly(storage); } public void iteratorAll() { DBIterator iterator = this.db.iterator(); iterator.seekToFirst(); while (iterator.hasNext()) { System.out.println(new String(iterator.next().getKey())); } } /** * 用于初始化 * * @return */ @Override public Consumer<UserAssignCodeInfo> doLoadConsumer() { return this.initLatch == null ? null : codeInfo -> { if (log.isDebugEnabled()) { log.debug("init codeInfo:{}", JacksonUtil.obj2Str(codeInfo)); } this.insertCodeInfo(codeInfo); }; } public static byte[] INDEX_VALUE = new byte[] {}; /** * 插入一个code信息:插入parentId_codeId作为key,value还是codeInfo pb 后的二进制数据 * * @param codeInfo */ private void insertCodeInfo(UserAssignCodeInfo codeInfo) { byte[] indexStartKey = (codeInfo.getParentId() + "_0").getBytes(); byte[] bs = this.db.get(indexStartKey); if (bs == null) { // 设置索引起始段 this.db.put(indexStartKey, INDEX_VALUE); } this.db.put((codeInfo.getParentId() + "_" + codeInfo.getId() + "_" + codeInfo.getLevel()).getBytes(), ProtoBufSerializeUtils.serialize(codeInfo)); } @Override public void onLoadOver() { log.info("CodeRelationStorageTopToSub 数据加载完成"); this.initLatch.countDown(); } private void waitLoadOver() { if (initLatch != null) { try { initLatch.await(); } catch (InterruptedException e) { log.error("InterruptedException onChange"); } } } /** * 查指定vip等级的下级用户的数量 * * @param assignCodeId * @param level * @return */ public int countSubCode(Long assignCodeId, boolean justDirect, List<Integer> level) { Stopwatch watch = Stopwatch.createStarted(); try { AtomicInteger countRes = new AtomicInteger(0); BitSet bitSet = new BitSet(); if (level != null) { level.stream().forEach(val -> { bitSet.set(val); }); } this.iteratorSubCodeInfo(assignCodeId, justDirect ? 1 : null, (codeLevel, codeInfo) -> { if (level == null || bitSet.get(codeLevel)) { countRes.incrementAndGet(); } return null; }); return countRes.get(); } finally { watch.stop(); log.info("query assignCodeId:{} with level:{} justDirect:{} useTime:{}", assignCodeId, level, justDirect, watch.elapsed(TimeUnit.MILLISECONDS)); } } /** * 根据父级id找指定条件的下级code * * @param assignCodeId * @return */ public List<UserAssignCodeInfo> listSubCodeInfo(Long assignCodeId, List<Integer> level, PageDto pageDto) { Stopwatch watch = Stopwatch.createStarted(); try { int startIndex = (pageDto.getPageNum() - 1) * pageDto.getPageSize(); int endIndex = startIndex + pageDto.getPageSize(); List<byte[]> codeInfoList = new ArrayList<>(); AtomicInteger curReadIndex = new AtomicInteger(0); BitSet bitSet = new BitSet(); if (level != null) { level.stream().forEach(val -> { bitSet.set(val); }); } AtomicInteger countRes = new AtomicInteger(0); this.iteratorSubCodeInfo(assignCodeId, null, (infoLevel, data) -> { if (level == null || bitSet.get(infoLevel)) { if (curReadIndex.getAndIncrement() >= startIndex && curReadIndex.get() <= endIndex) { codeInfoList.add(data); } countRes.incrementAndGet(); } return null; }); pageDto.setCount(countRes.get()); return codeInfoList.parallelStream() .map(data -> ProtoBufSerializeUtils.deserialize(data, UserAssignCodeInfo.class)).sorted(comparator) .collect(Collectors.toList()); } finally { watch.stop(); log.info("query assignCodeId:{} with level:{} useTime:{}", assignCodeId, level, watch.elapsed(TimeUnit.MILLISECONDS)); } } private static Comparator<UserAssignCodeInfo> comparator = (code1, code2) -> { int res = code1.getParentId().compareTo(code2.getParentId()); if (res != 0) { return res; } else { return code1.getId().compareTo(code2.getId()); } }; public void iteratorSubCodeInfo(Long assignCodeId, Integer iteratorLevel, BiFunction<Integer, byte[], EnumIteratorFlag> codeInfoFunction) { ReadOptions option = new ReadOptions(); option.fillCache(false); int curIteratorLevel = 0; BitSet bitSet = new BitSet(); TreeSet<Long> parentIdSet = new TreeSet<Long>(); parentIdSet.add(assignCodeId); OUTTER: while (!parentIdSet.isEmpty()) { DBIterator iterator = this.db.iterator(option); Long curParentId = parentIdSet.pollFirst(); bitSet.set(curParentId.intValue()); String keyStartPrefix = curParentId + "_"; iterator.seek((keyStartPrefix + "0").getBytes()); boolean match = false; while (iterator.hasNext()) { Entry<byte[], byte[]> next = iterator.next(); if (!new String(next.getKey()).startsWith(keyStartPrefix)) { break; } match = true; if (next.getValue().length != 0) { String[] split = new String(next.getKey()).split("_"); EnumIteratorFlag apply = codeInfoFunction.apply(Integer.parseInt(split[2]), next.getValue()); if (apply != null && apply == EnumIteratorFlag.BREAK) { break OUTTER; } int codeId = Integer.parseInt(split[1]); if (!bitSet.get(codeId)) { parentIdSet.add((long) codeId); } } } IOUtils.closeQuietly(iterator); if (!match) { break; } if (iteratorLevel != null && ++curIteratorLevel >= iteratorLevel) { break; } } } @Override public void close() throws IOException { log.info("try to close CodeRelationStorageTopToSub"); if (this.db != null) { IOUtils.closeQuietly(db); } } public void delCodeInfo(String assignCode) { waitLoadOver(); this.db.delete(assignCode.getBytes()); } @Override public void onChange(UserAssignCodeInfo codeInfo, ChangeType changeType) { waitLoadOver(); if (changeType == ChangeType.DELETE) { this.db.delete((codeInfo.getParentId() + "_" + codeInfo.getId()).getBytes()); } else { if (codeInfo.getAssignStatus() == RecordStatus.NORMAL.status) { return; } this.insertCodeInfo(codeInfo); } } } comparatorForTopToSub()实现: public static DBComparator comparatorForTopToSub() { return new DBComparator() { @Override public int compare(byte[] o1, byte[] o2) { String key1 = new String(o1); String key2 = new String(o2); String[] key1Array = key1.split("_"); String[] key2Array = key2.split("_"); for (int i = 0; i < 2; i++) { int res = Integer.valueOf(key1Array[i]).compareTo(Integer.valueOf(key2Array[i])); if (res != 0) { return res; } } return 0; } @Override public String name() { return "comparatorForTopToSub"; } @Override public byte[] findShortestSeparator(byte[] start, byte[] limit) { return start; } @Override public byte[] findShortSuccessor(byte[] key) { return key; } }; }