00e666b2dc3a2a52c85b87f421108e69f8fbee6e.svn-base 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package org.jeecg.boot.starter.rabbitmq.config;
  2. import java.util.UUID;
  3. import org.jeecg.boot.starter.rabbitmq.event.JeecgRemoteApplicationEvent;
  4. import org.jeecg.common.config.mqtoken.TransmitUserTokenFilter;
  5. import org.springframework.amqp.core.AcknowledgeMode;
  6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  7. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  8. import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
  9. import org.springframework.amqp.support.ConsumerTagStrategy;
  10. import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
  11. import org.springframework.context.annotation.Bean;
  12. import org.springframework.context.annotation.Configuration;
  13. /**
  14. * 消息队列配置类
  15. *
  16. * @author zyf
  17. */
  18. @Configuration
  19. @RemoteApplicationEventScan(basePackageClasses = JeecgRemoteApplicationEvent.class)
  20. public class RabbitMqConfig {
  21. @Bean
  22. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  23. RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  24. //设置忽略声明异常
  25. rabbitAdmin.setIgnoreDeclarationExceptions(true);
  26. return rabbitAdmin;
  27. }
  28. /**
  29. * 注入获取token过滤器
  30. * @return
  31. */
  32. @Bean
  33. public TransmitUserTokenFilter transmitUserInfoFromHttpHeader(){
  34. return new TransmitUserTokenFilter();
  35. }
  36. @Bean
  37. public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
  38. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  39. container.setConnectionFactory(connectionFactory);
  40. //手动确认
  41. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  42. //当前的消费者数量
  43. container.setConcurrentConsumers(1);
  44. //最大的消费者数量
  45. container.setMaxConcurrentConsumers(1);
  46. //是否重回队列
  47. container.setDefaultRequeueRejected(true);
  48. //消费端的标签策略
  49. container.setConsumerTagStrategy(new ConsumerTagStrategy() {
  50. @Override
  51. public String createConsumerTag(String queue) {
  52. return queue + "_" + UUID.randomUUID().toString();
  53. }
  54. });
  55. return container;
  56. }
  57. }