feature: [netty] 基于netty实现聊天室功能

This commit is contained in:
ovo 2025-05-13 18:16:13 +08:00
parent e63053c3e2
commit 1a2a83335f
24 changed files with 586 additions and 72 deletions

View File

@ -1,3 +1 @@
进度表
webChat

View File

@ -4,7 +4,7 @@ import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import com.guwan.backend.Handler.MyMetaObjectHandler;
import com.guwan.backend.handler.MyMetaObjectHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

View File

@ -0,0 +1,49 @@
package com.guwan.backend.config;
import com.guwan.backend.netty.chat.CourseChatWebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.HttpServerCodec;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NettyConfig {
@Bean
public EventLoopGroup bossGroup() {
return new NioEventLoopGroup(1);
}
@Bean
public EventLoopGroup workerGroup() {
return new NioEventLoopGroup();
}
@Bean
public ServerBootstrap serverBootstrap(EventLoopGroup bossGroup,
EventLoopGroup workerGroup,
CourseChatWebSocketHandler chatHandler) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(chatHandler);
}
});
return bootstrap;
}
}

View File

@ -1,12 +1,10 @@
package com.guwan.backend.config;
import com.guwan.backend.Handler.CourseWebSocketHandler;
import com.guwan.backend.Handler.CoursesWebSocketHandler;
import com.guwan.backend.handler.CourseWebSocketHandler;
import com.guwan.backend.handler.CoursesWebSocketHandler;
import com.guwan.backend.websocket.ChatWebSocketHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

View File

@ -0,0 +1,64 @@
package com.guwan.backend.controller;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.guwan.backend.common.Result;
import com.guwan.backend.mapper.ChatMessagesMapper;
import com.guwan.backend.mapper.UserMapper;
import com.guwan.backend.pojo.entity.ChatMessages;
import com.guwan.backend.pojo.entity.User;
import com.guwan.backend.pojo.response.HistotyMessageVO;
import com.guwan.backend.service.ChatMessagesService;
import com.guwan.backend.service.UserService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.List;
@Slf4j
@RestController
@RequestMapping("/chatMessages")
@RequiredArgsConstructor
public class ChatMessagesController {
private final ChatMessagesService chatMessagesService;
private final UserService userService;
private final UserMapper userMapper;
@GetMapping("/getHistoryMessage")
public Result getHistoryMessage(@RequestParam String courseId, @RequestParam Integer limit) {
List<ChatMessages> latest30Messages = chatMessagesService.list(
new LambdaQueryWrapper<ChatMessages>()
.eq(ChatMessages::getType, "chat")
.eq(ChatMessages::getCourseId, courseId)
.orderByDesc(ChatMessages::getCreatedAt)
.last("LIMIT " + limit)
);
List<HistotyMessageVO> histotyMessageVOS = latest30Messages.stream().map(
chatMessages -> {
HistotyMessageVO histotyMessageVO = new HistotyMessageVO();
//id content imageUrl
BeanUtils.copyProperties(chatMessages, histotyMessageVO);
User user = userMapper.selectById(chatMessages.getSenderId());
histotyMessageVO.setSenderName(user.getUsername());
histotyMessageVO.setSenderAvatar(user.getAvatar());
histotyMessageVO.setTimestamp(chatMessages.getCreatedAt().getTime());
histotyMessageVO.setType(chatMessages.getInfoType());
return histotyMessageVO;
}
).toList();
return Result.success(histotyMessageVOS);
}
}

View File

