123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- package com.xxl.job.core.executor;
- import com.xxl.job.core.biz.AdminBiz;
- import com.xxl.job.core.biz.client.AdminBizClient;
- import com.xxl.job.core.handler.IJobHandler;
- import com.xxl.job.core.log.XxlJobFileAppender;
- import com.xxl.job.core.server.EmbedServer;
- import com.xxl.job.core.thread.JobLogFileCleanThread;
- import com.xxl.job.core.thread.JobThread;
- import com.xxl.job.core.thread.TriggerCallbackThread;
- import com.xxl.job.core.util.IpUtil;
- import com.xxl.job.core.util.NetUtil;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentMap;
- /**
- * 重写目的修改默认端口9999为10000避免和网关冲突
- */
- public class XxlJobExecutor {
- private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);
- // ---------------------- param ----------------------
- private String adminAddresses;
- private String accessToken;
- private String appname;
- private String address;
- private String ip;
- private int port;
- private String logPath;
- private int logRetentionDays;
- public void setAdminAddresses(String adminAddresses) {
- this.adminAddresses = adminAddresses;
- }
- public void setAccessToken(String accessToken) {
- this.accessToken = accessToken;
- }
- public void setAppname(String appname) {
- this.appname = appname;
- }
- public void setAddress(String address) {
- this.address = address;
- }
- public void setIp(String ip) {
- this.ip = ip;
- }
- public void setPort(int port) {
- this.port = port;
- }
- public void setLogPath(String logPath) {
- this.logPath = logPath;
- }
- public void setLogRetentionDays(int logRetentionDays) {
- this.logRetentionDays = logRetentionDays;
- }
- // ---------------------- start + stop ----------------------
- public void start() throws Exception {
- // init logpath
- XxlJobFileAppender.initLogPath(logPath);
- // init invoker, admin-client
- initAdminBizList(adminAddresses, accessToken);
- // init JobLogFileCleanThread
- JobLogFileCleanThread.getInstance().start(logRetentionDays);
- // init TriggerCallbackThread
- TriggerCallbackThread.getInstance().start();
- // init executor-server
- initEmbedServer(address, ip, port, appname, accessToken);
- }
- public void destroy() {
- // destory executor-server
- stopEmbedServer();
- // destory jobThreadRepository
- if (jobThreadRepository.size() > 0) {
- for (Map.Entry<Integer, JobThread> item : jobThreadRepository.entrySet()) {
- JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");
- // wait for job thread push result to callback queue
- if (oldJobThread != null) {
- try {
- oldJobThread.join();
- } catch (InterruptedException e) {
- logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);
- }
- }
- }
- jobThreadRepository.clear();
- }
- jobHandlerRepository.clear();
- // destory JobLogFileCleanThread
- JobLogFileCleanThread.getInstance().toStop();
- // destory TriggerCallbackThread
- TriggerCallbackThread.getInstance().toStop();
- }
- // ---------------------- admin-client (rpc invoker) ----------------------
- private static List<AdminBiz> adminBizList;
- private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
- if (adminAddresses != null && adminAddresses.trim().length() > 0) {
- for (String address : adminAddresses.trim().split(",")) {
- if (address != null && address.trim().length() > 0) {
- AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
- if (adminBizList == null) {
- adminBizList = new ArrayList<AdminBiz>();
- }
- adminBizList.add(adminBiz);
- }
- }
- }
- }
- public static List<AdminBiz> getAdminBizList() {
- return adminBizList;
- }
- // ---------------------- executor-server (rpc provider) ----------------------
- private EmbedServer embedServer = null;
- private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
- // fill ip port 修改默认端口
- port = port > 0 ? port : NetUtil.findAvailablePort(10000);
- ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp();
- // generate address
- if (address == null || address.trim().length() == 0) {
- String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
- address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
- }
- // accessToken
- if (accessToken == null || accessToken.trim().length() == 0) {
- logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
- }
- // start
- embedServer = new EmbedServer();
- embedServer.start(address, port, appname, accessToken);
- }
- private void stopEmbedServer() {
- // stop provider factory
- if (embedServer != null) {
- try {
- embedServer.stop();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- }
- }
- // ---------------------- job handler repository ----------------------
- private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
- public static IJobHandler loadJobHandler(String name) {
- return jobHandlerRepository.get(name);
- }
- public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {
- logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
- return jobHandlerRepository.put(name, jobHandler);
- }
- // ---------------------- job thread repository ----------------------
- private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
- public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) {
- JobThread newJobThread = new JobThread(jobId, handler);
- newJobThread.start();
- logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
- JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
- if (oldJobThread != null) {
- oldJobThread.toStop(removeOldReason);
- oldJobThread.interrupt();
- }
- return newJobThread;
- }
- public static JobThread removeJobThread(int jobId, String removeOldReason) {
- JobThread oldJobThread = jobThreadRepository.remove(jobId);
- if (oldJobThread != null) {
- oldJobThread.toStop(removeOldReason);
- oldJobThread.interrupt();
- return oldJobThread;
- }
- return null;
- }
- public static JobThread loadJobThread(int jobId) {
- JobThread jobThread = jobThreadRepository.get(jobId);
- return jobThread;
- }
- }
|