HBase-4 hbase过滤器

    技术2025-02-07  42

    hbase过滤器

    概念种类比较过滤器比较过滤器的运算符比较过滤器的比较器连接代码RowFilter(rowkey过滤器)FamilyFilter(列族过滤器)QualifierFilter(列过滤器)ValueFilter(列值过滤器) 专用过滤器SingleColumnValueFilter(单列值过滤器)SingleColumnValueExcludeFilter(列值排除过滤器)PrefixFilter(前缀过滤器)PageFilter(分页过滤器PageFilter) 多过滤器组合使用官网连接

    概念

    过滤器的作用是在服务端判断是否满足条件,满足则将数据返回给客户端

    种类

    比较过滤器专用过滤器

    比较过滤器

    比较过滤器有两个参数:

    比较运算符比较器

    使用比较过滤器前,先确定运算符和比较器,然后确定比较器。最后使用scan等加载比较器。

    比较过滤器的运算符

    LESS < LESS_OR_EQUAL <= EQUAL = NOT_EQUAL <> GREATER_OR_EQUAL >= GREATER > NO_OP 排除所有

    比较过滤器的比较器

    BinaryComparator 按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[]) BinaryPrefixComparator 跟前面相同,只是比较左端前缀的数据是否相同 NullComparator 判断给定的是否为空 BitComparator 按位比较 RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL SubstringComparator 判断提供的子串是否出现在中

    连接代码

    private Table table; private Connection connection; private final String TABLE_NAME = "myUser"; @Before public void initTable() throws IOException { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181"); connection = ConnectionFactory.createConnection(configuration); table = connection.getTable(TableName.valueOf(TABLE_NAME)); } @After public void close() throws IOException { table.close(); connection.close(); }

    RowFilter(rowkey过滤器)

    /** * 查询所有的rowkey比0003小的所有的数据 */ @Test public void rowFilter() throws IOException { Scan scan = new Scan(); BinaryComparator binaryComparator = new BinaryComparator("0003".getBytes()); RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.LESS, binaryComparator); scan.setFilter(rowFilter); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { List<Cell> cells = result.listCells(); for (Cell cell :cells) { byte[] family_name = CellUtil.cloneFamily(cell); byte[] qualifier = CellUtil.cloneQualifier(cell); byte[] rowkey = CellUtil.cloneRow(cell); byte[] value = CellUtil.cloneValue(cell); if ("age".equals(Bytes.toString(qualifier)) || "id".equals(Bytes.toString(qualifier))){ System.out.println("rowkey -" + Bytes.toString(rowkey) + "family - " + Bytes.toString(family_name) + "column - " + Bytes.toString(qualifier) + "value - " + Bytes.toInt(value)); }else { System.out.println("rowkey -" + Bytes.toString(rowkey) + "family - " + Bytes.toString(family_name) + "column - " + Bytes.toString(qualifier) + "value - " + Bytes.toString(value)); } } } } 查询结果:

    FamilyFilter(列族过滤器)

    /** * 通过familyFilter来实现列族的过滤 * 需要过滤,列族名包含f2 * f1 f2 hello world */ @Test public void FamlitFilter() throws IOException { Scan scan = new Scan(); SubstringComparator substringComparator = new SubstringComparator("f2"); FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, substringComparator); scan.setFilter(familyFilter); ResultScanner scanner = table.getScanner(scan); //处理scanner结果同RowFilter }

    QualifierFilter(列过滤器)

    /** * 列名过滤器 只查询包含name列的值 */ @Test public void qualifierFilter() throws IOException { Scan scan = new Scan(); SubstringComparator substringComparator = new SubstringComparator("name"); QualifierFilter qualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, substringComparator); Scan scan1 = scan.setFilter(qualifierFilter); ResultScanner scanner = table.getScanner(scan); //处理scanner逻辑同上 }

    ValueFilter(列值过滤器)

    /** * 查询哪些字段值 包含数字8 */ @Test public void valueFilter() throws IOException { Scan scan = new Scan(); SubstringComparator substringComparator = new SubstringComparator("8"); ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, substringComparator); scan.setFilter(valueFilter); ResultScanner scanner = table.getScanner(scan); //处理scanner逻辑同上 }

    专用过滤器

    SingleColumnValueFilter(单列值过滤器)

    类似行查询查询符合条件的cell所在的一行所有的数据SingleColumnValueFilter中的参数:列族和列名起仅起定位作用,最终查询得到的结果不止该列族和列,而是该行 /** * Constructor for binary compare of the value of a single column. If the * column is found and the condition passes, all columns of the row will be * emitted. If the condition fails, the row will not be emitted. * <p> * Use the filterIfColumnMissing flag to set whether the rest of the columns * in a row will be emitted if the specified column to check is not found in * the row. * * @param family name of column family * @param qualifier name of column qualifier * @param compareOp operator * @param value value to compare column values against */ @Test public void singleColumnValueFilter() throws IOException { //查询 f1 列族 name 列 值为刘备的数据 Scan scan = new Scan(); //单列值过滤器,过滤 f1 列族 name 列 值为刘备的数据 SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("f1".getBytes(), "name".getBytes(), CompareFilter.CompareOp.EQUAL, "刘备".getBytes()); scan.setFilter(singleColumnValueFilter); ResultScanner scanner = table.getScanner(scan); //遍历ResultScanner 得到每一条数据,每一条数据都是封装在result对象里面了 for (Result result : scanner) { List<Cell> cells = result.listCells(); for (Cell cell : cells) { byte[] family_name = CellUtil.cloneFamily(cell); byte[] qualifier_name = CellUtil.cloneQualifier(cell); byte[] rowkey = CellUtil.cloneRow(cell); byte[] value = CellUtil.cloneValue(cell); //判断id和age字段,这两个字段是整形值 if ("age".equals(Bytes.toString(qualifier_name)) || "id".equals(Bytes.toString(qualifier_name))) { System.out.println("数据的rowkey为" + Bytes.toString(rowkey) + "======数据的列族为" + Bytes.toString(family_name) + "======数据的列名为" + Bytes.toString(qualifier_name) + "==========数据的值为" + Bytes.toInt(value)); } else { System.out.println("数据的rowkey为" + Bytes.toString(rowkey) + "======数据的列族为" + Bytes.toString(family_name) + "======数据的列名为" + Bytes.toString(qualifier_name) + "==========数据的值为" + Bytes.toString(value)); } } } } 查询结果

    SingleColumnValueExcludeFilter(列值排除过滤器)

    类似行查询SingleColumnValueExcludeFilter中的参数:列族和列名起仅起定位作用,最终查询得到的结果不止该列族和列,而是该行和上面类似,查询符合条件的cell所在的行的所有数据,排除掉该cell之后返回剩余的cells /** * Constructor for binary compare of the value of a single column. If the * column is found and the condition passes, all columns of the row will be * emitted; except for the tested column value. If the column is not found or * the condition fails, the row will not be emitted. * * @param family name of column family * @param qualifier name of column qualifier * @param compareOp operator * @param value value to compare column values against */ 查询结果

    PrefixFilter(前缀过滤器)

    /** * 查询rowkey前缀以 00开头的所有的数据 */ @Test public void prefixFilter() throws IOException { Scan scan = new Scan(); //过滤rowkey以 00开头的数据 PrefixFilter prefixFilter = new PrefixFilter("00".getBytes()); scan.setFilter(prefixFilter); ResultScanner scanner = table.getScanner(scan); //处理scanner的逻辑和上面一样 } 部分查询结果

    PageFilter(分页过滤器PageFilter)

    hbase没有提供直接查询到指定位置row的API,所以查询分页可以按下面的步骤查询:

    先确定要查询哪页,然后 若首页:直接从rowkey位置为0开始遍历,即设置startrow = “”若非首页:先通过PageFilter定位到该页的首个rowkey,然后以创建新的PageFilter,以该rowkey为首个startrow 举例:设一页有2个rowkey,想得到第五个rowkey的值,那么需要做以下计算查询(3-1)*2 + 1个rowkey。即(目标页-1)*每页的rowkey数量+偏移以新的rowkey为startrow @Test public void pageFilter() throws IOException { //页码 int pageNum = 3; //每页的大小 int pageSize = 2; //全表扫描 Scan scan = new Scan(); if (pageNum == 1){ //设置RPC每次调用获得最大值 scan.setMaxResultSize(1024); //hbase表中,默认的开始行键是"",又因为hbase是按rowkey进行排序,所以空必然在第一位 scan.setStartRow("".getBytes()); //通过PageFilter实现分页,参数表示每页返回的result的最大值 PageFilter pageFilter = new PageFilter(pageSize); scan.setFilter(pageFilter); ResultScanner scanner = table.getScanner(scan); //处理处理该页数据 }else{ String startRow = ""; //如果所得分页的值不是第一页,先获取此页的第一个rowkey的值 //设查询第5条rowkey,则查询(3-1) * 2 + 1 因为HBASE中没有方法直接的定位到指定页数的首个行键, // 即通过页数间接的得到想要查询的指定列的第一条数据 int scanDatas = (pageNum - 1)*pageSize + 1; PageFilter pageFilter = new PageFilter(scanDatas); scan.setFilter(pageFilter); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { //获得rowkey 此时的rowkey是想要查找的指定页的第一条数据,即第5条rowkey byte[] row_bytes = result.getRow(); startRow = Bytes.toString(row_bytes); } scan.setStartRow(startRow.getBytes()); scan.setMaxResultSize(1024); PageFilter pageFilter1 = new PageFilter(pageSize); scan.setFilter(pageFilter1); ResultScanner scanner1 = table.getScanner(scan); //处理该页数据 } }

    多过滤器组合使用

    /** * 查询 f1 列族 name 为刘备数据值 * 并且rowkey 前缀以 00开头数据 */ @Test public void filterList() throws IOException { Scan scan = new Scan(); SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("f1".getBytes(), "name".getBytes(), CompareFilter.CompareOp.EQUAL, "刘备".getBytes()); PrefixFilter prefixFilter = new PrefixFilter("00".getBytes()); FilterList filterList = new FilterList(); filterList.addFilter(singleColumnValueFilter); filterList.addFilter(prefixFilter); scan.setFilter(filterList); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { List<Cell> cells = result.listCells(); for (Cell cell : cells) { byte[] family_name = CellUtil.cloneFamily(cell); byte[] qualifier_name = CellUtil.cloneQualifier(cell); byte[] rowkey = CellUtil.cloneRow(cell); byte[] value = CellUtil.cloneValue(cell); //判断id和age字段,这两个字段是整形值 if ("age".equals(Bytes.toString(qualifier_name)) || "id".equals(Bytes.toString(qualifier_name))) { System.out.println("数据的rowkey为" + Bytes.toString(rowkey) + "======数据的列族为" + Bytes.toString(family_name) + "======数据的列名为" + Bytes.toString(qualifier_name) + "==========数据的值为" + Bytes.toInt(value)); } else { System.out.println("数据的rowkey为" + Bytes.toString(rowkey) + "======数据的列族为" + Bytes.toString(family_name) + "======数据的列名为" + Bytes.toString(qualifier_name) + "==========数据的值为" + Bytes.toString(value)); } } } } 查询结果:

    官网连接

    官网连接

    Processed: 0.015, SQL: 9