websocket(java)集群方案

websocket(java)集群方案

redis 用来存用户与服务器的关系(redis可以任意替换为一种存储形式)

服务器 A 会订阅topic 为 WebSocket-A 的消息

服务器 B 会订阅topic 为 WebSocket-B 的消息

kafka 用来接收推送消息(替换为任意一种mq)

如果用户1需要给用户3发送信息,如果在单机情况下,由于用户3没有与server-A建立链接,所以推送不到。但是现在因为在redis存储了用户3和server-B的关联关系,所以直接推送到WebSocket-B主题之中即可。由server-B接受消息并推送给用户3。

优点:

1. 解决了集群情况下跨机器通讯问题。

缺点:

1. 没有达到随意增减节点的需求,新加机器需要配置server-id。

2. 编程复杂,需要手动注册kafka-listener消息节点。

再宕机的情况下,假如A服务器宕机了,WEBSOCKET-A的消息就没有消费者了,所以只要在能重启一台服务器名继续叫做A的机器即可,因为在代码中多了一次判断,调用逻辑如下

@KafkaListener(id="server-${websocket.server.id}",

topics = AppConstants.WEBSOCKET_TOPIC_PREFIX + "${websocket.server.id}",

containerFactory = "containerFactory")

public void handlerKafkaMessage(String data){

BaseMessage message = JsonUtils.deserialize(data, BaseMessage.class);

handleDataToUser(message);

}

public void handleDataToUser(BaseMessage message) {

if(StringUtils.isEmpty(message.getName())){

return;

}

String sessionMessage = cacheClient.getValue(AppConstants.WEBSOCKET_SESSION_ID_REDIS_PREFIX + message.getName());

if(StringUtils.isEmpty(sessionMessage)){

processAfterUserIsLogout(message);

return;

}

WebSocketSessionInfo sessionInfo = JsonUtils.deserialize(sessionMessage, WebSocketSessionInfo.class);

// 如果接收方连接在本机,直接发送

// 否则发送至kafka broker

if(Objects.equals(sessionInfo.getServerId(), serverId)) {

messagingTemplate.convertAndSendToUser(message.getName(), message.getDestination(), message.getMessageBody());

}else {

kafkaTemplate.send(AppConstants.WEBSOCKET_TOPIC_PREFIX + sessionInfo.getServerId(), JsonUtils.toJson(message));

}

}

private void processAfterUserIsLogout(BaseMessage message) {

logger.warn("用户[{}]已退出,未接收到消息[{}]", message.getName(), message.getMessageBody());

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

即即便是A服务器接收到了这条消息,我们也不确定客户是否在此中间重连了,所以需要去redis拿到当前客户真实的服务器,但是如果客户不断的重练,可能会导致消息一直发送不出去。但是这种也是能被接收的。

实现:https://github.com/wuhulala/spring-wesocket-cluster.git

相关推荐