伴随着离线分析的稳定后,下一步的目标就是进行实时的数据分析。从原理上看,也就是将每天的数据变成实时的处理,技术栈会有所不同,本着用新不用旧的原则,我们将目标定为:flink作为流式计算的基础;
数据来源采集,由于原系统希望主动推送过来,并且希望采用http的方式,所以,在kafka前加了一层http接入服务;
系统提供一个采集服务接口,当外系统有数据时,调用http请求将数据上送过来;数据格式为json。(这里要吐槽一点,现在系统间对接,上来就是希望使用json,过往的经历,在银行,在通讯行业,有很多优秀的报文传输protocal,不知道,为什么在互联网行业不使用,也许是因为开发比较麻烦吧。) 我们直接了当,采用了一个无状态的http服务,因为是系统间上送,为了安全起见,采用了IP白名单。数据接收之后,直接写入kafka,效率很高,但是有些极端情况,如kafka当时恰好不工作,或者数据写入失败等等,为避免这种情况,所以,对于写入失败的数据,采用落入文件系统再进行二次入库。(这个受限于时间因素,没有code)
数据进入kafka后,就进入了flink的范畴了。我们使用的是最新版本的flink1.10 ,只有3个job,一个是从原始kafka中获取数据并进行数据清洗和补全的,另外两个是进行数据分析,并sink到mysql中的。
整个过程没有太多的编码难度,主要的坑在于,flink对于python的支持也不是很友好,也许是我们programer的coding能力有待提升吧;所以,经过两天的痛苦研究后,我建议他们放弃python,而使用了scale。
读取kafka,并进行数据清洗,最后再写入下一级处理队列;这里面主要有两个问题,一个比较通用的,就是消费kafka的时候,需要检查checkpoint,一个是在清洗过程中,需要调用第三方如,调用hbase服务进行经纬度地址信息补齐等;
调用第三方服务: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/asyncio.html earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 earliest: automatically reset the offset to the earliest offset,自动将偏移量置为最早的清洗后的数据,统计分析到mysql相对来说比较简单; flink官方提供了两种方式:
DataStream application Table API query不过在实际使用过程中还是出现了很大的问题。原因就在于使用table API时,Flink的版本以及依赖的jar包很多都不一样。所以,导致运行时会出错;下面是使用官方的最终写的一个测试代码,在实际线上环境中可以正常运行:(我自己使用的是java做demo,team用scale来写代码)
package com.vwsds.bigdata.flink.job; import java.sql.Timestamp; import java.util.Properties; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import com.vwsds.bigdata.flink.pojo.UserPvEntity; import com.vwsds.bigdata.flink.pojo.VwmsdsData; import com.vwsds.bigdata.flink.sql.SinkUserPvToMySQL2; public class SDSStreamingJob { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.36.249:9092,192.168.178.88:9092,192.168.204.104:9092"); properties.put("zookeeper.connect", "192.168.8.216:2181"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id", "sds-data"); properties.setProperty("auto.offset.reset", "latest") StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("****", new SimpleStringSchema(), properties); DataStream<String> stream = env.addSource(consumer); DataStream<VwmsdsData> map = stream.map(new MapFunction<String, VwmsdsData>() { private static final long serialVersionUID = 1471936326697828381L; @Override public VwmsdsData map(String value) throws Exception { return VwmsdsData.parseObjToQAObject(value); } }); //注册为user表 tableEnv.createTemporaryView("Users", map, "userId,itemId,categoryId,behavior,timestampin,proctime.proctime"); //执行sql查询 滚动窗口 10秒 计算10秒窗口内用户点击次数 Table sqlQuery = tableEnv.sqlQuery("SELECT TUMBLE_END(proctime, INTERVAL '10' SECOND) as processtime," + "userId,count(*) as pvcount " + "FROM Users " + "GROUP BY TUMBLE(proctime, INTERVAL '10' SECOND), userId"); //Table 转化为 DataStream DataStream<Tuple3<Timestamp, String, Long>> appendStream = tableEnv.toAppendStream(sqlQuery,Types.TUPLE(Types.SQL_TIMESTAMP,Types.STRING,Types.LONG)); appendStream.print(); //sink to mysql appendStream.map(new MapFunction<Tuple3<Timestamp,String,Long>, UserPvEntity>() { private static final long serialVersionUID = -4770965496944515917L; @Override public UserPvEntity map(Tuple3<Timestamp, String, Long> value) throws Exception { try { return new UserPvEntity(value.f0.getTime(),value.f1,value.f2); }catch (Exception e) { e.printStackTrace(); return null; } } }).addSink(new SinkUserPvToMySQL2()); env.execute("userPv from Kafka"); } }Flink的部署方式分三种: Local模式 Standalone模式 Flink On Yarn模式 1).第一种是yarn session模式 2).第二种 直接在yarn上运行任务
Local模式比较简单: ./bin/start-cluster.sh 执行就可以了;
yarn操作的几种方式: ./bin/yarn-session.sh --config /opt/client/Yarn/config/ -jm 1024m -tm 2028m 提交任务: /app/soft/flink/bin/flink run -yid application_1584428803396_1190 -c com.test.kafka_datacollect_new /app/bigdata/flinkjob/cleaning_online.jar #停止服务: yarn --config /opt/client/Yarn/config/ application -kill application_1584428803396_0859 直接将服务提交到yarn上 /app/soft/flink/bin/flink run -m yarn-cluster -c com.test.sds_mobvoi_qaresultdata /app/bigdata/flinkjob/FlinkTutorial_online-1.0-SNAPSHOT-jar-with-dependencies.jar对于这些服务,我都碰到了同一个问题,就是flink的Task managers和Total Task Slots 经常不足的情况。目前也没有搞懂,如何在一台机器上开启多个TaskManager; 后面在部署到Yarn上也碰到了类似的问题。还未完全解决;