Appearance
消息帧结构
本文档引用的文件
目录
引言
本文档详细描述了基于WebSocket协议的消息帧结构设计与实现,重点分析了消息的二进制/文本帧格式、头部信息、负载数据结构和长度编码方式。文档基于``项目中的WebSocket相关组件,深入解析了websocket.constants.ts
中定义的帧类型常量及其在实际通信中的应用。
系统采用基于主题(topic)的发布-订阅模式,实现了客户端与服务端之间的实时双向通信。通过分析服务端如何解析和验证消息帧的完整性,包括校验和计算、长度检查和边界处理,本文档提供了完整的消息帧字节级布局图示,展示了各字段的位置和大小。
项目结构分析
项目采用分层架构设计,WebSocket相关功能主要分布在deno/lib/websocket/
目录下,包含常量定义、路由处理和数据访问对象(DAO)三个核心组件。前端Vue组件位于pc/src/compositions/websocket.ts
,负责客户端连接管理和消息收发。
图表来源
本节来源
核心数据结构与常量
全局状态映射
系统定义了三个核心的Map数据结构,用于维护WebSocket连接的状态:
typescript
// deno-lint-ignore no-explicit-any
export const callbacksMap = new Map<string, ((data: any) => Promise<void> | void)[]>();
export const socketMap = new Map<string, WebSocket[]>();
export const clientIdTopicsMap = new Map<string, string[]>();
字段说明:
- callbacksMap: 主题回调映射,键为topic字符串,值为该主题订阅的回调函数数组
- socketMap: 客户端连接映射,键为clientId字符串,值为该客户端的所有WebSocket连接数组
- clientIdTopicsMap: 客户端主题映射,键为clientId字符串,值为该客户端订阅的主题字符串数组
这些Map结构实现了O(1)时间复杂度的查找操作,确保了高并发场景下的消息分发效率。
本节来源
WebSocket消息协议设计
消息类型定义
系统支持三种主要的消息操作类型:
操作类型 | 描述 | 方向 |
---|---|---|
subscribe | 订阅主题 | 客户端 → 服务端 |
publish | 发布消息 | 客户端 ↔ 服务端 |
unSubscribe | 取消订阅 | 客户端 → 服务端 |
消息帧格式
消息帧采用JSON格式进行序列化,具有统一的结构:
json
{
"action": "subscribe",
"data": {
"topics": ["topic1", "topic2"]
}
}
或
json
{
"action": "publish",
"data": {
"topic": "user.update",
"payload": { "userId": "123", "name": "张三" }
}
}
字段说明:
- action: 消息操作类型,枚举值为"subscribe"、"publish"、"unSubscribe"
- data: 消息数据体,根据action类型具有不同的结构
心跳机制
系统实现了ping-pong心跳机制,用于保持连接活跃:
typescript
function socketPing() {
if (socket && socket.readyState === WebSocket.OPEN) {
socket.send("ping");
}
setTimeout(socketPing, 60000);
}
心跳间隔为60秒,服务端收到"ping"消息后立即回复"pong"。
本节来源
服务端消息处理流程
连接升级流程
服务端通过/api/websocket/upgrade
端点处理WebSocket连接升级请求:
图表来源
本节来源
消息分发流程
服务端收到消息后,根据action类型进行分发处理:
图表来源
本节来源
客户端连接与通信机制
连接管理
客户端实现了自动重连机制,确保网络不稳定时的连接可靠性:
typescript
async function reConnect() {
if (socket && (socket.readyState === WebSocket.OPEN || socket.readyState === WebSocket.CONNECTING)) {
return socket;
}
if (topicCallbackMap.size === 0) {
return;
}
reConnectNum++;
let time = 200;
if (reConnectNum > 10) {
time = 5000;
} else {
time = reConnectNum * 200;
}
await new Promise((resolve) => setTimeout(resolve, time));
await connect();
}
重连策略:
- 初始重连间隔:200ms
- 递增策略:每次增加200ms
- 最大重连间隔:5000ms(当重连次数超过10次时)
消息订阅机制
客户端通过subscribe
函数实现主题订阅:
typescript
export async function subscribe<T>(
topic: string,
callback: ((data: T | undefined) => void),
) {
const socket = await connect();
if (!socket) {
return;
}
let callbacks = topicCallbackMap.get(topic);
if (!callbacks) {
callbacks = [ ];
topicCallbackMap.set(topic, callbacks);
}
callbacks.push(callback);
socket.send(JSON.stringify({
action: "subscribe",
data: {
topics: [ topic ],
},
}));
}
本节来源
消息帧结构详细分析
字节级布局
虽然系统采用文本帧(JSON格式),但其字节级布局遵循WebSocket协议规范:
字段说明:
- FIN: 1位,表示是否为消息的最后一个分片
- RSV1-3: 3位,保留位,必须为0
- Opcode: 4位,操作码,文本帧为1,二进制帧为2
- Mask: 1位,掩码标志,客户端发送必须为1
- Payload Length: 7/16/64位,负载长度
- Masking Key: 32位,掩码密钥
- JSON字符串: 应用层消息体,包含action和data字段
长度编码方式
系统采用WebSocket协议的变长整数编码方式:
- 负载长度 ≤ 125: 使用7位长度字段
- 126 ≤ 负载长度 ≤ 65535: 使用7+16位(值为126)
- 负载长度 > 65535: 使用7+64位(值为127)
校验和与完整性验证
系统通过以下机制确保消息完整性:
- JSON解析验证: 使用
JSON.parse()
进行语法验证 - 字段存在性检查: 验证action、data等关键字段
- 类型检查: 确保数据类型符合预期
- 连接状态检查: 发送前检查WebSocket.readyState
本节来源
异常处理与连接管理
错误处理策略
系统实现了全面的错误处理机制:
typescript
socket.onerror = function(err0) {
const err = err0 as ErrorEvent;
try {
if (socket.readyState === WebSocket.OPEN) {
socket.close(1000, err.message);
}
} catch (_err) {
// error(_err);
}
// 清理连接映射
for (const [ clientId2, sockets ] of socketMap) {
if (sockets.includes(socket)) {
socketMap.set(clientId2, sockets.filter((item) => item !== socket));
}
}
clientIdTopicsMap.delete(clientId);
};
错误处理步骤:
- 尝试关闭连接
- 从socketMap中移除失效连接
- 从clientIdTopicsMap中清除客户端订阅信息
- 记录错误日志
连接清理机制
系统在以下情况下自动清理连接:
- 连接关闭:
onclose
事件触发时 - 连接错误:
onerror
事件触发时 - 客户端重连: 新连接建立时关闭旧连接
- 空闲超时: 10分钟后无订阅自动关闭连接
图表来源
本节来源
性能优化与最佳实践
连接复用
系统支持单个客户端多个WebSocket连接,但通过onopen
函数确保连接有效性:
typescript
function onopen(socket: WebSocket, clientId: string) {
let socketOlds = socketMap.get(clientId);
if (!socketOlds) {
socketOlds = [ ];
socketMap.set(clientId, socketOlds);
}
socketOlds.push(socket);
for (const socket2 of socketOlds) {
if (socket2.readyState !== WebSocket.OPEN) {
socket2.close(1000, `websocket: clientId ${ clientId } reconnect`);
}
}
}
当新连接建立时,会关闭所有非OPEN状态的旧连接。
消息批处理
发布消息时,系统采用批处理方式提高效率:
typescript
export async function publish<T>(
data: {
topic: string;
payload: T;
},
) {
const topic = data.topic;
const callbacks = callbacksMap.get(topic);
if (callbacks && callbacks.length > 0) {
for (const callback of callbacks) {
await callback(data.payload);
}
}
const dataStr = JSON.stringify(data);
for (const [ clientId, topics ] of clientIdTopicsMap) {
if (!topics.includes(topic)) {
continue;
}
const sockets = socketMap.get(clientId);
if (!sockets || sockets.length === 0) {
continue;
}
for (const socket of sockets) {
if (socket.readyState !== WebSocket.OPEN) {
// 清理无效连接
continue;
}
socket.send(dataStr);
}
}
}
优化点:
- 本地回调并行执行
- 消息体只序列化一次
- 连接状态检查避免无效发送
内存管理
系统通过以下机制防止内存泄漏:
- 自动清理: 连接关闭时自动从Map中移除
- 空闲超时: 10分钟后无订阅自动关闭连接
- 重复订阅过滤: 避免同一主题多次订阅
本节来源
总结
本文档详细分析了WebSocket消息帧结构的设计与实现。系统采用基于JSON的文本帧格式,通过主题发布-订阅模式实现高效的消息分发。核心特点包括:
- 标准化消息协议: 统一的JSON消息格式,包含action和data字段
- 高效状态管理: 使用Map数据结构实现O(1)查找性能
- 可靠连接管理: 自动重连、心跳检测和连接清理机制
- 安全验证: 密码保护和客户端ID验证
- 性能优化: 消息批处理、连接复用和内存管理
该设计适用于高并发实时通信场景,具有良好的可扩展性和稳定性。