110 lines
3.7 KiB
TypeScript
110 lines
3.7 KiB
TypeScript
import { Client } from '@stomp/stompjs';
|
||
import type { IMessage, StompSubscription } from '@stomp/stompjs';
|
||
import SockJS from 'sockjs-client';
|
||
|
||
interface StompClientOptions {
|
||
endpoint: string; // WebSocket endpoint,例如:/ws
|
||
sendPath?: string; // 默认发送路径(可选)
|
||
host: string; // 服务器地址,例如:http://localhost:8080
|
||
useSockJS?: boolean; // 是否使用 SockJS(默认 false)
|
||
|
||
onConnect?: () => void; // 连接成功回调
|
||
onError?: (err: any) => void; // 错误回调
|
||
onMessage?: (msg: any, path: string) => void; // 全局消息处理回调(含路径)
|
||
}
|
||
|
||
export default class StompClient {
|
||
private options: StompClientOptions;
|
||
private stompClient: Client;
|
||
private subscriptions: Map<string, StompSubscription> = new Map();
|
||
private messageHandlers: Map<string, (msg: any) => void> = new Map();
|
||
|
||
constructor(options: StompClientOptions) {
|
||
this.options = options;
|
||
|
||
this.stompClient = new Client({
|
||
brokerURL: options.useSockJS
|
||
? undefined
|
||
: `ws://${options.host.replace(/^http(s)?:\/\//, '')}${options.endpoint}`,
|
||
webSocketFactory: options.useSockJS
|
||
? () => new SockJS(`${options.host}${options.endpoint}`)
|
||
: undefined,
|
||
reconnectDelay: 5000,
|
||
debug: () => {},
|
||
});
|
||
|
||
this.stompClient.onConnect = () => {
|
||
// 连接成功后,自动重新订阅所有路径
|
||
this.messageHandlers.forEach((handler, path) => {
|
||
const sub = this.stompClient.subscribe(path, (msg: IMessage) => {
|
||
const body = JSON.parse(msg.body);
|
||
handler(body);
|
||
this.options.onMessage?.(body, path);
|
||
});
|
||
this.subscriptions.set(path, sub);
|
||
});
|
||
|
||
this.options.onConnect?.();
|
||
};
|
||
|
||
this.stompClient.onStompError = (frame) => {
|
||
console.error('STOMP 错误:', frame);
|
||
this.options.onError?.(frame);
|
||
};
|
||
}
|
||
|
||
/** 建立连接 */
|
||
connect() {
|
||
this.stompClient.activate();
|
||
}
|
||
|
||
/** 发送消息,支持指定 destination,未指定则用默认 sendPath */
|
||
send(data: any, sendPath?: string) {
|
||
if (!this.stompClient.connected) {
|
||
console.warn('STOMP 未连接,无法发送消息');
|
||
return;
|
||
}
|
||
|
||
const destination = sendPath ?? this.options.sendPath;
|
||
if (!destination) {
|
||
console.warn('未提供发送路径');
|
||
return;
|
||
}
|
||
|
||
this.stompClient.publish({
|
||
destination,
|
||
body: JSON.stringify(data),
|
||
});
|
||
}
|
||
|
||
/** 订阅指定路径 */
|
||
subscribe(path: string, handler: (msg: any) => void) {
|
||
this.messageHandlers.set(path, handler);
|
||
|
||
if (this.stompClient.connected) {
|
||
const sub = this.stompClient.subscribe(path, (msg: IMessage) => {
|
||
const body = JSON.parse(msg.body);
|
||
handler(body);
|
||
this.options.onMessage?.(body, path);
|
||
});
|
||
this.subscriptions.set(path, sub);
|
||
}
|
||
}
|
||
|
||
/** 取消订阅指定路径 */
|
||
unsubscribe(path: string) {
|
||
this.subscriptions.get(path)?.unsubscribe();
|
||
this.subscriptions.delete(path);
|
||
this.messageHandlers.delete(path);
|
||
}
|
||
|
||
/** 断开连接并清理订阅 */
|
||
disconnect() {
|
||
this.subscriptions.forEach((sub) => sub.unsubscribe());
|
||
this.subscriptions.clear();
|
||
this.messageHandlers.clear();
|
||
this.stompClient.deactivate();
|
||
console.log('STOMP 已断开连接');
|
||
}
|
||
}
|