flink的RichMapFunction,RichSinkFunction等,并不能百分百做到每次只open一个数据库连接。在有些情况下他会一直创建然后销毁,创建销毁。举例: 重点在第三行的注释
val value
= env
.socketTextStream("192.168.13.11", 9090)
val value2
= value
.filter(x
=> {
try {
var a
= 1 / 0
} catch {
case e
: Exception
=>
}
isInter(x
)
}).map(fun
= x
=> {
x
.toLong
})
val value1
= value2
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long
](Time
.seconds(1)) {
override def
extractTimestamp(element
: Long
): Long
= {
println(element
+ "***************")
element
}
})
try {
var a
= 1 / 0
} catch {
case e
: Exception
=>
}
value1
.map(new mymap)
env
.execute("test")
}
def
isInter(input
: String
): Boolean
= {
val matcher
= Pattern
.compile("^[0-9]+$").matcher(input
)
matcher
.find()
}
}
class myRichMapfun6() extends RichMapFunction[ListBuffer
[String
], Unit
] {
var conn
: Connection
= _
var pst
: PreparedStatement
= _
override def
open(parameters
: Configuration
): Unit
= {
conn
= DriverManager
.getConnection("jdbc:mysql://xxxxxxx:3306/zzt?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true", "root", "bigdata@mysql")
println(conn
)
pst
= conn
.prepareStatement("insert into testa (str) values (?)")
}
override def
close(): Unit
= {
conn
.close()
pst
.close()
}
override def
map(in
: ListBuffer
[String
]): Unit
= {
pst
.setString(1, in
.head
)
pst
.execute()
}
}
所以你是不是觉得那就价格异常处理不就得了?
NO
再看:
这个时候,如果传进来line不是数字或者格式不对,就会触发异常,然而此时就不会像上面那样帮你解决问题,而是一遍遍创建对象销毁对象,一条消息创建一个连接,我就问你慌不慌,
原因
据观察是因为,输入的数据有问题,直接导致
val value1
= value2
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long
](Time
.seconds(1)) {
override def
extractTimestamp(element
: Long
): Long
= {
println(element
+ "***************")
element
}
})
这个崩溃了,不走这行代码了,没有获得eventime,然后估计。。。 剩下的我也没详细测。。。
解决方案
先fiiter过滤任何可能导致异常的脏数据确保数据都没问题就可以了。