6275ccbdb507ae39d118ca46b192cccd9fdbce37.svn-base 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. package org.jeecg.boot.starter.rabbitmq.client;
  2. import cn.hutool.core.util.ObjectUtil;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.jeecg.boot.starter.rabbitmq.event.EventObj;
  5. import org.jeecg.boot.starter.rabbitmq.event.JeecgRemoteApplicationEvent;
  6. import org.jeecg.boot.starter.rabbitmq.exchange.DelayExchangeBuilder;
  7. import org.jeecg.common.annotation.RabbitComponent;
  8. import org.jeecg.common.base.BaseMap;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.amqp.core.*;
  12. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  13. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  14. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  15. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.cloud.bus.BusProperties;
  18. import org.springframework.context.ApplicationContext;
  19. import org.springframework.context.ApplicationEventPublisher;
  20. import org.springframework.context.annotation.Bean;
  21. import org.springframework.context.annotation.Configuration;
  22. import javax.annotation.Resource;
  23. import java.lang.reflect.Method;
  24. import java.text.SimpleDateFormat;
  25. import java.util.Date;
  26. import java.util.HashMap;
  27. import java.util.Map;
  28. import java.util.Properties;
  29. /**
  30. * 消息队列客户端
  31. */
  32. @Slf4j
  33. @Configuration
  34. public class RabbitMqClient {
  35. private static final Logger logger = LoggerFactory.getLogger(RabbitMqClient.class);
  36. private final RabbitAdmin rabbitAdmin;
  37. private final RabbitTemplate rabbitTemplate;
  38. @Resource
  39. private SimpleMessageListenerContainer messageListenerContainer;
  40. @Resource
  41. BusProperties busProperties;
  42. @Resource
  43. private ApplicationEventPublisher publisher;
  44. @Resource
  45. private ApplicationContext applicationContext;
  46. @Bean
  47. public void initQueue() {
  48. Map<String, Object> beansWithRqbbitComponentMap = this.applicationContext.getBeansWithAnnotation(RabbitComponent.class);
  49. Class<? extends Object> clazz = null;
  50. for (Map.Entry<String, Object> entry : beansWithRqbbitComponentMap.entrySet()) {
  51. log.info("初始化队列............");
  52. //获取到实例对象的class信息
  53. clazz = entry.getValue().getClass();
  54. Method[] methods = clazz.getMethods();
  55. RabbitListener rabbitListener = clazz.getAnnotation(RabbitListener.class);
  56. if (ObjectUtil.isNotEmpty(rabbitListener)) {
  57. createQueue(rabbitListener);
  58. }
  59. for (Method method : methods) {
  60. RabbitListener methodRabbitListener = method.getAnnotation(RabbitListener.class);
  61. if (ObjectUtil.isNotEmpty(methodRabbitListener)) {
  62. createQueue(methodRabbitListener);
  63. }
  64. }
  65. }
  66. }
  67. /**
  68. * 初始化队列
  69. *
  70. * @param rabbitListener
  71. */
  72. private void createQueue(RabbitListener rabbitListener) {
  73. String[] queues = rabbitListener.queues();
  74. DirectExchange directExchange = createExchange(DelayExchangeBuilder.DELAY_EXCHANGE);
  75. //创建交换机
  76. rabbitAdmin.declareExchange(directExchange);
  77. if (ObjectUtil.isNotEmpty(queues)) {
  78. for (String queueName : queues) {
  79. Properties result = rabbitAdmin.getQueueProperties(queueName);
  80. if (ObjectUtil.isEmpty(result)) {
  81. Queue queue = new Queue(queueName);
  82. addQueue(queue);
  83. Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);
  84. rabbitAdmin.declareBinding(binding);
  85. log.info("创建队列:" + queueName);
  86. }else{
  87. log.info("已有队列:" + queueName);
  88. }
  89. }
  90. }
  91. }
  92. private Map sentObj = new HashMap<>();
  93. @Autowired
  94. public RabbitMqClient(RabbitAdmin rabbitAdmin, RabbitTemplate rabbitTemplate) {
  95. this.rabbitAdmin = rabbitAdmin;
  96. this.rabbitTemplate = rabbitTemplate;
  97. }
  98. /**
  99. * 发送远程事件
  100. *
  101. * @param handlerName
  102. * @param baseMap
  103. */
  104. public void publishEvent(String handlerName, BaseMap baseMap) {
  105. EventObj eventObj = new EventObj();
  106. eventObj.setHandlerName(handlerName);
  107. eventObj.setBaseMap(baseMap);
  108. publisher.publishEvent(new JeecgRemoteApplicationEvent(eventObj, busProperties.getId()));
  109. }
  110. /**
  111. * 转换Message对象
  112. *
  113. * @param messageType 返回消息类型 MessageProperties类中常量
  114. * @param msg
  115. * @return
  116. */
  117. public Message getMessage(String messageType, Object msg) {
  118. MessageProperties messageProperties = new MessageProperties();
  119. messageProperties.setContentType(messageType);
  120. Message message = new Message(msg.toString().getBytes(), messageProperties);
  121. return message;
  122. }
  123. /**
  124. * 有绑定Key的Exchange发送
  125. *
  126. * @param routingKey
  127. * @param msg
  128. */
  129. public void sendMessageToExchange(TopicExchange topicExchange, String routingKey, Object msg) {
  130. Message message = getMessage(MessageProperties.CONTENT_TYPE_JSON, msg);
  131. rabbitTemplate.send(topicExchange.getName(), routingKey, message);
  132. }
  133. /**
  134. * 没有绑定KEY的Exchange发送
  135. *
  136. * @param exchange
  137. * @param msg
  138. */
  139. public void sendMessageToExchange(TopicExchange topicExchange, AbstractExchange exchange, String msg) {
  140. addExchange(exchange);
  141. logger.info("RabbitMQ send " + exchange.getName() + "->" + msg);
  142. rabbitTemplate.convertAndSend(topicExchange.getName(), msg);
  143. }
  144. /**
  145. * 发送消息
  146. *
  147. * @param queueName 队列名称
  148. * @param params 消息内容map
  149. */
  150. public void sendMessage(String queueName, Object params) {
  151. log.info("发送消息到mq");
  152. try {
  153. rabbitTemplate.convertAndSend(DelayExchangeBuilder.DELAY_EXCHANGE, queueName, params, message -> {
  154. return message;
  155. });
  156. } catch (Exception e) {
  157. e.printStackTrace();
  158. }
  159. }
  160. /**
  161. * 发送消息
  162. *
  163. * @param queueName 队列名称
  164. */
  165. public void sendMessage(String queueName) {
  166. this.send(queueName, this.sentObj, 0);
  167. this.sentObj.clear();
  168. }
  169. public RabbitMqClient put(String key, Object value) {
  170. this.sentObj.put(key, value);
  171. return this;
  172. }
  173. /**
  174. * 延迟发送消息
  175. *
  176. * @param queueName 队列名称
  177. * @param params 消息内容params
  178. * @param expiration 延迟时间 单位毫秒
  179. */
  180. public void sendMessage(String queueName, Object params, Integer expiration) {
  181. this.send(queueName, params, expiration);
  182. }
  183. private void send(String queueName, Object params, Integer expiration) {
  184. Queue queue = new Queue(queueName);
  185. addQueue(queue);
  186. CustomExchange customExchange = DelayExchangeBuilder.buildExchange();
  187. rabbitAdmin.declareExchange(customExchange);
  188. Binding binding = BindingBuilder.bind(queue).to(customExchange).with(queueName).noargs();
  189. rabbitAdmin.declareBinding(binding);
  190. SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  191. log.debug("发送时间:" + sf.format(new Date()));
  192. messageListenerContainer.setQueueNames(queueName);
  193. /* messageListenerContainer.setMessageListener(new MqListener<Message>() {
  194. @Override
  195. public void onMessage(Message message, Channel channel) {
  196. MqListener messageListener = SpringContextHolder.getHandler(queueName + "Listener", MqListener.class);
  197. if (ObjectUtil.isNotEmpty(messageListener)) {
  198. messageListener.onMessage(message, channel);
  199. }
  200. }
  201. });*/
  202. rabbitTemplate.convertAndSend(DelayExchangeBuilder.DEFAULT_DELAY_EXCHANGE, queueName, params, message -> {
  203. if (expiration != null && expiration > 0) {
  204. message.getMessageProperties().setHeader("x-delay", expiration);
  205. }
  206. return message;
  207. });
  208. }
  209. /**
  210. * 给queue发送消息
  211. *
  212. * @param queueName
  213. */
  214. public String receiveFromQueue(String queueName) {
  215. return receiveFromQueue(DirectExchange.DEFAULT, queueName);
  216. }
  217. /**
  218. * 给direct交换机指定queue发送消息
  219. *
  220. * @param directExchange
  221. * @param queueName
  222. */
  223. public String receiveFromQueue(DirectExchange directExchange, String queueName) {
  224. Queue queue = new Queue(queueName);
  225. addQueue(queue);
  226. Binding binding = BindingBuilder.bind(queue).to(directExchange).withQueueName();
  227. rabbitAdmin.declareBinding(binding);
  228. String messages = (String) rabbitTemplate.receiveAndConvert(queueName);
  229. System.out.println("Receive:" + messages);
  230. return messages;
  231. }
  232. /**
  233. * 创建Exchange
  234. *
  235. * @param exchange
  236. */
  237. public void addExchange(AbstractExchange exchange) {
  238. rabbitAdmin.declareExchange(exchange);
  239. }
  240. /**
  241. * 删除一个Exchange
  242. *
  243. * @param exchangeName
  244. */
  245. public boolean deleteExchange(String exchangeName) {
  246. return rabbitAdmin.deleteExchange(exchangeName);
  247. }
  248. /**
  249. * 声明其名称自动命名的队列。它是用exclusive=true、autoDelete=true和 durable = false
  250. *
  251. * @return Queue
  252. */
  253. public Queue addQueue() {
  254. return rabbitAdmin.declareQueue();
  255. }
  256. /**
  257. * 创建一个指定的Queue
  258. *
  259. * @param queue
  260. * @return queueName
  261. */
  262. public String addQueue(Queue queue) {
  263. return rabbitAdmin.declareQueue(queue);
  264. }
  265. /**
  266. * 删除一个队列
  267. *
  268. * @param queueName the name of the queue.
  269. * @param unused true if the queue should be deleted only if not in use.
  270. * @param empty true if the queue should be deleted only if empty.
  271. */
  272. public void deleteQueue(String queueName, boolean unused, boolean empty) {
  273. rabbitAdmin.deleteQueue(queueName, unused, empty);
  274. }
  275. /**
  276. * 删除一个队列
  277. *
  278. * @param queueName
  279. * @return true if the queue existed and was deleted.
  280. */
  281. public boolean deleteQueue(String queueName) {
  282. return rabbitAdmin.deleteQueue(queueName);
  283. }
  284. /**
  285. * 绑定一个队列到一个匹配型交换器使用一个routingKey
  286. *
  287. * @param queue
  288. * @param exchange
  289. * @param routingKey
  290. */
  291. public void addBinding(Queue queue, TopicExchange exchange, String routingKey) {
  292. Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
  293. rabbitAdmin.declareBinding(binding);
  294. }
  295. /**
  296. * 绑定一个Exchange到一个匹配型Exchange 使用一个routingKey
  297. *
  298. * @param exchange
  299. * @param topicExchange
  300. * @param routingKey
  301. */
  302. public void addBinding(Exchange exchange, TopicExchange topicExchange, String routingKey) {
  303. Binding binding = BindingBuilder.bind(exchange).to(topicExchange).with(routingKey);
  304. rabbitAdmin.declareBinding(binding);
  305. }
  306. /**
  307. * 去掉一个binding
  308. *
  309. * @param binding
  310. */
  311. public void removeBinding(Binding binding) {
  312. rabbitAdmin.removeBinding(binding);
  313. }
  314. /**
  315. * 创建交换器
  316. *
  317. * @param exchangeName
  318. * @return
  319. */
  320. public DirectExchange createExchange(String exchangeName) {
  321. return new DirectExchange(exchangeName, true, false);
  322. }
  323. }