package com.xxl.job.core.executor; import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.client.AdminBizClient; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.server.EmbedServer; import com.xxl.job.core.thread.JobLogFileCleanThread; import com.xxl.job.core.thread.JobThread; import com.xxl.job.core.thread.TriggerCallbackThread; import com.xxl.job.core.util.IpUtil; import com.xxl.job.core.util.NetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * 重写目的修改默认端口9999为10000避免和网关冲突 */ public class XxlJobExecutor { private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class); // ---------------------- param ---------------------- private String adminAddresses; private String accessToken; private String appname; private String address; private String ip; private int port; private String logPath; private int logRetentionDays; public void setAdminAddresses(String adminAddresses) { this.adminAddresses = adminAddresses; } public void setAccessToken(String accessToken) { this.accessToken = accessToken; } public void setAppname(String appname) { this.appname = appname; } public void setAddress(String address) { this.address = address; } public void setIp(String ip) { this.ip = ip; } public void setPort(int port) { this.port = port; } public void setLogPath(String logPath) { this.logPath = logPath; } public void setLogRetentionDays(int logRetentionDays) { this.logRetentionDays = logRetentionDays; } // ---------------------- start + stop ---------------------- public void start() throws Exception { // init logpath XxlJobFileAppender.initLogPath(logPath); // init invoker, admin-client initAdminBizList(adminAddresses, accessToken); // init JobLogFileCleanThread JobLogFileCleanThread.getInstance().start(logRetentionDays); // init TriggerCallbackThread TriggerCallbackThread.getInstance().start(); // init executor-server initEmbedServer(address, ip, port, appname, accessToken); } public void destroy() { // destory executor-server stopEmbedServer(); // destory jobThreadRepository if (jobThreadRepository.size() > 0) { for (Map.Entry item : jobThreadRepository.entrySet()) { JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job."); // wait for job thread push result to callback queue if (oldJobThread != null) { try { oldJobThread.join(); } catch (InterruptedException e) { logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e); } } } jobThreadRepository.clear(); } jobHandlerRepository.clear(); // destory JobLogFileCleanThread JobLogFileCleanThread.getInstance().toStop(); // destory TriggerCallbackThread TriggerCallbackThread.getInstance().toStop(); } // ---------------------- admin-client (rpc invoker) ---------------------- private static List adminBizList; private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses != null && adminAddresses.trim().length() > 0) { for (String address : adminAddresses.trim().split(",")) { if (address != null && address.trim().length() > 0) { AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); if (adminBizList == null) { adminBizList = new ArrayList(); } adminBizList.add(adminBiz); } } } } public static List getAdminBizList() { return adminBizList; } // ---------------------- executor-server (rpc provider) ---------------------- private EmbedServer embedServer = null; private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception { // fill ip port 修改默认端口 port = port > 0 ? port : NetUtil.findAvailablePort(10000); ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp(); // generate address if (address == null || address.trim().length() == 0) { String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); } // accessToken if (accessToken == null || accessToken.trim().length() == 0) { logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken."); } // start embedServer = new EmbedServer(); embedServer.start(address, port, appname, accessToken); } private void stopEmbedServer() { // stop provider factory if (embedServer != null) { try { embedServer.stop(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } // ---------------------- job handler repository ---------------------- private static ConcurrentMap jobHandlerRepository = new ConcurrentHashMap(); public static IJobHandler loadJobHandler(String name) { return jobHandlerRepository.get(name); } public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) { logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); return jobHandlerRepository.put(name, jobHandler); } // ---------------------- job thread repository ---------------------- private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap(); public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) { JobThread newJobThread = new JobThread(jobId, handler); newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); } return newJobThread; } public static JobThread removeJobThread(int jobId, String removeOldReason) { JobThread oldJobThread = jobThreadRepository.remove(jobId); if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); return oldJobThread; } return null; } public static JobThread loadJobThread(int jobId) { JobThread jobThread = jobThreadRepository.get(jobId); return jobThread; } }