@ -1,4 +1,4 @@
package com.guwan.backend.Handler;
package com.guwan.backend.handler;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -46,7 +46,8 @@ public class CourseWebSocketHandler extends TextWebSocketHandler {
try {
JsonNode jsonNode = new ObjectMapper().readTree(message.getPayload());
String type = jsonNode.get("type").asText();
String courseId = jsonNode.get("courseId").asText();
JsonNode dataNode = jsonNode.get("data");
String courseId = dataNode.get("courseId").asText();
if ("JOIN_COURSE".equals(type)) {
courseSessions.computeIfAbsent(courseId, k -> ConcurrentHashMap.newKeySet()).add(session);

View File

@ -1,4 +1,4 @@
package com.guwan.backend.Handler;
package com.guwan.backend.handler;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

View File

@ -1,16 +1,8 @@
package com.guwan.backend.Handler;
package com.guwan.backend.handler;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
import com.baomidou.mybatisplus.core.toolkit.ReflectionKit;
import com.guwan.backend.annotation.RecoverIfDeleted;
import com.guwan.backend.util.ReflectUtil;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.session.SqlSession;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.LocalDateTime;

View File

@ -0,0 +1,18 @@
package com.guwan.backend.mapper;
import com.guwan.backend.pojo.entity.ChatMessages;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @author 12455
* @description 针对表chat_messages的数据库操作Mapper
* @createDate 2025-05-13 17:04:56
* @Entity com.guwan.backend.pojo.entity.ChatMessages
*/
public interface ChatMessagesMapper extends BaseMapper<ChatMessages> {
}

View File

@ -2,10 +2,17 @@ package com.guwan.backend.netty.chat;
import lombok.Data;
import java.util.List;
@Data
public class ChatMessage {
private String type; // 消息类型: CONNECT, CHAT, DISCONNECT
private String from; // 发送者
private String content; // 消息内容
private Long timestamp; // 时间戳
}
private String id;
private String courseId;
private String content;
private String imageUrl;
private String senderId;
private String senderName;
private String senderAvatar;
private Long timestamp;
private List<String> mentionedUsers;
}

View File

@ -1,15 +1,6 @@
package com.guwan.backend.netty.chat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
@ -17,8 +8,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
@ -27,45 +16,20 @@ public class ChatServer {
@Value("${netty.chat.port}")
private int port;
private final ChatServerHandler chatServerHandler;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
// 注入 ServerBootstrap
private final ServerBootstrap serverBootstrap;
// 启动服务器
@PostConstruct
public void start() throws Exception {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(new WebSocketServerProtocolHandler("/ws/chat"))
.addLast(chatServerHandler);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
log.info("聊天服务器启动成功WebSocket端口: {}", port);
} catch (Exception e) {
log.error("聊天服务器启动失败", e);
throw e;
}
serverBootstrap.bind(port).sync();
log.info("WebSocket server started on port {}", port);
}
// 优雅关闭
@PreDestroy
public void stop() {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
// 可添加 shutdown 逻辑
log.info("WebSocket server stopping...");
}
}
}

View File

@ -1,3 +1,4 @@
/*
package com.guwan.backend.netty.chat;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -89,4 +90,4 @@ public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocket
log.error("WebSocket error", cause);
ctx.close();
}
}
} */

View File

