场景:用户在不同客户端登录时,通过websocket可以进行实时通知,同时需要发布项目到多台服务器上,使用nginx对websocket进行负载均衡。
具体示例:以用户为单位,登录A 、B 浏览器,后台通过websocket进行实时推送(部署在多台服务器),用户在 A 浏览器 操作 通知 已阅读按钮(只能看到自己的未处理消息),同时在 A、 B 浏览器会收到最新的 剩余阅读数量提示。
解决思想:通过redis的 发布订阅 消息通信模式-》用户完成订阅频道后,业务逻辑 进行消息发布,最后 webcokset进行消息推送。
- 各个用户登录后 前端通过 websocket 与后台进行通讯,关键:根据用户唯一性 进行redis的 指定频道 订阅(我这里以用户id为指定频道,保证每个用户只能订阅自己的消息,防止多用户消息串联)
- 业务逻辑以用户为单位,代码逻辑以sesion 为单位。单用户不同端登录时,每个端都是一个session。同时对session进行保存(不管在哪台服务器,只要监听到就能发送出去)
- 实现 redis的监听器接口: 监听是否发布新消息,如有 则进行推送
- nginx配置域名访问websocket,多台服务器部署
代码实践:
一、相关依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
二、websocket相关
- websocket配置类
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;/*** @ClassName WebSocketConfig* @Description* @Author 董海* @Date 2020/6/29 14:22**/
@Configuration
public class WebSocketConfig {/** 本地测试时,需要放开下面注释 。因为使用的是springboot 自身的tomcat。。。声明将@ServerEndPoint的注解交由Spring管理* 因为线上环境发布使用的是 外部的 tomcat* *//*@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}*//*** 通过此方式进行 注入 -----PushWebSocketServer 需要用到*/@Autowiredpublic void setRedisTemplate(RedisTemplate redisTemplate) {PushWebSocketServer.redisTemplate = redisTemplate;}
}
2.websocket推送类
import com.alibaba.fastjson.JSONObject;import com.rexel.backstage.framework.websocket.RedisReceiverChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.ConcurrentHashMap;/*** @ClassName WebSocketServer* @Description* @Author 董海* @Date 2020/6/28 16:25**/
@ServerEndpoint(value = "/rexel/pushServer/{userId}")
@Component
@Slf4j
public class PushWebSocketServer {/*** 必须按照此方式注入service*/public static RedisTemplate redisTemplate;/*** 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/private static int onlineCount = 0;/*** concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/private static ConcurrentHashMap<String, PushWebSocketServer> webSocketMap = new ConcurrentHashMap<>();/*** 与某个客户端的连接会话,需要通过它来给客户端发送数据*/private Session session;/*** 接收userId*/private String userId = "";/*** 用来引入刚才在webcoketConfig注入的类*/private RedisMessageListenerContainer container = SpringUtils.getBean("redisMessageListenerContainer");/*** 连接建立成功调用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam("userId") String userId) {//websocket会话this.session = session;this.userId = userId;if (webSocketMap.containsKey(userId)) {webSocketMap.remove(userId);//加入set中webSocketMap.put(userId, this);} else {//加入set中webSocketMap.put(userId, this);//在线数加1addOnlineCount();}//初次连接 进行推送sendAsyInfo("xx具体消息xx", userId);RedisReceiverChannel listener = new RedisReceiverChannel();//关键:放入sessionlistener.setSession(session);container.addMessageListener(listener, new PatternTopic(userId));}/*** 连接关闭调用的方法*/@OnClosepublic void onClose() {if (webSocketMap.containsKey(userId)) {webSocketMap.remove(userId);//从set中删除subOnlineCount();}}/*** @param error XX*/@OnErrorpublic void onError(Throwable error) {error.printStackTrace();}/*** 实现服务器主动推送** @param message 推送消息*/@OnMessagepublic void sendStringMessage(String message) {try {if (session.isOpen()) {this.session.getBasicRemote().sendText(message);}} catch (Exception e) {log.error();}}/*** 发送string 自定义消息*/public static void sendInfo(String message, @PathParam("userId") String userId) {if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {webSocketMap.get(userId).sendStringMessage(message);} }/*** 发布消息: message.getString("userId")-》根据业务逻辑指定通道** @param message xx*/public void sendPubsh(JSONObject message) {//1.将信息变更发布到redis中redisTemplate.convertAndSend(message.getString("userId"), message);}/*** 发送object 自定义消息** @param message 消息* @param userId 用户id*/public static void sendAsyInfo(String message, @PathParam("userId") String userId) {if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {webSocketMap.get(userId).sendStringMessage(message);} }public static synchronized int getOnlineCount() {return onlineCount;}public static synchronized void addOnlineCount() {PushWebSocketServer.onlineCount++;}public static synchronized void subOnlineCount() {PushWebSocketServer.onlineCount--;}
三、redis相关
1.redis配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configuration
public class RedisListenerConfig {/*** 初始化redis监听容器** @param connectionFactory RedisConnectionFactory* @return RedisMessageListenerContainer*/@Bean("redisMessageListenerContainer")RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);return container;}/*** 监听器1,监听频道** @param redisReceiver RedisReceiverChannel* @return MessageListenerAdapter*/@Bean("listenerAdapter")MessageListenerAdapter listenerAdapter(RedisReceiverChannel redisReceiver) {return new MessageListenerAdapter(redisReceiver, "onMessage");}/*** 初始化redis操作模板** @param connectionFactory RedisConnectionFactory* @return StringRedisTemplate*/@BeanStringRedisTemplate stringRedisTemplate(RedisConnectionFactory connectionFactory) {return new StringRedisTemplate(connectionFactory);}
2.实现redis监听类
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import javax.websocket.Session;
import java.io.IOException;
/*** @ClassName RedisReceiverChannel* @Description 创建消息订阅监听者类* @Author 董海* @Date 2020/7/27 11:27**/
@Slf4j
@Component
public class RedisReceiverChannel implements MessageListener {/*** websocket客户端连接会话对象*/private Session session;public void setSession(Session session) {this.session = session;}/*** 实现MessageListener接口的onMessage()方法** @param message 消息* @param bytes 消息*/@Overridepublic void onMessage(Message message, byte[] bytes) {// message 为 业务代码推送过来的消息JSONObject jsonObject = JSONObject.parseObject(message.toString());if (session != null && session.isOpen()) {try {//进行推送session.getBasicRemote().sendText(jsonObject.getString("message"));} catch (IOException e) {log.error(e.toString());}}}
四、业务代码推送
public class tets{@Autowiredprivate PushWebSocketServer pushWebSocketServer;public void a(){//业务逻辑代码//进行推送JSONObject jsonObject = new JSONObject();jsonObject.put("userId", "你的唯一id");jsonObject.put("message", "你的消息");// 发布到redis 指定频道pushWebSocketServer.sendPubsh(jsonObject);}
}