1,因业务需要把hive的数据通过WaterDrop抽取到clickhouse,但是每次都要写配置文件,故写一个配置文件自动生成配置信息.
1,版本信息:
waterdrop版本 :1.5.0 spark版本:3.0.0 hive版本: 3.0.03. pom.xml文件如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>hdp-udf</groupId> <artifactId>hdp-udf</artifactId> <version>4.1</version> <packaging>jar</packaging> <name>hdp-udf</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <hive.version>3.1.0</hive.version> <hadoop.version>3.1.0</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0.2.6.4.76-1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-service</artifactId> <version>2.1.0</version> <scope>provided</scope> </dependency> <!-- 自研组件依赖 --> <dependency> <groupId>com.crgecent</groupId> <artifactId>crgt-util</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.25</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> </dependencies> <build> <finalName>hiveudf_${version}</finalName> <resources> <resource> <directory>src/main/java</directory> </resource> </resources> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.3.1</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <phase>compile</phase> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <!-- <archive>--> <!-- <manifest>--> <!-- <mainClass>com.crgt.BlockInstance2ck</mainClass>--> <!-- </manifest>--> <!-- </archive>--> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <repositories> <repository> <id>ownaliyunmaven</id> <name>own aliyun maven repository</name> <url>http://10.3.1.29:8081/repository/aliyun/</url> </repository> <repository> <id>ownmaven</id> <name>own maven repository</name> <url>http://10.3.1.29:8081/repository/maven-central/</url> </repository> <repository> <id>Hortonworks Repository</id> <url>http://repo.hortonworks.com/content/repositories/releases/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> <repository> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> <id>hortonworks.other</id> <name>Hortonworks Other Dependencies</name> <url>http://repo.hortonworks.com/content/groups/public</url> </repository> </repositories> </project>2,因为waterdrop抽取hive的分区表有点问题,所以目前只支持非分区表的情况,如果有需要支持分区表,可以修改udf完成支持
package com.xxx; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.io.Text; import java.sql.*; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * https://blog.csdn.net/xiao_jun_0820/article/details/76638198 * 根据hive的表在,自动生成一个ck的表,输入值要求是带ck的中间表 * wpp 自己开发的 * 部署在06上 * /home/admin/waterdrop * */ public class TableCreateGenericUDF extends GenericUDF { private static String driverName = "org.apache.hive.jdbc.HiveDriver"; StringObjectInspector keyElementOI; @Override public Object evaluate(DeferredObject[] arg0) throws HiveException{ String str = keyElementOI.getPrimitiveJavaObject(arg0[0].get()); if( str == null || str == "null" ){ return new Text("获取参数错误,需要"); } StringBuffer res = new StringBuffer( ); System.out.println("输入值 : " +str); String newStr = str.replace("'", "").replace("\"", ""); System.out.println("table: " +newStr); if(newStr.split("\\.").length == 1 ){ return new Text("请输入数据库名"); } String databaseName = newStr.split("\\.")[0]; String tableName = newStr.split("\\.")[1];//要求带ck try{ String createTableInfo = getCKCreateTableInfo(databaseName,tableName); res.append(createTableInfo); res.append("\n========================配置信息 "+ tableName+ ".conf ==================\n"); String tableMetaInfo = TableParse.getTableMetaInfo(newStr); String ckConfInfo = getCKConfInfo(databaseName,tableName,tableMetaInfo); res.append(ckConfInfo); }catch (Exception e){ e.printStackTrace(); System.out.println("获取结果异常:" +e.getMessage()); } return new Text(res.toString()); } public static String getCKConfInfo(String databaseName ,String tableName,String tableMetaInfo ) { String fullTableName = databaseName +"."+ tableName; String ckFullTableName = databaseName +"."+ tableName.replaceAll("_ck",""); String res = "spark {\n" + " spark.app.name = \"" + tableName+"_2_ck\"\n" + " spark.executor.instances = 6\n" + " spark.executor.cores = 2\n" + " spark.executor.memory = \"2g\"\n" + " spark.sql.catalogImplementation = \"hive\"\n" + "}\n" + "\n" + "input {\n" + " hive {\n" + " pre_sql = \"select * from "+ fullTableName +"\" \n" + " table_name = \""+tableName+"\"\n" + " }\n" + "}\n" + "\n" + "filter {\n" + " remove {\n" + " # source_field = [\"bizdate\"]\n" + " }\n" + " \n" + tableMetaInfo +"\n"+ "}\n" + "\n" + "output {\n" + " clickhouse {\n" + " host = \"10.2.12.56:8123\"\n" + " database = \""+databaseName+"\"\n" + " table = \""+ ckFullTableName+"_cls\"\n" + " username = \"root\"\n" + " password = \"root\"\n" + " }\n" + "}\n"; return res; } public static String getCKCreateTableInfo(String databaseName ,String tableName) throws SQLException{ String dbTableName = databaseName +"."+tableName; try { Class.forName(driverName); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block System.out.println("================================="); e.printStackTrace(); System.exit(1); } Connection con = DriverManager.getConnection(Utils.HIVE_JDBC_URL, "admin", "admin"); Statement stmt = con.createStatement(); String sql = "describe " + dbTableName ; System.out.println("Running: " + sql); ResultSet res = stmt.executeQuery(sql); StringBuffer result = new StringBuffer(); StringBuffer resultCls = new StringBuffer(); String ckTable = tableName.replaceAll("_ck$",""); String ckDbTableName = databaseName +"." +ckTable; result.append("==============请确定主键和date类型的列,并把Nullable去掉=============\n"); result.append("CREATE TABLE " +ckDbTableName +" on cluster crm_4shards_1replicas (\n"); resultCls.append("CREATE TABLE " +ckDbTableName +"_cls on cluster crm_4shards_1replicas (\n"); while (res.next()) { String dataKey = res.getString(1); String dataType = res.getString(2); System.out.println(dataKey + "\t" + dataType); String ckDataType = Utils.getParseType(dataType.toLowerCase()); if(dataKey.equals("ckbizdate")){ result.append(" `" + dataKey + "` Date,\n" ); resultCls.append(" `" + dataKey + "` Date,\n" ); }else { result.append(" `" + dataKey + "` Nullable(" + ckDataType + "),\n"); resultCls.append(" `" + dataKey + "` Nullable(" + ckDataType + "),\n"); } } result =new StringBuffer( result.substring(0,result.length()-2)); resultCls =new StringBuffer( resultCls.substring(0,resultCls.length()-2)); result.append("\n)ENGINE = MergeTree(ckbizdate, id, 8192);\n\n"); resultCls.append("\n)ENGINE = Distributed(crm_4shards_1replicas,"+databaseName+" ," +ckTable+ ",rand());"); result.append(resultCls); return result.toString(); } @Override public String getDisplayString(String[] arg0) { return "TableCreateGenericUDF(database.tablename)"; } @Override public ObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException { if (arg0.length != 1) { throw new UDFArgumentException(" Expecting one arguments: database.tablename"); } // 1. 检查是否接收到正确的参数类型 ObjectInspector key = arg0[0]; if (!(key instanceof StringObjectInspector) ) { throw new UDFArgumentException("one argument must be a string"); } this.keyElementOI = (StringObjectInspector) key; // return PrimitiveObjectInspectorFactory.writableIntObjectInspector; return PrimitiveObjectInspectorFactory.writableStringObjectInspector; } }2.1 utils.java
package com.crgt; import java.util.regex.Pattern; /** * @Author: wpp * @Date: 2019/10/21 19:41 */ public class Utils { public static String HIVE_JDBC_URL= "jdbc:hive2://xx.xx.xx.xx:10000/default"; public static String getParseType(String oriDataType){ String dataType = null; switch (oriDataType.toLowerCase()){ case "boolean": dataType="Int64";break; case "TINYINT": dataType="Int64";break; case "SMALLINT": dataType="Int64";break; case "int": dataType="Int64";break; case "bigint": dataType="Int64";break; case "float": dataType="Float64";break; case "double": dataType="Float64";break; case "decimal": dataType="Float64";break; case "string": dataType="String";break; case "datetime": dataType="String";break; case "timestamp": dataType="String";break; default: dataType="999999999"; } return dataType; } public static String getMysqlParseType(String oriDataType){ String dataType = null; if(Pattern.matches(".*varchar.*",oriDataType) ||Pattern.matches(".*datetime.*",oriDataType) ||Pattern.matches(".*time.*",oriDataType) ||Pattern.matches(".*text.*",oriDataType) ){ dataType="String"; }else if(Pattern.matches(".*bigint.*.*unsigned.*",oriDataType) || Pattern.matches(".*tinyint.*.*unsigned.*",oriDataType) || Pattern.matches(".*int.*.*unsigned.*",oriDataType) ){ dataType="UInt64"; }else if(Pattern.matches(".*bigint.*",oriDataType) || Pattern.matches(".*tinyint.*",oriDataType) || Pattern.matches(".*int.*",oriDataType) ){ dataType="Int64"; }else if(Pattern.matches(".*float.*",oriDataType) || Pattern.matches(".*double.*",oriDataType) || Pattern.matches(".*decimal.*",oriDataType) ){ dataType="Float64"; }else { dataType="9999999999999"; } return dataType; } }3,创建永久udf函数
create function tableinfo as 'com.xxx.TableCreateGenericUDF' using jar 'hdfs:///hiveudf/hiveudf_3.7-jar-with-dependencies.jar';
4,在hive命令行执行如下代码(只能输入非分区的表):
select default.tableinfo('hive_table_name');5,查询结果如下,主要是对hive的int型转换,(clickhouse的int64或者Float64):
###### ###### This config file is a demonstration of batch processing in waterdrop config ###### spark { spark.app.name = "dwd_ord_coupon_base_df_2_ck" spark.executor.instances = 6 spark.executor.cores = 2 spark.executor.memory = "2g" spark.sql.catalogImplementation = "hive" } input { hive { pre_sql = "select * from cdm_dwd.dwd_ord_coupon_base_df where bizdate='20191013' limit 10 " #result_table_name = "ads.ads_user_portrait_vertical_df" table_name = "dwd_ord_coupon_base_df" } } filter { # split data by specific delimiter remove { source_field = ["bizdate"] } convert { source_field = "coupon_type" new_type = "long" } convert { source_field = "scene" new_type = "long" } convert { source_field = "status" new_type = "long" } convert { source_field = "threshold" new_type = "long" } convert { source_field = "discount" new_type = "long" } convert { source_field = "time_sharing" new_type = "long" } convert { source_field = "version" new_type = "long" } } output { # choose stdout output plugin to output data to console stdout { host = "xx.xx.xx.xx:8123" database = "cdm_dwd" table = "cdm_dwd.dwd_ord_coupon_base_df2_cls" # fields = ["user_no","first_classification","second_classification","third_classification","fourth_classification","options","detail","create_time"] username = "root" password = "root" } }
