通过OPC-DA协议获取硬件数据,将数据进行json格式化,通过flume存入opentsdb
flume-1.9.0
jdk-1.8.0
安装包:
[apache-flume-1.9.0-bin.tar.gz]: http://flume.apache.org/download.html 直接解压,只要有Java环境就能运行
编写Java程序,得到数据并解析,将解析好的数据发送到flume所对应的主机和端口
利用Java模拟生成数据,数据内容包括:
[ 硬件对应的端口标识,要发送的数据]
例如: [“portmark1”,“1234567890123456”]
[“portmark2”,“1234567890123456”]
[“portmark3”,“1234567890123456”]
数据生成代码如下:
import java.util.ArrayList; import java.util.List; import java.util.Random; import org.junit.Test; public class MakeData { private String mark=null; private String value=null; /** * 模拟生成传输数据 * */ public List<String> makeData(){ List<String> resultList = new ArrayList<String>(); int random = (int)(Math.random()*100);; if(random <= 33) mark="port44444"; else if(random <= 66) mark="port55555"; else mark="port33333"; value=RandomString(); resultList.add(mark); resultList.add(value); return resultList; } /** * 生成随机字符串 * */ private String RandomString(){ String s="qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM123456789/*-+,./;'[]!@#$%^&*()_"; Random random=new Random(); StringBuffer stringBuffer=new StringBuffer(); for(int i=0;i<16;i++){ int number=random.nextInt(80); stringBuffer.append(s.charAt(number)); } return stringBuffer.toString(); } }代表意思为:
portMark1为端口标识,为其赋值port44444
port1为端口,为其赋值44444
整体代码如下:
import org.apache.commons.net.telnet.TelnetClient; import javax.sound.sampled.Port; import java.io.*; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class PortSend { private static String portMark1; private static String portMark2; private static String portMark3; private static String port1; private static String port2; private static String port3; public static void main(String args[]) throws IOException { while (true){ try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } MakeData makeData=new MakeData(); List<String> dataString=makeData.makeData(); configuration(); int sendPort= Integer.parseInt(AnalysisData(dataString)); System.out.println(dataString); TelnetClient telnetClient = new TelnetClient(); telnetClient.setDefaultTimeout(5000); telnetClient.connect("127.0.0.1",sendPort); PrintStream printStream = new PrintStream(telnetClient.getOutputStream()); //写命令的流 printStream.println(dataString.get(1));//写命令 printStream.flush();//将命令发送到telnet Server printStream.close(); telnetClient.disconnect(); } } /** * 加载配置文件 * */ private static void configuration() throws FileNotFoundException { InputStream ips = new FileInputStream("src/config.properties"); Properties props = new Properties(); try { props.load(ips); ips.close(); } catch (IOException e) { e.printStackTrace(); } portMark1=props.getProperty("portMark1"); portMark2=props.getProperty("portMark2"); portMark3=props.getProperty("portMark3"); port1=props.getProperty("port1"); port2=props.getProperty("port2"); port3=props.getProperty("port3"); } /** * 解析数据需要反送到哪个端口 * */ private static String AnalysisData(List<String> s){ if(s.get(0).equals(portMark1)){ return port1; }else if(s.get(0).equals(portMark2)){ return port2; }else if(s.get(0).equals(portMark3)){ return port3; } return null; } }思路为:
首先调用数据生成类的数据生成方法,得到一条数据:
根据配置文件读取到的信息,配置文件的标识位与数据标识位进行匹配,匹配成功则将端口值置为与配置文件中标识位对应的端口
之后通过main函数中写的端口数据发送,将相应标识位的数据发到相应端口
这是其中一个的配置文件,其余两个配置文件只需要把端口号修改为对应的,同时因为在同一台电脑上做测试,
所以agent是不能同名的 其余需要修改对应的agent名
启动命令:
flume-ng agent --conf ../conf --conf-file ../conf/flume44444.conf --name a1 -property flume.root.logger=INFO,console flume-ng agent --conf ../conf --conf-file ../conf/flume55555.conf --name a2 -property flume.root.logger=INFO,console flume-ng agent --conf ../conf --conf-file ../conf/flume33333.conf --name a3 -property flume.root.logger=INFO,console可以看到对应端口只会收到对应标识的数据,也就是说明硬件只会获取到自己对应的控制信息
并且使用端口转发数据可以实现在不同主机上控制不同硬件,只需配置主机ip和相应端口号
可以看到对应端口只会收到对应标识的数据,也就是说明硬件只会获取到自己对应的控制信息
并且使用端口转发数据可以实现在不同主机上控制不同硬件,只需配置主机ip和相应端口号
测试成功!
这是我读研以来接到的第一个工业级项目,历时一个月完成了接近一半,项目内容主要是数据采集和传输,因为项目中涉及工程中的其他内容,所以只能将测试案例进行分享交流,这一部分内容主要是对flume数据传输的使用,欢迎留言交流。