964fbbc86b52f18a90c80ba06ca76baa2c595f26.svn-base 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package org.jeecg.modules.message.websocket;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.concurrent.CopyOnWriteArraySet;
  5. import javax.annotation.Resource;
  6. import javax.websocket.OnClose;
  7. import javax.websocket.OnMessage;
  8. import javax.websocket.OnOpen;
  9. import javax.websocket.Session;
  10. import javax.websocket.server.PathParam;
  11. import javax.websocket.server.ServerEndpoint;
  12. import org.jeecg.common.base.BaseMap;
  13. import org.jeecg.common.constant.WebsocketConst;
  14. import org.jeecg.common.modules.redis.client.JeecgRedisClient;
  15. import org.springframework.stereotype.Component;
  16. import com.alibaba.fastjson.JSONObject;
  17. import lombok.extern.slf4j.Slf4j;
  18. /**
  19. * @Author scott
  20. * @Date 2019/11/29 9:41
  21. * @Description: 此注解相当于设置访问URL
  22. */
  23. @Component
  24. @Slf4j
  25. @ServerEndpoint("/websocket/{userId}") //此注解相当于设置访问URL
  26. public class WebSocket {
  27. private Session session;
  28. private String userId;
  29. private static final String REDIS_TOPIC_NAME = "socketHandler";
  30. @Resource
  31. private JeecgRedisClient jeecgRedisClient;
  32. /**
  33. * 缓存 webSocket连接到单机服务class中(整体方案支持集群)
  34. */
  35. private static CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
  36. private static Map<String, Session> sessionPool = new HashMap<String, Session>();
  37. @OnOpen
  38. public void onOpen(Session session, @PathParam(value = "userId") String userId) {
  39. try {
  40. this.session = session;
  41. this.userId = userId;
  42. webSockets.add(this);
  43. sessionPool.put(userId, session);
  44. log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
  45. } catch (Exception e) {
  46. }
  47. }
  48. @OnClose
  49. public void onClose() {
  50. try {
  51. webSockets.remove(this);
  52. sessionPool.remove(this.userId);
  53. log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
  54. } catch (Exception e) {
  55. }
  56. }
  57. /**
  58. * 服务端推送消息
  59. *
  60. * @param userId
  61. * @param message
  62. */
  63. public void pushMessage(String userId, String message) {
  64. Session session = sessionPool.get(userId);
  65. if (session != null && session.isOpen()) {
  66. try {
  67. log.info("【websocket消息】 单点消息:" + message);
  68. session.getAsyncRemote().sendText(message);
  69. } catch (Exception e) {
  70. e.printStackTrace();
  71. }
  72. }
  73. }
  74. /**
  75. * 服务器端推送消息
  76. */
  77. public void pushMessage(String message) {
  78. try {
  79. webSockets.forEach(ws -> ws.session.getAsyncRemote().sendText(message));
  80. } catch (Exception e) {
  81. e.printStackTrace();
  82. }
  83. }
  84. @OnMessage
  85. public void onMessage(String message) {
  86. //todo 现在有个定时任务刷,应该去掉
  87. log.debug("【websocket消息】收到客户端消息:" + message);
  88. JSONObject obj = new JSONObject();
  89. //业务类型
  90. obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
  91. //消息内容
  92. obj.put(WebsocketConst.MSG_TXT, "心跳响应");
  93. for (WebSocket webSocket : webSockets) {
  94. webSocket.pushMessage(message);
  95. }
  96. }
  97. /**
  98. * 后台发送消息到redis
  99. *
  100. * @param message
  101. */
  102. public void sendMessage(String message) {
  103. log.info("【websocket消息】广播消息:" + message);
  104. BaseMap baseMap = new BaseMap();
  105. baseMap.put("userId", "");
  106. baseMap.put("message", message);
  107. jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
  108. }
  109. /**
  110. * 此为单点消息
  111. *
  112. * @param userId
  113. * @param message
  114. */
  115. public void sendMessage(String userId, String message) {
  116. BaseMap baseMap = new BaseMap();
  117. baseMap.put("userId", userId);
  118. baseMap.put("message", message);
  119. jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
  120. }
  121. /**
  122. * 此为单点消息(多人)
  123. *
  124. * @param userIds
  125. * @param message
  126. */
  127. public void sendMessage(String[] userIds, String message) {
  128. for (String userId : userIds) {
  129. sendMessage(userId, message);
  130. }
  131. }
  132. }