flume拓扑结构之多副本结构案例

    技术2025-02-24  13

    1.案例需求 使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。

    2.需求分析:这里在Flume-1中没有使用一个Channel的原因是Sink组中并没有像Channel Selector具有Replicating这个功能。

    3.实现步骤 先在hadoop102主节点机器上的/opt/module/flume/job目录下创建group1文件夹 在/opt/module/datas/目录下创建flume3文件夹

    1)在group1目录下创建flume-file-flume.conf这个文件 配置1个接收日志文件的source和两个channel、两个sink,分别输送给flume-flume-hdfs和flume-flume-dir。

    在文件中添加如下内容,这里我监控的是启动hive中目录下/opt/module/hive/logs/hive.log的日志内容

    # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 将数据流复制给所有channel a1.sources.r1.selector.type = replicating # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log a1.sources.r1.shell = /bin/bash -c # Describe the sink # sink端的avro是一个数据发送者 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2

    2)在group1目录中创建flume-flume-hdfs.conf文件,配置上级Flume输出的Source,输出是到HDFS的Sink。

    # Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source # source端的avro是一个数据接收服务 a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H #上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = flume2- #是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a2.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 600 #设置每个文件的滚动大小大概是128M a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k1.hdfs.rollCount = 0 # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1

    3)在group1中创建文件flume-flume-dir.conf 并填入如下内容,配置上级Flume输出的Source,输出是到本地目录的Sink。

    注意sink中输出目录/opt/module/data/flume3需要自己建好 ,如果没有的话会报错

    # Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/module/data/flume3 # Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2

    4)在Xshell中将hadoop102复制3个方便我们开启flume任务,因为每一个任务的开启之后都会是阻塞状态

    然后在复制的三个机器中分别启动对应的flume进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume。 注意启动的顺序应该是 在复制的第三个机器上的/opt/module/flume目录下执行bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf

    在复制的第二个机器上执行bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf

    在复制的第一个机器上执行bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf

    启动hadoop集群,然后开始启动hive(因为我们监控的是hive的日志,所以在启动hive之前的话要先把hadoop启动起来)

    在没开启hive之前我们看一下hdfs的web端

    开启后我们再去web端刷新一下发现多出来一个flume2目录

    将这个文件下载下来也正是我们的登录hive的日志信息(后缀.tmp的这个临时文件要过60秒才能生存正式文件)

    这时我们再去检查一下/opt/module/datas/flume3目录中数据会发现多出来很多数据,其实数据量大小为0的数据都是空文件,原因是我们设置的是30秒滚动刷新一次。

    这时我们打开一下数据量大小为12741这个来查看一下发现他是我们的登录hive日志的一部分。

    上述内容也就完成了我们的多副本结构

    Processed: 0.010, SQL: 9