【数据仓库】元数据血缘分析

    技术2022-07-11  78

    现在数据仓库基本上采用Hadoop平台了,那么数据仓库里面元数据的血缘分析的思路有哪些呢

    基本上有下面这两种思路:

    1、解析hql脚本,通过正则表达式去匹配每一行字符串

    2、采用Hadoop自带的语法分析类解析

    这里比较建议采用第二种,比较直接简单,因为第一种方式比较复杂,需要考虑场景比较多,容易出现遗漏

    Hadoop 自带的类 org.apache.hadoop.hive.ql.tools.LineageInfo

    将hql语句通过解析语法tree,获取hive表的源表和目标表,达到血缘分析的目的

    但是这个类有一点缺陷就是对于create table xx as 这种hql语句无法解析

    我们稍加修改代码就可以解决了

    代码如下:

    package com.neo.datamanager; import org.apache.hadoop.hive.ql.lib.*; import org.apache.hadoop.hive.ql.parse.*; import java.io.IOException; import java.util.*; public class HiveLineageInfo implements NodeProcessor { // private static final Logger logger = LoggerFactory.getLogger(HiveLineageInfo.class); /** * Stores input tables in sql. */ TreeSet inputTableList = new TreeSet(); /** * Stores output tables in sql. */ TreeSet OutputTableList = new TreeSet(); /** * @return java.util.TreeSet */ public TreeSet getInputTableList() { return inputTableList; } /** * @return java.util.TreeSet */ public TreeSet getOutputTableList() { return OutputTableList; } /** * Implements the process method for the NodeProcessor interface. */ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { ASTNode pt = (ASTNode) nd; switch (pt.getToken().getType()) { case HiveParser.TOK_CREATETABLE: OutputTableList.add(BaseSemanticAnalyzer.getUnescapedName((ASTNode) pt.getChild(0))); break; case HiveParser.TOK_TAB: OutputTableList.add(BaseSemanticAnalyzer.getUnescapedName((ASTNode) pt.getChild(0))); break; case HiveParser.TOK_TABREF: ASTNode tabTree = (ASTNode) pt.getChild(0); String table_name = (tabTree.getChildCount() == 1) ? BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) : BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) + "." + tabTree.getChild(1); inputTableList.add(table_name); break; } return null; } /** * parses given query and gets the lineage info. * * @param query * @throws ParseException */ public void getLineageInfo(String query) throws ParseException, SemanticException { /* * Get the AST tree */ ParseDriver pd = new ParseDriver(); ASTNode tree = pd.parse(query); while ((tree.getToken() == null) && (tree.getChildCount() > 0)) { tree = (ASTNode) tree.getChild(0); } /* * initialize Event Processor and dispatcher. */ inputTableList.clear(); OutputTableList.clear(); // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. The dispatcher // generates the plan from the operator tree Map<Rule, NodeProcessor> rules = new LinkedHashMap<Rule, NodeProcessor>(); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(this, rules, null); GraphWalker ogw = new DefaultGraphWalker(disp); // Create a list of topop nodes ArrayList topNodes = new ArrayList(); topNodes.add(tree); ogw.startWalking(topNodes, null); } public static void main(String[] args) throws IOException, ParseException, SemanticException { String query = "insert into table aa select * from bb union all select * from cc"; HiveLineageInfo lep = new HiveLineageInfo(); lep.getLineageInfo(query); System.out.println("Input tables = " + lep.getInputTableList()); System.out.println("Output tables = " + lep.getOutputTableList()); } }

    运行之后结果如下:

    resulttableinput_table[bb, cc]output_table[aa]
    Processed: 0.010, SQL: 9