WebSocket 是现代游戏服务器与客户端通信的主流协议,提供低延迟的双向通信能力。本教程将详细讲解如何在 Skynet 中实现 WebSocket 支持,从协议原理到完整的网关服务实现。
WebSocket 协议概述
WebSocket vs TCP vs HTTP
| 特性 | WebSocket | TCP | HTTP |
|---|---|---|---|
| 通信方式 | 双向全双工 | 双向全双工 | 请求-响应 |
| 连接保持 | 长连接 | 长连接 | 短连接/长连接 |
| 数据帧 | 结构化帧 | 原始字节流 | 文本 |
| 穿透能力 | 高(基于 HTTP 升级) | 低 | 高 |
| 适用场景 | 实时通信 | 自定义协议 | Web API |
WebSocket 握手过程
WebSocket 连接通过 HTTP Upgrade 建立:
客户端请求:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
服务器响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
数据帧格式
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Masking-key (if MASK=1, 4 bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Payload Data |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- FIN:是否为最后一帧
- opcode:帧类型(0x1=文本, 0x2=二进制, 0x8=关闭, 0x9=ping, 0xA=pong)
- MASK:客户端发送必须置掩码
- Payload len:负载长度
Skynet WebSocket 网关
基础网关实现
-- service/wsgate.lua
local skynet = require "skynet"
local socket = require "skynet.socket"
local crypt = require "skynet.crypt"
local url = require "skynet.url"
-- WebSocket opcode
local WS_TEXT = 0x1
local WS_BINARY = 0x2
local WS_CLOSE = 0x8
local WS_PING = 0x9
local WS_PONG = 0xA
-- 连接管理
local connections = {} -- fd -> {agent, addr, last_active}
local agents = {} -- agent -> fd
-- 配置
local config = {
address = "0.0.0.0",
port = 8888,
maxclient = 4096,
heartbeat_timeout = 60, -- 秒
}
-- ===== WebSocket 协议实现 =====
-- Base64 编码表
local base64_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
local function base64_encode(data)
return ((data:gsub('.', function(x)
local r, b = '', x:byte()
for i = 8, 1, -1 do
r = r .. (b % 2^i - b % 2^(i-1) > 0 and '1' or '0')
end
return r
end)..'0000'):gsub('%d%d%d?%d?%d?%d?', function(x)
if #x < 6 then return '' end
local c = 0
for i = 1, 6 do
c = c + (x:sub(i,i) == '1' and 2^(6-i) or 0)
end
return base64_chars:sub(c+1, c+1)
end)..({'', '==', '='})[#data%3+1])
end
-- 处理 HTTP 升级请求
local function handle_handshake(fd, data)
-- 解析 HTTP 请求头
local headers = {}
local method, path = string.match(data, "^(%u+)%s+(%S+)%s+HTTP")
if method ~= "GET" then
socket.write(fd, "HTTP/1.1 405 Method Not Allowed\r\n\r\n")
return false
end
for key, value in string.gmatch(data, "([^:\r\n]+):%s*([^\r\n]+)") do
headers[key:lower()] = value
end
-- 验证 WebSocket 升级请求
if not headers["upgrade"] or headers["upgrade"]:lower() ~= "websocket" then
socket.write(fd, "HTTP/1.1 400 Bad Request\r\n\r\n")
return false
end
local key = headers["sec-websocket-key"]
if not key then
socket.write(fd, "HTTP/1.1 400 Bad Request\r\n\r\n")
return false
end
-- 计算 Sec-WebSocket-Accept
local magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
local accept = crypt.base64encode(crypt.sha1(key .. magic))
-- 发送 101 响应
local response = string.format(
"HTTP/1.1 101 Switching Protocols\r\n" ..
"Upgrade: websocket\r\n" ..
"Connection: Upgrade\r\n" ..
"Sec-WebSocket-Accept: %s\r\n" ..
"\r\n",
accept
)
socket.write(fd, response)
return true, path
end
-- 读取 WebSocket 帧
local function read_frame(fd)
-- 读取前 2 字节
local header = socket.read(fd, 2)
if not header or #header < 2 then
return nil
end
local byte1 = string.byte(header, 1)
local byte2 = string.byte(header, 2)
local fin = (byte1 & 0x80) ~= 0
local opcode = byte1 & 0x0F
local mask = (byte2 & 0x80) ~= 0
local payload_len = byte2 & 0x7F
-- 扩展长度
if payload_len == 126 then
local ext = socket.read(fd, 2)
if not ext or #ext < 2 then return nil end
payload_len = string.byte(ext, 1) * 256 + string.byte(ext, 2)
elseif payload_len == 127 then
local ext = socket.read(fd, 8)
if not ext or #ext < 8 then return nil end
-- 简化处理(忽略高 4 字节)
payload_len = string.byte(ext, 5) * 16777216 +
string.byte(ext, 6) * 65536 +
string.byte(ext, 7) * 256 +
string.byte(ext, 8)
end
-- 读取掩码
local mask_key
if mask then
mask_key = socket.read(fd, 4)
if not mask_key or #mask_key < 4 then return nil end
end
-- 读取负载数据
local payload = ""
if payload_len > 0 then
payload = socket.read(fd, payload_len)
if not payload or #payload < payload_len then return nil end
end
-- 解掩码
if mask and mask_key then
local bytes = {}
for i = 1, #payload do
local mask_byte = string.byte(mask_key, ((i - 1) % 4) + 1)
local data_byte = string.byte(payload, i)
bytes[i] = string.char(data_byte ~ mask_byte)
end
payload = table.concat(bytes)
end
return {
fin = fin,
opcode = opcode,
payload = payload,
}
end
-- 发送 WebSocket 帧
local function write_frame(fd, opcode, data)
local len = #data
local header = {}
-- 第一字节:FIN + opcode
header[#header + 1] = string.char(0x80 | opcode)
-- 第二字节及长度(服务器不发送掩码)
if len < 126 then
header[#header + 1] = string.char(len)
elseif len < 65536 then
header[#header + 1] = string.char(126)
header[#header + 1] = string.char((len >> 8) & 0xFF)
header[#header + 1] = string.char(len & 0xFF)
else
header[#header + 1] = string.char(127)
for i = 7, 0, -1 do
header[#header + 1] = string.char((len >> (i * 8)) & 0xFF)
end
end
socket.write(fd, table.concat(header) .. data)
end
-- 发送文本消息
local function send_text(fd, text)
write_frame(fd, WS_TEXT, text)
end
-- 发送二进制消息
local function send_binary(fd, data)
write_frame(fd, WS_BINARY, data)
end
-- 发送关闭帧
local function send_close(fd, code, reason)
local payload = ""
if code then
payload = string.char((code >> 8) & 0xFF, code & 0xFF)
if reason then
payload = payload .. reason
end
end
write_frame(fd, WS_CLOSE, payload)
end
-- 发送 pong
local function send_pong(fd, data)
write_frame(fd, WS_PONG, data or "")
end
-- ===== 连接管理 =====
local function on_connect(fd, addr)
skynet.error(string.format("新连接: fd=%d, addr=%s", fd, addr))
-- 设置 socket 为数据模式
socket.start(fd)
-- 读取 HTTP 请求进行握手
local request_data = ""
while true do
local chunk = socket.readline(fd, "\r\n")
if not chunk then
socket.close(fd)
return
end
request_data = request_data .. chunk .. "\r\n"
if chunk == "" then
break
end
end
-- 执行 WebSocket 握手
local ok, path = handle_handshake(fd, request_data)
if not ok then
skynet.error("握手失败:", fd)
socket.close(fd)
return
end
-- 为连接创建 agent 服务
local agent = skynet.newservice("wsagent", fd, addr, path)
connections[fd] = {
agent = agent,
addr = addr,
path = path,
last_active = skynet.time(),
}
agents[agent] = fd
skynet.error(string.format("连接已建立: fd=%d, agent=:%08x",
fd, agent))
end
local function on_disconnect(fd)
local conn = connections[fd]
if not conn then return end
skynet.error(string.format("连接断开: fd=%d", fd))
-- 通知 agent
if conn.agent then
pcall(skynet.send, conn.agent, "lua", "disconnect")
end
agents[conn.agent] = nil
connections[fd] = nil
end
-- ===== 命令处理 =====
local CMD = {}
-- 网关配置
function CMD.open(conf)
config.address = conf.address or config.address
config.port = conf.port or config.port
config.maxclient = conf.maxclient or config.maxclient
-- 监听端口
local listen_fd = socket.listen(config.address, config.port)
skynet.error(string.format("WebSocket 网关启动: %s:%d",
config.address, config.port))
socket.start(listen_fd, function(fd, addr)
skynet.fork(on_connect, fd, addr)
end)
end
-- 向客户端发送消息
function CMD.send(fd, data)
local conn = connections[fd]
if conn then
send_text(fd, data)
end
end
-- 向客户端发送二进制消息
function CMD.send_binary(fd, data)
local conn = connections[fd]
if conn then
send_binary(fd, data)
end
end
-- 关闭连接
function CMD.close(fd, code, reason)
local conn = connections[fd]
if conn then
send_close(fd, code or 1000, reason)
skynet.timeout(100, function()
socket.close(fd)
end)
end
end
-- 获取在线连接数
function CMD.get_online()
local count = 0
for _ in pairs(connections) do
count = count + 1
end
return count
end
-- 广播消息
function CMD.broadcast(data)
for fd in pairs(connections) do
send_text(fd, data)
end
end
skynet.start(function()
-- 启动消息处理循环
skynet.fork(function()
while true do
skynet.sleep(100) -- 每秒检查
local now = skynet.time()
-- 检查超时连接
for fd, conn in pairs(connections) do
if now - conn.last_active > config.heartbeat_timeout then
skynet.error("心跳超时,断开连接:", fd)
send_close(fd, 1001, "heartbeat timeout")
skynet.timeout(100, function()
socket.close(fd)
end)
end
end
end
end)
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd], "Unknown command: " .. cmd)
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
-- 处理 socket 消息
skynet.dispatch("socket", function(session, source, cmd, fd, data)
if cmd == "data" then
local conn = connections[fd]
if conn then
conn.last_active = skynet.time()
-- 解析 WebSocket 帧
local frame = read_frame(fd)
if frame then
if frame.opcode == WS_CLOSE then
send_close(fd, 1000)
socket.close(fd)
elseif frame.opcode == WS_PING then
send_pong(fd, frame.payload)
elseif frame.opcode == WS_PONG then
-- 心跳响应,忽略
elseif frame.opcode == WS_TEXT or frame.opcode == WS_BINARY then
-- 转发给 agent
if conn.agent then
skynet.send(conn.agent, "lua", "message",
fd, frame.payload)
end
end
end
end
elseif cmd == "close" or cmd == "error" then
on_disconnect(fd)
elseif cmd == "warning" then
skynet.error("socket warning:", fd, data)
end
end)
end)
Agent 服务(处理客户端逻辑)
-- service/wsagent.lua
local skynet = require "skynet"
local cjson = require "cjson"
local fd = tonumber(...)
local addr = select(2, ...)
local path = select(3, ...)
local user_id = nil
local CMD = {}
function CMD.message(fd, data)
-- 解析客户端消息(JSON 格式)
local ok, msg = pcall(cjson.decode, data)
if not ok then
skynet.error("消息解析失败:", data)
return
end
local msg_type = msg.type
local msg_data = msg.data
skynet.error(string.format("收到消息: type=%s", msg_type))
if msg_type == "login" then
user_id = msg_data.user_id
skynet.error(string.format("用户 %s 登录", user_id))
-- 发送登录成功响应
send_response("login", {success = true, user_id = user_id})
elseif msg_type == "chat" then
-- 转发给聊天服务
skynet.send(".chat_service", "lua", "send", user_id, msg_data)
elseif msg_type == "move" then
-- 处理移动
handle_move(msg_data)
elseif msg_type == "action" then
-- 处理动作
handle_action(msg_data)
end
end
function CMD.disconnect()
skynet.error(string.format("用户 %s 断开连接", user_id))
-- 清理资源
if user_id then
skynet.send(".user_service", "lua", "offline", user_id)
end
skynet.exit()
end
-- 发送响应给客户端
function send_response(msg_type, data)
local response = cjson.encode({
type = msg_type,
data = data,
timestamp = os.time(),
})
skynet.send(".wsgate", "lua", "send", fd, response)
end
-- 处理移动
function handle_move(data)
-- 广播移动给其他玩家
skynet.send(".game_service", "lua", "player_move", user_id, data)
-- 回复确认
send_response("move_ack", {
x = data.x,
y = data.y,
})
end
-- 处理动作
function handle_action(data)
-- 调用游戏逻辑服务
local result = skynet.call(".game_service", "lua",
"player_action", user_id, data)
send_response("action_result", result)
end
skynet.start(function()
skynet.error(string.format("Agent 启动: fd=%d, addr=%s, path=%s",
fd, addr, path))
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd], "Unknown command: " .. cmd)
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
多协议网关
一个网关同时支持 WebSocket 和 TCP:
-- service/unified_gate.lua
local skynet = require "skynet"
local socket = require "skynet.socket"
local connections = {}
local PROTO_WS = "ws"
local PROTO_TCP = "tcp"
local CMD = {}
function CMD.open(conf)
-- WebSocket 端口
if conf.ws_port then
local ws_fd = socket.listen(conf.address or "0.0.0.0", conf.ws_port)
socket.start(ws_fd, function(fd, addr)
skynet.fork(handle_ws_connect, fd, addr)
end)
skynet.error("WebSocket 端口:", conf.ws_port)
end
-- TCP 端口
if conf.tcp_port then
local tcp_fd = socket.listen(conf.address or "0.0.0.0", conf.tcp_port)
socket.start(tcp_fd, function(fd, addr)
skynet.fork(handle_tcp_connect, fd, addr)
end)
skynet.error("TCP 端口:", conf.tcp_port)
end
end
-- WebSocket 连接处理
function handle_ws_connect(fd, addr)
-- ... WebSocket 握手逻辑(参考上文)
local conn = {proto = PROTO_WS, addr = addr}
connections[fd] = conn
-- ... 后续处理
end
-- TCP 连接处理
function handle_tcp_connect(fd, addr)
socket.start(fd)
skynet.error("TCP 连接:", fd, addr)
local conn = {proto = PROTO_TCP, addr = addr}
connections[fd] = conn
while true do
-- TCP 消息格式:4 字节长度 + 数据
local header = socket.read(fd, 4)
if not header or #header < 4 then
break
end
local len = string.unpack(">I4", header)
local data = socket.read(fd, len)
if not data or #data < len then
break
end
-- 转发给 agent
local agent = conn.agent
if agent then
skynet.send(agent, "lua", "message", fd, data)
end
end
-- 连接断开
socket.close(fd)
connections[fd] = nil
end
-- 发送消息(根据协议自动编码)
function CMD.send(fd, data)
local conn = connections[fd]
if not conn then return end
if conn.proto == PROTO_WS then
-- WebSocket 文本帧
-- write_frame(fd, WS_TEXT, data)
elseif conn.proto == PROTO_TCP then
-- TCP:长度前缀
local packet = string.pack(">I4", #data) .. data
socket.write(fd, packet)
end
end
skynet.start(function()
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
if session ~= 0 then
skynet.retpack(f(...))
else
f(...)
end
end)
end)
SSL/TLS 支持
Skynet 的 socket 模块支持 TLS,需要在编译时启用 OpenSSL:
-- 启用 TLS 监听
local tls_fd = socket.listen("0.0.0.0", 443)
socket.start(tls_fd, function(fd, addr)
-- 启用 TLS
local tls = socket.tls(fd, {
certificate = "./cert/server.crt",
key = "./cert/server.key",
})
-- 后续按普通 socket 处理
skynet.fork(handle_ws_connect, tls, addr)
end)
客户端 JavaScript 示例
// 客户端连接示例
class GameClient {
constructor(url) {
this.url = url;
this.ws = null;
this.handlers = {};
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log("连接已建立");
this.login("player_001");
};
this.ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
const handler = this.handlers[msg.type];
if (handler) handler(msg.data);
};
this.ws.onclose = (event) => {
console.log("连接关闭:", event.code, event.reason);
};
this.ws.onerror = (error) => {
console.error("连接错误:", error);
};
}
send(type, data) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({type, data}));
}
}
login(userId) {
this.send("login", {user_id: userId});
}
move(x, y) {
this.send("move", {x, y, timestamp: Date.now()});
}
chat(message) {
this.send("chat", {message, timestamp: Date.now()});
}
on(type, handler) {
this.handlers[type] = handler;
}
}
// 使用
const client = new GameClient("ws://localhost:8888/game");
client.connect();
client.on("login", (data) => {
console.log("登录成功:", data);
});
client.on("move_ack", (data) => {
console.log("移动确认:", data);
});
总结
本教程涵盖了 Skynet WebSocket 支持的完整实现:
- 协议原理:WebSocket 握手、帧格式
- 网关实现:连接管理、协议解析
- Agent 模式:每个连接独立处理逻辑
- 多协议支持:同时支持 WS 和 TCP
- 安全性:TLS 加密
参考资料
- WebSocket 协议规范:RFC 6455
- Skynet socket 模块文档
- Skynet TLS 支持
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。