概述
接上一篇flink-sql关联hbase维度数据处理。这次我们使用api的方式来实现。并解决上次提到的问题:订单支付成功后,可以退款,退款完成后订单状态会变成失效,那么统计结果中不应该包含退款成功后相关数据,这次的代码是在上一篇总结的基础上进行的改造,因此只给出了新增的代码逻辑。
实现代码
//main方法
//3.2 直接入库Hbase库的维度数据和需要进行实时计算的数据这里分别写了一个,因为还是有些不同的
//3.2.1 维度数据的处理
JobDefine.userJob(env, props);
JobDefine.productJob(env, props);
//实时计算的处理
JobDefine.orderJob1(env,tEnv,props);//修改项
//执行任务
env.execute("orderJob");
//JobDefine.orderJob1方法
public static void orderJob1(StreamExecutionEnvironment env, Properties props){
SingleOutputStreamOperator orderSource = env.addSource(new FlinkKafkaConsumer010<>("order表对应的topic", new SimpleStringSchema(), props));
//将kafka读取的数据(为json字符串SimpleStringSchema)转换成Java对象 自定义类 见后续代码
SingleOutputStreamOperator<Order> orderMessageStream = orderSource.flatMap(new MyKafkaRichFlatMapFunction<Order>()).returns(Order.class);
//将order数据进行实时计算并将结果保存到MySQL中
new OrderFunction().handle(orderMessageStream, env);
}
//OrderFunction.handle方法
public void handle(SingleOutputStreamOperator<Order> streamOperator, StreamExecutionEnvironment envtEnv){
//先分组
SingleOutputStreamOperator<List<Result>> process = streamOperator.keyBy("orderID", "userID", "productID")
//进行滑动处理时间窗口计算
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
//全量聚合
.process(new OrderProcessWindowFunction());
process.addSink(new OrderMysqlSinkFunction()).name("OrderMysqlSinkFunction");
}
OrderProcessWindowFunction全量聚合函数
public class OrderProcessWindowFunction extends ProcessWindowFunction<Order
, List
<Result>, Tuple
, TimeWindow
> {
private MapState
<String,Result> mapState
;
protected Connection connection
;
protected HTable userTable
= null
;
protected HTable productTable
= null
;
@Override
public void open(Configuration parameters
) throws Exception
{
super.open(parameters
);
MapStateDescriptor
<String, Result> orderMapStatedesc
= new MapStateDescriptor<>(
"orderMapState", String
.class, Result
.class);
mapState
= getRuntimeContext().getMapState(orderMapStatedesc
);
ExecutionConfig
.GlobalJobParameters globalParams
= getRuntimeContext().getExecutionConfig()
.getGlobalJobParameters();
org
.apache
.hadoop
.conf
.Configuration conf
= HBaseConfiguration
.create();
conf
.set("hbase.zookeeper.quorum", globalParams
.toMap().get("hbase.zookeeper.quorum"));
connection
= ConnectionFactory
.createConnection(conf
);
}
@Override
public void process(Tuple tuple
, Context context
, Iterable
<Order> elements
, Collector
<List
<Result>> out
) throws Exception
{
Iterator
<Order> it
= elements
.iterator();
while(it
.hasNext()){
Order order
= it
.next();
String key
= order
.getOrderID()+":"+order
.getUserID()+":"+order
.getProductID()+":"+order
.getBuyDate();
Result state
= mapState
.get(key
);
if(state
==null
){
Result result
= new Result();
result
.setBuyDate(order
.getBuyDate());
result
.setProductCount(order
.getProductCount());
result
.setTotalMoney(order
.getMoney()*order
.getProductCount());
Integer userID
= order
.getUserID();
userTable
= (HTable
) connection
.getTable(TableName
.valueOf("t_user"));
org
.apache
.hadoop
.hbase
.client
.Result user
= userTable
.get(new Get(Bytes
.toBytes(userID
))
.addColumn(Bytes
.toBytes("f"), Bytes
.toBytes("uname")));
userTable
.close();
result
.setUserName(Bytes
.toString(user
.getValue(Bytes
.toBytes("f"), Bytes
.toBytes("uname"))));
Integer productID
= order
.getProductID();
productTable
= (HTable
) connection
.getTable(TableName
.valueOf("t_user"));
org
.apache
.hadoop
.hbase
.client
.Result product
= productTable
.get(new Get(Bytes
.toBytes(productID
))
.addColumn(Bytes
.toBytes("f"), Bytes
.toBytes("pname"))
.addColumn(Bytes
.toBytes("f"), Bytes
.toBytes("pt")));
productTable
.close();
result
.setProductName(Bytes
.toString(user
.getValue(Bytes
.toBytes("f"), Bytes
.toBytes("pname"))));
result
.setProductType(Bytes
.toString(user
.getValue(Bytes
.toBytes("f"), Bytes
.toBytes("pt"))));
mapState
.put(key
, result
);
}else {
if(order
.getOrderStatus().equals("0")){
state
.setProductCount(order
.getProductCount()+state
.getProductCount());
state
.setTotalMoney(order
.getMoney()*order
.getProductCount()+state
.getTotalMoney());
}else{
state
.setProductCount(state
.getProductCount()-order
.getProductCount());
state
.setTotalMoney(state
.getTotalMoney()-(order
.getMoney()*order
.getProductCount()));
}
mapState
.put(key
, state
);
}
}
out
.collect(Lists
.newArrayList(mapState
.values()));
}
@Override
public void close() throws Exception
{
connection
.close();
}
}
可以看到,虽然写sql的方式比较简单,但遇到逻辑叫复杂的情况时,还是没有使用原生api方式灵活。