Categories
java

Ruoyi websocket 对接

若依java服务端添加websocket 包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

业务类

package com.ruoyi.project.websocket;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.framework.security.LoginUser;
import com.ruoyi.framework.security.service.TokenService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Component
@ServerEndpoint("/websocket")
public class WebSocketServer {
    private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
    
    private static final AtomicInteger onlineCount = new AtomicInteger(0);
    private static final Map<String, Session> sessionMap = new ConcurrentHashMap<>();
    private static final Map<String, Long> lastActiveTimeMap = new ConcurrentHashMap<>();
    private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private static final long HEARTBEAT_TIMEOUT = 30000;
    
    private static TokenService tokenService;
    
    @Autowired
    public void setTokenService(TokenService tokenService) {
        WebSocketServer.tokenService = tokenService;
    }
    
    private String token;
    private LoginUser loginUser;
    
    static {
        scheduler.scheduleAtFixedRate(() -> {
            long currentTime = System.currentTimeMillis();
            lastActiveTimeMap.forEach((token, lastActiveTime) -> {
                if (currentTime - lastActiveTime > HEARTBEAT_TIMEOUT) {
                    Session session = sessionMap.get(token);
                    if (session != null) {
                        try {
                            session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
                        } catch (IOException e) {
                            log.error("关闭WebSocket连接失败", e);
                        }
                    }
                }
            });
        }, 0, HEARTBEAT_TIMEOUT / 2, TimeUnit.MILLISECONDS);
    }
    
    @OnOpen
    public void onOpen(Session session) {
        log.info("有新连接加入,等待认证...");
    }
    
    @OnClose
    public void onClose() {
        if (this.token != null) {
            sessionMap.remove(this.token);
            lastActiveTimeMap.remove(this.token);
            subOnlineCount();
            log.info("连接关闭:用户ID={},当前在线人数为:{}", 
                    loginUser != null ? loginUser.getUserId() : "unknown", 
                    getOnlineCount());
        }
    }
    
    @OnMessage
    public void onMessage(String message, Session session) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            JsonNode jsonNode = mapper.readTree(message);
            
            // 处理认证消息
            if (this.token == null) {
                if (!jsonNode.has("type") || !"AUTH".equals(jsonNode.get("type").asText())) {
                    sendError(session, "请先发送认证消息");
                    return;
                }
                
                String token = jsonNode.get("token").asText();
                LoginUser user = tokenService.getLoginUser(token);
                if (user == null) {
                    sendError(session, "无效的token");
                    session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "认证失败"));
                    return;
                }
                
                this.token = token;
                this.loginUser = user;
                sessionMap.put(token, session);
                lastActiveTimeMap.put(token, System.currentTimeMillis());
                addOnlineCount();
                
                log.info("认证成功:用户ID={},用户名={}", user.getUserId(), user.getUsername());
                
                // 发送认证成功响应
                session.getBasicRemote().sendText(mapper.writeValueAsString(Map.of(
                    "type", "AUTH_RESPONSE",
                    "success", true,
                    "message", "认证成功",
                    "userId", user.getUserId()
                )));
                return;
            }
            
            // 更新最后活跃时间
            lastActiveTimeMap.put(this.token, System.currentTimeMillis());
            
            // 处理心跳消息
            if (jsonNode.has("type") && "HEARTBEAT".equals(jsonNode.get("type").asText())) {
                session.getBasicRemote().sendText(mapper.writeValueAsString(Map.of(
                    "type", "HEARTBEAT_ACK"
                )));
                return;
            }
            
            // 处理业务消息
            if (jsonNode.has("type") && "MESSAGE".equals(jsonNode.get("type").asText())) {
                String content = jsonNode.get("content").asText();
                log.info("收到来自用户{}的消息: {}", loginUser.getUserId(), content);
                
                // 业务处理逻辑...
                session.getBasicRemote().sendText(mapper.writeValueAsString(Map.of(
                    "type", "MESSAGE_RESPONSE",
                    "content", "服务器收到消息: " + content,
                    "timestamp", System.currentTimeMillis()
                )));
            }
            
        } catch (Exception e) {
            log.error("处理消息异常", e);
            try {
                sendError(session, "处理消息时发生错误: " + e.getMessage());
            } catch (IOException ex) {
                log.error("发送错误消息失败", ex);
            }
        }
    }
    
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("WebSocket发生错误", error);
    }
    
    private void sendError(Session session, String errorMessage) throws IOException {
        session.getBasicRemote().sendText(new ObjectMapper().writeValueAsString(Map.of(
            "type", "ERROR",
            "message", errorMessage
        )));
    }
    
    public static void sendToUser(String token, String message) {
        Session session = sessionMap.get(token);
        if (session != null && session.isOpen()) {
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                log.error("发送消息失败", e);
            }
        }
    }
    
    public static void sendToUserByUserId(Long userId, String message) {
        sessionMap.forEach((token, session) -> {
            try {
                LoginUser user = tokenService.getLoginUser(token);
                if (user != null && userId.equals(user.getUserId())) {
                    session.getBasicRemote().sendText(message);
                }
            } catch (Exception e) {
                log.error("通过用户ID发送消息异常", e);
            }
        });
    }
    
    public static int getOnlineCount() {
        return onlineCount.get();
    }
    
    public static void addOnlineCount() {
        onlineCount.incrementAndGet();
    }
    
    public static void subOnlineCount() {
        onlineCount.decrementAndGet();
    }
}

