Flume数据传输及多端口转发

    技术2023-06-04  69

    文章目录

    Flume发送数据测试1、概述1.1项目内容1.2使用工具1.3整体思路 2、测试过程2.1数据生成2.2配置端口标识和端口号2.3解析获取到的数据2.4flume配置文件 3、测试结果

    Flume发送数据测试

    1、概述

    1.1项目内容

    通过OPC-DA协议获取硬件数据,将数据进行json格式化,通过flume存入opentsdb

    1.2使用工具

    flume-1.9.0

    jdk-1.8.0

    安装包:

    [apache-flume-1.9.0-bin.tar.gz]: http://flume.apache.org/download.html 直接解压,只要有Java环境就能运行

    1.3整体思路

    编写Java程序,得到数据并解析,将解析好的数据发送到flume所对应的主机和端口

    2、测试过程

    2.1数据生成

    利用Java模拟生成数据,数据内容包括:

    [ 硬件对应的端口标识,要发送的数据]

    例如: [“portmark1”,“1234567890123456”]

    ​ [“portmark2”,“1234567890123456”]

    ​ [“portmark3”,“1234567890123456”]

    数据生成代码如下:

    import java.util.ArrayList; import java.util.List; import java.util.Random; import org.junit.Test; public class MakeData { private String mark=null; private String value=null; /** * 模拟生成传输数据 * */ public List<String> makeData(){ List<String> resultList = new ArrayList<String>(); int random = (int)(Math.random()*100);; if(random <= 33) mark="port44444"; else if(random <= 66) mark="port55555"; else mark="port33333"; value=RandomString(); resultList.add(mark); resultList.add(value); return resultList; } /** * 生成随机字符串 * */ private String RandomString(){ String s="qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM123456789/*-+,./;'[]!@#$%^&*()_"; Random random=new Random(); StringBuffer stringBuffer=new StringBuffer(); for(int i=0;i<16;i++){ int number=random.nextInt(80); stringBuffer.append(s.charAt(number)); } return stringBuffer.toString(); } }

    2.2配置端口标识和端口号

    portMark1 = port44444 port1 = 44444 portMark2 = port55555 port2 = 55555 portMark3 = port66666 port3 = 66666

    代表意思为:

    portMark1为端口标识,为其赋值port44444

    port1为端口,为其赋值44444

    2.3解析获取到的数据

    整体代码如下:

    import org.apache.commons.net.telnet.TelnetClient; import javax.sound.sampled.Port; import java.io.*; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class PortSend { private static String portMark1; private static String portMark2; private static String portMark3; private static String port1; private static String port2; private static String port3; public static void main(String args[]) throws IOException { while (true){ try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } MakeData makeData=new MakeData(); List<String> dataString=makeData.makeData(); configuration(); int sendPort= Integer.parseInt(AnalysisData(dataString)); System.out.println(dataString); TelnetClient telnetClient = new TelnetClient(); telnetClient.setDefaultTimeout(5000); telnetClient.connect("127.0.0.1",sendPort); PrintStream printStream = new PrintStream(telnetClient.getOutputStream()); //写命令的流 printStream.println(dataString.get(1));//写命令 printStream.flush();//将命令发送到telnet Server printStream.close(); telnetClient.disconnect(); } } /** * 加载配置文件 * */ private static void configuration() throws FileNotFoundException { InputStream ips = new FileInputStream("src/config.properties"); Properties props = new Properties(); try { props.load(ips); ips.close(); } catch (IOException e) { e.printStackTrace(); } portMark1=props.getProperty("portMark1"); portMark2=props.getProperty("portMark2"); portMark3=props.getProperty("portMark3"); port1=props.getProperty("port1"); port2=props.getProperty("port2"); port3=props.getProperty("port3"); } /** * 解析数据需要反送到哪个端口 * */ private static String AnalysisData(List<String> s){ if(s.get(0).equals(portMark1)){ return port1; }else if(s.get(0).equals(portMark2)){ return port2; }else if(s.get(0).equals(portMark3)){ return port3; } return null; } }

    思路为:

    首先调用数据生成类的数据生成方法,得到一条数据:

    根据配置文件读取到的信息,配置文件的标识位与数据标识位进行匹配,匹配成功则将端口值置为与配置文件中标识位对应的端口

    之后通过main函数中写的端口数据发送,将相应标识位的数据发到相应端口

    2.4flume配置文件

    # example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

    这是其中一个的配置文件,其余两个配置文件只需要把端口号修改为对应的,同时因为在同一台电脑上做测试,

    所以agent是不能同名的 其余需要修改对应的agent名

    启动命令:

    flume-ng agent --conf ../conf --conf-file ../conf/flume44444.conf --name a1 -property flume.root.logger=INFO,console flume-ng agent --conf ../conf --conf-file ../conf/flume55555.conf --name a2 -property flume.root.logger=INFO,console flume-ng agent --conf ../conf --conf-file ../conf/flume33333.conf --name a3 -property flume.root.logger=INFO,console

    3、测试结果

    可以看到对应端口只会收到对应标识的数据,也就是说明硬件只会获取到自己对应的控制信息

    并且使用端口转发数据可以实现在不同主机上控制不同硬件,只需配置主机ip和相应端口号

    可以看到对应端口只会收到对应标识的数据,也就是说明硬件只会获取到自己对应的控制信息

    并且使用端口转发数据可以实现在不同主机上控制不同硬件,只需配置主机ip和相应端口号

    测试成功!

    这是我读研以来接到的第一个工业级项目,历时一个月完成了接近一半,项目内容主要是数据采集和传输,因为项目中涉及工程中的其他内容,所以只能将测试案例进行分享交流,这一部分内容主要是对flume数据传输的使用,欢迎留言交流。

    Processed: 0.011, SQL: 10