c28896c6f09016c57cf9c8b04cb7daca1a939dd2.svn-base 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. package org.jeecg.boot.starter.rabbitmq.core;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
  5. import org.jeecg.common.config.mqtoken.UserTokenContext;
  6. import java.io.IOException;
  7. /**
  8. *
  9. * @author zyf
  10. */
  11. @Slf4j
  12. public class BaseRabbiMqHandler<T> {
  13. private String token= UserTokenContext.getToken();
  14. public void onMessage(T t, Long deliveryTag, Channel channel, MqListener mqListener) {
  15. try {
  16. UserTokenContext.setToken(token);
  17. mqListener.handler(t, channel);
  18. channel.basicAck(deliveryTag, false);
  19. } catch (Exception e) {
  20. log.info("接收消息失败,重新放回队列");
  21. try {
  22. /**
  23. * deliveryTag:该消息的index
  24. * multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
  25. * requeue:被拒绝的是否重新入队列
  26. */
  27. channel.basicNack(deliveryTag, false, true);
  28. } catch (IOException ex) {
  29. ex.printStackTrace();
  30. }
  31. }
  32. }
  33. }