0fdfb12b300866f98f6371785a8df041482af2d7.svn-base 14 KB


  1. package com.xxl.job.admin.core.thread;
  2. import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
  3. import com.xxl.job.admin.core.cron.CronExpression;
  4. import com.xxl.job.admin.core.model.XxlJobInfo;
  5. import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import java.sql.Connection;
  9. import java.sql.PreparedStatement;
  10. import java.sql.SQLException;
  11. import java.text.ParseException;
  12. import java.util.*;
  13. import java.util.concurrent.ConcurrentHashMap;
  14. import java.util.concurrent.TimeUnit;
  15. /**
  16. * @author xuxueli 2019-05-21
  17. */
  18. public class JobScheduleHelper {
  19. private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);
  20. private static JobScheduleHelper instance = new JobScheduleHelper();
  21. public static JobScheduleHelper getInstance(){
  22. return instance;
  23. }
  24. public static final long PRE_READ_MS = 5000; // pre read
  25. private Thread scheduleThread;
  26. private Thread ringThread;
  27. private volatile boolean scheduleThreadToStop = false;
  28. private volatile boolean ringThreadToStop = false;
  29. private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
  30. public void start(){
  31. // schedule thread
  32. scheduleThread = new Thread(new Runnable() {
  33. @Override
  34. public void run() {
  35. try {
  36. TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
  37. } catch (InterruptedException e) {
  38. if (!scheduleThreadToStop) {
  39. logger.error(e.getMessage(), e);
  40. }
  41. }
  42. logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
  43. // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
  44. int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
  45. while (!scheduleThreadToStop) {
  46. // Scan Job
  47. long start = System.currentTimeMillis();
  48. Connection conn = null;
  49. Boolean connAutoCommit = null;
  50. PreparedStatement preparedStatement = null;
  51. boolean preReadSuc = true;
  52. try {
  53. conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
  54. connAutoCommit = conn.getAutoCommit();
  55. conn.setAutoCommit(false);
  56. preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
  57. preparedStatement.execute();
  58. // tx start
  59. // 1、pre read
  60. long nowTime = System.currentTimeMillis();
  61. List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
  62. if (scheduleList!=null && scheduleList.size()>0) {
  63. // 2、push time-ring
  64. for (XxlJobInfo jobInfo: scheduleList) {
  65. // time-ring jump
  66. if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
  67. // 2.1、trigger-expire > 5s:pass && make next-trigger-time
  68. logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
  69. // fresh next
  70. refreshNextValidTime(jobInfo, new Date());
  71. } else if (nowTime > jobInfo.getTriggerNextTime()) {
  72. // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
  73. // 1、trigger
  74. JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
  75. logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
  76. // 2、fresh next
  77. refreshNextValidTime(jobInfo, new Date());
  78. // next-trigger-time in 5s, pre-read again
  79. if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
  80. // 1、make ring second
  81. int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
  82. // 2、push time ring
  83. pushTimeRing(ringSecond, jobInfo.getId());
  84. // 3、fresh next
  85. refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
  86. }
  87. } else {
  88. // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
  89. // 1、make ring second
  90. int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
  91. // 2、push time ring
  92. pushTimeRing(ringSecond, jobInfo.getId());
  93. // 3、fresh next
  94. refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
  95. }
  96. }
  97. // 3、update trigger info
  98. for (XxlJobInfo jobInfo: scheduleList) {
  99. XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
  100. }
  101. } else {
  102. preReadSuc = false;
  103. }
  104. // tx stop
  105. } catch (Exception e) {
  106. if (!scheduleThreadToStop) {
  107. logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
  108. }
  109. } finally {
  110. // commit
  111. if (conn != null) {
  112. try {
  113. conn.commit();
  114. } catch (SQLException e) {
  115. if (!scheduleThreadToStop) {
  116. logger.error(e.getMessage(), e);
  117. }
  118. }
  119. try {
  120. conn.setAutoCommit(connAutoCommit);
  121. } catch (SQLException e) {
  122. if (!scheduleThreadToStop) {
  123. logger.error(e.getMessage(), e);
  124. }
  125. }
  126. try {
  127. conn.close();
  128. } catch (SQLException e) {
  129. if (!scheduleThreadToStop) {
  130. logger.error(e.getMessage(), e);
  131. }
  132. }
  133. }
  134. // close PreparedStatement
  135. if (null != preparedStatement) {
  136. try {
  137. preparedStatement.close();
  138. } catch (SQLException e) {
  139. if (!scheduleThreadToStop) {
  140. logger.error(e.getMessage(), e);
  141. }
  142. }
  143. }
  144. }
  145. long cost = System.currentTimeMillis()-start;
  146. // Wait seconds, align second
  147. if (cost < 1000) { // scan-overtime, not wait
  148. try {
  149. // pre-read period: success > scan each second; fail > skip this period;
  150. TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
  151. } catch (InterruptedException e) {
  152. if (!scheduleThreadToStop) {
  153. logger.error(e.getMessage(), e);
  154. }
  155. }
  156. }
  157. }
  158. logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
  159. }
  160. });
  161. scheduleThread.setDaemon(true);
  162. scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
  163. scheduleThread.start();
  164. // ring thread
  165. ringThread = new Thread(new Runnable() {
  166. @Override
  167. public void run() {
  168. // align second
  169. try {
  170. TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
  171. } catch (InterruptedException e) {
  172. if (!ringThreadToStop) {
  173. logger.error(e.getMessage(), e);
  174. }
  175. }
  176. while (!ringThreadToStop) {
  177. try {
  178. // second data
  179. List<Integer> ringItemData = new ArrayList<>();
  180. int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
  181. for (int i = 0; i < 2; i++) {
  182. List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
  183. if (tmpData != null) {
  184. ringItemData.addAll(tmpData);
  185. }
  186. }
  187. // ring trigger
  188. logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
  189. if (ringItemData.size() > 0) {
  190. // do trigger
  191. for (int jobId: ringItemData) {
  192. // do trigger
  193. JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
  194. }
  195. // clear
  196. ringItemData.clear();
  197. }
  198. } catch (Exception e) {
  199. if (!ringThreadToStop) {
  200. logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
  201. }
  202. }
  203. // next second, align second
  204. try {
  205. TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
  206. } catch (InterruptedException e) {
  207. if (!ringThreadToStop) {
  208. logger.error(e.getMessage(), e);
  209. }
  210. }
  211. }
  212. logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
  213. }
  214. });
  215. ringThread.setDaemon(true);
  216. ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
  217. ringThread.start();
  218. }
  219. private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws ParseException {
  220. Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(fromTime);
  221. if (nextValidTime != null) {
  222. jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
  223. jobInfo.setTriggerNextTime(nextValidTime.getTime());
  224. } else {
  225. jobInfo.setTriggerStatus(0);
  226. jobInfo.setTriggerLastTime(0);
  227. jobInfo.setTriggerNextTime(0);
  228. }
  229. }
  230. private void pushTimeRing(int ringSecond, int jobId){
  231. // push async ring
  232. List<Integer> ringItemData = ringData.get(ringSecond);
  233. if (ringItemData == null) {
  234. ringItemData = new ArrayList<Integer>();
  235. ringData.put(ringSecond, ringItemData);
  236. }
  237. ringItemData.add(jobId);
  238. logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
  239. }
  240. public void toStop(){
  241. // 1、stop schedule
  242. scheduleThreadToStop = true;
  243. try {
  244. TimeUnit.SECONDS.sleep(1); // wait
  245. } catch (InterruptedException e) {
  246. logger.error(e.getMessage(), e);
  247. }
  248. if (scheduleThread.getState() != Thread.State.TERMINATED){
  249. // interrupt and wait
  250. scheduleThread.interrupt();
  251. try {
  252. scheduleThread.join();
  253. } catch (InterruptedException e) {
  254. logger.error(e.getMessage(), e);
  255. }
  256. }
  257. // if has ring data
  258. boolean hasRingData = false;
  259. if (!ringData.isEmpty()) {
  260. for (int second : ringData.keySet()) {
  261. List<Integer> tmpData = ringData.get(second);
  262. if (tmpData!=null && tmpData.size()>0) {
  263. hasRingData = true;
  264. break;
  265. }
  266. }
  267. }
  268. if (hasRingData) {
  269. try {
  270. TimeUnit.SECONDS.sleep(8);
  271. } catch (InterruptedException e) {
  272. logger.error(e.getMessage(), e);
  273. }
  274. }
  275. // stop ring (wait job-in-memory stop)
  276. ringThreadToStop = true;
  277. try {
  278. TimeUnit.SECONDS.sleep(1);
  279. } catch (InterruptedException e) {
  280. logger.error(e.getMessage(), e);
  281. }
  282. if (ringThread.getState() != Thread.State.TERMINATED){
  283. // interrupt and wait
  284. ringThread.interrupt();
  285. try {
  286. ringThread.join();
  287. } catch (InterruptedException e) {
  288. logger.error(e.getMessage(), e);
  289. }
  290. }
  291. logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");
  292. }
  293. }