@ -0,0 +1,275 @@
package com.guwan.backend.netty.chat;
import cn.hutool.core.collection.ConcurrentHashSet;
import com.alibaba.fastjson.JSON;
import com.guwan.backend.pojo.entity.ChatMessages;
import com.guwan.backend.pojo.enums.MessageType;
import com.guwan.backend.service.ChatMessagesService;
import com.guwan.backend.service.impl.ChatMessagesServiceImpl;
import com.guwan.backend.util.UUIDUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static com.guwan.backend.pojo.enums.MessageType.*;
@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class CourseChatWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static final Map<String, Set<Channel>> courseChannels = new ConcurrentHashMap<>();
private static final Map<String, UserInfo> channelUsers = new ConcurrentHashMap<>();
private final ChatMessagesService chatMessagesService;
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
try {
WebSocketMessage message = JSON.parseObject(frame.text(), WebSocketMessage.class);
handleMessage(ctx, message);
} catch (Exception e) {
log.error("Error handling message", e);
sendError(ctx, "消息处理失败");
}
}
private void handleMessage(ChannelHandlerContext ctx, WebSocketMessage message) {
switch (message.getType()) {
case JOIN_COURSE:
handleJoinCourse(ctx, message);
break;
case LEAVE_COURSE:
handleLeaveCourse(ctx, message);
break;
case CHAT_MESSAGE:
handleChatMessage(ctx, message);
break;
default:
sendError(ctx, "未知的消息类型");
}
}
private void handleJoinCourse(ChannelHandlerContext ctx, WebSocketMessage message) {
Map<String, Object> data = (Map<String, Object>) message.getData();
String courseId = (String) data.get("courseId");
//TODO userId String
String userId = data.get("userId").toString();
String username = (String) data.get("username");
// 保存用户信息
UserInfo userInfo = new UserInfo(userId, username);
channelUsers.put(ctx.channel().id().asLongText(), userInfo);
// 添加到课程频道
courseChannels.computeIfAbsent(courseId, k -> new ConcurrentHashSet<>())
.add(ctx.channel());
// 广播用户列表更新
broadcastUserList(courseId);
// 发送系统消息
sendSystemMessage(courseId, username + " 加入了聊天室");
}
private void handleLeaveCourse(ChannelHandlerContext ctx, WebSocketMessage message) {
Map<String, Object> data = (Map<String, Object>) message.getData();
String courseId = (String) data.get("courseId");
//String userId = (String) data.get("userId");
String username = (String) data.get("username");
// 从课程频道中移除用户
Set<Channel> channels = courseChannels.get(courseId);
if (channels != null) {
channels.remove(ctx.channel());
// 如果课程没有用户了可以移除整个课程频道
if (channels.isEmpty()) {
courseChannels.remove(courseId);
} else {
// 广播用户列表更新
broadcastUserList(courseId);
// 发送系统消息
sendSystemMessage(courseId, username + " 离开了聊天室");
}
}
// 注意这里不移除channelUsers中的用户信息因为用户可能加入了多个课程
// 这部分在handlerRemoved方法中处理当连接完全断开时
}
private void handleChatMessage(ChannelHandlerContext ctx, WebSocketMessage message) {
ChatMessage chatMessage = JSON.parseObject(JSON.toJSONString(message.getData()), ChatMessage.class);
String courseId = chatMessage.getCourseId();
// 保存消息到数据库
saveMessage(chatMessage);
// 广播消息给课程内的所有用户
broadcastMessage(courseId, chatMessage);
// 处理@消息
if (chatMessage.getMentionedUsers() != null && !chatMessage.getMentionedUsers().isEmpty()) {
handleMentions(courseId, chatMessage);
}
}
private void broadcastMessage(String courseId, ChatMessage message) {
WebSocketMessage wsMessage = new WebSocketMessage();
wsMessage.setType(CHAT_MESSAGE);
wsMessage.setData(message);
String messageJson = JSON.toJSONString(wsMessage);
Set<Channel> channels = courseChannels.get(courseId);
if (channels != null) {
channels.forEach(channel -> {
if (channel.isActive()) {
channel.writeAndFlush(new TextWebSocketFrame(messageJson));
}
});
}
}
private void broadcastUserList(String courseId) {
Set<Channel> channels = courseChannels.get(courseId);
if (channels != null) {
List<UserInfo> users = channels.stream()
.map(channel -> channelUsers.get(channel.id().asLongText()))
.filter(Objects::nonNull)
.collect(Collectors.toList());
WebSocketMessage message = new WebSocketMessage();
message.setType(MessageType.USER_LIST);
message.setData(users);
String messageJson = JSON.toJSONString(message);
channels.forEach(channel -> {
if (channel.isActive()) {
channel.writeAndFlush(new TextWebSocketFrame(messageJson));
}
});
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
String channelId = ctx.channel().id().asLongText();
UserInfo userInfo = channelUsers.remove(channelId);
if (userInfo != null) {
// 从所有课程频道中移除
courseChannels.forEach((courseId, channels) -> {
if (channels.remove(ctx.channel())) {
broadcastUserList(courseId);
sendSystemMessage(courseId, userInfo.getUsername() + " 离开了聊天室");
}
});
}
}
private void handleMentions(String courseId, ChatMessage chatMessage) {
// 处理@消息的逻辑
for (String mentionedUserId : chatMessage.getMentionedUsers()) {
// 查找被@用户的Channel
Set<Channel> channels = courseChannels.get(courseId);
if (channels != null) {
for (Channel channel : channels) {
UserInfo userInfo = channelUsers.get(channel.id().asLongText());
if (userInfo != null && userInfo.getUserId().equals(mentionedUserId)) {
// 向被@用户发送通知
WebSocketMessage notification = new WebSocketMessage();
notification.setType(MessageType.SYSTEM_MESSAGE);
notification.setData("您被 " + chatMessage.getSenderName() + " 提及:" + chatMessage.getContent());
channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(notification)));
break;
}
}
}
}
}
private void sendSystemMessage(String courseId, String message) {
WebSocketMessage wsMessage = new WebSocketMessage();
wsMessage.setType(MessageType.SYSTEM_MESSAGE);
ChatMessage chatMessage1 = new ChatMessage();
chatMessage1.setCourseId(UUIDUtil.uuid());
chatMessage1.setContent(message);
wsMessage.setData(chatMessage1);
String messageJson = JSON.toJSONString(wsMessage);
Set<Channel> channels = courseChannels.get(courseId);
if (channels != null) {
channels.forEach(channel -> {
if (channel.isActive()) {
channel.writeAndFlush(new TextWebSocketFrame(messageJson));
}
});
}
// ChatMessage chatMessage = new ChatMessages();
// chatMessage.setCourseId(courseId);
}
private void sendError(ChannelHandlerContext ctx, String errorMessage) {
WebSocketMessage wsMessage = new WebSocketMessage();
wsMessage.setType(MessageType.ERROR);
wsMessage.setData(errorMessage);
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(wsMessage)));
}
private void saveMessage(ChatMessage chatMessage) {
// 在实际项目中这里应该将消息保存到数据库
// 例如使用JPA或MyBatis等ORM框架
try {
// 设置消息ID和时间戳如果未设置
if (chatMessage.getId() == null) {
chatMessage.setId(UUIDUtil.uuid());
}
if (chatMessage.getTimestamp() == null) {
chatMessage.setTimestamp(System.currentTimeMillis());
}
ChatMessages chatMessages = new ChatMessages();
BeanUtils.copyProperties(chatMessage, chatMessages);
chatMessages.setInfoType("USER");
chatMessages.setType("CHAT");
chatMessages.setCreatedAt(new Date(chatMessage.getTimestamp()));
System.out.println("chatMessages = " + chatMessages);
// 这里应该调用消息服务或存储库来持久化消息
chatMessagesService.save(chatMessages);
log.info("保存消息: {}", chatMessage);
} catch (Exception e) {
log.error("保存消息失败", e);
}
}
}

