分布式定时任务—xxl-job学习(三)——调度中心(xxl-job-admin)的启动和任务调度过程源码分析

    技术2022-07-12  78

    分布式定时任务—xxl-job学习(三)——调度中心(xxl-job-admin)的启动和任务调度过程源码分析

    RabbitsInTheGrass 2020-06-30 10:31:08  813  收藏  原力计划 分类专栏: # xxl-job Java分布式 版权 分布式定时任务—xxl-job学习(三)调度中心(xxl-job-admin)的启动和任务调度过程源码分析 前言 一、调度中心的启动 1.1 分析XxlJobAdminConfig类 1.2 分析XxlJobScheduler.init() 1.2.1 initI18n() 1.2.2 JobRegistryMonitorHelper.getInstance().start() 1.2.3 JobFailMonitorHelper.getInstance().start() 1.2.3.1 分析失败重试中的JobTriggerPoolHelper.trigger()方法 1.2.3.2 分析失败预警中的XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log)方法 1.2.3.2.1 分析JobAlarmer预警类 1.2.3.2.2 分析EmailJobAlarm邮件预警类 1.2.4 JobLosedMonitorHelper.getInstance().start() 1.2.5 JobTriggerPoolHelper.toStart() 1.2.6 JobLogReportHelper.getInstance().start() 1.2.7 JobScheduleHelper.getInstance().start() 1.2.7.1 scheduleThread 1.2.7.2 ringThread 1.3 分析XxlJobTrigger核心类 1.3.1 trigger()方法 1.3.2 processTrigger()方法 1.3.3 runExecutor()方法 二、彩蛋 前言 接上一篇:分布式定时任务—xxl-job学习(二)——执行器的启动过程源码分析 更早:分布式定时任务—xxl-job学习(一):简单demo搭建

    在上一篇我们深度分析了下xxl-job执行器组件的启动加载原理,本篇我们来深度挖掘一下调度中心(xxl-job-admin)的启动和调度过程。

    xxl.job.version使用的依旧是2.2.1-SNAPSHOT版本

    一、调度中心的启动

    通过上边xxl-job-admin的代码目录层级我们可以知道它包括:controller层支持在Web页面进行CRUD、核心类core、持久层dao以及业务实现层service。

    首先我们先看com.xxl.job.admin.core.conf目录下的XxlJobAdminConfig.java

    1.1 分析XxlJobAdminConfig类 @Component public class XxlJobAdminConfig implements InitializingBean, DisposableBean {

        private static XxlJobAdminConfig adminConfig = null;     public static XxlJobAdminConfig getAdminConfig() {         return adminConfig;     }

        // ---------------------- XxlJobScheduler ----------------------

        private XxlJobScheduler xxlJobScheduler;

        @Override     public void afterPropertiesSet() throws Exception {         adminConfig = this;

            xxlJobScheduler = new XxlJobScheduler();         xxlJobScheduler.init();     }

        @Override     public void destroy() throws Exception {         xxlJobScheduler.destroy();     }

        // ---------------------- XxlJobScheduler ----------------------

        // conf     @Value("${xxl.job.i18n}")     private String i18n;

        @Value("${xxl.job.accessToken}")     private String accessToken;

        @Value("${spring.mail.from}")     private String emailFrom;

        @Value("${xxl.job.triggerpool.fast.max}")     private int triggerPoolFastMax;

        @Value("${xxl.job.triggerpool.slow.max}")     private int triggerPoolSlowMax;

        @Value("${xxl.job.logretentiondays}")     private int logretentiondays;

        // dao, service

        @Resource     private XxlJobLogDao xxlJobLogDao;     @Resource     private XxlJobInfoDao xxlJobInfoDao;     @Resource     private XxlJobRegistryDao xxlJobRegistryDao;     @Resource     private XxlJobGroupDao xxlJobGroupDao;     @Resource     private XxlJobLogReportDao xxlJobLogReportDao;     @Resource     private JavaMailSender mailSender;     @Resource     private DataSource dataSource;     @Resource     private JobAlarmer jobAlarmer;

        public String getI18n() {         if (!Arrays.asList("zh_CN", "zh_TC", "en").contains(i18n)) {             return "zh_CN";         }         return i18n;     }

        public String getAccessToken() {         return accessToken;     }

        public String getEmailFrom() {         return emailFrom;     }

        public int getTriggerPoolFastMax() {         if (triggerPoolFastMax < 200) {             return 200;         }         return triggerPoolFastMax;     }

        public int getTriggerPoolSlowMax() {         if (triggerPoolSlowMax < 100) {             return 100;         }         return triggerPoolSlowMax;     }

        public int getLogretentiondays() {         if (logretentiondays < 7) {             return -1;  // Limit greater than or equal to 7, otherwise close         }         return logretentiondays;     }

        public XxlJobLogDao getXxlJobLogDao() {         return xxlJobLogDao;     }

        public XxlJobInfoDao getXxlJobInfoDao() {         return xxlJobInfoDao;     }

        public XxlJobRegistryDao getXxlJobRegistryDao() {         return xxlJobRegistryDao;     }

        public XxlJobGroupDao getXxlJobGroupDao() {         return xxlJobGroupDao;     }

        public XxlJobLogReportDao getXxlJobLogReportDao() {         return xxlJobLogReportDao;     }

        public JavaMailSender getMailSender() {         return mailSender;     }

        public DataSource getDataSource() {         return dataSource;     }

        public JobAlarmer getJobAlarmer() {         return jobAlarmer;     }

    } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 配置了i18n(调度中心国际化配置)、accessToken(调度中心通讯TOKEN)、emailFrom(报警邮箱)、triggerPoolFastMax( 调度线程池最大线程配置)、triggerPoolSlowMax( 调度线程池最大线程配置)、logretentiondays(调度中心日志表数据保存天数)等参数; 注入了DataSource、XxlJobLogDao、XxlJobInfoDao、XxlJobRegistryDao、XxlJobGroupDao、XxlJobLogReportDao、JavaMailSender、JobAlarmer等dao和service接口; 同时因为它实现了InitializingBean, DisposableBean接口,所以在这个对象初始化的时候会调用afterPropertiesSet()方法。 注意: 在afterPropertiesSet()方法中新建了一个XxlJobScheduler对象,并调用了它的init()方法。

    1.2 分析XxlJobScheduler.init() public void init() throws Exception {     // init i18n     initI18n();

        // admin registry monitor run     JobRegistryMonitorHelper.getInstance().start();

        // admin fail-monitor run     JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run     JobLosedMonitorHelper.getInstance().start();

        // admin trigger pool start     JobTriggerPoolHelper.toStart();

        // admin log report start     JobLogReportHelper.getInstance().start();

        // start-schedule     JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success."); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 1.2.1 initI18n() // ---------------------- I18n ----------------------

    private void initI18n(){     for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {         item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));     } } 1 2 3 4 5 6 7 初始化新增任务页面中的阻塞处理策略相关的国际化信息。如果没有配置国际化语言则默认为中文。

    1.2.2 JobRegistryMonitorHelper.getInstance().start() private Thread registryThread; private volatile boolean toStop = false; public void start(){     registryThread = new Thread(new Runnable() {         @Override         public void run() {             while (!toStop) {                 try {                     // auto registry group                     List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);                     if (groupList!=null && !groupList.isEmpty()) {

                            // remove dead address (admin/executor)                         List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());                         if (ids!=null && ids.size()>0) {                             XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);                         }

                            // fresh online address (admin/executor)                         HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();                         List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());                         if (list != null) {                             for (XxlJobRegistry item: list) {                                 if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {                                     String appname = item.getRegistryKey();                                     List<String> registryList = appAddressMap.get(appname);                                     if (registryList == null) {                                         registryList = new ArrayList<String>();                                     }

                                        if (!registryList.contains(item.getRegistryValue())) {                                         registryList.add(item.getRegistryValue());                                     }                                     appAddressMap.put(appname, registryList);                                 }                             }                         }

                            // fresh group address                         for (XxlJobGroup group: groupList) {                             List<String> registryList = appAddressMap.get(group.getAppname());                             String addressListStr = null;                             if (registryList!=null && !registryList.isEmpty()) {                                 Collections.sort(registryList);                                 addressListStr = "";                                 for (String item:registryList) {                                     addressListStr += item + ",";                                 }                                 addressListStr = addressListStr.substring(0, addressListStr.length()-1);                             }                             group.setAddressList(addressListStr);                             XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);                         }                     }                 } catch (Exception e) {                     if (!toStop) {                         logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);                     }                 }                 try {                     TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);                 } catch (InterruptedException e) {                     if (!toStop) {                         logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);                     }                 }             }             logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");         }     });     registryThread.setDaemon(true);     registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");     registryThread.start(); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 分析: 该类主要是创建一个守护线程管理注册地址(每30秒执行一次)

    从xxl_job_group表查询所有自动注册的执行器分组groupList; 删除过期注册地址,查询xxl_job_registry表中update_time小于当前时间前90秒的记录并做delete删除; 查询xxl_job_registry表中update_time大于当前时间前90秒的注册记录,并存入HashMap<String, List<String>> appAddressMap中,key为执行器的appname,value为执行器注册地址集合; 循环groupList,从HashMap<String, List<String>> appAddressMap中获取每个分组对应的地址集合,拼接每一个地址,以","分隔,并更新到xxl_job_group中。 1.2.3 JobFailMonitorHelper.getInstance().start() public void start(){     monitorThread = new Thread(new Runnable() {

            @Override         public void run() {

                // monitor             while (!toStop) {                 try {

                        List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);                     if (failLogIds!=null && !failLogIds.isEmpty()) {                         for (long failLogId: failLogIds) {

                                // lock log                             int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);                             if (lockRet < 1) {                                 continue;                             }                             XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);                             XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

                                // 1、fail retry monitor                             if (log.getExecutorFailRetryCount() > 0) {                                 JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);                                 String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";                                 log.setTriggerMsg(log.getTriggerMsg() + retryMsg);                                 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);                             }

                                // 2、fail alarm monitor                             int newAlarmStatus = 0;        // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败                             if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {                                 boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);                                 newAlarmStatus = alarmResult?2:3;                             } else {                                 newAlarmStatus = 1;                             }

                                XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);                         }                     }

                    } catch (Exception e) {                     if (!toStop) {                         logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);                     }                 }

                       try {                        TimeUnit.SECONDS.sleep(10);                    } catch (Exception e) {                        if (!toStop) {                            logger.error(e.getMessage(), e);                        }                    }

                   }

                logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

            }     });     monitorThread.setDaemon(true);     monitorThread.setName("xxl-job, admin JobFailMonitorHelper");     monitorThread.start(); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 分析:

    新建一个预警守护线程(每十秒工作一次) TimeUnit.SECONDS.sleep(10);

    从xxl_job_log表查询失败日志记录id集合,每次取1000条; List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);

        <select id="findFailJobLogIds" resultType="long" >         SELECT id FROM `xxl_job_log`         WHERE !(             (trigger_code in (0, 200) and handle_code = 0)             OR             (handle_code = 200)         )         AND `alarm_status` = 0         ORDER BY id ASC         LIMIT #{pagesize}     </select> 1 2 3 4 5 6 7 8 9 10 11 如果failLogIds不为空,则循环failLogIds

    锁住这条日志记录(其实是把这条记录的alarm_status字段从0变成了-1);

    根据日志记录id从数据库加载日志记录对象XxlJobLog,根据日志记录里的jobId加载任务对象XxlJobInfo;

    校验是否重试:如果任务设置了失败重试次数,那么进行失败重试。

    将任务信息、失败次数-1加入调度线程队列中进行调度; JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null) 将重试信息更新到本条日志记录中; String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>"; log.setTriggerMsg(log.getTriggerMsg() + retryMsg); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log); 1 2 3 校验预警状态: 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败 如果任务配置了预警邮箱,调用所有实现JobAlarm接口的预警实现类的doAlarm(XxlJobInfo info, XxlJobLog jobLog)方法得到最终状态,并根据日志id把当前日志记录的状态从-1更新到最终状态。

    1.2.3.1 分析失败重试中的JobTriggerPoolHelper.trigger()方法 public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {     helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } 1 2 3 // job timeout count private volatile long minTim = System.currentTimeMillis()/60000;     // ms > min private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();

    /**  * add trigger  */ public void addTrigger(final int jobId,                        final TriggerTypeEnum triggerType,                        final int failRetryCount,                        final String executorShardingParam,                        final String executorParam,                        final String addressList) {

        // choose thread pool     ThreadPoolExecutor triggerPool_ = fastTriggerPool;     AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);     if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min         triggerPool_ = slowTriggerPool;     }

        // trigger     triggerPool_.execute(new Runnable() {         @Override         public void run() {

                long start = System.currentTimeMillis();

                try {                 // do trigger                 XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);             } catch (Exception e) {                 logger.error(e.getMessage(), e);             } finally {

                    // check timeout-count-map                 long minTim_now = System.currentTimeMillis()/60000;                 if (minTim != minTim_now) {                     minTim = minTim_now;                     jobTimeoutCountMap.clear();                 }

                    // incr timeout-count-map                 long cost = System.currentTimeMillis()-start;                 if (cost > 500) {       // ob-timeout threshold 500ms                     AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));                     if (timeoutCount != null) {                         timeoutCount.incrementAndGet();                     }                 }

                }

            }     }); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 分析:

    JobTriggerPoolHelper类中有一个ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap变量,保存每个任务的执行次数,key为任务的jobId; 根据传入的jobId,从jobTimeoutCountMap取到对应的值jobTimeoutCount; 如果jobTimeoutCount不为空并且大于10(即该任务在一分钟内调度了10次)则使用慢线程池来执行任务,否则使用默认的快线程池; 调用XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);执行任务; check timeout-count-map;(==注意:==此处暂时没明白为什么要判断minTim != minTim_now,待补充。。。。) 计算调度任务的花费时间,如果大于500ms,则先调用jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)),如果已经存在对应的key,则调用incrementAndGet()方法加1。 XxlJobTrigger.trigger()方法稍后再详细讲解。

    1.2.3.2 分析失败预警中的XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log)方法 1.2.3.2.1 分析JobAlarmer预警类 @Component public class JobAlarmer implements ApplicationContextAware, InitializingBean {     private static Logger logger = LoggerFactory.getLogger(JobAlarmer.class);

        private ApplicationContext applicationContext;     private List<JobAlarm> jobAlarmList;

        @Override     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {         this.applicationContext = applicationContext;     }

        @Override     public void afterPropertiesSet() throws Exception {         Map<String, JobAlarm> serviceBeanMap = applicationContext.getBeansOfType(JobAlarm.class);         if (serviceBeanMap != null && serviceBeanMap.size() > 0) {             jobAlarmList = new ArrayList<JobAlarm>(serviceBeanMap.values());         }     }

        /**      * job alarm      *      * @param info      * @param jobLog      * @return      */     public boolean alarm(XxlJobInfo info, XxlJobLog jobLog) {

            boolean result = false;         if (jobAlarmList!=null && jobAlarmList.size()>0) {             result = true;  // success means all-success             for (JobAlarm alarm: jobAlarmList) {                 boolean resultItem = false;                 try {                     resultItem = alarm.doAlarm(info, jobLog);                 } catch (Exception e) {                     logger.error(e.getMessage(), e);                 }                 if (!resultItem) {                     result = false;                 }             }         }

            return result;     }

    } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 分析:

    这个类ApplicationContextAware, InitializingBean接口,重写了setApplicationContext()方法和afterPropertiesSet()方法,那么这个对象在初始化的时候会从Spring容器对象ApplicationContext中取到指定的类的list。在这里就是所有的JobAlarm,于是我们拿到了jobAlarmList。 alarm(info, log)方法其实是循环jobAlarmList,执行每个JobAlarm类的doAlarm方法。 1.2.3.2.2 分析EmailJobAlarm邮件预警类 @Component public class EmailJobAlarm implements JobAlarm {     private static Logger logger = LoggerFactory.getLogger(EmailJobAlarm.class);

        /**      * fail alarm      *      * @param jobLog      */     public boolean doAlarm(XxlJobInfo info, XxlJobLog jobLog){         boolean alarmResult = true;

            // send monitor email         if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {

                // alarmContent             String alarmContent = "Alarm Job LogId=" + jobLog.getId();             if (jobLog.getTriggerCode() != ReturnT.SUCCESS_CODE) {                 alarmContent += "<br>TriggerMsg=<br>" + jobLog.getTriggerMsg();             }             if (jobLog.getHandleCode()>0 && jobLog.getHandleCode() != ReturnT.SUCCESS_CODE) {                 alarmContent += "<br>HandleCode=" + jobLog.getHandleMsg();             }

                // email info             XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(Integer.valueOf(info.getJobGroup()));             String personal = I18nUtil.getString("admin_name_full");             String title = I18nUtil.getString("jobconf_monitor");             String content = MessageFormat.format(loadEmailJobAlarmTemplate(),                     group!=null?group.getTitle():"null",                     info.getId(),                     info.getJobDesc(),                     alarmContent);

                Set<String> emailSet = new HashSet<String>(Arrays.asList(info.getAlarmEmail().split(",")));             for (String email: emailSet) {

                    // make mail                 try {                     MimeMessage mimeMessage = XxlJobAdminConfig.getAdminConfig().getMailSender().createMimeMessage();

                        MimeMessageHelper helper = new MimeMessageHelper(mimeMessage, true);                     helper.setFrom(XxlJobAdminConfig.getAdminConfig().getEmailFrom(), personal);                     helper.setTo(email);                     helper.setSubject(title);                     helper.setText(content, true);

                        XxlJobAdminConfig.getAdminConfig().getMailSender().send(mimeMessage);                 } catch (Exception e) {                     logger.error(">>>>>>>>>>> xxl-job, job fail alarm email send error, JobLogId:{}", jobLog.getId(), e);

                        alarmResult = false;                 }

                }         }

            return alarmResult;     }

        /**      * load email job alarm template      *      * @return      */     private static final String loadEmailJobAlarmTemplate(){         String mailBodyTemplate = "<h5>" + I18nUtil.getString("jobconf_monitor_detail") + ":</span>" +                 "<table border=\"1\" cellpadding=\"3\" style=\"border-collapse:collapse; width:80%;\" >\n" +                 "   <thead style=\"font-weight: bold;color: #ffffff;background-color: #ff8c00;\" >" +                 "      <tr>\n" +                 "         <td width=\"20%\" >"+ I18nUtil.getString("jobinfo_field_jobgroup") +"</td>\n" +                 "         <td width=\"10%\" >"+ I18nUtil.getString("jobinfo_field_id") +"</td>\n" +                 "         <td width=\"20%\" >"+ I18nUtil.getString("jobinfo_field_jobdesc") +"</td>\n" +                 "         <td width=\"10%\" >"+ I18nUtil.getString("jobconf_monitor_alarm_title") +"</td>\n" +                 "         <td width=\"40%\" >"+ I18nUtil.getString("jobconf_monitor_alarm_content") +"</td>\n" +                 "      </tr>\n" +                 "   </thead>\n" +                 "   <tbody>\n" +                 "      <tr>\n" +                 "         <td>{0}</td>\n" +                 "         <td>{1}</td>\n" +                 "         <td>{2}</td>\n" +                 "         <td>"+ I18nUtil.getString("jobconf_monitor_alarm_type") +"</td>\n" +                 "         <td>{3}</td>\n" +                 "      </tr>\n" +                 "   </tbody>\n" +                 "</table>";

            return mailBodyTemplate;     }

    } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 分析:

    实现了JobAlarm预警接口 如果配置了报警邮件地址(多个地址使用","分隔),则发送预警邮件。 发送邮件这块实际上是使用了spring-boot-starter-mail, 其他发送邮件的各种方法可以参考本人的 SpringBoot学习历程(十一):SpringBoot2.X集成mail发送邮件

    如果要实现自己业务的其他预警方法,只需要实现JobAlarm接口,重写doAlarm方法即可。(比如钉钉等)

    1.2.4 JobLosedMonitorHelper.getInstance().start() public class JobLosedMonitorHelper {     private static Logger logger = LoggerFactory.getLogger(JobLosedMonitorHelper.class);          private static JobLosedMonitorHelper instance = new JobLosedMonitorHelper();     public static JobLosedMonitorHelper getInstance(){         return instance;     }

        // ---------------------- monitor ----------------------

        private Thread monitorThread;     private volatile boolean toStop = false;     public void start(){         monitorThread = new Thread(new Runnable() {

                @Override             public void run() {

                    // monitor                 while (!toStop) {                     try {                         // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;                         Date losedTime = DateUtil.addMinutes(new Date(), -10);                         List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

                            if (losedJobIds!=null && losedJobIds.size()>0) {                             for (Long logId: losedJobIds) {

                                    XxlJobLog jobLog = new XxlJobLog();                                 jobLog.setId(logId);

                                    jobLog.setHandleTime(new Date());                                 jobLog.setHandleCode(ReturnT.FAIL_CODE);                                 jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );

                                    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(jobLog);                             }

                            }                     } catch (Exception e) {                         if (!toStop) {                             logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);                         }                     }

                        try {                         TimeUnit.SECONDS.sleep(60);                     } catch (Exception e) {                         if (!toStop) {                             logger.error(e.getMessage(), e);                         }                     }

                    }

                    logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");

                }         });         monitorThread.setDaemon(true);         monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");         monitorThread.start();     }

        public void toStop(){         toStop = true;         // interrupt and wait         monitorThread.interrupt();         try {             monitorThread.join();         } catch (InterruptedException e) {             logger.error(e.getMessage(), e);         }     }

    } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 分析: 该类主要是任务结果丢失处理

    Date losedTime = DateUtil.addMinutes(new Date(), -10)取到十分钟前的时间; 从xxl_job_log表查询调度记录停留在 “运行中” 状态超过10min,且对应执行器心跳注册失败不在线的记录id集合;     <select id="findLostJobIds" resultType="long" >         SELECT t.id         FROM xxl_job_log AS t         WHERE t.trigger_code = 200             and t.handle_code = 0             and t.trigger_time <![CDATA[ <= ]]> #{losedTime}             and t.executor_address not in (                 SELECT t2.registry_value                 FROM xxl_job_registry AS t2             )     </select> 1 2 3 4 5 6 7 8 9 10 11 如果id集合不为空,则循环更新xxl_job_log表对应记录,将本地调度主动标记失败。 1.2.5 JobTriggerPoolHelper.toStart() // ---------------------- helper ----------------------

    private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();

    public static void toStart() {     helper.start(); } public static void toStop() {     helper.stop(); } 1 2 3 4 5 6 7 8 9 10 // ---------------------- trigger pool ----------------------

    // fast/slow thread pool private ThreadPoolExecutor fastTriggerPool = null; private ThreadPoolExecutor slowTriggerPool = null;

    public void start(){     fastTriggerPool = new ThreadPoolExecutor(             10,             XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),             60L,             TimeUnit.SECONDS,             new LinkedBlockingQueue<Runnable>(1000),             new ThreadFactory() {                 @Override                 public Thread newThread(Runnable r) {                     return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());                 }             });

        slowTriggerPool = new ThreadPoolExecutor(             10,             XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),             60L,             TimeUnit.SECONDS,             new LinkedBlockingQueue<Runnable>(2000),             new ThreadFactory() {                 @Override                 public Thread newThread(Runnable r) {                     return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());                 }             }); }

    public void stop() {     //triggerPool.shutdown();     fastTriggerPool.shutdownNow();     slowTriggerPool.shutdownNow();     logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success."); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 分析: 初始化快慢两个线程池:fastTriggerPool和slowTriggerPool。当任务执行的时候默认使用快线程池fastTriggerPool;当任务执行失败并且重试次数达到阈值的时候就会使用慢线程池slowTriggerPool来执行任务

    1.2.6 JobLogReportHelper.getInstance().start() public class JobLogReportHelper {     private static Logger logger = LoggerFactory.getLogger(JobLogReportHelper.class);

        private static JobLogReportHelper instance = new JobLogReportHelper();     public static JobLogReportHelper getInstance(){         return instance;     }

        private Thread logrThread;     private volatile boolean toStop = false;     public void start(){         logrThread = new Thread(new Runnable() {

                @Override             public void run() {

                    // last clean log time                 long lastCleanLogTime = 0;

                    while (!toStop) {

                        // 1、log-report refresh: refresh log report in 3 days                     try {

                            for (int i = 0; i < 3; i++) {

                                // today                             Calendar itemDay = Calendar.getInstance();                             itemDay.add(Calendar.DAY_OF_MONTH, -i);                             itemDay.set(Calendar.HOUR_OF_DAY, 0);                             itemDay.set(Calendar.MINUTE, 0);                             itemDay.set(Calendar.SECOND, 0);                             itemDay.set(Calendar.MILLISECOND, 0);

                                Date todayFrom = itemDay.getTime();

                                itemDay.set(Calendar.HOUR_OF_DAY, 23);                             itemDay.set(Calendar.MINUTE, 59);                             itemDay.set(Calendar.SECOND, 59);                             itemDay.set(Calendar.MILLISECOND, 999);

                                Date todayTo = itemDay.getTime();

                                // refresh log-report every minute                             XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();                             xxlJobLogReport.setTriggerDay(todayFrom);                             xxlJobLogReport.setRunningCount(0);                             xxlJobLogReport.setSucCount(0);                             xxlJobLogReport.setFailCount(0);

                                Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);                             if (triggerCountMap!=null && triggerCountMap.size()>0) {                                 int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;                                 int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;                                 int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;                                 int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;

                                    xxlJobLogReport.setRunningCount(triggerDayCountRunning);                                 xxlJobLogReport.setSucCount(triggerDayCountSuc);                                 xxlJobLogReport.setFailCount(triggerDayCountFail);                             }

                                // do refresh                             int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);                             if (ret < 1) {                                 XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);                             }                         }

                        } catch (Exception e) {                         if (!toStop) {                             logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);                         }                     }

                        // 2、log-clean: switch open & once each day                     if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0                             && System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {

                            // expire-time                         Calendar expiredDay = Calendar.getInstance();                         expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());                         expiredDay.set(Calendar.HOUR_OF_DAY, 0);                         expiredDay.set(Calendar.MINUTE, 0);                         expiredDay.set(Calendar.SECOND, 0);                         expiredDay.set(Calendar.MILLISECOND, 0);                         Date clearBeforeTime = expiredDay.getTime();

                            // clean expired log                         List<Long> logIds = null;                         do {                             logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);                             if (logIds!=null && logIds.size()>0) {                                 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);                             }                         } while (logIds!=null && logIds.size()>0);

                            // update clean time                         lastCleanLogTime = System.currentTimeMillis();                     }

                        try {                         TimeUnit.MINUTES.sleep(1);                     } catch (Exception e) {                         if (!toStop) {                             logger.error(e.getMessage(), e);                         }                     }

                    }

                    logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");

                }         });         logrThread.setDaemon(true);         logrThread.setName("xxl-job, admin JobLogReportHelper");         logrThread.start();     }

        public void toStop(){         toStop = true;         // interrupt and wait         logrThread.interrupt();         try {             logrThread.join();         } catch (InterruptedException e) {             logger.error(e.getMessage(), e);         }     }

    } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 分析: 创建一个守护线程(每一分钟工作一次)

    刷新前三天的日志相关报表数据: 取到前N天的开始时间和结束时间; 从xxl_job_log表统计相关指标;     <select id="findLogReport" resultType="java.util.Map" >         SELECT             COUNT(handle_code) triggerDayCount,             SUM(CASE WHEN (trigger_code in (0, 200) and handle_code = 0) then 1 else 0 end) as triggerDayCountRunning,             SUM(CASE WHEN handle_code = 200 then 1 else 0 end) as triggerDayCountSuc         FROM xxl_job_log         WHERE trigger_time BETWEEN #{from} and #{to}     </select> 1 2 3 4 5 6 7 8 保存结果到xxl_job_log_report表,有则更新,没有则新增 清理N天前的日志记录(N就是我们配置的xxl.job.logretentiondays参数,必须大于7,否则无效) 删除xxl_job_log表trigger_time是N天前的数据。 1.2.7 JobScheduleHelper.getInstance().start() public class JobScheduleHelper {     private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);

        private static JobScheduleHelper instance = new JobScheduleHelper();     public static JobScheduleHelper getInstance(){         return instance;     }

        public static final long PRE_READ_MS = 5000;    // pre read

        private Thread scheduleThread;     private Thread ringThread;     private volatile boolean scheduleThreadToStop = false;     private volatile boolean ringThreadToStop = false;     private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

        public void start(){

            // schedule thread         scheduleThread = new Thread(new Runnable() {             @Override             public void run() {

                    try {                     TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()00 );                 } catch (InterruptedException e) {                     if (!scheduleThreadToStop) {                         logger.error(e.getMessage(), e);                     }                 }                 logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

                    // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)                 int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

                    while (!scheduleThreadToStop) {

                        // Scan Job                     long start = System.currentTimeMillis();

                        Connection conn = null;                     Boolean connAutoCommit = null;                     PreparedStatement preparedStatement = null;

                        boolean preReadSuc = true;                     try {

                            conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();                         connAutoCommit = conn.getAutoCommit();                         conn.setAutoCommit(false);

                            preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );                         preparedStatement.execute();

                            // tx start

                            // 1、pre read                         long nowTime = System.currentTimeMillis();                         List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);                         if (scheduleList!=null && scheduleList.size()>0) {                             // 2、push time-ring                             for (XxlJobInfo jobInfo: scheduleList) {

                                    // time-ring jump                                 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {                                     // 2.1、trigger-expire > 5s:pass && make next-trigger-time                                     logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                        // fresh next                                     refreshNextValidTime(jobInfo, new Date());

                                    } else if (nowTime > jobInfo.getTriggerNextTime()) {                                     // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                        // 1、trigger                                     JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);                                     logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                        // 2、fresh next                                     refreshNextValidTime(jobInfo, new Date());

                                        // next-trigger-time in 5s, pre-read again                                     if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                            // 1、make ring second                                         int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)`);

                                            // 2、push time ring                                         pushTimeRing(ringSecond, jobInfo.getId());

                                            // 3、fresh next                                         refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                        }

                                    } else {                                     // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

                                        // 1、make ring second                                     int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)`);

                                        // 2、push time ring                                     pushTimeRing(ringSecond, jobInfo.getId());

                                        // 3、fresh next                                     refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                    }

                                }

                                // 3、update trigger info                             for (XxlJobInfo jobInfo: scheduleList) {                                 XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);                             }

                            } else {                             preReadSuc = false;                         }

                            // tx stop

                        } catch (Exception e) {                         if (!scheduleThreadToStop) {                             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);                         }                     } finally {

                            // commit                         if (conn != null) {                             try {                                 conn.commit();                             } catch (SQLException e) {                                 if (!scheduleThreadToStop) {                                     logger.error(e.getMessage(), e);                                 }                             }                             try {                                 conn.setAutoCommit(connAutoCommit);                             } catch (SQLException e) {                                 if (!scheduleThreadToStop) {                                     logger.error(e.getMessage(), e);                                 }                             }                             try {                                 conn.close();                             } catch (SQLException e) {                                 if (!scheduleThreadToStop) {                                     logger.error(e.getMessage(), e);                                 }                             }                         }

                            // close PreparedStatement                         if (null != preparedStatement) {                             try {                                 preparedStatement.close();                             } catch (SQLException e) {                                 if (!scheduleThreadToStop) {                                     logger.error(e.getMessage(), e);                                 }                             }                         }                     }                     long cost = System.currentTimeMillis()-start;

                        // Wait seconds, align second                     if (cost < 1000) {  // scan-overtime, not wait                         try {                             // pre-read period: success > scan each second; fail > skip this period;                             TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()00);                         } catch (InterruptedException e) {                             if (!scheduleThreadToStop) {                                 logger.error(e.getMessage(), e);                             }                         }                     }

                    }

                    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");             }         });         scheduleThread.setDaemon(true);         scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");         scheduleThread.start();

            // ring thread         ringThread = new Thread(new Runnable() {             @Override             public void run() {

                    // align second                 try {                     TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()00 );                 } catch (InterruptedException e) {                     if (!ringThreadToStop) {                         logger.error(e.getMessage(), e);                     }                 }

                    while (!ringThreadToStop) {

                        try {                         // second data                         List<Integer> ringItemData = new ArrayList<>();                         int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;                         for (int i = 0; i < 2; i++) {                             List<Integer> tmpData = ringData.remove( (nowSecond+60-i)` );                             if (tmpData != null) {                                 ringItemData.addAll(tmpData);                             }                         }

                            // ring trigger                         logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );                         if (ringItemData.size() > 0) {                             // do trigger                             for (int jobId: ringItemData) {                                 // do trigger                                 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);                             }                             // clear                             ringItemData.clear();                         }                     } catch (Exception e) {                         if (!ringThreadToStop) {                             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);                         }                     }

                        // next second, align second                     try {                         TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()00);                     } catch (InterruptedException e) {                         if (!ringThreadToStop) {                             logger.error(e.getMessage(), e);                         }                     }                 }                 logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");             }         });         ringThread.setDaemon(true);         ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");         ringThread.start();     }

        private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws ParseException {         Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(fromTime);         if (nextValidTime != null) {             jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());             jobInfo.setTriggerNextTime(nextValidTime.getTime());         } else {             jobInfo.setTriggerStatus(0);             jobInfo.setTriggerLastTime(0);             jobInfo.setTriggerNextTime(0);         }     }

        private void pushTimeRing(int ringSecond, int jobId){         // push async ring         List<Integer> ringItemData = ringData.get(ringSecond);         if (ringItemData == null) {             ringItemData = new ArrayList<Integer>();             ringData.put(ringSecond, ringItemData);         }         ringItemData.add(jobId);

            logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );     }

        public void toStop(){

            // 1、stop schedule         scheduleThreadToStop = true;         try {             TimeUnit.SECONDS.sleep(1);  // wait         } catch (InterruptedException e) {             logger.error(e.getMessage(), e);         }         if (scheduleThread.getState() != Thread.State.TERMINATED){             // interrupt and wait             scheduleThread.interrupt();             try {                 scheduleThread.join();             } catch (InterruptedException e) {                 logger.error(e.getMessage(), e);             }         }

            // if has ring data         boolean hasRingData = false;         if (!ringData.isEmpty()) {             for (int second : ringData.keySet()) {                 List<Integer> tmpData = ringData.get(second);                 if (tmpData!=null && tmpData.size()>0) {                     hasRingData = true;                     break;                 }             }         }         if (hasRingData) {             try {                 TimeUnit.SECONDS.sleep(8);             } catch (InterruptedException e) {                 logger.error(e.getMessage(), e);             }         }

            // stop ring (wait job-in-memory stop)         ringThreadToStop = true;         try {             TimeUnit.SECONDS.sleep(1);         } catch (InterruptedException e) {             logger.error(e.getMessage(), e);         }         if (ringThread.getState() != Thread.State.TERMINATED){             // interrupt and wait             ringThread.interrupt();             try {                 ringThread.join();             } catch (InterruptedException e) {                 logger.error(e.getMessage(), e);             }         }

            logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");     }

    }分析: 创建了两个守护线程scheduleThread和ringThread

    1.2.7.1 scheduleThread 根据treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)计算预读取任务数; 执行for update语句锁定任务的获取资格; 从xxl_job_info表查询未来5秒内即将要执行的任务集合;     <select id="scheduleJobQuery" parameterType="java.util.HashMap" resultMap="XxlJobInfo">         SELECT <include refid="Base_Column_List" />         FROM xxl_job_info AS t         WHERE t.trigger_status = 1             and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}         ORDER BY id ASC         LIMIT #{pagesize}     </select> 1 2 3 4 5 6 7 8 如果当前任务的触发时间已经超时5秒以上,跳过本次执行,计算下一次的执行时间 如果当前时间大于任务的下次触发时间,则调用JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);执行调度;计算下一次的执行时间;如果任务正在运行中并且下一次执行时间在5秒内,则进行此任务数据的缓存,处理逻辑就是第6步; 其他情况下则存入Map<Integer, List<Integer>> ringData 中,key为下次执行时间数,value为待执行任务id集合; 更新任务的触发调度信息; 1.2.7.2 ringThread 避免处理耗时太长,跨过刻度,向前校验一个刻度; 从Map<Integer, List<Integer>> ringData中取两个刻度(当前秒和上一秒)的任务id集合; 循环第二步的任务id集合,调用JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);触发调度。 1.3 分析XxlJobTrigger核心类 1.3.1 trigger()方法 public static void trigger(int jobId,                            TriggerTypeEnum triggerType,                            int failRetryCount,                            String executorShardingParam,                            String executorParam,                            String addressList) {

        // load data     XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);     if (jobInfo == null) {         logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);         return;     }     if (executorParam != null) {         jobInfo.setExecutorParam(executorParam);     }     int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();     XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

        // cover addressList     if (addressList!=null && addressList.trim().length()>0) {         group.setAddressType(1);         group.setAddressList(addressList.trim());     }

        // sharding param     int[] shardingParam = null;     if (executorShardingParam!=null){         String[] shardingArr = executorShardingParam.split("/");         if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {             shardingParam = new int[2];             shardingParam[0] = Integer.valueOf(shardingArr[0]);             shardingParam[1] = Integer.valueOf(shardingArr[1]);         }     }     if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)             && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()             && shardingParam==null) {         for (int i = 0; i < group.getRegistryList().size(); i++) {             processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());         }     } else {         if (shardingParam == null) {             shardingParam = new int[]{0, 1};         }         processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);     }

    } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 分析:

    根据jobId从数据库xxl_job_info表中加载任务信息XxlJobInfo; 校验方法入参executorParam,不为空则覆盖任务信息的executorParam; 校验方法入参failRetryCount,大于等于0则覆盖; 从数据库xxl_job_group表加载执行器信息,校验方法入参addressList,不为空则覆盖执行器的地址类型为手动注入1,地址列表为入参addressList; 校验方法入参executorShardingParam, 如果任务的路由策略是分片广播且执行器的注册地址列表不为空,需要根据分片参数调用本类中的processTrigger()方法。 1.3.2 processTrigger()方法 private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

        // param     ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy     ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy     String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

        // 1、save log-id     XxlJobLog jobLog = new XxlJobLog();     jobLog.setJobGroup(jobInfo.getJobGroup());     jobLog.setJobId(jobInfo.getId());     jobLog.setTriggerTime(new Date());     XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);     logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

        // 2、init trigger-param     TriggerParam triggerParam = new TriggerParam();     triggerParam.setJobId(jobInfo.getId());     triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());     triggerParam.setExecutorParams(jobInfo.getExecutorParam());     triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());     triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());     triggerParam.setLogId(jobLog.getId());     triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());     triggerParam.setGlueType(jobInfo.getGlueType());     triggerParam.setGlueSource(jobInfo.getGlueSource());     triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());     triggerParam.setBroadcastIndex(index);     triggerParam.setBroadcastTotal(total);

        // 3、init address     String address = null;     ReturnT<String> routeAddressResult = null;     if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {         if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {             if (index < group.getRegistryList().size()) {                 address = group.getRegistryList().get(index);             } else {                 address = group.getRegistryList().get(0);             }         } else {             routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());             if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {                 address = routeAddressResult.getContent();             }         }     } else {         routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));     }

        // 4、trigger remote executor     ReturnT<String> triggerResult = null;     if (address != null) {         triggerResult = runExecutor(triggerParam, address);     } else {         triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);     }

        // 5、collection trigger info     StringBuffer triggerMsgSb = new StringBuffer();     triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")             .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());     if (shardingParam != null) {         triggerMsgSb.append("("+shardingParam+")");     }     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

        triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")             .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

        // 6、save log trigger-info     jobLog.setExecutorAddress(address);     jobLog.setExecutorHandler(jobInfo.getExecutorHandler());     jobLog.setExecutorParam(jobInfo.getExecutorParam());     jobLog.setExecutorShardingParam(shardingParam);     jobLog.setExecutorFailRetryCount(finalFailRetryCount);     //jobLog.setTriggerTime();     jobLog.setTriggerCode(triggerResult.getCode());     jobLog.setTriggerMsg(triggerMsgSb.toString());     XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

        logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 分析:

    处理方法入参:阻塞处理策略、路由策略、分片参数; 保存一条日志记录到xxl_job_log表; 初始化任务调度参数(包括执行器参数、日志记录id、任务信息和分片参数); 初始化调度地址(如果路由策略是分片广播,则从执行器地址列表中获取分片参数i对应的地址即可;如果是其他路由策略则调用对应ExecutorRouter的route方法获取最终执行器地址); 调用runExecutor(triggerParam, address)方法执行远程任务调度; 组装任务触发调度信息; 将触发调度信息更新到对应日志记录中。 1.3.3 runExecutor()方法 public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){     ReturnT<String> runResult = null;     try {         ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);         runResult = executorBiz.run(triggerParam);     } catch (Exception e) {         logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);         runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));     }

        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");     runResultSB.append("<br>address:").append(address);     runResultSB.append("<br>code:").append(runResult.getCode());     runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());     return runResult; } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 分析:

    根据执行器地址获取对应的ExecutorBizClient(所有的执行器客户端会保存在ConcurrentMap<String, ExecutorBiz> executorBizRepository中); 调用ExecutorBizClient.run(triggerParam)方法; @Override public ReturnT<String> run(TriggerParam triggerParam) {     return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class); } 1 2 3 4 使用XxlJobRemotingUtil工具类发送了一个post请求 处理返回的结果信息; 二、彩蛋 本篇跟进了一下调度中心(xxl-job-admin)的启动过程和任务调度触发原理,下一篇我们继续跟进下调度中心的web页面端api调用源码分析。 ———————————————— 版权声明:本文为博主「RabbitsInTheGrass」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/RabbitInTheGrass/article/details/106996040

    Processed: 0.031, SQL: 9