大数据面试·Spark篇(二)
Spark Streaming任务延迟监控及告警1.需求2.自定义StreamingListener3.添加到streamingContext中
Spark Streaming任务延迟监控及告警
1.需求
监控批次处理时间,若超过阈值则告警,每次告警间隔2分钟
2.自定义StreamingListener
class SparkStreamingDelayListener(private val appName
:String
, private val duration
: Int
,private val times
: Int
) extends StreamingListener{
private val logger
= LoggerFactory
.getLogger("SparkStreamingDelayListener")
override def
onBatchCompleted(batchCompleted
: StreamingListenerBatchCompleted
): Unit
= {
val batchInfo
= batchCompleted
.batchInfo
val processingStartTime
= batchCompleted
.batchInfo
.processingStartTime
val numRecords
= batchCompleted
.batchInfo
.numRecords
val processingEndTime
= batchInfo
.processingEndTime
val processingDelay
= batchInfo
.processingDelay
val totalDelay
= batchInfo
.totalDelay
val jedis
= RedisClusterClient
.getJedisClusterClient()
val current_time
= (System
.currentTimeMillis
/ 1000).toInt
val redis_time
= jedis
.get(appName
)
var flag
= false
if(redis_time
==null
|| current_time
-redis_time
.toInt
>120){
jedis
.set(appName
,current_time
.toString
)
flag
= true
}
if(totalDelay
.get
>= times
* duration
* 1000 && flag
){
val monitorContent
= appName
+": numRecords ->"+numRecords
+",processingDelay ->"+processingDelay
.get
/1000+" s,totalDelay -> "+totalDelay
.get
/1000+"s"
println(monitorContent
)
val msg
= "Streaming_"+appName
+"_DelayTime:"+totalDelay
.get
/1000+"S"
val getURL
= "http://node1:8002/message/weixin?msg="+msg
HttpClient
.doGet(getURL
)
}
}
}
3.添加到streamingContext中
ssc
.addStreamingListener(new SparkStreamingDelayListener("Userid2Redis", duration
,times
))