27dfd554d1336f8ae518be170321809fd71d695d.svn-base 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package com.xxl.job.admin.core.thread;
  2. import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
  3. import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
  4. import com.xxl.job.admin.core.trigger.XxlJobTrigger;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import java.util.concurrent.*;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9. /**
  10. * job trigger thread pool helper
  11. *
  12. * @author xuxueli 2018-07-03 21:08:07
  13. */
  14. public class JobTriggerPoolHelper {
  15. private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
  16. // ---------------------- trigger pool ----------------------
  17. // fast/slow thread pool
  18. private ThreadPoolExecutor fastTriggerPool = null;
  19. private ThreadPoolExecutor slowTriggerPool = null;
  20. public void start(){
  21. fastTriggerPool = new ThreadPoolExecutor(
  22. 10,
  23. XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
  24. 60L,
  25. TimeUnit.SECONDS,
  26. new LinkedBlockingQueue<Runnable>(1000),
  27. new ThreadFactory() {
  28. @Override
  29. public Thread newThread(Runnable r) {
  30. return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
  31. }
  32. });
  33. slowTriggerPool = new ThreadPoolExecutor(
  34. 10,
  35. XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
  36. 60L,
  37. TimeUnit.SECONDS,
  38. new LinkedBlockingQueue<Runnable>(2000),
  39. new ThreadFactory() {
  40. @Override
  41. public Thread newThread(Runnable r) {
  42. return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
  43. }
  44. });
  45. }
  46. public void stop() {
  47. //triggerPool.shutdown();
  48. fastTriggerPool.shutdownNow();
  49. slowTriggerPool.shutdownNow();
  50. logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
  51. }
  52. // job timeout count
  53. private volatile long minTim = System.currentTimeMillis()/60000; // ms > min
  54. private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
  55. /**
  56. * add trigger
  57. */
  58. public void addTrigger(final int jobId,
  59. final TriggerTypeEnum triggerType,
  60. final int failRetryCount,
  61. final String executorShardingParam,
  62. final String executorParam,
  63. final String addressList) {
  64. // choose thread pool
  65. ThreadPoolExecutor triggerPool_ = fastTriggerPool;
  66. AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
  67. if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
  68. triggerPool_ = slowTriggerPool;
  69. }
  70. // trigger
  71. triggerPool_.execute(new Runnable() {
  72. @Override
  73. public void run() {
  74. long start = System.currentTimeMillis();
  75. try {
  76. // do trigger
  77. XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
  78. } catch (Exception e) {
  79. logger.error(e.getMessage(), e);
  80. } finally {
  81. // check timeout-count-map
  82. long minTim_now = System.currentTimeMillis()/60000;
  83. if (minTim != minTim_now) {
  84. minTim = minTim_now;
  85. jobTimeoutCountMap.clear();
  86. }
  87. // incr timeout-count-map
  88. long cost = System.currentTimeMillis()-start;
  89. if (cost > 500) { // ob-timeout threshold 500ms
  90. AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
  91. if (timeoutCount != null) {
  92. timeoutCount.incrementAndGet();
  93. }
  94. }
  95. }
  96. }
  97. });
  98. }
  99. // ---------------------- helper ----------------------
  100. private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
  101. public static void toStart() {
  102. helper.start();
  103. }
  104. public static void toStop() {
  105. helper.stop();
  106. }
  107. /**
  108. * @param jobId
  109. * @param triggerType
  110. * @param failRetryCount
  111. * >=0: use this param
  112. * <0: use param from job info config
  113. * @param executorShardingParam
  114. * @param executorParam
  115. * null: use job param
  116. * not null: cover job param
  117. */
  118. public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
  119. helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
  120. }
  121. }