|| package com.xxl.job.admin.core.thread;import com.xxl.job.admin.core.conf.XxlJobAdminConfig;import com.xxl.job.admin.core.cron.CronExpression;import com.xxl.job.admin.core.model.XxlJobInfo;import com.xxl.job.admin.core.trigger.TriggerTypeEnum;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.SQLException;import java.text.ParseException;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.TimeUnit;/** * @author xuxueli 2019-05-21 */public class JobScheduleHelper {    private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);    private static JobScheduleHelper instance = new JobScheduleHelper();    public static JobScheduleHelper getInstance(){        return instance;    }    public static final long PRE_READ_MS = 5000;    // pre read    private Thread scheduleThread;    private Thread ringThread;    private volatile boolean scheduleThreadToStop = false;    private volatile boolean ringThreadToStop = false;    private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();    public void start(){        // schedule thread        scheduleThread = new Thread(new Runnable() {            @Override            public void run() {                try {                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );                } catch (InterruptedException e) {                    if (!scheduleThreadToStop) {                        logger.error(e.getMessage(), e);                    }                }                logger.info(">>>>>>>>> init xxl-job admin scheduler success.");                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;                while (!scheduleThreadToStop) {                    // Scan Job                    long start = System.currentTimeMillis();                    Connection conn = null;                    Boolean connAutoCommit = null;                    PreparedStatement preparedStatement = null;                    boolean preReadSuc = true;                    try {                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();                        connAutoCommit = conn.getAutoCommit();                        conn.setAutoCommit(false);                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );                        preparedStatement.execute();                        // tx start                        // 1、pre read                        long nowTime = System.currentTimeMillis();                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);                        if (scheduleList!=null && scheduleList.size()>0) {                            // 2、push time-ring                            for (XxlJobInfo jobInfo: scheduleList) {                                // time-ring jump                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());                                    // fresh next                                    refreshNextValidTime(jobInfo, new Date());                                } else if (nowTime > jobInfo.getTriggerNextTime()) {                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time                                    // 1、trigger                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );                                    // 2、fresh next                                    refreshNextValidTime(jobInfo, new Date());                                    // next-trigger-time in 5s, pre-read again                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {                                        // 1、make ring second                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);                                        // 2、push time ring                                        pushTimeRing(ringSecond, jobInfo.getId());                                        // 3、fresh next                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));                                    }                                } else {                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time                                    // 1、make ring second                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);                                    // 2、push time ring                                    pushTimeRing(ringSecond, jobInfo.getId());                                    // 3、fresh next                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));                                }                            }                            // 3、update trigger info                            for (XxlJobInfo jobInfo: scheduleList) {                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);                            }                        } else {                            preReadSuc = false;                        }                        // tx stop                    } catch (Exception e) {                        if (!scheduleThreadToStop) {                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);                        }                    } finally {                        // commit                        if (conn != null) {                            try {                                conn.commit();                            } catch (SQLException e) {                                if (!scheduleThreadToStop) {                                    logger.error(e.getMessage(), e);                                }                            }                            try {                                conn.setAutoCommit(connAutoCommit);                            } catch (SQLException e) {                                if (!scheduleThreadToStop) {                                    logger.error(e.getMessage(), e);                                }                            }                            try {                                conn.close();                            } catch (SQLException e) {                                if (!scheduleThreadToStop) {                                    logger.error(e.getMessage(), e);                                }                            }                        }                        // close PreparedStatement                        if (null != preparedStatement) {                            try {                                preparedStatement.close();                            } catch (SQLException e) {                                if (!scheduleThreadToStop) {                                    logger.error(e.getMessage(), e);                                }                            }                        }                    }                    long cost = System.currentTimeMillis()-start;                    // Wait seconds, align second                    if (cost < 1000) {  // scan-overtime, not wait                        try {                            // pre-read period: success > scan each second; fail > skip this period;                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);                        } catch (InterruptedException e) {                            if (!scheduleThreadToStop) {                                logger.error(e.getMessage(), e);                            }                        }                    }                }                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");            }        });        scheduleThread.setDaemon(true);        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");        scheduleThread.start();        // ring thread        ringThread = new Thread(new Runnable() {            @Override            public void run() {                // align second                try {                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );                } catch (InterruptedException e) {                    if (!ringThreadToStop) {                        logger.error(e.getMessage(), e);                    }                }                while (!ringThreadToStop) {                    try {                        // second data                        List<Integer> ringItemData = new ArrayList<>();                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;                        for (int i = 0; i < 2; i++) {                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );                            if (tmpData != null) {                                ringItemData.addAll(tmpData);                            }                        }                        // ring trigger                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );                        if (ringItemData.size() > 0) {                            // do trigger                            for (int jobId: ringItemData) {                                // do trigger                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);                            }                            // clear                            ringItemData.clear();                        }                    } catch (Exception e) {                        if (!ringThreadToStop) {                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);                        }                    }                    // next second, align second                    try {                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);                    } catch (InterruptedException e) {                        if (!ringThreadToStop) {                            logger.error(e.getMessage(), e);                        }                    }                }                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");            }        });        ringThread.setDaemon(true);        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");        ringThread.start();    }    private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws ParseException {        Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(fromTime);        if (nextValidTime != null) {            jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());            jobInfo.setTriggerNextTime(nextValidTime.getTime());        } else {            jobInfo.setTriggerStatus(0);            jobInfo.setTriggerLastTime(0);            jobInfo.setTriggerNextTime(0);        }    }    private void pushTimeRing(int ringSecond, int jobId){        // push async ring        List<Integer> ringItemData = ringData.get(ringSecond);        if (ringItemData == null) {            ringItemData = new ArrayList<Integer>();            ringData.put(ringSecond, ringItemData);        }        ringItemData.add(jobId);        logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );    }    public void toStop(){        // 1、stop schedule        scheduleThreadToStop = true;        try {            TimeUnit.SECONDS.sleep(1);  // wait        } catch (InterruptedException e) {            logger.error(e.getMessage(), e);        }        if (scheduleThread.getState() != Thread.State.TERMINATED){            // interrupt and wait            scheduleThread.interrupt();            try {                scheduleThread.join();            } catch (InterruptedException e) {                logger.error(e.getMessage(), e);            }        }        // if has ring data        boolean hasRingData = false;        if (!ringData.isEmpty()) {            for (int second : ringData.keySet()) {                List<Integer> tmpData = ringData.get(second);                if (tmpData!=null && tmpData.size()>0) {                    hasRingData = true;                    break;                }            }        }        if (hasRingData) {            try {                TimeUnit.SECONDS.sleep(8);            } catch (InterruptedException e) {                logger.error(e.getMessage(), e);            }        }        // stop ring (wait job-in-memory stop)        ringThreadToStop = true;        try {            TimeUnit.SECONDS.sleep(1);        } catch (InterruptedException e) {            logger.error(e.getMessage(), e);        }        if (ringThread.getState() != Thread.State.TERMINATED){            // interrupt and wait            ringThread.interrupt();            try {                ringThread.join();            } catch (InterruptedException e) {                logger.error(e.getMessage(), e);            }        }        logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");    }}
 |