Linux下Flume的安装

    技术2022-07-11  75

    Linux下Flume的安装

    一、前置条件

    Flume 需要依赖 JDK 1.8+,JDK 参考:

    Linux下JDK的安装

    二 、安装步骤

    2.1 下载并解压

    下载所需版本的 Flume,这里我下载的是 Apache 版本的 Flume。下载地址为:http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

    # 下载后进行解压 tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/

    2.2 配置环境变量

    # vim /etc/profile

    添加环境变量:

    export FLUME_HOME=/opt/apache-flume-1.9.0-bin export PATH=$FLUME_HOME/bin:$PATH

    使得配置的环境变量立即生效:

    # source /etc/profile

    2.3 修改配置

    进入安装目录下的 conf/ 目录,拷贝 Flume 的环境配置模板 flume-env.sh.template:

    # cp flume-env.sh.template flume-env.sh

    修改 flume-env.sh,指定 JDK 的安装路径:

    # Enviroment variables can be set here. export JAVA_HOME=/opt/jdk1.8.0_181

    2.4 验证

    由于已经将 Flume 的 bin 目录配置到环境变量,直接使用以下命令验证是否配置成功:

    # flume-ng version

    出现对应的版本信息则代表配置成功。

    Flume 1.9.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: d4fcab4f501d41597bc616921329a4339f73585e Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018 From source with checksum 35db629a3bda49d23e9b3690c80737f9

    三、测试使用

    3.1 flume 从文件导数据入kafka

    flume-kafka.conf:

    a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type=exec a1.sources.s1.command=tail -n0 -F /opt/gzgtest/flumekafka/kafka.log a1.sources.s1.channels=c1 #设置Kafka接收器 a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink #设置Kafka地址 a1.sinks.k1.brokerList=192.168.73.130:9092,192.168.73.131:9092,192.168.73.132:9092 #设置发送到Kafka上的主题 a1.sinks.k1.topic=test #设置序列化方式 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.channel=c1 a1.channels.c1.type=memory a1.channels.c1.capacity=10000 a1.channels.c1.transactionCapacity=100

    启动:

    nohup flume-ng agent -c /opt/apache-flume-1.9.0-bin/conf -f /data/flume/flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console > /data/flume/nohup.out 2>&1 &

    可以启动kafka-console-consumer.sh 观看消费情况

    3.1 flume 从文件导数据入HDFS

    flume-hdfs.conf:

    a1.sources = r1 a1.sinks = k1 a1.channels = c1 # the source a1.sources.r1.type = exec a1.sources.r1.command = tail -n0 -F /opt/gzgtest/flumekafka/kafka.log # the file a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 # define channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://gzgtest/user/hive/warehouse/tmp.db/rt_minipc_dfh_goodnews/dt=20200701 a1.sinks.k1.hdfs.filePrefix = 192.168.73.132_log_%Y%m%d%H%M a1.sinks.k1.hdfs.inUsePrefix = . a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 600 a1.sinks.k1.hdfs.minBlockReplicas=1 a1.sinks.k1.hdfs.batchDurationMillis = 10000 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.threadsPoolSize = 250 a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.callTimeout = 120000 a1.sinks.k1.hdfs.idleTimeout = 600 a1.sinks.k1.hdfs.rollTimerPoolSize = 10

    启动:

    nohup flume-ng agent -c /opt/apache-flume-1.9.0-bin/conf -f /data/flume/flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console > /data/flume/nohup.out 2>&1 &

    注意: 以上我都使用hadoop用户执行、可能会需要文件权限问题、把所有需要执行的和flume文件夹都改为hadoop:hadoop

    可能遇到的问题:

    2020-07-01 16:54:47,501 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:459)] process failed java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)

    解决方案参考: flume 运行时报NoSuchMethodError: com.google.common.base.Preconditions.checkArgument

    参考: Linux下Flume的安装

    官网需要多看: FlumeUserGuide

    flume配置参数的意义

    Processed: 0.014, SQL: 9