概述
最近项目中用到flink进行实时计算,流程为从kafka读取数据,如果是维度数据,则插入到hbase中,如果是需要实时计算的数据, 则进行实时计算,并将计算结果保存到MySQL中。在实时计算过程中,可能会用到hbase中的维度数据,为了开发的效率,使用flink-sql的方式实现。 flink-sql是在flink流式计算的基础上进行了高度抽象,使开发过程更简单,更有效率,但要理解sql执行背后的原理还是需要 仔细学习flink流式计算的相关内容。本文主要以flink-sql实现相关功能。
假设需求如下:
用户数据,需要从kafka中读取,并保存到hbase中,作为维度数据。商品数据,需要从kafka中读取,并保存到hbase中,作为维度数据。订单数据,需要从kafka中读取,并进行实时计算,最后将计算结果保存到MySQL中(表示支付成功后的订单数据)。
假设需要统计用户每天的订单数量
用户数据:userID, userName, sex, address (用户ID,用户名,性别,地址)
商品数据:productID, productName, productType (商品ID,商品名称,商品类型)
订单数据:orderID, userID, productID, productCount, money, buyDate (订单ID,用户ID,商品ID,商品数量,商品单价,购买日期)
最终计算结果:userName, productType, productName, buyDate, productCount, totalMoney (用户名,商品类型,商品名称,购买日期,商品数量,商品总价) 即:要求计算某人在某日购买了某种类别,某种商品名称的商品数量,花费了多少钱
maven依赖
<dependencies>
<dependency>
<groupId>org.slf4j
</groupId>
<artifactId>slf4j-api
</artifactId>
<version>1.7.7
</version>
</dependency>
<dependency>
<groupId>redis.clients
</groupId>
<artifactId>jedis
</artifactId>
<version>2.9.0
</version>
</dependency>
<dependency>
<groupId>log4j
</groupId>
<artifactId>log4j
</artifactId>
<version>1.2.17
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-clients_2.11
</artifactId>
<version>1.10.0
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-java
</artifactId>
<version>1.10.0
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-streaming-java_2.11
</artifactId>
<version>1.10.0
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-connector-kafka-0.10_2.11
</artifactId>
<version>1.10.0
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-table-common
</artifactId>
<version>1.10.0
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-streaming-scala_2.11
</artifactId>
<version>1.10.0
</version>
</dependency>
<dependency>
<groupId>com.alibaba
</groupId>
<artifactId>fastjson
</artifactId>
<version>1.2.58
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-hbase_2.11
</artifactId>
<version>1.10.0
</version>
<exclusions>
<exclusion>
<groupId>org.apache.hbase
</groupId>
<artifactId>hbase-client
</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hbase
</groupId>
<artifactId>hbase-server
</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka
</groupId>
<artifactId>kafka-clients
</artifactId>
<version>0.10.2.1
</version>
</dependency>
<dependency>
<groupId>org.apache.hbase
</groupId>
<artifactId>hbase-shaded-client
</artifactId>
<version>1.4.3
</version>
</dependency>
<dependency>
<groupId>org.apache.hbase
</groupId>
<artifactId>hbase-common
</artifactId>
<version>1.4.3
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-sql-client_2.11
</artifactId>
<version>1.10.0
</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-shaded-hadoop2
</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-shaded-hive
</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty
</groupId>
<artifactId>jetty-all
</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j
</groupId>
<artifactId>slf4j-api
</artifactId>
</exclusion>
<exclusion>
<groupId>log4j
</groupId>
<artifactId>log4j
</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j
</groupId>
<artifactId>slf4j-log4j12
</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-jdbc_2.11
</artifactId>
<version>1.10.0
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-table-planner-blink_2.11
</artifactId>
<version>1.10.0
</version>
</dependency>
<dependency>
<groupId>org.apache.flink
</groupId>
<artifactId>flink-table-api-java-bridge_2.11
</artifactId>
<version>1.10.0
</version>
</dependency>
<dependency>
<groupId>mysql
</groupId>
<artifactId>mysql-connector-java
</artifactId>
<version>5.1.46
</version>
</dependency>
<dependency>
<groupId>com.alibaba
</groupId>
<artifactId>druid
</artifactId>
<version>1.1.21
</version>
</dependency>
<dependency>
<groupId>cglib
</groupId>
<artifactId>cglib
</artifactId>
<version>2.2
</version>
</dependency>
</dependencies>
实体类
@Data
public class User implements Serializable {
private Integer userID
;
private String userName
;
private String sex
;
private String address
;
}
@Data
public class Product implements Serializable {
private Integer productID
;
private String productName
;
private String productType
;
}
@Data
public class Order implements Serializable {
private Integer orderID
;
private Integer userID
;
private Integer productID
;
private Integer productCount
;
private Double money
;
private String buyDate
;
}
@Data
public class Result implements Serializable {
private String userName
;
private String productType
;
private String productName
;
private String buyDate
;
private Integer productCount
;
private Double totalMoney
;
}
创建执行环境
public class Main {
public static void main(String
[] args
)throws Exception
{
StreamExecutionEnvironment env
= StreamExecutionEnvironment
.getExecutionEnvironment();
EnvironmentSettings settings
= EnvironmentSettings
.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv
= StreamTableEnvironment
.create(env
, settings
);
env
.enableCheckpointing(60000, CheckpointingMode
.EXACTLY_ONCE
);
env
.setStreamTimeCharacteristic(TimeCharacteristic
.ProcessingTime
);
CheckpointConfig checkpointConfig
= env
.getCheckpointConfig();
checkpointConfig
.enableExternalizedCheckpoints(CheckpointConfig
.ExternalizedCheckpointCleanup
.RETAIN_ON_CANCELLATION
);
checkpointConfig
.setTolerableCheckpointFailureNumber(0);
env
.setRestartStrategy(RestartStrategies
.fixedDelayRestart(3, 60000));
Map
<String, String> maps
= new ImmutableMap.Builder<String, String>().
put("kafka.url", args
[0]).
put("hbase.zookeeper.quorum", args
[1]).
put("dbUrl", args
[2]).
put("dbName", args
[3]).
put("user", args
[4]).
put("psw", args
[5]).build();
env
.getConfig().setGlobalJobParameters(ParameterTool
.fromMap(maps
));
tEnv
.registerFunction("time_format", new DateFormatFunction());
String zkUrl
= env
.getConfig().getGlobalJobParameters().toMap().getOrDefault("hbase.zookeeper.quorum", "");
Configuration conf
= HBaseConfiguration
.create();
conf
.set("hbase.zookeeper.quorum", zkUrl
);
TableDefine
.defineUserHbaseTable(tEnv
, conf
);
TableDefine
.defineProductHbaseTable(tEnv
, conf
);
String kafkaUrl
= env
.getConfig().getGlobalJobParameters().toMap().getOrDefault("kafka.url", "");
Properties props
= new Properties();
props
.setProperty("bootstrap.servers",kafkaUrl
);
props
.setProperty("group.id", "配置的groupID");
props
.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props
.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
JobDefine
.userJob(env
, props
);
JobDefine
.productJob(env
, props
);
JobDefine
.orderJob(env
,tEnv
,props
);
env
.execute("orderJob");
}
}
将kafka读取的json串转换成具体是Java实体类
public class MyKafkaRichFlatMapFunction<OUT> extends RichFlatMapFunction<String, OUT> {
private static Logger logger
= LogManager
.getLogger(MyKafkaRichFlatMapFunction
.class);
@Override
public void flatMap(String value
, Collector
<OUT> collector
) {
try {
if(value
!= null
&& !"".equals(value
)){
Class
<OUT> tClass
= (Class
<OUT>)((ParameterizedType
)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
OUT out
= JSONObject
.parseObject(value
, tClass
);
collector
.collect(out
);
}
} catch (Exception e
) {
logger
.error("AbstractKafkaRichFlatMapFunction 发生异常:" + value
, e
);
}
}
}
sql中用到的时间转换函数
public class DateFormatFunction extends ScalarFunction {
public String
eval(Timestamp time
, String format
) {
return new SimpleDateFormat(format
).format(new Date(time
.getTime()));
}
}
hbase 表定义
public class TableDefine {
public static void defineUserHbaseTable(StreamTableEnvironment tEnv
,Configuration conf
){
HBaseTableSource hBaseTableSource
= new HBaseTableSource(conf
, "t_user");
hBaseTableSource
.setRowKey("rowKey", Integer
.class);
hBaseTableSource
.addColumn("f", "uid", Integer
.class);
hBaseTableSource
.addColumn("f", "uname", String
.class);
hBaseTableSource
.addColumn("f", "sex", String
.class);
hBaseTableSource
.addColumn("f", "address", String
.class);
tEnv
.registerFunction("t_user", hBaseTableSource
.getLookupFunction(new String[]{"rowKey"}));
}
public static void defineProductHbaseTable(StreamTableEnvironment tEnv
,Configuration conf
){
HBaseTableSource hBaseTableSource
= new HBaseTableSource(conf
, "t_product");
hBaseTableSource
.setRowKey("rowKey", Integer
.class);
hBaseTableSource
.addColumn("f", "pid", Integer
.class);
hBaseTableSource
.addColumn("f", "pname", String
.class);
hBaseTableSource
.addColumn("f", "pt", String
.class);
tEnv
.registerFunction("t_product", hBaseTableSource
.getLookupFunction(new String[]{"rowKey"}));
}
}
hbase sink 定义
public abstract class AbstractHbaseSinkFunction<OUT> extends RichSinkFunction<OUT> {
protected static String cf_String
= "f";
protected static byte[] cf
= Bytes
.toBytes(cf_String
);
protected String tableName
= null
;
private Connection connection
;
@Override
public void open(Configuration parameters
) throws Exception
{
super.open(parameters
);
ExecutionConfig
.GlobalJobParameters globalParams
= getRuntimeContext().getExecutionConfig()
.getGlobalJobParameters();
org
.apache
.hadoop
.conf
.Configuration conf
= HBaseConfiguration
.create();
conf
.set("hbase.zookeeper.quorum", globalParams
.toMap().get("hbase.zookeeper.quorum"));
if (null
== connection
) {
connection
= ConnectionFactory
.createConnection(conf
);
}
}
@Override
public void invoke(OUT value
, Context context
) throws Exception
{
HTable table
= null
;
try {
table
= (HTable
) connection
.getTable(TableName
.valueOf(tableName
));
handle(value
, context
, table
);
} finally {
table
.close();
}
}
protected abstract void handle(OUT value
, Context context
, HTable table
) throws Exception
;
@Override
public void close() throws Exception
{
connection
.close();
}
protected byte[] getByteValue(Object value
){
if(value
==null
){
return Bytes
.toBytes("");
}else{
return Bytes
.toBytes(value
.toString());
}
}
}
public class UserHbaseSinkFunction extends AbstractHbaseSinkFunction<User> {
public UserHbaseSinkFunction(){
tableName
= "t_user";
}
@Override
protected void handle(User value
, Context context
, HTable table
) throws Exception
{
Integer rowkey1
= value
.getUserID();
Put put1
= new Put(Bytes
.toBytes(rowkey1
));
put1
.addColumn(cf
, Bytes
.toBytes("uid"), getByteValue(value
.getUserID()));
put1
.addColumn(cf
, Bytes
.toBytes("uname"), getByteValue(value
.getUserName()));
put1
.addColumn(cf
, Bytes
.toBytes("sex"), getByteValue(value
.getSex()));
put1
.addColumn(cf
, Bytes
.toBytes("addr"), getByteValue(value
.getAddress()));
table
.put(put1
);
}
}
public class ProductHbaseSinkFunction extends AbstractHbaseSinkFunction<Product> {
public ProductHbaseSinkFunction(){
tableName
= "t_product";
}
@Override
protected void handle(Product value
, Context context
, HTable table
) throws Exception
{
Integer rowkey1
= value
.getProductID();
Put put1
= new Put(Bytes
.toBytes(rowkey1
));
put1
.addColumn(cf
, Bytes
.toBytes("pid"), getByteValue(value
.getProductID()));
put1
.addColumn(cf
, Bytes
.toBytes("pname"), getByteValue(value
.getProductName()));
put1
.addColumn(cf
, Bytes
.toBytes("pt"), getByteValue(value
.getProductType()));
table
.put(put1
);
}
}
MySQL sink 定义
public class OrderMysqlSinkFunction extends RichSinkFunction<List
<Result>> {
public static Logger logger
= LogManager
.getLogger(OrderMysqlSinkFunction
.class);
private DataSource dataSource
= null
;
@Override
public void open(Configuration parameters
) throws Exception
{
logger
.info("MysqlSinkFunction open");
super.open(parameters
);
Map
<String, String> globalParams
= getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();
String dbUrl
= globalParams
.get("dbUrl");
String dbName
= globalParams
.get("dbName");
String user
= globalParams
.get("user");
String psw
= globalParams
.get("psw");
try {
dataSource
= DBUtil
.getDataSource(dbUrl
, dbName
, user
, psw
);
} catch (Exception e
) {
logger
.error("" + e
);
}
}
@Override
public void invoke(List
<Result> results
, Context context
) throws Exception
{
Connection connection
= dataSource
.getConnection();
PreparedStatement ps
= null
;
try {
String sql
= "INSERT INTO t_result (userName, productType, productName, buyDate, productCount, totalMoney) " +
" values (?,?,?,?,?,?) on duplicate key " +
" update productCount = values(productCount),totalMoney=values(totalMoney) ";
ps
= connection
.prepareStatement(sql
);
for (Result record
: results
) {
ps
.setObject(1, record
.getUserName());
ps
.setObject(2, record
.getProductType());
ps
.setObject(3, record
.getProductName());
ps
.setObject(4, record
.getBuyDate());
ps
.setObject(5, record
.getProductCount());
ps
.setObject(6, record
.getTotalMoney());
ps
.addBatch();
}
ps
.executeBatch();
}catch (Exception e
){
logger
.error("数据入库异常:"+e
);
throw new RuntimeException("gcoll_vmotask_sum保存数据库失败:"+e
);
}finally {
DBUtil
.close(ps
);
}
}
@Override
public void close() {
logger
.info("MysqlSinkFunction close");
DBUtil
.closeDataSource();
}
}
计算任务定义
public class JobDefine {
public static void userJob(StreamExecutionEnvironment env
,Properties props
){
SingleOutputStreamOperator
<String> userSource
= env
.addSource(new FlinkKafkaConsumer010<>("user表对应的topic", new SimpleStringSchema(), props
));
SingleOutputStreamOperator
<User> userMessageStream
=userSource
.flatMap(new MyKafkaRichFlatMapFunction<User>()).returns(User
.class);
userMessageStream
.addSink(new UserHbaseSinkFunction()).name("UserHbaseSinkFunction");
}
public static void productJob(StreamExecutionEnvironment env
,Properties props
){
SingleOutputStreamOperator
<String> productSource
= env
.addSource(new FlinkKafkaConsumer010<>("product表对应的topic", new SimpleStringSchema(), props
));
SingleOutputStreamOperator
<Product> productStream
=productSource
.flatMap(new MyKafkaRichFlatMapFunction<Product>()).returns(Product
.class);
productStream
.addSink(new ProductHbaseSinkFunction()).name("ProductHbaseSinkFunction");
}
public static void orderJob(StreamExecutionEnvironment env
, StreamTableEnvironment tEnv
,Properties props
){
SingleOutputStreamOperator orderSource
= env
.addSource(new FlinkKafkaConsumer010<>("order表对应的topic", new SimpleStringSchema(), props
));
SingleOutputStreamOperator
<Order> orderMessageStream
= orderSource
.flatMap(new MyKafkaRichFlatMapFunction<Order>()).returns(Order
.class);
tEnv
.createTemporaryView("t_order", orderMessageStream
);
new OrderFunction().handle(orderMessageStream
, env
,tEnv
);
}
}
实时计算
public class OrderFunction {
public void handle(SingleOutputStreamOperator streamOperator
,StreamExecutionEnvironment env
,
StreamTableEnvironment tEnv
){
Table collectionTaskResult
= tEnv
.sqlQuery("select o.buyDate,sum(o.productCount) ,p.pname,p.pt,sum(o.money*o.productCount) ,u.uname" +
" from t_order o,lateral table(t_user(o.userID)) u,lateral table(t_product(o.productID)) p " +
" where o.userID=u.uid and o.productID=p.pid group by u.uname,p.pt,p.pname,o.buyDate");
DataStream
<Tuple2
<Boolean, Result>> stream
= tEnv
.toRetractStream(collectionTaskResult
, Result
.class);
stream
.windowAll(TumblingProcessingTimeWindows
.of(Time
.seconds(10)))
.process(new ProcessAllWindowFunction<Tuple2
<Boolean, Result>, List
<Result>, TimeWindow
>() {
public void process(Context context
, Iterable
<Tuple2
<Boolean, Result>> values
, Collector
<List
<Result>> out
) throws Exception
{
Iterator
<Tuple2
<Boolean, Result>> iterator
= values
.iterator();
Map
<String, Result> result
= Maps
.newHashMap();
while (iterator
.hasNext()) {
Tuple2
<Boolean, Result> tuple
= iterator
.next();
if (tuple
.getField(0)) {
Result item
= tuple
.getField(1);
result
.put(item
.getUserName()+":"+item
.getProductType()+":"+item
.getProductName()+":"+item
.getBuyDate(), item
);
}
}
out
.collect(Lists
.newArrayList(result
.values()));
}
}).setParallelism(1)
.addSink(new OrderMysqlSinkFunction()).name("OrderMysqlSinkFunction");
}
}
是不是感觉flink-sql很牛逼呢,但就像文章最开始所述的flink-sql是在flink流式计算的基础上进行了高度抽象,它不能实现所有功能。
订单支付成功后,可以退款,退款完成后订单状态会变成失效,那么统计结果中不应该包含退款成功后相关数据, 那么如何使用sql进行计算呢?我也不知道,但可以使用流式计算进行操作,下次在总结。
MySQL数据库连接工具类
public class DBUtil {
private static Logger logger
= LogManager
.getLogger(DBUtil
.class);
private static DruidDataSource dataSource
;
public static DruidDataSource
getDataSource(String dbUrl
, String dbName
, String username
, String password
) {
try {
if (dataSource
== null
) {
synchronized (DBUtil
.class) {
if (dataSource
== null
) {
dataSource
= new DruidDataSource();
dataSource
.setDriverClassName("com.mysql.jdbc.Driver");
dataSource
.setUrl("jdbc:mysql://#/%?autoReconnect=true&useUnicode=true&useAffectedRows=true&characterEncoding=utf8".replace("#", dbUrl
).replace("%", dbName
));
dataSource
.setUsername(username
);
dataSource
.setPassword(password
);
dataSource
.setInitialSize(20);
dataSource
.setMinIdle(20);
dataSource
.setMaxActive(100);
dataSource
.setMaxWait(60000);
dataSource
.setTimeBetweenEvictionRunsMillis(60000);
dataSource
.setMinEvictableIdleTimeMillis(300000);
dataSource
.setValidationQuery("SELECT 1 FROM DUAL");
dataSource
.setTestWhileIdle(true);
dataSource
.setTestOnBorrow(false);
dataSource
.setTestOnReturn(false);
dataSource
.setPoolPreparedStatements(true);
dataSource
.setMaxPoolPreparedStatementPerConnectionSize(20);
dataSource
.setFilters("stat,wall,log4j");
dataSource
.setConnectionProperties("druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000");
}
}
}
} catch (Exception e
) {
logger
.error("初始化数据源信息异常....", e
);
}
return dataSource
;
}
public static DataSource
getDataSource() {
return dataSource
;
}
public static void closeDataSource() {
logger
.info("method datasource close !");
if (dataSource
!= null
) {
dataSource
.close();
}
}
public static Connection
getConnection() {
Connection conn
= null
;
try {
conn
= dataSource
.getConnection();
} catch (SQLException e
) {
logger
.error("获取数据库连接异常", e
);
}
return conn
;
}
public static void close(Connection conn
,Statement st
,ResultSet rs
) {
if(rs
!=null
) {
try {
rs
.close();
} catch (SQLException e
) {
logger
.error("释放数据库连接异常", e
);
}
}
close(conn
,st
);
}
public static void close(Connection conn
,Statement st
) {
if(st
!=null
) {
try {
st
.close();
} catch (SQLException e
) {
logger
.error("关闭Statement异常", e
);
}
}
close(conn
);
}
public static void close(Statement st
) {
if(st
!=null
) {
try {
st
.close();
} catch (SQLException e
) {
logger
.error("关闭Statement异常", e
);
}
}
}
public static void close(Connection conn
) {
if(conn
!=null
) {
try {
conn
.close();
} catch (SQLException e
) {
logger
.error("关闭SConnection异常", e
);
}
}
}
public static void commit(Connection conn
){
if(conn
!=null
) {
try {
conn
.commit();
} catch (SQLException e
) {
logger
.error("提交事务异常", e
);
}
}
}
public static void rollback(Connection conn
){
if(conn
!=null
) {
try {
conn
.rollback();
} catch (SQLException e
) {
logger
.error("回滚异常", e
);
}
}
}
}