yl-backend/src/main/java/com/guwan/backend/netty/chat/CourseChatWebSocketHandler....

272 lines
10 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.guwan.backend.netty.chat;
import cn.hutool.core.collection.ConcurrentHashSet;
import com.alibaba.fastjson.JSON;
import com.guwan.backend.pojo.entity.ChatMessages;
import com.guwan.backend.pojo.enums.MessageType;
import com.guwan.backend.service.ChatMessagesService;
import com.guwan.backend.service.impl.ChatMessagesServiceImpl;
import com.guwan.backend.util.UUIDUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static com.guwan.backend.pojo.enums.MessageType.*;
@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class CourseChatWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static final Map<String, Set<Channel>> courseChannels = new ConcurrentHashMap<>();
private static final Map<String, UserInfo> channelUsers = new ConcurrentHashMap<>();
private final ChatMessagesService chatMessagesService;
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
try {
WebSocketMessage message = JSON.parseObject(frame.text(), WebSocketMessage.class);
handleMessage(ctx, message);
} catch (Exception e) {
log.error("Error handling message", e);
sendError(ctx, "消息处理失败");
}
}
private void handleMessage(ChannelHandlerContext ctx, WebSocketMessage message) {
switch (message.getType()) {
case JOIN_COURSE:
handleJoinCourse(ctx, message);
break;
case LEAVE_COURSE:
handleLeaveCourse(ctx, message);
break;
case CHAT_MESSAGE:
handleChatMessage(ctx, message);
break;
default:
sendError(ctx, "未知的消息类型");
}
}
private void handleJoinCourse(ChannelHandlerContext ctx, WebSocketMessage message) {
Map<String, Object> data = (Map<String, Object>) message.getData();
String courseId = (String) data.get("courseId");
//TODO userId 变 String
String userId = data.get("userId").toString();
String username = (String) data.get("username");
// 保存用户信息
UserInfo userInfo = new UserInfo(userId, username);
channelUsers.put(ctx.channel().id().asLongText(), userInfo);
// 添加到课程频道
courseChannels.computeIfAbsent(courseId, k -> new ConcurrentHashSet<>())
.add(ctx.channel());
// 广播用户列表更新
broadcastUserList(courseId);
// 发送系统消息
sendSystemMessage(courseId, username + " 加入了聊天室");
}
private void handleLeaveCourse(ChannelHandlerContext ctx, WebSocketMessage message) {
Map<String, Object> data = (Map<String, Object>) message.getData();
String courseId = (String) data.get("courseId");
//String userId = (String) data.get("userId");
String username = (String) data.get("username");
// 从课程频道中移除用户
Set<Channel> channels = courseChannels.get(courseId);
if (channels != null) {
channels.remove(ctx.channel());
// 如果课程没有用户了,可以移除整个课程频道
if (channels.isEmpty()) {
courseChannels.remove(courseId);
} else {
// 广播用户列表更新
broadcastUserList(courseId);
// 发送系统消息
sendSystemMessage(courseId, username + " 离开了聊天室");
}
}
// 注意这里不移除channelUsers中的用户信息因为用户可能加入了多个课程
// 这部分在handlerRemoved方法中处理当连接完全断开时
}
private void handleChatMessage(ChannelHandlerContext ctx, WebSocketMessage message) {
ChatMessage chatMessage = JSON.parseObject(JSON.toJSONString(message.getData()), ChatMessage.class);
String courseId = chatMessage.getCourseId();
// 保存消息到数据库
saveMessage(chatMessage);
// 广播消息给课程内的所有用户
broadcastMessage(courseId, chatMessage);
// 处理@消息
if (chatMessage.getMentionedUsers() != null && !chatMessage.getMentionedUsers().isEmpty()) {
handleMentions(courseId, chatMessage);
}
}
private void broadcastMessage(String courseId, ChatMessage message) {
WebSocketMessage wsMessage = new WebSocketMessage();
wsMessage.setType(CHAT_MESSAGE);
wsMessage.setData(message);
String messageJson = JSON.toJSONString(wsMessage);
Set<Channel> channels = courseChannels.get(courseId);
if (channels != null) {
channels.forEach(channel -> {
if (channel.isActive()) {
channel.writeAndFlush(new TextWebSocketFrame(messageJson));
}
});
}
}
private void broadcastUserList(String courseId) {
Set<Channel> channels = courseChannels.get(courseId);
if (channels != null) {
List<UserInfo> users = channels.stream()
.map(channel -> channelUsers.get(channel.id().asLongText()))
.filter(Objects::nonNull)
.collect(Collectors.toList());
WebSocketMessage message = new WebSocketMessage();
message.setType(MessageType.USER_LIST);
message.setData(users);
String messageJson = JSON.toJSONString(message);
channels.forEach(channel -> {
if (channel.isActive()) {
channel.writeAndFlush(new TextWebSocketFrame(messageJson));
}
});
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
String channelId = ctx.channel().id().asLongText();
UserInfo userInfo = channelUsers.remove(channelId);
if (userInfo != null) {
// 从所有课程频道中移除
courseChannels.forEach((courseId, channels) -> {
if (channels.remove(ctx.channel())) {
broadcastUserList(courseId);
sendSystemMessage(courseId, userInfo.getUsername() + " 离开了聊天室");
}
});
}
}
private void handleMentions(String courseId, ChatMessage chatMessage) {
// 处理@消息的逻辑
for (String mentionedUserId : chatMessage.getMentionedUsers()) {
// 查找被@用户的Channel
Set<Channel> channels = courseChannels.get(courseId);
if (channels != null) {
for (Channel channel : channels) {
UserInfo userInfo = channelUsers.get(channel.id().asLongText());
if (userInfo != null && userInfo.getUserId().equals(mentionedUserId)) {
// 向被@用户发送通知
WebSocketMessage notification = new WebSocketMessage();
notification.setType(MessageType.SYSTEM_MESSAGE);
notification.setData("您被 " + chatMessage.getSenderName() + " 提及:" + chatMessage.getContent());
channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(notification)));
break;
}
}
}
}
}
private void sendSystemMessage(String courseId, String message) {
WebSocketMessage wsMessage = new WebSocketMessage();
wsMessage.setType(MessageType.SYSTEM_MESSAGE);
ChatMessage chatMessage1 = new ChatMessage();
chatMessage1.setCourseId(UUIDUtil.uuid());
chatMessage1.setContent(message);
wsMessage.setData(chatMessage1);
String messageJson = JSON.toJSONString(wsMessage);
Set<Channel> channels = courseChannels.get(courseId);
if (channels != null) {
channels.forEach(channel -> {
if (channel.isActive()) {
channel.writeAndFlush(new TextWebSocketFrame(messageJson));
}
});
}
// ChatMessage chatMessage = new ChatMessages();
// chatMessage.setCourseId(courseId);
}
private void sendError(ChannelHandlerContext ctx, String errorMessage) {
WebSocketMessage wsMessage = new WebSocketMessage();
wsMessage.setType(MessageType.ERROR);
wsMessage.setData(errorMessage);
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(wsMessage)));
}
private void saveMessage(ChatMessage chatMessage) {
// 在实际项目中,这里应该将消息保存到数据库
// 例如使用JPA或MyBatis等ORM框架
try {
// 设置消息ID和时间戳如果未设置
if (chatMessage.getId() == null) {
chatMessage.setId(UUIDUtil.uuid());
}
if (chatMessage.getTimestamp() == null) {
chatMessage.setTimestamp(System.currentTimeMillis());
}
ChatMessages chatMessages = new ChatMessages();
BeanUtils.copyProperties(chatMessage, chatMessages);
chatMessages.setInfoType("USER");
chatMessages.setType("CHAT");
chatMessages.setCreatedAt(new Date(chatMessage.getTimestamp()));
System.out.println("chatMessages = " + chatMessages);
// 这里应该调用消息服务或存储库来持久化消息
chatMessagesService.save(chatMessages);
log.info("保存消息: {}", chatMessage);
} catch (Exception e) {
log.error("保存消息失败", e);
}
}
}