feat(netty-demo): netty-demo

netty-demo
This commit is contained in:
ovo 2024-12-08 19:39:55 +08:00
parent 43c7bf1810
commit c341087577
5 changed files with 254 additions and 1 deletions

View File

@ -0,0 +1,11 @@
package com.guwan.backend.netty.chat;
import lombok.Data;
@Data
public class ChatMessage {
private String type; // 消息类型: CONNECT, CHAT, DISCONNECT
private String from; // 发送者
private String content; // 消息内容
private Long timestamp; // 时间戳
}

View File

@ -0,0 +1,70 @@
package com.guwan.backend.netty.chat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Slf4j
@Component
@RequiredArgsConstructor
public class ChatServer {
@Value("${netty.chat.port}")
private int port;
private final ChatServerHandler chatServerHandler;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
@PostConstruct
public void start() throws Exception {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(new WebSocketServerProtocolHandler("/ws/chat"))
.addLast(chatServerHandler);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
log.info("聊天服务器启动成功WebSocket端口: {}", port);
} catch (Exception e) {
log.error("聊天服务器启动失败", e);
throw e;
}
}
@PreDestroy
public void stop() {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,92 @@
package com.guwan.backend.netty.chat;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
@ChannelHandler.Sharable
public class ChatServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Map<String, Channel> clients = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
// 解析消息
ChatMessage message = objectMapper.readValue(frame.text(), ChatMessage.class);
// 处理不同类型的消息
switch (message.getType()) {
case "CONNECT":
handleConnect(ctx, message);
break;
case "CHAT":
handleChat(message);
break;
case "DISCONNECT":
handleDisconnect(message);
break;
}
}
private void handleConnect(ChannelHandlerContext ctx, ChatMessage message) {
// 保存用户Channel
clients.put(message.getFrom(), ctx.channel());
// 广播用户上线消息
broadcastMessage(new ChatMessage() {{
setType("CHAT");
setFrom("System");
setContent(message.getFrom() + " joined the chat");
setTimestamp(System.currentTimeMillis());
}});
}
private void handleChat(ChatMessage message) {
// 广播聊天消息
broadcastMessage(message);
}
private void handleDisconnect(ChatMessage message) {
// 移除用户Channel
clients.remove(message.getFrom());
// 广播用户下线消息
broadcastMessage(new ChatMessage() {{
setType("CHAT");
setFrom("System");
setContent(message.getFrom() + " left the chat");
setTimestamp(System.currentTimeMillis());
}});
}
private void broadcastMessage(ChatMessage message) {
try {
String messageJson = objectMapper.writeValueAsString(message);
TextWebSocketFrame frame = new TextWebSocketFrame(messageJson);
// 广播给所有在线用户
clients.values().forEach(channel ->
channel.writeAndFlush(frame.retain())
);
} catch (Exception e) {
log.error("广播消息失败", e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("WebSocket error", cause);
ctx.close();
}
}

View File

@ -140,4 +140,16 @@ easy-es:
process-index-mode: manual
print-dsl: true
distributed: false
response-log: true
response-log: true
netty:
danmaku:
port: 8085
chat:
port: 8086
stream:
port: 8087
heartbeat:
interval: 30
cluster:
nodes: localhost:8088,localhost:8089

View File

@ -0,0 +1,68 @@
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Chat</title>
<style>
#messageArea { height: 300px; overflow-y: scroll; border: 1px solid #ccc; }
</style>
</head>
<body>
<div id="messageArea"></div>
<input type="text" id="username" placeholder="Your name">
<input type="text" id="message" placeholder="Type a message...">
<button onclick="sendMessage()">Send</button>
<script>
let ws;
let username;
function connect() {
username = document.getElementById('username').value;
if (!username) {
alert('Please enter your name');
return;
}
ws = new WebSocket('ws://localhost:8086/ws/chat');
ws.onopen = () => {
// 发送连接消息
ws.send(JSON.stringify({
type: 'CONNECT',
from: username,
timestamp: Date.now()
}));
};
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
const messageArea = document.getElementById('messageArea');
messageArea.innerHTML += `<p><strong>${message.from}:</strong> ${message.content}</p>`;
messageArea.scrollTop = messageArea.scrollHeight;
};
}
function sendMessage() {
const content = document.getElementById('message').value;
if (!content) return;
ws.send(JSON.stringify({
type: 'CHAT',
from: username,
content: content,
timestamp: Date.now()
}));
document.getElementById('message').value = '';
}
// 页面加载完成后连接
window.onload = () => {
document.getElementById('username').onchange = connect;
document.getElementById('message').onkeypress = (e) => {
if (e.key === 'Enter') sendMessage();
};
};
</script>
</body>
</html>