| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 | package com.xxl.job.core.executor;import com.xxl.job.core.biz.AdminBiz;import com.xxl.job.core.biz.client.AdminBizClient;import com.xxl.job.core.handler.IJobHandler;import com.xxl.job.core.log.XxlJobFileAppender;import com.xxl.job.core.server.EmbedServer;import com.xxl.job.core.thread.JobLogFileCleanThread;import com.xxl.job.core.thread.JobThread;import com.xxl.job.core.thread.TriggerCallbackThread;import com.xxl.job.core.util.IpUtil;import com.xxl.job.core.util.NetUtil;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;/** * 重写目的修改默认端口9999为10000避免和网关冲突 */public class XxlJobExecutor {    private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);    // ---------------------- param ----------------------    private String adminAddresses;    private String accessToken;    private String appname;    private String address;    private String ip;    private int port;    private String logPath;    private int logRetentionDays;    public void setAdminAddresses(String adminAddresses) {        this.adminAddresses = adminAddresses;    }    public void setAccessToken(String accessToken) {        this.accessToken = accessToken;    }    public void setAppname(String appname) {        this.appname = appname;    }    public void setAddress(String address) {        this.address = address;    }    public void setIp(String ip) {        this.ip = ip;    }    public void setPort(int port) {        this.port = port;    }    public void setLogPath(String logPath) {        this.logPath = logPath;    }    public void setLogRetentionDays(int logRetentionDays) {        this.logRetentionDays = logRetentionDays;    }    // ---------------------- start + stop ----------------------    public void start() throws Exception {        // init logpath        XxlJobFileAppender.initLogPath(logPath);        // init invoker, admin-client        initAdminBizList(adminAddresses, accessToken);        // init JobLogFileCleanThread        JobLogFileCleanThread.getInstance().start(logRetentionDays);        // init TriggerCallbackThread        TriggerCallbackThread.getInstance().start();        // init executor-server        initEmbedServer(address, ip, port, appname, accessToken);    }    public void destroy() {        // destory executor-server        stopEmbedServer();        // destory jobThreadRepository        if (jobThreadRepository.size() > 0) {            for (Map.Entry<Integer, JobThread> item : jobThreadRepository.entrySet()) {                JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");                // wait for job thread push result to callback queue                if (oldJobThread != null) {                    try {                        oldJobThread.join();                    } catch (InterruptedException e) {                        logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);                    }                }            }            jobThreadRepository.clear();        }        jobHandlerRepository.clear();        // destory JobLogFileCleanThread        JobLogFileCleanThread.getInstance().toStop();        // destory TriggerCallbackThread        TriggerCallbackThread.getInstance().toStop();    }    // ---------------------- admin-client (rpc invoker) ----------------------    private static List<AdminBiz> adminBizList;    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {        if (adminAddresses != null && adminAddresses.trim().length() > 0) {            for (String address : adminAddresses.trim().split(",")) {                if (address != null && address.trim().length() > 0) {                    AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);                    if (adminBizList == null) {                        adminBizList = new ArrayList<AdminBiz>();                    }                    adminBizList.add(adminBiz);                }            }        }    }    public static List<AdminBiz> getAdminBizList() {        return adminBizList;    }    // ---------------------- executor-server (rpc provider) ----------------------    private EmbedServer embedServer = null;    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {        // fill ip port 修改默认端口        port = port > 0 ? port : NetUtil.findAvailablePort(10000);        ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp();        // generate address        if (address == null || address.trim().length() == 0) {            String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);        }        // accessToken        if (accessToken == null || accessToken.trim().length() == 0) {            logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");        }        // start        embedServer = new EmbedServer();        embedServer.start(address, port, appname, accessToken);    }    private void stopEmbedServer() {        // stop provider factory        if (embedServer != null) {            try {                embedServer.stop();            } catch (Exception e) {                logger.error(e.getMessage(), e);            }        }    }    // ---------------------- job handler repository ----------------------    private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();    public static IJobHandler loadJobHandler(String name) {        return jobHandlerRepository.get(name);    }    public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {        logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);        return jobHandlerRepository.put(name, jobHandler);    }    // ---------------------- job thread repository ----------------------    private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();    public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) {        JobThread newJobThread = new JobThread(jobId, handler);        newJobThread.start();        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);    // putIfAbsent | oh my god, map's put method return the old value!!!        if (oldJobThread != null) {            oldJobThread.toStop(removeOldReason);            oldJobThread.interrupt();        }        return newJobThread;    }    public static JobThread removeJobThread(int jobId, String removeOldReason) {        JobThread oldJobThread = jobThreadRepository.remove(jobId);        if (oldJobThread != null) {            oldJobThread.toStop(removeOldReason);            oldJobThread.interrupt();            return oldJobThread;        }        return null;    }    public static JobThread loadJobThread(int jobId) {        JobThread jobThread = jobThreadRepository.get(jobId);        return jobThread;    }}
 |