flink实战(二)flink API方式关联hbase维度数据处理

    技术2025-04-27  24

    概述

    接上一篇flink-sql关联hbase维度数据处理。这次我们使用api的方式来实现。并解决上次提到的问题:订单支付成功后,可以退款,退款完成后订单状态会变成失效,那么统计结果中不应该包含退款成功后相关数据,这次的代码是在上一篇总结的基础上进行的改造,因此只给出了新增的代码逻辑。

    实现代码

    //main方法 //3.2 直接入库Hbase库的维度数据和需要进行实时计算的数据这里分别写了一个,因为还是有些不同的 //3.2.1 维度数据的处理 JobDefine.userJob(env, props); JobDefine.productJob(env, props); //实时计算的处理 JobDefine.orderJob1(env,tEnv,props);//修改项 //执行任务 env.execute("orderJob"); //JobDefine.orderJob1方法 public static void orderJob1(StreamExecutionEnvironment env, Properties props){ SingleOutputStreamOperator orderSource = env.addSource(new FlinkKafkaConsumer010<>("order表对应的topic", new SimpleStringSchema(), props)); //将kafka读取的数据(为json字符串SimpleStringSchema)转换成Java对象 自定义类 见后续代码 SingleOutputStreamOperator<Order> orderMessageStream = orderSource.flatMap(new MyKafkaRichFlatMapFunction<Order>()).returns(Order.class); //将order数据进行实时计算并将结果保存到MySQL中 new OrderFunction().handle(orderMessageStream, env); } //OrderFunction.handle方法 public void handle(SingleOutputStreamOperator<Order> streamOperator, StreamExecutionEnvironment envtEnv){ //先分组 SingleOutputStreamOperator<List<Result>> process = streamOperator.keyBy("orderID", "userID", "productID") //进行滑动处理时间窗口计算 .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) //全量聚合 .process(new OrderProcessWindowFunction()); process.addSink(new OrderMysqlSinkFunction()).name("OrderMysqlSinkFunction"); }

    OrderProcessWindowFunction全量聚合函数

    /** * 继承ProcessWindowFunction类,输入类型、输出类型、key、时间窗口 */ public class OrderProcessWindowFunction extends ProcessWindowFunction<Order, List<Result>, Tuple, TimeWindow> { //中间状态数据 使用的是mapstate private MapState<String,Result> mapState; //hbase连接 protected Connection connection; //hbase表 protected HTable userTable = null; protected HTable productTable = null; /** * 启动的时候执行一次,进行相关数据的初始化 */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //初始化中间状态数据 MapStateDescriptor<String, Result> orderMapStatedesc = new MapStateDescriptor<>( "orderMapState", String.class, Result.class); mapState = getRuntimeContext().getMapState(orderMapStatedesc); //初始化hbase连接 ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig() .getGlobalJobParameters(); org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", globalParams.toMap().get("hbase.zookeeper.quorum")); connection = ConnectionFactory.createConnection(conf); } @Override public void process(Tuple tuple, Context context, Iterable<Order> elements, Collector<List<Result>> out) throws Exception { Iterator<Order> it = elements.iterator(); //遍历时间窗口中的所有订单数据 while(it.hasNext()){ Order order = it.next(); String key = order.getOrderID()+":"+order.getUserID()+":"+order.getProductID()+":"+order.getBuyDate(); //根据key从中间状态中获取结果 Result state = mapState.get(key); if(state==null){ //如果中间状态中没有结果,则根据当前订单创建一个结果,并为结果设置相关值 Result result = new Result(); result.setBuyDate(order.getBuyDate()); result.setProductCount(order.getProductCount()); result.setTotalMoney(order.getMoney()*order.getProductCount()); //关联hbase,查询用户表 Integer userID = order.getUserID(); userTable = (HTable) connection.getTable(TableName.valueOf("t_user")); org.apache.hadoop.hbase.client.Result user = userTable.get(new Get(Bytes.toBytes(userID)) .addColumn(Bytes.toBytes("f"), Bytes.toBytes("uname"))); userTable.close(); result.setUserName(Bytes.toString(user.getValue(Bytes.toBytes("f"), Bytes.toBytes("uname")))); //管理hbase,查询产品表 Integer productID = order.getProductID(); productTable = (HTable) connection.getTable(TableName.valueOf("t_user")); org.apache.hadoop.hbase.client.Result product = productTable.get(new Get(Bytes.toBytes(productID)) .addColumn(Bytes.toBytes("f"), Bytes.toBytes("pname")) .addColumn(Bytes.toBytes("f"), Bytes.toBytes("pt"))); productTable.close(); result.setProductName(Bytes.toString(user.getValue(Bytes.toBytes("f"), Bytes.toBytes("pname")))); result.setProductType(Bytes.toString(user.getValue(Bytes.toBytes("f"), Bytes.toBytes("pt")))); //将结果保存到中间状态中 mapState.put(key, result); }else { //判断当前订单状态,如果是正常状态,则执行数据相加 if(order.getOrderStatus().equals("0")){ state.setProductCount(order.getProductCount()+state.getProductCount()); state.setTotalMoney(order.getMoney()*order.getProductCount()+state.getTotalMoney()); }else{ //判断当前订单状态,如果是异常状态,则执行数据相减 state.setProductCount(state.getProductCount()-order.getProductCount()); state.setTotalMoney(state.getTotalMoney()-(order.getMoney()*order.getProductCount())); } //更新中间状态数据 mapState.put(key, state); } } //将数据发送给流程下游节点 out.collect(Lists.newArrayList(mapState.values())); } @Override public void close() throws Exception { //关闭连接 connection.close(); } }

    可以看到,虽然写sql的方式比较简单,但遇到逻辑叫复杂的情况时,还是没有使用原生api方式灵活。

    Processed: 0.011, SQL: 9