前端添加websocket操作类 且作为全局单例模式 任何文件都可以调用

import { getToken } from '@/utils/auth'
import { Message, MessageBox } from 'element-ui'

class WebSocketClient {
  constructor(options = {}) {
    const defaultOptions = {
      url: '',
      heartBeat: 30000,          // 心跳间隔30秒
      reconnectDelay: 5000,       // 重连延迟5秒
      maxReconnectAttempts: 5,    // 最大重连次数
      onOpen: () => {},
      onMessage: () => {},
      onClose: () => {},
      onError: () => {},
      onAuthenticated: () => {}   // 新增认证成功回调
    }

    this.options = { ...defaultOptions, ...options }
    this.ws = null
    this.reconnectAttempts = 0
    this.heartBeatTimer = null
    this.isManualClose = false
    this.isAuthenticated = false  // 认证状态
  }

  connect() {
    if (this.ws) {
      this.close()
    }

    this.ws = new WebSocket(this.options.url)

    this.ws.onopen = (event) => {
      this.reconnectAttempts = 0
      this.options.onOpen(event)
      
      // 连接建立后立即发送认证消息
      this.sendAuthMessage()
      
      this.startHeartBeat()
    }

    this.ws.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data)
        
        // 处理认证响应
        if (data.type === 'AUTH_RESPONSE') {
          if (data.success) {
            this.isAuthenticated = true
            this.options.onAuthenticated(data)
            Message.success('WebSocket认证成功')
          } else {
            Message.error(data.message || 'WebSocket认证失败')
            this.close()
          }
          return
        }
        
        // 其他消息处理
        this.options.onMessage(event)
        this.resetHeartBeat()
      } catch (e) {
        console.error('消息解析失败:', e)
      }
    }

    this.ws.onclose = (event) => {
      this.isAuthenticated = false
      this.options.onClose(event)
      this.stopHeartBeat()
      
      if (!this.isManualClose) {
        this.reconnect()
      }
    }

    this.ws.onerror = (error) => {
      this.options.onError(error)
      this.stopHeartBeat()
      
      if (!this.isManualClose) {
        this.reconnect()
      }
    }
  }

  // 发送认证消息
  sendAuthMessage() {
    const token = getToken()
    if (!token) {
      Message.error('未获取到登录Token,请重新登录')
      this.close()
      return
    }

    this.send({
      type: 'AUTH',
      token: token,
      timestamp: new Date().getTime()
    })
  }

  send(data) {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(data))
    } else {
      console.error('WebSocket未连接')
    }
  }

  // 其他方法保持不变...
  close() { /* ... */ }
  reconnect() { /* ... */ }
  startHeartBeat() { /* ... */ }
  stopHeartBeat() { /* ... */ }
  resetHeartBeat() { /* ... */ }
}

// 全局单例
let wsInstance = null

export function initWebSocket() {
  if (wsInstance) {
    return wsInstance
  }

  wsInstance = new WebSocketClient({
    url: process.env.VUE_APP_WS_API,
    onAuthenticated: (data) => {
      // 认证成功后的处理
      console.log('WebSocket认证成功', data)
    },
    onMessage: (event) => {
      try {
        const data = JSON.parse(event.data)
        if (data.type === 'NOTIFICATION') {
          MessageBox.alert(data.content, data.title || '系统通知', {
            confirmButtonText: '确定',
            type: data.level || 'info'
          })
        }
      } catch (e) {
        console.error('消息处理错误:', e)
      }
    },
    onError: (error) => {
      Message.error('WebSocket连接错误: ' + error.message)
    }
  })

  return wsInstance
}

export function getWebSocket() {
  return wsInstance
}

export function closeWebSocket() {
  if (wsInstance) {
    wsInstance.close()
    wsInstance = null
  }
}