123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- package com.xxl.job.admin.core.trigger;
- import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
- import com.xxl.job.admin.core.model.XxlJobGroup;
- import com.xxl.job.admin.core.model.XxlJobInfo;
- import com.xxl.job.admin.core.model.XxlJobLog;
- import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
- import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
- import com.xxl.job.admin.core.util.I18nUtil;
- import com.xxl.job.core.biz.ExecutorBiz;
- import com.xxl.job.core.biz.model.ReturnT;
- import com.xxl.job.core.biz.model.TriggerParam;
- import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
- import com.xxl.job.core.util.IpUtil;
- import com.xxl.job.core.util.ThrowableUtil;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.Date;
- /**
- * xxl-job trigger
- * Created by xuxueli on 17/7/13.
- */
- public class XxlJobTrigger {
- private static Logger logger = LoggerFactory.getLogger(XxlJobTrigger.class);
- /**
- * trigger job
- *
- * @param jobId
- * @param triggerType
- * @param failRetryCount
- * >=0: use this param
- * <0: use param from job info config
- * @param executorShardingParam
- * @param executorParam
- * null: use job param
- * not null: cover job param
- * @param addressList
- * null: use executor addressList
- * not null: cover
- */
- public static void trigger(int jobId,
- TriggerTypeEnum triggerType,
- int failRetryCount,
- String executorShardingParam,
- String executorParam,
- String addressList) {
- // load data
- XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
- if (jobInfo == null) {
- logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
- return;
- }
- if (executorParam != null) {
- jobInfo.setExecutorParam(executorParam);
- }
- int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
- XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
- // cover addressList
- if (addressList!=null && addressList.trim().length()>0) {
- group.setAddressType(1);
- group.setAddressList(addressList.trim());
- }
- // sharding param
- int[] shardingParam = null;
- if (executorShardingParam!=null){
- String[] shardingArr = executorShardingParam.split("/");
- if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
- shardingParam = new int[2];
- shardingParam[0] = Integer.valueOf(shardingArr[0]);
- shardingParam[1] = Integer.valueOf(shardingArr[1]);
- }
- }
- if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
- && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
- && shardingParam==null) {
- for (int i = 0; i < group.getRegistryList().size(); i++) {
- processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
- }
- } else {
- if (shardingParam == null) {
- shardingParam = new int[]{0, 1};
- }
- processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
- }
- }
- private static boolean isNumeric(String str){
- try {
- int result = Integer.valueOf(str);
- return true;
- } catch (NumberFormatException e) {
- return false;
- }
- }
- /**
- * @param group job group, registry list may be empty
- * @param jobInfo
- * @param finalFailRetryCount
- * @param triggerType
- * @param index sharding index
- * @param total sharding index
- */
- private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
- // param
- ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
- ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
- String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
- // 1、save log-id
- XxlJobLog jobLog = new XxlJobLog();
- jobLog.setJobGroup(jobInfo.getJobGroup());
- jobLog.setJobId(jobInfo.getId());
- jobLog.setTriggerTime(new Date());
- XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
- logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
- // 2、init trigger-param
- TriggerParam triggerParam = new TriggerParam();
- triggerParam.setJobId(jobInfo.getId());
- triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
- triggerParam.setExecutorParams(jobInfo.getExecutorParam());
- triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
- triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
- triggerParam.setLogId(jobLog.getId());
- triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
- triggerParam.setGlueType(jobInfo.getGlueType());
- triggerParam.setGlueSource(jobInfo.getGlueSource());
- triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
- triggerParam.setBroadcastIndex(index);
- triggerParam.setBroadcastTotal(total);
- // 3、init address
- String address = null;
- ReturnT<String> routeAddressResult = null;
- if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
- if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
- if (index < group.getRegistryList().size()) {
- address = group.getRegistryList().get(index);
- } else {
- address = group.getRegistryList().get(0);
- }
- } else {
- routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
- if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
- address = routeAddressResult.getContent();
- }
- }
- } else {
- routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
- }
- // 4、trigger remote executor
- ReturnT<String> triggerResult = null;
- if (address != null) {
- triggerResult = runExecutor(triggerParam, address);
- } else {
- triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
- }
- // 5、collection trigger info
- StringBuffer triggerMsgSb = new StringBuffer();
- triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
- .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
- if (shardingParam != null) {
- triggerMsgSb.append("("+shardingParam+")");
- }
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
- triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
- triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
- .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
- // 6、save log trigger-info
- jobLog.setExecutorAddress(address);
- jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
- jobLog.setExecutorParam(jobInfo.getExecutorParam());
- jobLog.setExecutorShardingParam(shardingParam);
- jobLog.setExecutorFailRetryCount(finalFailRetryCount);
- //jobLog.setTriggerTime();
- jobLog.setTriggerCode(triggerResult.getCode());
- jobLog.setTriggerMsg(triggerMsgSb.toString());
- XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
- logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
- }
- /**
- * run executor
- * @param triggerParam
- * @param address
- * @return
- */
- public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
- ReturnT<String> runResult = null;
- try {
- ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
- runResult = executorBiz.run(triggerParam);
- } catch (Exception e) {
- logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
- runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
- }
- StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
- runResultSB.append("<br>address:").append(address);
- runResultSB.append("<br>code:").append(runResult.getCode());
- runResultSB.append("<br>msg:").append(runResult.getMsg());
- runResult.setMsg(runResultSB.toString());
- return runResult;
- }
- }
|