服务器主动发送响应?聊天模块如何实现?
一、背景知识
当我们在某聊天界面中发送一个消息时,如A给B发送了一个消息,而B马上就收到,仔细思考会发现以下问题 :
1. A给服务器发送请求,但服务器却给B发送了响应
2.B没有向服务器发送请求,却收到了响应
很明显,单纯的使用HTTP协议很难做到这一点,因此需要使用另一种应用层协议——WebSocket
二、了解WebSocket报文格式
2.1 简单了解websocket报文格式
1.FIN( 结束标志位):表示是否关闭连接;
2.RSV1~RSV2(保留位) :为后续可能的功能扩展保留
3.opcode(操作码):有多个,用于判断如何处理数据(如1表示文本数据)
4.MASK(掩码):与数据安全相关
5. 1)Payload len(载荷):7个bit,表示0-127字节
2)Extended payload length(扩展载荷):16个bit,
3)Extend payload length continued(扩展载荷):64个bit
(如果payload len小于126 ,模式1生效,如果等于126,模式2生效,等于127,模式3生效)
6.payload data(载荷)
2.2 了解WebSocket握手过程
WebSocket是由HTTP升级而成:
在这之后就建立好了WebSocket连接,后续就通过WebSocket进行数据传输了
三、基于WebSocket编写代码
使用WebSocket,需要引入相关依赖:
org.springframework.boot
spring-boot-starter-websocket
3.1 编写服务器示例代码
编写服务器代码主要有以下几步:
1. 创建一个类继承TextWebSocketHandler(处理通信流程);
2. 重写afterConnectionEstablished、handleTextMessage、handleTransportError、afterConnectionClosed四个方法;
3.实现WebSocketConfigurer接口,重写方法并配置路由(关联对应Handler的路径);
一、继承TextWebSockeHandler并从写上述4个方法:
@Component public class TestWebSocketController extends TextWebSocketHandler { @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { //这个方法会在websocket连接建立之后,自动调用 System.out.println("TestSocket 连接成功"); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { //websocket收到消息时自动调用 System.out.println("TestSocket 收到消息" + message.toString()); //session是个会话,记录了通信双方是谁 session.sendMessage(message); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { //连接异常时自动调用 System.out.println("TestSocket 连接异常"); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { //连接正常关闭后,自动调用 System.out.println("Test Socket 连接正常关闭"); } }
其中,handleTextMessage是这几个方法的重点,主要的代码逻辑都是在这里实现,形参message表示载荷中的数据,需要做的就是将这里的message发送给通信双方的另一方
二、实现WebSocketConfigurer接口,配置路由:
@Configuration @EnableWebSocket//启动websocket public class WebSocketConfig implements WebSocketConfigurer { @Autowired private TestWebSocketController testWebSocketController; @Autowired private WebSocketController webSocketController; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { // 通过这个方法,创建好的handler类注册到具体的路径 // 当浏览器,websocket请求路径时“/test”时,就会调用到TestWebSocketController这个类里的方法 registry.addHandler(testWebSocketController,"/test"); } }
3.2 编写前端示例代码
websocket 测试
3.3 效果展示
四、基于WebSocket实现网页聊天项目的消息转发
这里的网页聊天项目和网页聊天相似,如:
具体实现分为以下几步:
一、约定前后端交互接口
二、实现TextWebSocketHandler接口
1> 服务器消息转发逻辑分析
上图了解到,每个会话的好友可能是一个,也可能是多个,并且未上线的好友无法接收消息,而服务器转发消息,需要知道对端的WebSocketSession,因此,可以使用map存储上线用户的userId 和 webSocketSession
1.用户上线,map插入值;
2.用户下线,map 删除值;
@Component @Slf4j public class OnLineUserManager { //此处的 哈希表 要考虑线程安全问题 private ConcurrentHashMap
sessions = new ConcurrentHashMap<>(); // 1) 用户上线,哈希表插入值 public void online(Integer userId,WebSocketSession session){ if(sessions.get(userId) != null){ //说明用户已经上线,登录失败 log.info("[" + userId + "] 登录失败,已在其它地方登录!"); return; } sessions.put(userId,session); log.info("[" + userId + "] 上线!"); } // 2)用户下线,哈希表删除值 public void offline(Integer userId,WebSocketSession session){ WebSocketSession exitSession = sessions.get(userId); if(exitSession == session){ //两个session是同一个,才下线 sessions.remove(userId); log.info("[" + userId + "] 下线!"); } } // 3)根据userId 获取 WebSocketSession public WebSocketSession getSession(Integer userId){ return sessions.get(userId); } }
2>实现 TextWebSocketHandler 接口,重写4个方法
@Slf4j @Component public class WebSocketController extends TextWebSocketHandler { @Autowired private OnLineUserManager onLineUserManager; private ObjectMapper objectMapper = new ObjectMapper(); @Autowired private MessageMapper messageMapper; @Autowired private SessionMapper sessionMapper; @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { log.info("websocket 连接成功"); Integer userId = (Integer) session.getAttributes().get("session_userId"); if(userId == null){ return; } onLineUserManager.online(userId,session); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { log.info("websocket 收到消息" + message.toString()); //1.获取用户信息 Integer userId = (Integer) session.getAttributes().get("session_userId"); String username = (String) session.getAttributes().get("session_username"); if(userId == null || username == null){ log.info("用户未登录,无法转发"); return; } //2.解析请求,注意把json字符串转化为java对象 MessageRequest req = objectMapper.readValue(message.getPayload(), MessageRequest.class); if(req.getType().equals("message")){ //转发消息 transferMessage(userId,username,req); }else { log.info("req.type 有误:" + message.getPayload()); } } //通过这个方法进行转发 private void transferMessage(Integer fromId, String fromName,MessageRequest req) throws IOException { //1.构造待转发的响应对象 MessageResponse messageResponse = new MessageResponse("message",fromId,fromName, req.getSessionId(), req.getContent()); //转成字符串 String responseJson = objectMapper.writeValueAsString(messageResponse); log.info("转发消息:responseJson" + responseJson); //2.根据请求中的sessionId ,获取到message_session中有那些用户(查询数据库) List
friends = sessionMapper.selectFriendBySessionId(req.getSessionId(),fromId); //3.遍历users,通过OnLineUserManager获取到websocketsession发送响应, // !!!也要给自己发送一个消息,因此需要将自己也加入friends,让自己可以看到自己发送的消息 Friend myself = new Friend(); myself.setFriendId(fromId); myself.setFriendName(fromName); friends.add(myself); for (Friend friend:friends ) { WebSocketSession webSocketSession = onLineUserManager.getSession(friend.getFriendId()); if(webSocketSession == null){ //用户不在线,不发送 continue; } webSocketSession.sendMessage(new TextMessage(responseJson)); } //4.转发的消息需要存储在数据库,这样下线之后,重新上线可以获取历史消息 //向message表写入记录 Message message = new Message(); message.setFromId(fromId); message.setSessionId(req.getSessionId()); message.setContent(req.getContent()); messageMapper.insertMessage(message); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { log.info("websocket 连接异常" + exception.toString()); Integer userId = (Integer) session.getAttributes().get("session_userId"); if(userId == null){ return; } onLineUserManager.offline(userId,session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { log.info("websocket 连接正常关闭" + status.toString()); Integer userId = (Integer) session.getAttributes().get("session_userId"); if(userId == null){ return; } onLineUserManager.offline(userId,session); } }
3> 实现 WebSocketConfigurer 接口,配置路由
@Configuration @EnableWebSocket//启动websocket public class WebSocketConfig implements WebSocketConfigurer { @Autowired private TestWebSocketController testWebSocketController; @Autowired private WebSocketController webSocketController; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(webSocketController,"/WebSocketMessage") //!!!由于userId 和 username 都是保存在httpSession中的,无法直接在webSocketSession中使用,需要添加以下带代码 .addInterceptors(new HttpSessionHandshakeInterceptor()); } }
4> 测试结果