spring_boot应用转发websocket

需求或场景

当对接多个第三方的websocket服务时,如果要前端直连,业务全在前端、第三方的添加与删除不好控制(版本升级)等问题,所以需要搭建自已的websocket服务提供给前端标准api,然后后端去连接第三方的websocket服务,就是相于转发。

添加存储session的容器至配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Configuration
@EnableWebSocket
@Slf4j
public class WebSocketConfig implements WebSocketConfigurer {

private Map<String, WebSocketSession> aiClientSessionsMap = new ConcurrentHashMap<>();

private Map<String, WebSocketSession> myClientSessionsMap = new ConcurrentHashMap<>();

public WebSocketSession getAiClientSession(String clientId) {
try {
return aiClientSessionsMap.get(clientId);
} catch (Exception e) {
// 记录错误信息
log.error("Error occurred while getting WebSocket session for client: {}", clientId, e);
return null;
}
}

public synchronized void addAiClientSession(String clientId, WebSocketSession session) {
aiClientSessionsMap.put(clientId, session);
log.info("Added WebSocket session for client: {}", clientId);
}

public synchronized void removeAiClientSession(String clientId) {
WebSocketSession session = aiClientSessionsMap.remove(clientId);
if (session != null && session.isOpen()) {
try {
session.close();
} catch (IOException e) {
// 处理关闭失败的情况
log.error("Failed to close WebSocket session for client: {}", clientId, e);
}
}
}

public WebSocketSession getMyClientSession(String clientId) {
try {
return myClientSessionsMap.get(clientId);
} catch (Exception e) {
// 记录错误信息
log.error("Error occurred while getting WebSocket session for client: {}", clientId, e);
return null;
}
}

public synchronized void addMyClientSession(String clientId, WebSocketSession session) {
myClientSessionsMap.put(clientId, session);
log.info("Added WebSocket session for client: {}", clientId);
}

public synchronized void removeMyClientSession(String clientId) {
WebSocketSession session = myClientSessionsMap.remove(clientId);
if (session != null && session.isOpen()) {
try {
session.close();
} catch (IOException e) {
// 处理关闭失败的情况
log.error("Failed to close WebSocket session for client: {}", clientId, e);
}
}
}


@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new MyWebSocketHandler(this), "/websocket-endpoint")
.setAllowedOrigins("*"); // 允许跨域
}

设置session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Component
@Slf4j
public class MyWebSocketHandler implements WebSocketHandler {

private WebSocketConfig webSocketConfig;
public MyWebSocketHandler(WebSocketConfig webSocketConfig) {
this.webSocketConfig = webSocketConfig;
}

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.info("New WebSocket connection established: {}" , session.getId());
WebSocketSession aiClientSession = webSocketConfig.getAiClientSession(session.getId());
if (aiClientSession == null) {
try {
StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
URI uri = new URI("ws://127.0.0.1:5003/xxx");
aiClientSession = webSocketClient.doHandshake(new AiClientWebSocketHandler(webSocketConfig), headers, uri).get();
webSocketConfig.addAiClientSession(session.getId(), aiClientSession);
} catch (Exception e) {
log.error("获取aiwebsocket session 失败", e);
}
}
webSocketConfig.addMyClientSession(aiClientSession.getId(), session);
}

@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
log.info("Received message");
WebSocketSession aiClientSession = webSocketConfig.getAiClientSession(session.getId());
if (aiClientSession == null) {
session.sendMessage(new TextMessage("无法获取ai websocket session"));
return;
}
aiClientSession.sendMessage(message);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
log.info("WebSocket connection closed: {}" , session.getId());
closeAiClientSession(session.getId());
}

@Override
public boolean supportsPartialMessages() {
return false;
}

@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.error("WebSocket transport error: ", exception);
closeAiClientSession(session.getId());
}

public void closeAiClientSession(String clientId) {
webSocketConfig.removeAiClientSession(clientId);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Slf4j
public class AiClientWebSocketHandler implements WebSocketHandler {

private WebSocketConfig webSocketConfig;

public AiClientWebSocketHandler(WebSocketConfig webSocketConfig) {
this.webSocketConfig = webSocketConfig;
}

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 连接建立后的处理逻辑
log.info("连接建立后的处理逻辑");
}

@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
// 处理接收到的消息
log.info("处理接收到的消息");
WebSocketSession myClientSession = webSocketConfig.getMyClientSession(session.getId());
myClientSession.sendMessage(message);
}

@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
// 处理传输错误
log.error("处理传输错误");
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
// 连接关闭后的处理逻辑
log.info("连接关闭后的处理逻辑");
}

@Override
public boolean supportsPartialMessages() {
return false;
}

}

spring_boot应用转发websocket
http://hanqichuan.com/2024/10/14/spring/spring_boot应用转发websocket/
作者
韩启川
发布于
2024年10月14日
许可协议