package org.jeecg.modules.message.websocket; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; import javax.annotation.Resource; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import org.jeecg.common.base.BaseMap; import org.jeecg.common.constant.WebsocketConst; import org.jeecg.common.modules.redis.client.JeecgRedisClient; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; /** * @Author scott * @Date 2019/11/29 9:41 * @Description: 此注解相当于设置访问URL */ @Component @Slf4j @ServerEndpoint("/websocket/{userId}") //此注解相当于设置访问URL public class WebSocket { /** * Redis触发监听名字 */ public static final String REDIS_TOPIC_NAME = "socketHandler"; private Session session; private static CopyOnWriteArraySet webSockets =new CopyOnWriteArraySet<>(); private static Map sessionPool = new HashMap(); @Resource private JeecgRedisClient jeecgRedisClient; @OnOpen public void onOpen(Session session, @PathParam(value="userId")String userId) { try { this.session = session; webSockets.add(this); sessionPool.put(userId, session); log.info("【websocket消息】有新的连接,总数为:"+webSockets.size()); } catch (Exception e) { } } @OnClose public void onClose() { try { webSockets.remove(this); log.info("【websocket消息】连接断开,总数为:"+webSockets.size()); } catch (Exception e) { } } @OnMessage public void onMessage(String message) { //todo 现在有个定时任务刷,应该去掉 log.debug("【websocket消息】收到客户端消息:"+message); JSONObject obj = new JSONObject(); obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);//业务类型 obj.put(WebsocketConst.MSG_TXT, "心跳响应");//消息内容 session.getAsyncRemote().sendText(obj.toJSONString()); } /** * ws推送消息 * * @param userId * @param message */ public void pushMessage(String userId, String message) { for (Map.Entry item : sessionPool.entrySet()) { //userId key值= {用户id + "_"+ 登录token的md5串} //TODO vue2未改key新规则,暂时不影响逻辑 if (item.getKey().contains(userId)) { Session session = item.getValue(); try { //update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU synchronized (session){ log.info("【系统 WebSocket】推送单人消息:" + message); session.getBasicRemote().sendText(message); } //update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU } catch (Exception e) { log.error(e.getMessage(),e); } } } } // 此为广播消息 public void sendAllMessage(String message) { // log.info("【websocket消息】广播消息:"+message); for(WebSocket webSocket : webSockets) { try { if(webSocket.session.isOpen()) { webSocket.session.getAsyncRemote().sendText(message); } } catch (Exception e) { e.printStackTrace(); } } } // 此为单点消息 public void sendOneMessage(String userId, String message) { Session session = sessionPool.get(userId); if (session != null&&session.isOpen()) { try { log.info("【websocket消息】 单点消息:"+message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } // 此为单点消息(多人) public void sendMoreMessage(String[] userIds, String message) { for(String userId:userIds) { Session session = sessionPool.get(userId); if (session != null&&session.isOpen()) { try { log.info("【websocket消息】 单点消息:"+message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } } //==========【采用redis发布订阅模式——推送消息】======================================================================================== /** * 后台发送消息到redis * * @param message */ public void sendMessage(String message) { //log.info("【系统 WebSocket】广播消息:" + message); BaseMap baseMap = new BaseMap(); baseMap.put("userId", ""); baseMap.put("message", message); jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap); } /** * 此为单点消息 redis * * @param userId * @param message */ public void sendMessage(String userId, String message) { BaseMap baseMap = new BaseMap(); baseMap.put("userId", userId); baseMap.put("message", message); jeecgRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap); } /** * 此为单点消息(多人) redis * * @param userIds * @param message */ public void sendMessage(String[] userIds, String message) { for (String userId : userIds) { sendMessage(userId, message); } } }