在讲flume之前我们先来看一下hadoop的流程
一:flume的概述:
1.flume的定义: Flume 是Cloudera提供的高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。 Flume 支持定制各类数据发送方,用于收集各类型数据;Flume 支持定制各类数据发送方,用于收集各类型数据;同时, Flume 提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。 一般的采集需求,通过对 flume 的简单配置即可实现。 针对特殊场景也具备良好的自定义扩展能力。 因此, flume 可以适用于大部分的日常数据采集场景。并且flume基于流式架构,灵活简单。
2.flume的基础架构 数据流模型:Flume事件定义为具有字节有效负载和可选的字符串属性集的数据流单位。Flume代理是一个(JVM)进程,承载了组件,事件通过这些组件从外部源流到下一个目标(跳)。
2.1:Agent Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。 Agent主要有3个部分组成,Source、Channel、Sink。
2.2:Source Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、 exec、jms、 spooling directory、 netcat、sequence generator、syslog、http、legacy。
2.3:Sink
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。 Sink组件目的地包括 hdfs、 logger、 avro、thrift、ipc、 file、 HBase、solr、自定义。
2.4:Channel
具有解耦和缓冲的作用,Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。 Flume自带两种Channel: Memory Channel和 File Channel以及 Kafka Channel。 Memory Channel是内存中的队列,File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据,所以在一定程度不追求速度的情况下用File Channel是相对安全的。
2.5:Event
传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由 Header和 Body两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。
二:flume的安装部署(flume的安装部署基本上在主节点上面就可以了,以后有特别的需求可以在子节点上面在部署) 1.下载地址:http://archive.apache.org/dist/flume/ ,这里我下载的是flume1.7的版本。
2.安装部署 2.1 将apache-flume-1.7.0-bin.tar.gz上传到linux的/opt/software目录下
2.2 解压apache-flume-1.7.0-bin.tar.gz到/opt/module/目录下 tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
2.3 进入到/opt/module目录修改apache-flume-1.7.0-bin的名称为flume mv apache-flume-1.7.0-bin flume
2.4 将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件
在flume-env.sh文件中添加jdk的安装路径,将注释的#去掉,将本机的jdk实际路径替换上去,如果不知道jdk安装在哪里可以先用echo $JAVA_HOME来查看
安装完以后在flume的目录下执行 bin/flume-ng version,如果出现类似下面的信息说明flume已经安装完成。
[root@hadoop102 flume]# bin/flume-ng version Flume 1.7.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707 Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016 From source with checksum 0d21b3ffdc55a07e1d08875872c00523三:flume的入门案例
案例1.监控端口数据官方案例
案例步骤 1)安装netcat工具,yum install -y nc(在每一台hadoop的机器上都安装一下) 2)判断44444端口是否被占用 netstat -tunlp | grep 44444 3)创建Flume Agent配置文件netcat-flume-logger.conf,这一步先在flume目录下创建一个叫job的目录,然后在job目录下新建netcat-flume-logger.conf文件,然后在文件中添加如下的内容。一下的配置信息在官网均可查到 http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1这里解释一下每个参数的意思
4).启动flume的命令,官网所给的命令是如下部分(以下的命令都要运行在flume目录下)
第一种开启方式: bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
第二种开启方式:(推荐用这种写法,因为写法稍微简单一些) bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
参数说明: –conf/-c:表示配置文件存储在conf/目录 –name/-n:表示给agent起名为a1 –conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件。 -Dflume.root.logger=INFO,console :-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。
这里我先用第一种方式启动一下,注意配置文件的话可能每个人命名都不一样,bin/flume-ng agent --conf conf/ --name a1 --conf-file job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
如果是第二种方式启动的话: bin/flume-ng agent -c conf/ -n a1 -f job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
此时的flume相当于开启了一个服务端,所以现在是处于阻塞状态
这时我们在hadoop102这台主节点上面在复制一个hadoop102,然后在这台复制的hadoop充当客户端,可以看到这时客户端也是处于阻塞状态
这时我们在客户端上面发送消息hello,haha等数据会发现服务端这边实时会接收到数据 客户端: 服务端:
如果关闭的话在服务端ctrl c来关闭或者用kill来关闭
案例2:实时监控单个追加文件 1)需求:实时监控Hive日志,并上传到HDFS中 2)对需求进行分析
实现步骤: 3)a Flume要想将数据输出到HDFS,须持有Hadoop相关jar包,并将这些jar包拷贝到/opt/module/flume/lib文件夹下。 网盘链接 https://pan.baidu.com/s/1eeIijS33YeDb29GhZ87wGQ 提取码:xlvn
3)b:在/opt/module/flume/job 目录下创建file-flume-hdfs.conf文件 文件里面添加如下内容
# Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = exec a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log # 这一步根据自己的hive日志来写路径 # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k2.hdfs.batchSize = 1000 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k2.hdfs.rollInterval = 30 #设置每个文件的滚动大小 a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k2.hdfs.rollCount = 0 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2开启flume
bin/flume-ng agent -c conf/ -f job/file-flume-hdfs.conf -n a2这里没有添加-Dflume.root.logger=INFO,console是因为我们是把日志写入到hdfs所以不需要打印到控制台了。
启动完以后这时在hdfs文件系统的/flume/20200702/14目录下已经生成了 临时文件,这里我截取的是实际文件,因为我们配置的是过了30秒之后生成的临时文件就会变成为实际文件。
然后我们在复制的这个hadoop102机器上启动一下hive(启动hive之前要先启动一下hadoop)
这里的三个日志分别是我启动hive,show tables,select * from fruit 的日志
通过上述案例我们就可以将本地文件系统实时变化的日志给它采集到hdfs中,然后将本地的日志load到hive表中进行分析了。
案例3:实时监控目录下多个新文件 1)需求:使用Flume监听整个目录的文件,并上传至HDFS 2)需求分析:
3)a 在/opt/module/flume/job下创建文件 dir-flume-hdfs.conf 在文件中添加如下内容
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /opt/module/flume/upload a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true #忽略所有以.tmp结尾的文件,不上传 a3.sources.r3.ignorePattern = ([^ ]*\.tmp) # Describe the sink a3.sinks.k3.type = hdfs #文件上传到hdfs的路径 a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a3.sinks.k3.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c33)b: 启动监控文件夹命令 bin/flume-ng agent -c conf/ -n a3 -f job/dir-flume-hdfs.conf
3)c: 向upload文件夹中添加文件 在/opt/module/flume目录下创建upload目录 重新来一个hadoop102的窗口将/opt/module/flume作为测试的三个文件test1.txt,test2.txt,test3.txt 上传到upload目录中去,进入到updata目录中发现它里面的文件自动地按照我们的预期添加了一个.COMPLETED后缀名
这里也可以看到hdfs目录中flume/upload/20200702/17生成了如下两个文件/
我们将这两个文件下载到桌面发现正是我们test1.txt和test2.txt中的内容。
这里说明一下: 在使用Spooling Directory Source时不要在监控目录中创建并持续修改文件 上传完成的文件会以.COMPLETED结尾 被监控文件夹每500毫秒扫描一次文件变动