flume收集nginx日志文件存储mysql

    技术2022-07-12  75

    现在需要对项目组提供的查询API进行流量统计,准备采用flume收集Nginx的host.access.log日志文件进行分析。

    初步方案:每台Nginx里部署一个flume。最后在部署一个聚合flume,编写自定义mysqlsink写入mysql(写hdfs更好....)

    贴代码:

    mysql的sink类

    import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; /** * 自定义Sink类 */ public class NginxToMySQLSink extends AbstractSink implements Configurable { private Connection conn; private Statement stmt; private String columnNames; private String url; private String user; private String password; private String tableName; // 在整个sink结束时执行一遍 @Override public synchronized void stop() { } // 在整个sink开始时执行一遍 @Override public synchronized void start() { try { System.out.println("Driver loading...."); Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection(url, user, password); stmt = conn.createStatement(); System.out.println("Driver OK"); } catch (Exception e) { e.printStackTrace(); } } // 不断循环调用 public Status process() throws EventDeliveryException { Status res = Status.READY; Channel channel = this.getChannel(); Transaction tx = channel.getTransaction(); Event event; tx.begin(); while (true) { event = channel.take(); if (event != null) { break; } } try { String body = new String(event.getBody(),"UTF-8"); System.out.println("get body:"+body); if(AccessLogUtil.isNginxLog(body)){ String sql = "insert into " + tableName + "(" + columnNames + ") values("+ AccessLogUtil.getSqlValuesString(body) +")"; // System.out.println("========sql======= " + sql); stmt.executeUpdate(sql); }else { System.out.println("input error! passing..."); } } catch (Exception th) { th.printStackTrace(); } finally { tx.commit(); tx.close(); } return res; } public void configure(Context context) { columnNames = context.getString("columnNames"); url = context.getString("url"); user = context.getString("user"); password = context.getString("password"); tableName = context.getString("tableName"); } }

    清洗 nginx的日志文件格式

    import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * 解析nginx日志工具类 */ public class AccessLogUtil { public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH); public static final SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 解析英文时间字符串 * @param string * @return * @throws ParseException */ private static Date parseDateFormat(String string) { Date parse = null; try { parse = FORMAT.parse(string); } catch (ParseException e) { e.printStackTrace(); } return parse; } /** * 判断是否是nginx的日志,判断第一个是否是IP地址 * @param string * @return */ public static Boolean isNginxLog(String string){ String ip = string.substring(0, string.indexOf("-")).trim(); String pattern = "((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})(\\.((2(5[0-5]|[0-4]\\d))|[0-1]?\\d{1,2})){3}"; Pattern r = Pattern.compile(pattern); Matcher m = r.matcher(ip); if (m.matches()) { return true; } return false; } /** * 解析日志的行记录 */ public static Map<String,String> parse(String line) { ConcurrentHashMap<String,String> result = new ConcurrentHashMap<String, String>(); result.put("ip","\""+parseIP(line)+"\""); result.put("time","\""+parseTime(line)+"\""); result.put("url","\""+parseURL(line)+"\""); result.put("status","\""+parseStatus(line)+"\""); result.put("traffic","\""+parseTraffic(line)+"\""); result.put("sourcePath","\""+parseSource(line)+"\""); return result; } private static String parseTraffic(String line) { int start = line.indexOf("\""); int second = line.indexOf("\"", start + 1); int three = line.indexOf("\"", second + 1); final String trim = line.substring(second + 1, three).trim(); String traffic = trim.split(" ")[1]; return traffic; } private static String parseStatus(String line) { int start = line.indexOf("\""); int second = line.indexOf("\"", start + 1); int three = line.indexOf("\"", second + 1); final String trim = line.substring(second + 1, three).trim(); String status = trim.split(" ")[0]; return status; } private static String parseURL(String line) { final int first = line.indexOf("\""); final int second = line.indexOf("\"", first + 1); final int last = line.lastIndexOf("\""); String url = line.substring(first + 1, second); return url; } private static String parseTime(String line) { final int first = line.indexOf("["); final int last = line.indexOf("+0800]"); String time = line.substring(first + 1, last).trim(); Date date = parseDateFormat(time); return dateformat1.format(date); } private static String parseIP(String line) { String ip = line.substring(0, line.indexOf("-")).trim(); return ip; } private static String parseSource(String line) { final int end = line.lastIndexOf("\""); final int end1 = line.lastIndexOf("\"", end - 3); final int end2 = line.lastIndexOf("\"", end1 - 1); String sourcePath = line.substring(end2 + 1, end1).trim(); return sourcePath; } public static String getSqlValuesString(String body){ Map<String,String> result = parse(body); String s = result.get("ip")+","+result.get("time")+","+result.get("url")+","+result.get("sourcePath")+","+result.get("status")+","+result.get("traffic"); return s; } public static void main(String args[]) { // String s1 = "10.25.24.133 - admin [07/Mar/2019:14:19:53 +0800] \"GET /oss-eureka-server/console HTTP/1.1\" 200 21348 \"http://218.200.65.200:9425/oss-web/main.jsp\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36\"\n"; // String s1 = "192.168.20.1 - - [29/Jun/2020:12:50:40 +0800] \"GET /favicon.ico HTTP/1.1\" 502 537 \"http://192.168.20.128/\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36\" \"-\""; // String s1 = "192.168.20.1 - - [16/Jun/2020:17:31:58 +0800] \"GET /test HTTP/1.1\" 200 6 \"-\" \"PostmanRuntime/7.25.0\" \"-\""; String s1 = "192.168.20.1 - - [29/Jun/2020:15:50:02 +0800] \"GET / HTTP/1.1\" 502 537 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleW/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36\" \"-\""; Map<String, String> parse = AccessLogUtil.parse(s1); // AccessLogUtil.isNginxLog(s1); System.out.println(parse); System.out.println(parse.get("ip")); String sqlValues = getSqlValuesString(s1); System.out.println(sqlValues); } }

    打成jar后,把mysql驱动和自定义的jar包,放到flume文件的lib文件。

    以下是job的conf文件和启动命令

    #===== nginx服务器端flume ===== a2.sources = r2 a2.sinks = k2 a2.channels = c2 a2.sources.r2.type = exec a2.sources.r2.command = tail -F /opt/modules/nginx/logs/host.access.log a2.sources.r2.shell = /bin/bash -c a2.sinks.k2.type = avro a2.sinks.k2.hostname = centos03 a2.sinks.k2.port = 4141 a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 #===== 聚合flume端 ===== a2.sources = r2 a2.sinks = k2 a2.channels = c2 a2.sources.r2.type = avro a2.sources.r2.bind = centos03 a2.sources.r2.port = 4141 a2.sinks.k2.type = com.zr.loggertomysql.NginxToMySQLSink a2.sinks.k2.url=jdbc:mysql://localhost:3306/db_test?useSSL=false a2.sinks.k2.tableName= nginx_logger a2.sinks.k2.user=root a2.sinks.k2.password=xxxxxx a2.sinks.k2.columnNames=ip,time,url,sourcePath,status,traffic a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 #===启动命令=== bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-nginx-sender.conf -Dflume.root.logger=INFO,console bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-nginx-receiver.conf -Dflume.root.logger=INFO,console

     

    Processed: 0.025, SQL: 9