View File

@ -0,0 +1,18 @@
package com.guwan.backend.netty.chat;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class UserInfo {
private String userId;
private String username;
private String avatar;
public UserInfo(String userId, String username) {
this.userId = userId;
this.username = username;
this.avatar = null; // 可以设置默认头像URL
}
}

View File

@ -0,0 +1,10 @@
package com.guwan.backend.netty.chat;
import com.guwan.backend.pojo.enums.MessageType;
import lombok.Data;
@Data
public class WebSocketMessage {
private MessageType type;
private Object data;
}

View File

@ -0,0 +1,53 @@
package com.guwan.backend.pojo.entity;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.util.Date;
import lombok.Data;
/**
*
* @TableName chat_messages
*/
@TableName(value ="chat_messages")
@Data
public class ChatMessages {
/**
*
*/
@TableId
private String id;
/**
*
*/
private String courseId;
/**
*
*/
private String senderId;
/**
*
*/
private String content;
/**
*
*/
private String imageUrl;
private String infoType;
/**
* 来源 直播还是聊天室
*/
private String type;
/**
*
*/
private Date createdAt;
}

View File

@ -96,6 +96,8 @@ public class Course implements Serializable {
*/
private Integer totalDuration;
private String liveUrl;
/**
* 创建时间
*/

View File

@ -0,0 +1,10 @@
package com.guwan.backend.pojo.enums;
public enum MessageType {
JOIN_COURSE,
LEAVE_COURSE,
CHAT_MESSAGE,
USER_LIST,
SYSTEM_MESSAGE,
ERROR
}

View File

@ -17,7 +17,7 @@ public class CourseCenterVO {
private Integer ratingCount;
private BigDecimal price;
private Integer studentCount;
private String liveUrl;
private String teacher;
}

View File

@ -0,0 +1,16 @@
package com.guwan.backend.pojo.response;
import lombok.Data;
@Data
public class HistotyMessageVO {
private String id;
private String senderName;
private String senderAvatar;
private Long timestamp;
private String content;
private String imageUrl;
private String type;
}

View File

@ -48,6 +48,9 @@ public class CourseDetailVO {
private Boolean enrolled;
private String coverImg;
private String liveUrl;
@JsonProperty("chapters")
private List<ChapterVO> chapterVOS;
@JsonProperty("reviews")

View File

@ -0,0 +1,13 @@
package com.guwan.backend.service;
import com.guwan.backend.pojo.entity.ChatMessages;
import com.baomidou.mybatisplus.extension.service.IService;
/**
* @author 12455
* @description 针对表chat_messages的数据库操作Service
* @createDate 2025-05-13 17:04:56
*/
public interface ChatMessagesService extends IService<ChatMessages> {
}

View File

@ -1,6 +1,6 @@
package com.guwan.backend.service;
import com.guwan.backend.Handler.CoursesWebSocketHandler;
import com.guwan.backend.handler.CoursesWebSocketHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

View File

@ -0,0 +1,22 @@
package com.guwan.backend.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.guwan.backend.mapper.ChatMessagesMapper;
import com.guwan.backend.service.ChatMessagesService;
import com.guwan.backend.pojo.entity.ChatMessages;
import org.springframework.stereotype.Service;
/**
* @author 12455
* @description 针对表chat_messages的数据库操作Service实现
* @createDate 2025-05-13 17:04:56
*/
@Service
public class ChatMessagesServiceImpl extends ServiceImpl<ChatMessagesMapper, ChatMessages>
implements ChatMessagesService {
}