若依java服务端添加websocket 包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
业务类
package com.ruoyi.project.websocket;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.framework.security.LoginUser;
import com.ruoyi.framework.security.service.TokenService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@ServerEndpoint("/websocket")
public class WebSocketServer {
private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
private static final AtomicInteger onlineCount = new AtomicInteger(0);
private static final Map<String, Session> sessionMap = new ConcurrentHashMap<>();
private static final Map<String, Long> lastActiveTimeMap = new ConcurrentHashMap<>();
private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private static final long HEARTBEAT_TIMEOUT = 30000;
private static TokenService tokenService;
@Autowired
public void setTokenService(TokenService tokenService) {
WebSocketServer.tokenService = tokenService;
}
private String token;
private LoginUser loginUser;
static {
scheduler.scheduleAtFixedRate(() -> {
long currentTime = System.currentTimeMillis();
lastActiveTimeMap.forEach((token, lastActiveTime) -> {
if (currentTime - lastActiveTime > HEARTBEAT_TIMEOUT) {
Session session = sessionMap.get(token);
if (session != null) {
try {
session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "心跳超时"));
} catch (IOException e) {
log.error("关闭WebSocket连接失败", e);
}
}
}
});
}, 0, HEARTBEAT_TIMEOUT / 2, TimeUnit.MILLISECONDS);
}
@OnOpen
public void onOpen(Session session) {
log.info("有新连接加入,等待认证...");
}
@OnClose
public void onClose() {
if (this.token != null) {
sessionMap.remove(this.token);
lastActiveTimeMap.remove(this.token);
subOnlineCount();
log.info("连接关闭:用户ID={},当前在线人数为:{}",
loginUser != null ? loginUser.getUserId() : "unknown",
getOnlineCount());
}
}
@OnMessage
public void onMessage(String message, Session session) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(message);
// 处理认证消息
if (this.token == null) {
if (!jsonNode.has("type") || !"AUTH".equals(jsonNode.get("type").asText())) {
sendError(session, "请先发送认证消息");
return;
}
String token = jsonNode.get("token").asText();
LoginUser user = tokenService.getLoginUser(token);
if (user == null) {
sendError(session, "无效的token");
session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "认证失败"));
return;
}
this.token = token;
this.loginUser = user;
sessionMap.put(token, session);
lastActiveTimeMap.put(token, System.currentTimeMillis());
addOnlineCount();
log.info("认证成功:用户ID={},用户名={}", user.getUserId(), user.getUsername());
// 发送认证成功响应
session.getBasicRemote().sendText(mapper.writeValueAsString(Map.of(
"type", "AUTH_RESPONSE",
"success", true,
"message", "认证成功",
"userId", user.getUserId()
)));
return;
}
// 更新最后活跃时间
lastActiveTimeMap.put(this.token, System.currentTimeMillis());
// 处理心跳消息
if (jsonNode.has("type") && "HEARTBEAT".equals(jsonNode.get("type").asText())) {
session.getBasicRemote().sendText(mapper.writeValueAsString(Map.of(
"type", "HEARTBEAT_ACK"
)));
return;
}
// 处理业务消息
if (jsonNode.has("type") && "MESSAGE".equals(jsonNode.get("type").asText())) {
String content = jsonNode.get("content").asText();
log.info("收到来自用户{}的消息: {}", loginUser.getUserId(), content);
// 业务处理逻辑...
session.getBasicRemote().sendText(mapper.writeValueAsString(Map.of(
"type", "MESSAGE_RESPONSE",
"content", "服务器收到消息: " + content,
"timestamp", System.currentTimeMillis()
)));
}
} catch (Exception e) {
log.error("处理消息异常", e);
try {
sendError(session, "处理消息时发生错误: " + e.getMessage());
} catch (IOException ex) {
log.error("发送错误消息失败", ex);
}
}
}
@OnError
public void onError(Session session, Throwable error) {
log.error("WebSocket发生错误", error);
}
private void sendError(Session session, String errorMessage) throws IOException {
session.getBasicRemote().sendText(new ObjectMapper().writeValueAsString(Map.of(
"type", "ERROR",
"message", errorMessage
)));
}
public static void sendToUser(String token, String message) {
Session session = sessionMap.get(token);
if (session != null && session.isOpen()) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("发送消息失败", e);
}
}
}
public static void sendToUserByUserId(Long userId, String message) {
sessionMap.forEach((token, session) -> {
try {
LoginUser user = tokenService.getLoginUser(token);
if (user != null && userId.equals(user.getUserId())) {
session.getBasicRemote().sendText(message);
}
} catch (Exception e) {
log.error("通过用户ID发送消息异常", e);
}
});
}
public static int getOnlineCount() {
return onlineCount.get();
}
public static void addOnlineCount() {
onlineCount.incrementAndGet();
}
public static void subOnlineCount() {
onlineCount.decrementAndGet();
}
}
前端添加websocket操作类 且作为全局单例模式 任何文件都可以调用
import { getToken } from '@/utils/auth'
import { Message, MessageBox } from 'element-ui'
class WebSocketClient {
constructor(options = {}) {
const defaultOptions = {
url: '',
heartBeat: 30000, // 心跳间隔30秒
reconnectDelay: 5000, // 重连延迟5秒
maxReconnectAttempts: 5, // 最大重连次数
onOpen: () => {},
onMessage: () => {},
onClose: () => {},
onError: () => {},
onAuthenticated: () => {} // 新增认证成功回调
}
this.options = { ...defaultOptions, ...options }
this.ws = null
this.reconnectAttempts = 0
this.heartBeatTimer = null
this.isManualClose = false
this.isAuthenticated = false // 认证状态
}
connect() {
if (this.ws) {
this.close()
}
this.ws = new WebSocket(this.options.url)
this.ws.onopen = (event) => {
this.reconnectAttempts = 0
this.options.onOpen(event)
// 连接建立后立即发送认证消息
this.sendAuthMessage()
this.startHeartBeat()
}
this.ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data)
// 处理认证响应
if (data.type === 'AUTH_RESPONSE') {
if (data.success) {
this.isAuthenticated = true
this.options.onAuthenticated(data)
Message.success('WebSocket认证成功')
} else {
Message.error(data.message || 'WebSocket认证失败')
this.close()
}
return
}
// 其他消息处理
this.options.onMessage(event)
this.resetHeartBeat()
} catch (e) {
console.error('消息解析失败:', e)
}
}
this.ws.onclose = (event) => {
this.isAuthenticated = false
this.options.onClose(event)
this.stopHeartBeat()
if (!this.isManualClose) {
this.reconnect()
}
}
this.ws.onerror = (error) => {
this.options.onError(error)
this.stopHeartBeat()
if (!this.isManualClose) {
this.reconnect()
}
}
}
// 发送认证消息
sendAuthMessage() {
const token = getToken()
if (!token) {
Message.error('未获取到登录Token,请重新登录')
this.close()
return
}
this.send({
type: 'AUTH',
token: token,
timestamp: new Date().getTime()
})
}
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data))
} else {
console.error('WebSocket未连接')
}
}
// 其他方法保持不变...
close() { /* ... */ }
reconnect() { /* ... */ }
startHeartBeat() { /* ... */ }
stopHeartBeat() { /* ... */ }
resetHeartBeat() { /* ... */ }
}
// 全局单例
let wsInstance = null
export function initWebSocket() {
if (wsInstance) {
return wsInstance
}
wsInstance = new WebSocketClient({
url: process.env.VUE_APP_WS_API,
onAuthenticated: (data) => {
// 认证成功后的处理
console.log('WebSocket认证成功', data)
},
onMessage: (event) => {
try {
const data = JSON.parse(event.data)
if (data.type === 'NOTIFICATION') {
MessageBox.alert(data.content, data.title || '系统通知', {
confirmButtonText: '确定',
type: data.level || 'info'
})
}
} catch (e) {
console.error('消息处理错误:', e)
}
},
onError: (error) => {
Message.error('WebSocket连接错误: ' + error.message)
}
})
return wsInstance
}
export function getWebSocket() {
return wsInstance
}
export function closeWebSocket() {
if (wsInstance) {
wsInstance.close()
wsInstance = null
}
}