flink RichFunction 之坑

    技术2022-07-20  62

    flink的RichMapFunction,RichSinkFunction等,并不能百分百做到每次只open一个数据库连接。在有些情况下他会一直创建然后销毁,创建销毁。举例: 重点在第三行的注释 val value = env.socketTextStream("192.168.13.11", 9090) val value2 = value.filter(x => { try { var a = 1 / 0 //此处若没有异常处理,任务不会断,但是会重复打开数据库连接 } catch { case e: Exception => } isInter(x) }).map(fun = x => { x.toLong }) val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) { override def extractTimestamp(element: Long): Long = { println(element + "***************") element } }) try { var a = 1 / 0 } catch { case e: Exception => } value1.map(new mymap) env.execute("test") } def isInter(input: String): Boolean = { val matcher = Pattern.compile("^[0-9]+$").matcher(input) matcher.find() } } class myRichMapfun6() extends RichMapFunction[ListBuffer[String], Unit] { var conn: Connection = _ var pst: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://xxxxxxx:3306/zzt?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true", "root", "bigdata@mysql") println(conn) pst = conn.prepareStatement("insert into testa (str) values (?)") } override def close(): Unit = { conn.close() pst.close() } override def map(in: ListBuffer[String]): Unit = { pst.setString(1, in.head) pst.execute() } }

    所以你是不是觉得那就价格异常处理不就得了?

    NO

    再看:

    这个时候,如果传进来line不是数字或者格式不对,就会触发异常,然而此时就不会像上面那样帮你解决问题,而是一遍遍创建对象销毁对象,一条消息创建一个连接,我就问你慌不慌,

    原因

    据观察是因为,输入的数据有问题,直接导致
    val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) { override def extractTimestamp(element: Long): Long = { println(element + "***************") element } })
    这个崩溃了,不走这行代码了,没有获得eventime,然后估计。。。 剩下的我也没详细测。。。

    解决方案

    先fiiter过滤任何可能导致异常的脏数据确保数据都没问题就可以了。
    Processed: 0.011, SQL: 9