c11c8891ed62b1e8029aaadec9c00c2ea1b9d3f8.svn-base 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package com.xxl.job.core.executor;
  2. import com.xxl.job.core.biz.AdminBiz;
  3. import com.xxl.job.core.biz.client.AdminBizClient;
  4. import com.xxl.job.core.handler.IJobHandler;
  5. import com.xxl.job.core.log.XxlJobFileAppender;
  6. import com.xxl.job.core.server.EmbedServer;
  7. import com.xxl.job.core.thread.JobLogFileCleanThread;
  8. import com.xxl.job.core.thread.JobThread;
  9. import com.xxl.job.core.thread.TriggerCallbackThread;
  10. import com.xxl.job.core.util.IpUtil;
  11. import com.xxl.job.core.util.NetUtil;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import java.util.ArrayList;
  15. import java.util.List;
  16. import java.util.Map;
  17. import java.util.concurrent.ConcurrentHashMap;
  18. import java.util.concurrent.ConcurrentMap;
  19. /**
  20. * 重写目的修改默认端口9999为10000避免和网关冲突
  21. */
  22. public class XxlJobExecutor {
  23. private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);
  24. // ---------------------- param ----------------------
  25. private String adminAddresses;
  26. private String accessToken;
  27. private String appname;
  28. private String address;
  29. private String ip;
  30. private int port;
  31. private String logPath;
  32. private int logRetentionDays;
  33. public void setAdminAddresses(String adminAddresses) {
  34. this.adminAddresses = adminAddresses;
  35. }
  36. public void setAccessToken(String accessToken) {
  37. this.accessToken = accessToken;
  38. }
  39. public void setAppname(String appname) {
  40. this.appname = appname;
  41. }
  42. public void setAddress(String address) {
  43. this.address = address;
  44. }
  45. public void setIp(String ip) {
  46. this.ip = ip;
  47. }
  48. public void setPort(int port) {
  49. this.port = port;
  50. }
  51. public void setLogPath(String logPath) {
  52. this.logPath = logPath;
  53. }
  54. public void setLogRetentionDays(int logRetentionDays) {
  55. this.logRetentionDays = logRetentionDays;
  56. }
  57. // ---------------------- start + stop ----------------------
  58. public void start() throws Exception {
  59. // init logpath
  60. XxlJobFileAppender.initLogPath(logPath);
  61. // init invoker, admin-client
  62. initAdminBizList(adminAddresses, accessToken);
  63. // init JobLogFileCleanThread
  64. JobLogFileCleanThread.getInstance().start(logRetentionDays);
  65. // init TriggerCallbackThread
  66. TriggerCallbackThread.getInstance().start();
  67. // init executor-server
  68. initEmbedServer(address, ip, port, appname, accessToken);
  69. }
  70. public void destroy() {
  71. // destory executor-server
  72. stopEmbedServer();
  73. // destory jobThreadRepository
  74. if (jobThreadRepository.size() > 0) {
  75. for (Map.Entry<Integer, JobThread> item : jobThreadRepository.entrySet()) {
  76. JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");
  77. // wait for job thread push result to callback queue
  78. if (oldJobThread != null) {
  79. try {
  80. oldJobThread.join();
  81. } catch (InterruptedException e) {
  82. logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);
  83. }
  84. }
  85. }
  86. jobThreadRepository.clear();
  87. }
  88. jobHandlerRepository.clear();
  89. // destory JobLogFileCleanThread
  90. JobLogFileCleanThread.getInstance().toStop();
  91. // destory TriggerCallbackThread
  92. TriggerCallbackThread.getInstance().toStop();
  93. }
  94. // ---------------------- admin-client (rpc invoker) ----------------------
  95. private static List<AdminBiz> adminBizList;
  96. private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
  97. if (adminAddresses != null && adminAddresses.trim().length() > 0) {
  98. for (String address : adminAddresses.trim().split(",")) {
  99. if (address != null && address.trim().length() > 0) {
  100. AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
  101. if (adminBizList == null) {
  102. adminBizList = new ArrayList<AdminBiz>();
  103. }
  104. adminBizList.add(adminBiz);
  105. }
  106. }
  107. }
  108. }
  109. public static List<AdminBiz> getAdminBizList() {
  110. return adminBizList;
  111. }
  112. // ---------------------- executor-server (rpc provider) ----------------------
  113. private EmbedServer embedServer = null;
  114. private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
  115. // fill ip port 修改默认端口
  116. port = port > 0 ? port : NetUtil.findAvailablePort(10000);
  117. ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp();
  118. // generate address
  119. if (address == null || address.trim().length() == 0) {
  120. String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
  121. address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
  122. }
  123. // accessToken
  124. if (accessToken == null || accessToken.trim().length() == 0) {
  125. logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
  126. }
  127. // start
  128. embedServer = new EmbedServer();
  129. embedServer.start(address, port, appname, accessToken);
  130. }
  131. private void stopEmbedServer() {
  132. // stop provider factory
  133. if (embedServer != null) {
  134. try {
  135. embedServer.stop();
  136. } catch (Exception e) {
  137. logger.error(e.getMessage(), e);
  138. }
  139. }
  140. }
  141. // ---------------------- job handler repository ----------------------
  142. private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
  143. public static IJobHandler loadJobHandler(String name) {
  144. return jobHandlerRepository.get(name);
  145. }
  146. public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {
  147. logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
  148. return jobHandlerRepository.put(name, jobHandler);
  149. }
  150. // ---------------------- job thread repository ----------------------
  151. private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
  152. public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) {
  153. JobThread newJobThread = new JobThread(jobId, handler);
  154. newJobThread.start();
  155. logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
  156. JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
  157. if (oldJobThread != null) {
  158. oldJobThread.toStop(removeOldReason);
  159. oldJobThread.interrupt();
  160. }
  161. return newJobThread;
  162. }
  163. public static JobThread removeJobThread(int jobId, String removeOldReason) {
  164. JobThread oldJobThread = jobThreadRepository.remove(jobId);
  165. if (oldJobThread != null) {
  166. oldJobThread.toStop(removeOldReason);
  167. oldJobThread.interrupt();
  168. return oldJobThread;
  169. }
  170. return null;
  171. }
  172. public static JobThread loadJobThread(int jobId) {
  173. JobThread jobThread = jobThreadRepository.get(jobId);
  174. return jobThread;
  175. }
  176. }