Hive内置函数和自定义函数(UDF、UDTF)

    技术2022-07-11  87

    内置函数 在Hive中查看所有的内置函数和使用方法命令:

    SHOW FUNCTIONS; DESCRIBE FUNCTION <function_name>; DESCRIBE FUNCTION EXTENDED <function_name>;

    内置函数详情可以查看官方文档: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inOperators

    自定义函数 创建自定义(UDF、UDTF)函数的流程: 1、自定义一个Java类 2、UDF继承 UDF 类,UDTF继承 GenericUDTF 类 3、UDF重写evaluate方法,UDTF重写initialize、process和close三个方法 4、打成jar包 6、在hive中执行add jar方法 7、在hive中执行创建模板函数 8、在hql中使用

    实战案例示例 1)在Maven项目工程的pom.xml文件中添加依赖:

    <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> </dependency>

    2)编写UDF类

    import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDF; import org.json.JSONException; import org.json.JSONObject; public class BaseFieldUDF extends UDF { public static void main(String[] args) { String line = "{"source": "/data/logs/hhh/test/access-web.2019-03-28.0.log", "thread": "test_1-thread-5", "log_date": "2019-03-28 16:10:43.273", "log_level": "INFO", "content": "com.xsluo.test.TokenFilter - token verify, reqPath=/test/proxy/api/publish, tokenVerifyUrl=http://192.168.2.100:8080/test/user/authorize, req={method=POST, module=test-module-admin, url=/test/proxy/api/publish}, token=bbalfed5-5b6a-43b4-82a6-7b6899822c98, res={"code":"000000","message":"操作成功","data": {"field1":"value1","field2":"value2","field3":"value3","field4":"value4","field5":"value5","field6":"value6"} }", "@version": "1", "log": {"file":{"path":"/data/logs/hhh/test/access-web.2019-03-28.0.log"}}, "offset": "12646428", "server_ip": "192.168.2.101", "app_name": "test-web", "@timestamp": "2019-03-28T08:10:43.273Z"}"; String jsonkeysString = "source,thread,log_date,log_level,content,version,offset,server_ip,app_name,timestamp"; String res = new BaseFieldUDF().evaluate(line, jsonkeysString); System.out.println(res); } public String evaluate(String line, String jsonkeysString){ StringBuilder sb = new StringBuilder(); String[] jsonkeys = jsonkeysString.split(","); if (StringUtils.isBlank(line)){ return ""; } try { JSONObject jsonObject = new JSONObject(line); for (int i = 0; i < jsonkeys.length; i++) { String fieldName = jsonkeys[i].trim(); if (jsonObject.has(fieldName)) { sb.append(jsonObject.getString(fieldName)).append("\t"); }else if (jsonObject.has("@"+fieldName)){ sb.append(jsonObject.getString("@"+fieldName)).append("\t"); }else { sb.append("\t"); } } } catch (JSONException e) { e.printStackTrace(); } return sb.toString(); } }

    3)编写UDTF类

    import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.json.JSONException; import org.json.JSONObject; import java.util.ArrayList; import java.util.regex.Matcher; import java.util.regex.Pattern; public class InterfaceJsonUDTF extends GenericUDTF { @Override public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); fieldNames.add("reqPath"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("reqMethod"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("reqModule"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("reqUrl"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("tokenVerifyUrl"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("token"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("resCode"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("resMessage"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("resData"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @Override public void process(Object[] objects) throws HiveException { // 获取传入的content String input = objects[0].toString(); // 如果传进来的数据为空,直接返回过滤掉该数据 if (StringUtils.isBlank(input)) { return; } else { String pattern = "reqPath=(.*?),\\s?tokenVerifyUrl=(.*?),\\s?.*?method=(.*?),\\s?module=(.*?),\\s?url=(.*?)},\\s?token=(.*?),\\s?res=(.*?}}?)"; Pattern r = Pattern.compile(pattern); Matcher m = r.matcher(input); String res = null; String[] result = new String[9]; if (m.find()) { result[0]= m.group(1); result[1]= m.group(3); result[2]= m.group(4); result[3]= m.group(5); result[4]= m.group(2); result[5]= m.group(6); res= m.group(7); } JSONObject resJsonObject = null; try { resJsonObject = new JSONObject(res); } catch (JSONException e) { e.printStackTrace(); } if (resJsonObject.has("code")) { try { result[6]= resJsonObject.getString("code"); } catch (JSONException e) { e.printStackTrace(); } } if (resJsonObject.has("message")) { try { result[7]= resJsonObject.getString("message"); } catch (JSONException e) { e.printStackTrace(); } } if (resJsonObject.has("data")) { try { result[8] = resJsonObject.getString("data"); } catch (JSONException e) { e.printStackTrace(); } } forward(result); } } @Override public void close() throws HiveException { } }

    4)打好JAR包之后上传到Hive的主目录下,然后在Hive中执行add jar 和 创建临时函数:

    add jar /home/hadoop/app/hive-1.2.1-bin/HiveFunction-1.0-SNAPSHOT.jar; create temporary function base_analizer as 'com.xsluo.udf.BaseFieldUDF'; create temporary function flat_analizer as 'com.xsluo.udtf.InterfaceJsonUDTF';

    5)Hive数仓中DWD层基表解析代码:

    set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table dwd_gammao_base_log partition (dt='2019-03-28') select source,thread,log_date,log_level,reqPath,reqMethod,reqModule,reqUrl,tokenVerifyUrl,token,resCode,resMessage,resData,version,offset,server_ip,app_name,sjc from ( select split(base_analizer(line,'source,thread,log_date,log_level,content,version,offset,server_ip,app_name,timestamp'),'\t')[0] as source, split(base_analizer(line,'source,thread,log_date,log_level,content,version,offset,server_ip,app_name,timestamp'),'\t')[1] as thread, split(base_analizer(line,'source,thread,log_date,log_level,content,version,offset,server_ip,app_name,timestamp'),'\t')[2] as log_date, split(base_analizer(line,'source,thread,log_date,log_level,content,version,offset,server_ip,app_name,timestamp'),'\t')[3] as log_level, split(base_analizer(line,'source,thread,log_date,log_level,content,version,offset,server_ip,app_name,timestamp'),'\t')[4] as content, split(base_analizer(line,'source,thread,log_date,log_level,content,version,offset,server_ip,app_name,timestamp'),'\t')[5] as version, split(base_analizer(line,'source,thread,log_date,log_level,content,version,offset,server_ip,app_name,timestamp'),'\t')[6] as offset, split(base_analizer(line,'source,thread,log_date,log_level,content,version,offset,server_ip,app_name,timestamp'),'\t')[7] as server_ip, split(base_analizer(line,'source,thread,log_date,log_level,content,version,offset,server_ip,app_name,timestamp'),'\t')[8] as app_name, split(base_analizer(line,'source,thread,log_date,log_level,content,version,offset,server_ip,app_name,timestamp'),'\t')[9] as sjc from ods_gammao_log where dt='2019-03-28' and base_analizer(line,'source,thread,log_date,log_level,content,version,offset,server_ip,app_name,timestamp')<>'' )t lateral view flat_analizer(content) t2 as reqPath, reqMethod, reqModule, reqUrl, tokenVerifyUrl, token, resCode, resMessage, resData;
    Processed: 0.012, SQL: 9