55f715bb6cd9dd236da3ffbb02e3e01a5bd4a368.svn-base 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package com.xxl.job.admin.core.thread;
  2. import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
  3. import com.xxl.job.admin.core.model.XxlJobGroup;
  4. import com.xxl.job.admin.core.model.XxlJobRegistry;
  5. import com.xxl.job.core.enums.RegistryConfig;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import java.util.*;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. * job registry instance
  12. * @author xuxueli 2016-10-02 19:10:24
  13. */
  14. public class JobRegistryMonitorHelper {
  15. private static Logger logger = LoggerFactory.getLogger(JobRegistryMonitorHelper.class);
  16. private static JobRegistryMonitorHelper instance = new JobRegistryMonitorHelper();
  17. public static JobRegistryMonitorHelper getInstance(){
  18. return instance;
  19. }
  20. private Thread registryThread;
  21. private volatile boolean toStop = false;
  22. public void start(){
  23. registryThread = new Thread(new Runnable() {
  24. @Override
  25. public void run() {
  26. while (!toStop) {
  27. try {
  28. // auto registry group
  29. List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
  30. if (groupList!=null && !groupList.isEmpty()) {
  31. // remove dead address (admin/executor)
  32. List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
  33. if (ids!=null && ids.size()>0) {
  34. XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
  35. }
  36. // fresh online address (admin/executor)
  37. HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
  38. List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
  39. if (list != null) {
  40. for (XxlJobRegistry item: list) {
  41. if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
  42. String appname = item.getRegistryKey();
  43. List<String> registryList = appAddressMap.get(appname);
  44. if (registryList == null) {
  45. registryList = new ArrayList<String>();
  46. }
  47. if (!registryList.contains(item.getRegistryValue())) {
  48. registryList.add(item.getRegistryValue());
  49. }
  50. appAddressMap.put(appname, registryList);
  51. }
  52. }
  53. }
  54. // fresh group address
  55. for (XxlJobGroup group: groupList) {
  56. List<String> registryList = appAddressMap.get(group.getAppname());
  57. String addressListStr = null;
  58. if (registryList!=null && !registryList.isEmpty()) {
  59. Collections.sort(registryList);
  60. addressListStr = "";
  61. for (String item:registryList) {
  62. addressListStr += item + ",";
  63. }
  64. addressListStr = addressListStr.substring(0, addressListStr.length()-1);
  65. }
  66. group.setAddressList(addressListStr);
  67. XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
  68. }
  69. }
  70. } catch (Exception e) {
  71. if (!toStop) {
  72. logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
  73. }
  74. }
  75. try {
  76. TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
  77. } catch (InterruptedException e) {
  78. if (!toStop) {
  79. logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
  80. }
  81. }
  82. }
  83. logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
  84. }
  85. });
  86. registryThread.setDaemon(true);
  87. registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");
  88. registryThread.start();
  89. }
  90. public void toStop(){
  91. toStop = true;
  92. // interrupt and wait
  93. registryThread.interrupt();
  94. try {
  95. registryThread.join();
  96. } catch (InterruptedException e) {
  97. logger.error(e.getMessage(), e);
  98. }
  99. }
  100. }