flink实战(一) flink-sql关联hbase维度数据处理

    技术2025-04-29  19

    概述

    最近项目中用到flink进行实时计算,流程为从kafka读取数据,如果是维度数据,则插入到hbase中,如果是需要实时计算的数据, 则进行实时计算,并将计算结果保存到MySQL中。在实时计算过程中,可能会用到hbase中的维度数据,为了开发的效率,使用flink-sql的方式实现。 flink-sql是在flink流式计算的基础上进行了高度抽象,使开发过程更简单,更有效率,但要理解sql执行背后的原理还是需要 仔细学习flink流式计算的相关内容。本文主要以flink-sql实现相关功能。

    假设需求如下:

    用户数据,需要从kafka中读取,并保存到hbase中,作为维度数据。商品数据,需要从kafka中读取,并保存到hbase中,作为维度数据。订单数据,需要从kafka中读取,并进行实时计算,最后将计算结果保存到MySQL中(表示支付成功后的订单数据)。

    假设需要统计用户每天的订单数量

    用户数据:userID, userName, sex, address (用户ID,用户名,性别,地址)

    商品数据:productID, productName, productType (商品ID,商品名称,商品类型)

    订单数据:orderID, userID, productID, productCount, money, buyDate (订单ID,用户ID,商品ID,商品数量,商品单价,购买日期)

    最终计算结果:userName, productType, productName, buyDate, productCount, totalMoney (用户名,商品类型,商品名称,购买日期,商品数量,商品总价) 即:要求计算某人在某日购买了某种类别,某种商品名称的商品数量,花费了多少钱

    maven依赖

    <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.11</artifactId> <version>1.10.0</version> <exclusions> <exclusion> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> </exclusion> <exclusion> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-shaded-client</artifactId> <version>1.4.3</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.4.3</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-client_2.11</artifactId> <version>1.10.0</version> <exclusions> <exclusion> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId> </exclusion> <exclusion> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hive</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-all</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.46</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.21</version> </dependency> <dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId> <version>2.2</version> </dependency> </dependencies>

    实体类

    @Data public class User implements Serializable { private Integer userID; private String userName; private String sex; private String address; } @Data public class Product implements Serializable { private Integer productID; private String productName; private String productType; } @Data public class Order implements Serializable { private Integer orderID; private Integer userID; private Integer productID; private Integer productCount; private Double money; private String buyDate; } @Data public class Result implements Serializable { private String userName; private String productType; private String productName; private String buyDate; private Integer productCount; private Double totalMoney; }

    创建执行环境

    public class Main { public static void main(String[] args)throws Exception{ //1. 相关参数设置 //创建流执行环境,并创建表执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //如果需要使用sql和table的话,这个必须设置 EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); //非常关键,一定要设置启动检查点,设置最少一次处理语义和恰一次处理语义 env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE); //按照处理时间进行计算 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // checkpoint的清除策略 CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.setTolerableCheckpointFailureNumber(0); //设置重启策略:3次尝试,每次尝试间隔60s env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 60000)); Map<String, String> maps = new ImmutableMap.Builder<String, String>(). put("kafka.url", args[0]). put("hbase.zookeeper.quorum", args[1]). put("dbUrl", args[2]). put("dbName", args[3]). put("user", args[4]). put("psw", args[5]).build(); //相关数据设置成全局配置,以便再后面的function中获取 env.getConfig().setGlobalJobParameters(ParameterTool.fromMap(maps)); //table中时间转换函数 tEnv.registerFunction("time_format", new DateFormatFunction()); //2. 创建hbase环境 //从全局配置中获取hbase的zookeeper地址 String zkUrl = env.getConfig().getGlobalJobParameters().toMap().getOrDefault("hbase.zookeeper.quorum", ""); Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", zkUrl); //2.1 创建hbase表 TableDefine.defineUserHbaseTable(tEnv, conf);//创建用户表 TableDefine.defineProductHbaseTable(tEnv, conf);//创建商品表 //3. 创建kafka环境 //3.1 从全局配置中获取kafka相关配置 String kafkaUrl = env.getConfig().getGlobalJobParameters().toMap().getOrDefault("kafka.url", ""); Properties props = new Properties(); props.setProperty("bootstrap.servers",kafkaUrl); props.setProperty("group.id", "配置的groupID"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //3.2 直接入库Hbase库的维度数据和需要进行实时计算的数据这里分别写了一个,因为还是有些不同的 //3.2.1 维度数据的处理 JobDefine.userJob(env, props); JobDefine.productJob(env, props); //实时计算的处理 JobDefine.orderJob(env,tEnv,props); //执行任务 env.execute("orderJob"); } }

    将kafka读取的json串转换成具体是Java实体类

    public class MyKafkaRichFlatMapFunction<OUT> extends RichFlatMapFunction<String, OUT> { private static Logger logger = LogManager.getLogger(MyKafkaRichFlatMapFunction.class); @Override public void flatMap(String value, Collector<OUT> collector) { try { if(value != null && !"".equals(value)){ Class<OUT> tClass = (Class<OUT>)((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0]; OUT out = JSONObject.parseObject(value, tClass); collector.collect(out); } } catch (Exception e) { logger.error("AbstractKafkaRichFlatMapFunction 发生异常:" + value, e); } } }

    sql中用到的时间转换函数

    public class DateFormatFunction extends ScalarFunction { public String eval(Timestamp time, String format) { return new SimpleDateFormat(format).format(new Date(time.getTime())); } }

    hbase 表定义

    public class TableDefine { public static void defineUserHbaseTable(StreamTableEnvironment tEnv,Configuration conf){ HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, "t_user"); //设置hbase表的rowKey及其类型 hBaseTableSource.setRowKey("rowKey", Integer.class); //设置hbase表的字段及其类型 第一个参数为列簇,第二个参数为字段名(最后简写以减少存储空间和执行效率),第三个参数为类型 hBaseTableSource.addColumn("f", "uid", Integer.class); hBaseTableSource.addColumn("f", "uname", String.class); hBaseTableSource.addColumn("f", "sex", String.class); hBaseTableSource.addColumn("f", "address", String.class); //向flinktable注册处理函数 // 第一个参数为注册函数名字,即flink-sql中的表名,而new HBaseTableSource(conf, "t_user")中的t_user为hbase的表名 // 第二个参数是一个TableFunction,返回结果为要查询出的数据列,即hbase表addColumn的哪些列,参数为rowkey,表示根据rowkey即userID进行查询 tEnv.registerFunction("t_user", hBaseTableSource.getLookupFunction(new String[]{"rowKey"})); } public static void defineProductHbaseTable(StreamTableEnvironment tEnv,Configuration conf){ HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, "t_product"); //设置hbase表的rowKey及其类型 hBaseTableSource.setRowKey("rowKey", Integer.class); //设置hbase表的字段及其类型 第一个参数为列簇,第二个参数为字段名(最后简写以减少存储空间和执行效率),第三个参数为类型 hBaseTableSource.addColumn("f", "pid", Integer.class); hBaseTableSource.addColumn("f", "pname", String.class); hBaseTableSource.addColumn("f", "pt", String.class); //向flinktable注册处理函数 // 第一个参数为注册函数名字,即flink-sql中的表名,而new HBaseTableSource(conf, "t_product")中的t_product为hbase的表名 // 第二个参数是一个TableFunction,返回结果为要查询出的数据列,即hbase表addColumn的哪些列,参数为rowkey,表示根据rowkey即userID进行查询 tEnv.registerFunction("t_product", hBaseTableSource.getLookupFunction(new String[]{"rowKey"})); } }

    hbase sink 定义

    //抽象类 public abstract class AbstractHbaseSinkFunction<OUT> extends RichSinkFunction<OUT> { protected static String cf_String = "f"; protected static byte[] cf = Bytes.toBytes(cf_String); protected String tableName = null; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig() .getGlobalJobParameters(); org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", globalParams.toMap().get("hbase.zookeeper.quorum")); if (null == connection) { connection = ConnectionFactory.createConnection(conf); } } @Override public void invoke(OUT value, Context context) throws Exception { HTable table = null; try { table = (HTable) connection.getTable(TableName.valueOf(tableName)); handle(value, context, table); } finally { table.close(); } } protected abstract void handle(OUT value, Context context, HTable table) throws Exception; @Override public void close() throws Exception { connection.close(); } protected byte[] getByteValue(Object value){ if(value==null){ return Bytes.toBytes(""); }else{ return Bytes.toBytes(value.toString()); } } } //用户sink public class UserHbaseSinkFunction extends AbstractHbaseSinkFunction<User> { public UserHbaseSinkFunction(){ tableName = "t_user"; } @Override protected void handle(User value, Context context, HTable table) throws Exception { Integer rowkey1 = value.getUserID(); Put put1 = new Put(Bytes.toBytes(rowkey1)); //这里的字段必须和定义表时的字段保存一致 put1.addColumn(cf, Bytes.toBytes("uid"), getByteValue(value.getUserID())); put1.addColumn(cf, Bytes.toBytes("uname"), getByteValue(value.getUserName())); put1.addColumn(cf, Bytes.toBytes("sex"), getByteValue(value.getSex())); put1.addColumn(cf, Bytes.toBytes("addr"), getByteValue(value.getAddress())); table.put(put1); } } //商品sink public class ProductHbaseSinkFunction extends AbstractHbaseSinkFunction<Product> { public ProductHbaseSinkFunction(){ tableName = "t_product"; } @Override protected void handle(Product value, Context context, HTable table) throws Exception { //这里的字段必须和定义表时的字段保存一致 Integer rowkey1 = value.getProductID(); Put put1 = new Put(Bytes.toBytes(rowkey1)); put1.addColumn(cf, Bytes.toBytes("pid"), getByteValue(value.getProductID())); put1.addColumn(cf, Bytes.toBytes("pname"), getByteValue(value.getProductName())); put1.addColumn(cf, Bytes.toBytes("pt"), getByteValue(value.getProductType())); table.put(put1); } }

    MySQL sink 定义

    public class OrderMysqlSinkFunction extends RichSinkFunction<List<Result>> { public static Logger logger = LogManager.getLogger(OrderMysqlSinkFunction.class); private DataSource dataSource = null; @Override public void open(Configuration parameters) throws Exception { logger.info("MysqlSinkFunction open"); super.open(parameters); Map<String, String> globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap(); String dbUrl = globalParams.get("dbUrl"); String dbName = globalParams.get("dbName"); String user = globalParams.get("user"); String psw = globalParams.get("psw"); try { dataSource = DBUtil.getDataSource(dbUrl, dbName, user, psw); } catch (Exception e) { logger.error("" + e); } } @Override public void invoke(List<Result> results, Context context) throws Exception { Connection connection = dataSource.getConnection(); PreparedStatement ps = null; try { //构建sql userName, productType, productName, buyDate, productCount, totalMoney String sql = "INSERT INTO t_result (userName, productType, productName, buyDate, productCount, totalMoney) " + " values (?,?,?,?,?,?) on duplicate key " + " update productCount = values(productCount),totalMoney=values(totalMoney) "; ps = connection.prepareStatement(sql); for (Result record : results) { ps.setObject(1, record.getUserName()); ps.setObject(2, record.getProductType()); ps.setObject(3, record.getProductName()); ps.setObject(4, record.getBuyDate()); ps.setObject(5, record.getProductCount()); ps.setObject(6, record.getTotalMoney()); ps.addBatch(); } ps.executeBatch(); }catch (Exception e){ logger.error("数据入库异常:"+e); throw new RuntimeException("gcoll_vmotask_sum保存数据库失败:"+e); }finally { DBUtil.close(ps); } } @Override public void close() { logger.info("MysqlSinkFunction close"); DBUtil.closeDataSource(); } }

    计算任务定义

    public class JobDefine { //用户数据直接保存hbase public static void userJob(StreamExecutionEnvironment env,Properties props){ //获取kafka对应的source流 SingleOutputStreamOperator<String> userSource = env.addSource(new FlinkKafkaConsumer010<>("user表对应的topic", new SimpleStringSchema(), props)); //将kafka读取的数据(为json字符串SimpleStringSchema)转换成Java对象 自定义类,这里必须调用.returns(User.class),不然启动会报错 SingleOutputStreamOperator<User> userMessageStream=userSource.flatMap(new MyKafkaRichFlatMapFunction<User>()).returns(User.class); //将Java对象保存到hbase中 userMessageStream.addSink(new UserHbaseSinkFunction()).name("UserHbaseSinkFunction"); } //商品数据直接保存hbase public static void productJob(StreamExecutionEnvironment env,Properties props){ //获取kafka对应的source流 SingleOutputStreamOperator<String> productSource = env.addSource(new FlinkKafkaConsumer010<>("product表对应的topic", new SimpleStringSchema(), props)); //将kafka读取的数据(为json字符串SimpleStringSchema)转换成Java对象 自定义类,这里必须调用.returns(Product.class),不然启动会报错 SingleOutputStreamOperator<Product> productStream=productSource.flatMap(new MyKafkaRichFlatMapFunction<Product>()).returns(Product.class); //将Java对象保存到hbase中 productStream.addSink(new ProductHbaseSinkFunction()).name("ProductHbaseSinkFunction"); } //订单数据需要进行计算统计 public static void orderJob(StreamExecutionEnvironment env, StreamTableEnvironment tEnv,Properties props){ SingleOutputStreamOperator orderSource = env.addSource(new FlinkKafkaConsumer010<>("order表对应的topic", new SimpleStringSchema(), props)); //将kafka读取的数据(为json字符串SimpleStringSchema)转换成Java对象 自定义类,这里必须调用.returns(Order.class),不然启动会报错 SingleOutputStreamOperator<Order> orderMessageStream = orderSource.flatMap(new MyKafkaRichFlatMapFunction<Order>()).returns(Order.class); //将当前订单流注册到flinktable 中,第一个参数就是表名,第二个参数就是数据流 tEnv.createTemporaryView("t_order", orderMessageStream); //将order数据进行实时计算并将结果保存到MySQL中 new OrderFunction().handle(orderMessageStream, env,tEnv); } }

    实时计算

    public class OrderFunction { public void handle(SingleOutputStreamOperator streamOperator,StreamExecutionEnvironment env, StreamTableEnvironment tEnv){ /** * 这里比较坑,需要说明下: * 1. 字段顺序的问题,select中的字段顺序和类型必须和Result类中一致,不然会报异常 Field types of query result and registered TableSink do not match * flink框架会根据返回结果Result类中的属性的字典升序进行排序,并将排序完后的字段及其字段类型和select 中的字段进行类型匹配,只要类型匹配成功则成功 * 如下面的返回类型Result类,字段按照字典升序排列后为:[buyDate: STRING, productCount: INT, productName: STRING, productType: STRING, totalMoney: DOUBLE, userName: STRING] * 那么我们的select字段的返回类型就必须为:[STRING, INT, STRING, STRING,DOUBLE,STRING] * 假设我们的sql为(sum(o.productCount) ,o.buyDate交换了位置):select sum(o.productCount) ,o.buyDate,p.pname,p.pt,sum(o.money*o.productCount) ,u.uname 则会报错Field types of query result and registered TableSink do not match * 假设我们的sql为(pt和pname交换了位置):select o.buyDate,sum(o.productCount),p.pt,p.pname,sum(o.money*o.productCount) ,u.uname,那么Result类 * 的productName的值将会为pt的值,productType的值将会为pname的值 * 2. 对于hbase维度数据,维度数据的字段为简写,如username被写成的uname,那么这里select 中的字段必须和hbase表定义的字段一致,如p.pname,u.uname * 3. 如果select中的字段类型和result类中字段类型不一样时,需要在sql中使用相关函数进行转换 * 4. 在这里的t_order必须和JobDefine中注册到table环境中的【视图名字】一致,如tEnv.createTemporaryView("t_order", orderMessageStream); * 5. 这里的t_user必须和TableDefine中注册到table环境中的【注册函数名】 一致,如:tEnv.registerFunction("t_user", hBaseTableSource.getLookupFunction(new String[]{"rowKey"})); * 6.这里的t_product必须和TableDefine中注册到table环境中的【注册函数名】 一致,如:tEnv.registerFunction("t_product", hBaseTableSource.getLookupFunction(new String[]{"rowKey"})); * */ Table collectionTaskResult = tEnv.sqlQuery("select o.buyDate,sum(o.productCount) ,p.pname,p.pt,sum(o.money*o.productCount) ,u.uname" + " from t_order o,lateral table(t_user(o.userID)) u,lateral table(t_product(o.productID)) p " + " where o.userID=u.uid and o.productID=p.pid group by u.uname,p.pt,p.pname,o.buyDate"); DataStream<Tuple2<Boolean, Result>> stream = tEnv.toRetractStream(collectionTaskResult, Result.class); //windAll 这里是为了减轻数据库的压力,每10秒保存一次库,在存库前将数据进行处理,只保存唯一键相同的一条数据 stream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))) .process(new ProcessAllWindowFunction<Tuple2<Boolean, Result>, List<Result>, TimeWindow>() { //values的数量等于这个时间窗口范围内的所有数据量 public void process(Context context, Iterable<Tuple2<Boolean, Result>> values, Collector<List<Result>> out) throws Exception { Iterator<Tuple2<Boolean, Result>> iterator = values.iterator(); Map<String, Result> result = Maps.newHashMap(); while (iterator.hasNext()) { Tuple2<Boolean, Result> tuple = iterator.next(); //输入流进行收集 if (tuple.getField(0)) { Result item = tuple.getField(1); //这样map的结果表示为唯一键相同时的最新数据 result.put(item.getUserName()+":"+item.getProductType()+":"+item.getProductName()+":"+item.getBuyDate(), item); } } out.collect(Lists.newArrayList(result.values())); } }).setParallelism(1)//之所以要将并行度设置为1,是为了防止并发写入数据库导致数据错误 //将Java对象保存到MySQL中 .addSink(new OrderMysqlSinkFunction()).name("OrderMysqlSinkFunction"); } }

    是不是感觉flink-sql很牛逼呢,但就像文章最开始所述的flink-sql是在flink流式计算的基础上进行了高度抽象,它不能实现所有功能。

    订单支付成功后,可以退款,退款完成后订单状态会变成失效,那么统计结果中不应该包含退款成功后相关数据, 那么如何使用sql进行计算呢?我也不知道,但可以使用流式计算进行操作,下次在总结。

    MySQL数据库连接工具类

    public class DBUtil { private static Logger logger = LogManager.getLogger(DBUtil.class); private static DruidDataSource dataSource; public static DruidDataSource getDataSource(String dbUrl, String dbName, String username, String password) { try { if (dataSource == null) { synchronized (DBUtil.class) { if (dataSource == null) { dataSource = new DruidDataSource(); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); dataSource.setUrl("jdbc:mysql://#/%?autoReconnect=true&useUnicode=true&useAffectedRows=true&characterEncoding=utf8".replace("#", dbUrl).replace("%", dbName)); dataSource.setUsername(username); dataSource.setPassword(password); //configuration dataSource.setInitialSize(20); dataSource.setMinIdle(20); dataSource.setMaxActive(100); dataSource.setMaxWait(60000); dataSource.setTimeBetweenEvictionRunsMillis(60000); dataSource.setMinEvictableIdleTimeMillis(300000); dataSource.setValidationQuery("SELECT 1 FROM DUAL"); dataSource.setTestWhileIdle(true); dataSource.setTestOnBorrow(false); dataSource.setTestOnReturn(false); dataSource.setPoolPreparedStatements(true); dataSource.setMaxPoolPreparedStatementPerConnectionSize(20); dataSource.setFilters("stat,wall,log4j"); dataSource.setConnectionProperties("druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000"); } } } } catch (Exception e) { logger.error("初始化数据源信息异常....", e); } return dataSource; } /** * 获取数据库连接池 */ public static DataSource getDataSource() { return dataSource; } /** * 关闭数据库连接池 */ public static void closeDataSource() { logger.info("method datasource close !"); if (dataSource != null) { dataSource.close(); } } /** * 获取数据库连接 */ public static Connection getConnection() { Connection conn = null; try { conn = dataSource.getConnection(); } catch (SQLException e) { logger.error("获取数据库连接异常", e); } return conn; } /** * 释放数据库连接 */ public static void close(Connection conn,Statement st,ResultSet rs) { if(rs!=null) { try { rs.close(); } catch (SQLException e) { logger.error("释放数据库连接异常", e); } } close(conn,st); } public static void close(Connection conn,Statement st) { if(st!=null) { try { st.close(); } catch (SQLException e) { logger.error("关闭Statement异常", e); } } close(conn); } public static void close(Statement st) { if(st!=null) { try { st.close(); } catch (SQLException e) { logger.error("关闭Statement异常", e); } } } public static void close(Connection conn) { if(conn!=null) { try { conn.close(); } catch (SQLException e) { logger.error("关闭SConnection异常", e); } } } public static void commit(Connection conn){ if(conn!=null) { try { conn.commit(); } catch (SQLException e) { logger.error("提交事务异常", e); } } } public static void rollback(Connection conn){ if(conn!=null) { try { conn.rollback(); } catch (SQLException e) { logger.error("回滚异常", e); } } } }
    Processed: 0.017, SQL: 9