94ccdd2b72c58faf9fd19b6f595c25217a8e656c.svn-base 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package com.xxl.job.admin.core.trigger;
  2. import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
  3. import com.xxl.job.admin.core.model.XxlJobGroup;
  4. import com.xxl.job.admin.core.model.XxlJobInfo;
  5. import com.xxl.job.admin.core.model.XxlJobLog;
  6. import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
  7. import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
  8. import com.xxl.job.admin.core.util.I18nUtil;
  9. import com.xxl.job.core.biz.ExecutorBiz;
  10. import com.xxl.job.core.biz.model.ReturnT;
  11. import com.xxl.job.core.biz.model.TriggerParam;
  12. import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
  13. import com.xxl.job.core.util.IpUtil;
  14. import com.xxl.job.core.util.ThrowableUtil;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import java.util.Date;
  18. /**
  19. * xxl-job trigger
  20. * Created by xuxueli on 17/7/13.
  21. */
  22. public class XxlJobTrigger {
  23. private static Logger logger = LoggerFactory.getLogger(XxlJobTrigger.class);
  24. /**
  25. * trigger job
  26. *
  27. * @param jobId
  28. * @param triggerType
  29. * @param failRetryCount
  30. * >=0: use this param
  31. * <0: use param from job info config
  32. * @param executorShardingParam
  33. * @param executorParam
  34. * null: use job param
  35. * not null: cover job param
  36. * @param addressList
  37. * null: use executor addressList
  38. * not null: cover
  39. */
  40. public static void trigger(int jobId,
  41. TriggerTypeEnum triggerType,
  42. int failRetryCount,
  43. String executorShardingParam,
  44. String executorParam,
  45. String addressList) {
  46. // load data
  47. XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
  48. if (jobInfo == null) {
  49. logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
  50. return;
  51. }
  52. if (executorParam != null) {
  53. jobInfo.setExecutorParam(executorParam);
  54. }
  55. int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
  56. XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
  57. // cover addressList
  58. if (addressList!=null && addressList.trim().length()>0) {
  59. group.setAddressType(1);
  60. group.setAddressList(addressList.trim());
  61. }
  62. // sharding param
  63. int[] shardingParam = null;
  64. if (executorShardingParam!=null){
  65. String[] shardingArr = executorShardingParam.split("/");
  66. if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
  67. shardingParam = new int[2];
  68. shardingParam[0] = Integer.valueOf(shardingArr[0]);
  69. shardingParam[1] = Integer.valueOf(shardingArr[1]);
  70. }
  71. }
  72. if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
  73. && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
  74. && shardingParam==null) {
  75. for (int i = 0; i < group.getRegistryList().size(); i++) {
  76. processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
  77. }
  78. } else {
  79. if (shardingParam == null) {
  80. shardingParam = new int[]{0, 1};
  81. }
  82. processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
  83. }
  84. }
  85. private static boolean isNumeric(String str){
  86. try {
  87. int result = Integer.valueOf(str);
  88. return true;
  89. } catch (NumberFormatException e) {
  90. return false;
  91. }
  92. }
  93. /**
  94. * @param group job group, registry list may be empty
  95. * @param jobInfo
  96. * @param finalFailRetryCount
  97. * @param triggerType
  98. * @param index sharding index
  99. * @param total sharding index
  100. */
  101. private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
  102. // param
  103. ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
  104. ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
  105. String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
  106. // 1、save log-id
  107. XxlJobLog jobLog = new XxlJobLog();
  108. jobLog.setJobGroup(jobInfo.getJobGroup());
  109. jobLog.setJobId(jobInfo.getId());
  110. jobLog.setTriggerTime(new Date());
  111. XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
  112. logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
  113. // 2、init trigger-param
  114. TriggerParam triggerParam = new TriggerParam();
  115. triggerParam.setJobId(jobInfo.getId());
  116. triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
  117. triggerParam.setExecutorParams(jobInfo.getExecutorParam());
  118. triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
  119. triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
  120. triggerParam.setLogId(jobLog.getId());
  121. triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
  122. triggerParam.setGlueType(jobInfo.getGlueType());
  123. triggerParam.setGlueSource(jobInfo.getGlueSource());
  124. triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
  125. triggerParam.setBroadcastIndex(index);
  126. triggerParam.setBroadcastTotal(total);
  127. // 3、init address
  128. String address = null;
  129. ReturnT<String> routeAddressResult = null;
  130. if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
  131. if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
  132. if (index < group.getRegistryList().size()) {
  133. address = group.getRegistryList().get(index);
  134. } else {
  135. address = group.getRegistryList().get(0);
  136. }
  137. } else {
  138. routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
  139. if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
  140. address = routeAddressResult.getContent();
  141. }
  142. }
  143. } else {
  144. routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
  145. }
  146. // 4、trigger remote executor
  147. ReturnT<String> triggerResult = null;
  148. if (address != null) {
  149. triggerResult = runExecutor(triggerParam, address);
  150. } else {
  151. triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
  152. }
  153. // 5、collection trigger info
  154. StringBuffer triggerMsgSb = new StringBuffer();
  155. triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
  156. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
  157. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
  158. .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
  159. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
  160. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
  161. if (shardingParam != null) {
  162. triggerMsgSb.append("("+shardingParam+")");
  163. }
  164. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
  165. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
  166. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
  167. triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
  168. .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
  169. // 6、save log trigger-info
  170. jobLog.setExecutorAddress(address);
  171. jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
  172. jobLog.setExecutorParam(jobInfo.getExecutorParam());
  173. jobLog.setExecutorShardingParam(shardingParam);
  174. jobLog.setExecutorFailRetryCount(finalFailRetryCount);
  175. //jobLog.setTriggerTime();
  176. jobLog.setTriggerCode(triggerResult.getCode());
  177. jobLog.setTriggerMsg(triggerMsgSb.toString());
  178. XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
  179. logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
  180. }
  181. /**
  182. * run executor
  183. * @param triggerParam
  184. * @param address
  185. * @return
  186. */
  187. public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
  188. ReturnT<String> runResult = null;
  189. try {
  190. ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
  191. runResult = executorBiz.run(triggerParam);
  192. } catch (Exception e) {
  193. logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
  194. runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
  195. }
  196. StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
  197. runResultSB.append("<br>address:").append(address);
  198. runResultSB.append("<br>code:").append(runResult.getCode());
  199. runResultSB.append("<br>msg:").append(runResult.getMsg());
  200. runResult.setMsg(runResultSB.toString());
  201. return runResult;
  202. }
  203. }