| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 | package org.jeecg.modules.demo.mock.vxe.websocket;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import lombok.extern.slf4j.Slf4j;import org.jeecg.common.constant.VXESocketConst;import org.springframework.stereotype.Component;import javax.websocket.OnClose;import javax.websocket.OnMessage;import javax.websocket.OnOpen;import javax.websocket.Session;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.util.Collection;import java.util.HashMap;import java.util.Map;/** * vxe WebSocket,用于实现实时无痕刷新的功能 */@Slf4j@Component@ServerEndpoint("/vxeSocket/{userId}/{pageId}")public class VXESocket {    /**     * 当前 session     */    private Session session;    /**     * 当前用户id     */    private String userId;    /**     * 页面id,用于标识同一用户,不同页面的数据     */    private String pageId;    /**     * 当前socket唯一id     */    private String socketId;    /**     * 用户连接池,包含单个用户的所有socket连接;     * 因为一个用户可能打开多个页面,多个页面就会有多个连接;     * key是userId,value是Map对象;子Map的key是pageId,value是VXESocket对象     */    private static Map<String, Map<String, VXESocket>> userPool = new HashMap<>();    /**     * 连接池,包含所有WebSocket连接;     * key是socketId,value是VXESocket对象     */    private static Map<String, VXESocket> socketPool = new HashMap<>();    /**     * 获取某个用户所有的页面     */    public static Map<String, VXESocket> getUserPool(String userId) {        return userPool.computeIfAbsent(userId, k -> new HashMap<>());    }    /**     * 向当前用户发送消息     *     * @param message 消息内容     */    public void sendMessage(String message) {        try {            this.session.getAsyncRemote().sendText(message);        } catch (Exception e) {            log.error("【vxeSocket】消息发送失败:" + e.getMessage());        }    }    /**     * 封装消息json     *     * @param data 消息内容     */    public static String packageMessage(String type, Object data) {        JSONObject message = new JSONObject();        message.put(VXESocketConst.TYPE, type);        message.put(VXESocketConst.DATA, data);        return message.toJSONString();    }    /**     * 向指定用户的所有页面发送消息     *     * @param userId  接收消息的用户ID     * @param message 消息内容     */    public static void sendMessageTo(String userId, String message) {        Collection<VXESocket> values = getUserPool(userId).values();        if (values.size() > 0) {            for (VXESocket socketItem : values) {                socketItem.sendMessage(message);            }        } else {            log.warn("【vxeSocket】消息发送失败:userId\"" + userId + "\"不存在或未在线!");        }    }    /**     * 向指定用户的指定页面发送消息     *     * @param userId  接收消息的用户ID     * @param message 消息内容     */    public static void sendMessageTo(String userId, String pageId, String message) {        VXESocket socketItem = getUserPool(userId).get(pageId);        if (socketItem != null) {            socketItem.sendMessage(message);        } else {            log.warn("【vxeSocket】消息发送失败:userId\"" + userId + "\"的pageId\"" + pageId + "\"不存在或未在线!");        }    }    /**     * 向多个用户的所有页面发送消息     *     * @param userIds 接收消息的用户ID数组     * @param message 消息内容     */    public static void sendMessageTo(String[] userIds, String message) {        for (String userId : userIds) {            VXESocket.sendMessageTo(userId, message);        }    }    /**     * 向所有用户的所有页面发送消息     *     * @param message 消息内容     */    public static void sendMessageToAll(String message) {        for (VXESocket socketItem : socketPool.values()) {            socketItem.sendMessage(message);        }    }    /**     * websocket 开启连接     */    @OnOpen    public void onOpen(Session session, @PathParam("userId") String userId, @PathParam("pageId") String pageId) {        try {            this.userId = userId;            this.pageId = pageId;            this.socketId = userId + pageId;            this.session = session;            socketPool.put(this.socketId, this);            getUserPool(userId).put(this.pageId, this);            log.info("【vxeSocket】有新的连接,总数为:" + socketPool.size());        } catch (Exception ignored) {        }    }    /**     * websocket 断开连接     */    @OnClose    public void onClose() {        try {            socketPool.remove(this.socketId);            getUserPool(this.userId).remove(this.pageId);            log.info("【vxeSocket】连接断开,总数为:" + socketPool.size());        } catch (Exception ignored) {        }    }    /**     * websocket 收到消息     */    @OnMessage    public void onMessage(String message) {        // log.info("【vxeSocket】onMessage:" + message);        JSONObject json;        try {            json = JSON.parseObject(message);        } catch (Exception e) {            log.warn("【vxeSocket】收到不合法的消息:" + message);            return;        }        String type = json.getString(VXESocketConst.TYPE);        switch (type) {            // 心跳检测            case VXESocketConst.TYPE_HB:                this.sendMessage(VXESocket.packageMessage(type, true));                break;            // 更新form数据            case VXESocketConst.TYPE_UVT:                this.handleUpdateForm(json);                break;            default:                log.warn("【vxeSocket】收到不识别的消息类型:" + type);                break;        }    }    /**     * 处理 UpdateForm 事件     */    private void handleUpdateForm(JSONObject json) {        // 将事件转发给所有人        JSONObject data = json.getJSONObject(VXESocketConst.DATA);        VXESocket.sendMessageToAll(VXESocket.packageMessage(VXESocketConst.TYPE_UVT, data));    }}
 |