大数据面试·Spark篇(二)

    技术2022-07-11  127

    大数据面试·Spark篇(二)

    Spark Streaming任务延迟监控及告警1.需求2.自定义StreamingListener3.添加到streamingContext中

    Spark Streaming任务延迟监控及告警

    1.需求

    监控批次处理时间,若超过阈值则告警,每次告警间隔2分钟

    2.自定义StreamingListener

    class SparkStreamingDelayListener(private val appName:String, private val duration: Int,private val times: Int) extends StreamingListener{ private val logger = LoggerFactory.getLogger("SparkStreamingDelayListener") //每个批次完成时执行 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { val batchInfo = batchCompleted.batchInfo val processingStartTime = batchCompleted.batchInfo.processingStartTime val numRecords = batchCompleted.batchInfo.numRecords val processingEndTime = batchInfo.processingEndTime val processingDelay = batchInfo.processingDelay val totalDelay = batchInfo.totalDelay //将每次告警时间写入redis,用以判断告警间隔大于2分钟 val jedis = RedisClusterClient.getJedisClusterClient() val current_time = (System.currentTimeMillis / 1000).toInt val redis_time = jedis.get(appName) var flag = false if(redis_time==null || current_time-redis_time.toInt>120){ jedis.set(appName,current_time.toString) flag = true } //若批次处理延迟大于批次时长指定倍数,并且告警间隔大约2分钟,则告警 if(totalDelay.get >= times * duration * 1000 && flag){ val monitorContent = appName+": numRecords ->"+numRecords+",processingDelay ->"+processingDelay.get/1000+" s,totalDelay -> "+totalDelay.get/1000+"s" println(monitorContent) val msg = "Streaming_"+appName+"_DelayTime:"+totalDelay.get/1000+"S" val getURL = "http://node1:8002/message/weixin?msg="+msg HttpClient.doGet(getURL) } } }

    3.添加到streamingContext中

    ssc.addStreamingListener(new SparkStreamingDelayListener("Userid2Redis", duration,times))
    Processed: 0.010, SQL: 9