Skynet WebSocket 支持与实现

详解 Skynet 中 WebSocket 的实现原理、网关服务搭建、协议握手、消息编解码和客户端连接管理

WebSocket 是现代游戏服务器与客户端通信的主流协议,提供低延迟的双向通信能力。本教程将详细讲解如何在 Skynet 中实现 WebSocket 支持,从协议原理到完整的网关服务实现。

WebSocket 协议概述

WebSocket vs TCP vs HTTP

特性WebSocketTCPHTTP
通信方式双向全双工双向全双工请求-响应
连接保持长连接长连接短连接/长连接
数据帧结构化帧原始字节流文本
穿透能力高(基于 HTTP 升级)
适用场景实时通信自定义协议Web API

WebSocket 握手过程

WebSocket 连接通过 HTTP Upgrade 建立:

客户端请求:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

服务器响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

数据帧格式

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|     Masking-key (if MASK=1, 4 bytes)                          |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                     Payload Data                              |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  • FIN:是否为最后一帧
  • opcode:帧类型(0x1=文本, 0x2=二进制, 0x8=关闭, 0x9=ping, 0xA=pong)
  • MASK:客户端发送必须置掩码
  • Payload len:负载长度

Skynet WebSocket 网关

基础网关实现

-- service/wsgate.lua
local skynet = require "skynet"
local socket = require "skynet.socket"
local crypt = require "skynet.crypt"
local url = require "skynet.url"

-- WebSocket opcode
local WS_TEXT   = 0x1
local WS_BINARY = 0x2
local WS_CLOSE  = 0x8
local WS_PING   = 0x9
local WS_PONG   = 0xA

-- 连接管理
local connections = {}   -- fd -> {agent, addr, last_active}
local agents = {}        -- agent -> fd

-- 配置
local config = {
    address = "0.0.0.0",
    port = 8888,
    maxclient = 4096,
    heartbeat_timeout = 60,  -- 秒
}

-- ===== WebSocket 协议实现 =====

-- Base64 编码表
local base64_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"

local function base64_encode(data)
    return ((data:gsub('.', function(x)
        local r, b = '', x:byte()
        for i = 8, 1, -1 do
            r = r .. (b % 2^i - b % 2^(i-1) > 0 and '1' or '0')
        end
        return r
    end)..'0000'):gsub('%d%d%d?%d?%d?%d?', function(x)
        if #x < 6 then return '' end
        local c = 0
        for i = 1, 6 do
            c = c + (x:sub(i,i) == '1' and 2^(6-i) or 0)
        end
        return base64_chars:sub(c+1, c+1)
    end)..({'', '==', '='})[#data%3+1])
end

-- 处理 HTTP 升级请求
local function handle_handshake(fd, data)
    -- 解析 HTTP 请求头
    local headers = {}
    local method, path = string.match(data, "^(%u+)%s+(%S+)%s+HTTP")
    
    if method ~= "GET" then
        socket.write(fd, "HTTP/1.1 405 Method Not Allowed\r\n\r\n")
        return false
    end
    
    for key, value in string.gmatch(data, "([^:\r\n]+):%s*([^\r\n]+)") do
        headers[key:lower()] = value
    end
    
    -- 验证 WebSocket 升级请求
    if not headers["upgrade"] or headers["upgrade"]:lower() ~= "websocket" then
        socket.write(fd, "HTTP/1.1 400 Bad Request\r\n\r\n")
        return false
    end
    
    local key = headers["sec-websocket-key"]
    if not key then
        socket.write(fd, "HTTP/1.1 400 Bad Request\r\n\r\n")
        return false
    end
    
    -- 计算 Sec-WebSocket-Accept
    local magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
    local accept = crypt.base64encode(crypt.sha1(key .. magic))
    
    -- 发送 101 响应
    local response = string.format(
        "HTTP/1.1 101 Switching Protocols\r\n" ..
        "Upgrade: websocket\r\n" ..
        "Connection: Upgrade\r\n" ..
        "Sec-WebSocket-Accept: %s\r\n" ..
        "\r\n",
        accept
    )
    
    socket.write(fd, response)
    return true, path
end

-- 读取 WebSocket 帧
local function read_frame(fd)
    -- 读取前 2 字节
    local header = socket.read(fd, 2)
    if not header or #header < 2 then
        return nil
    end
    
    local byte1 = string.byte(header, 1)
    local byte2 = string.byte(header, 2)
    
    local fin = (byte1 & 0x80) ~= 0
    local opcode = byte1 & 0x0F
    local mask = (byte2 & 0x80) ~= 0
    local payload_len = byte2 & 0x7F
    
    -- 扩展长度
    if payload_len == 126 then
        local ext = socket.read(fd, 2)
        if not ext or #ext < 2 then return nil end
        payload_len = string.byte(ext, 1) * 256 + string.byte(ext, 2)
    elseif payload_len == 127 then
        local ext = socket.read(fd, 8)
        if not ext or #ext < 8 then return nil end
        -- 简化处理(忽略高 4 字节)
        payload_len = string.byte(ext, 5) * 16777216 +
                      string.byte(ext, 6) * 65536 +
                      string.byte(ext, 7) * 256 +
                      string.byte(ext, 8)
    end
    
    -- 读取掩码
    local mask_key
    if mask then
        mask_key = socket.read(fd, 4)
        if not mask_key or #mask_key < 4 then return nil end
    end
    
    -- 读取负载数据
    local payload = ""
    if payload_len > 0 then
        payload = socket.read(fd, payload_len)
        if not payload or #payload < payload_len then return nil end
    end
    
    -- 解掩码
    if mask and mask_key then
        local bytes = {}
        for i = 1, #payload do
            local mask_byte = string.byte(mask_key, ((i - 1) % 4) + 1)
            local data_byte = string.byte(payload, i)
            bytes[i] = string.char(data_byte ~ mask_byte)
        end
        payload = table.concat(bytes)
    end
    
    return {
        fin = fin,
        opcode = opcode,
        payload = payload,
    }
end

-- 发送 WebSocket 帧
local function write_frame(fd, opcode, data)
    local len = #data
    local header = {}
    
    -- 第一字节:FIN + opcode
    header[#header + 1] = string.char(0x80 | opcode)
    
    -- 第二字节及长度(服务器不发送掩码)
    if len < 126 then
        header[#header + 1] = string.char(len)
    elseif len < 65536 then
        header[#header + 1] = string.char(126)
        header[#header + 1] = string.char((len >> 8) & 0xFF)
        header[#header + 1] = string.char(len & 0xFF)
    else
        header[#header + 1] = string.char(127)
        for i = 7, 0, -1 do
            header[#header + 1] = string.char((len >> (i * 8)) & 0xFF)
        end
    end
    
    socket.write(fd, table.concat(header) .. data)
end

-- 发送文本消息
local function send_text(fd, text)
    write_frame(fd, WS_TEXT, text)
end

-- 发送二进制消息
local function send_binary(fd, data)
    write_frame(fd, WS_BINARY, data)
end

-- 发送关闭帧
local function send_close(fd, code, reason)
    local payload = ""
    if code then
        payload = string.char((code >> 8) & 0xFF, code & 0xFF)
        if reason then
            payload = payload .. reason
        end
    end
    write_frame(fd, WS_CLOSE, payload)
end

-- 发送 pong
local function send_pong(fd, data)
    write_frame(fd, WS_PONG, data or "")
end

-- ===== 连接管理 =====

local function on_connect(fd, addr)
    skynet.error(string.format("新连接: fd=%d, addr=%s", fd, addr))
    
    -- 设置 socket 为数据模式
    socket.start(fd)
    
    -- 读取 HTTP 请求进行握手
    local request_data = ""
    while true do
        local chunk = socket.readline(fd, "\r\n")
        if not chunk then
            socket.close(fd)
            return
        end
        request_data = request_data .. chunk .. "\r\n"
        if chunk == "" then
            break
        end
    end
    
    -- 执行 WebSocket 握手
    local ok, path = handle_handshake(fd, request_data)
    if not ok then
        skynet.error("握手失败:", fd)
        socket.close(fd)
        return
    end
    
    -- 为连接创建 agent 服务
    local agent = skynet.newservice("wsagent", fd, addr, path)
    
    connections[fd] = {
        agent = agent,
        addr = addr,
        path = path,
        last_active = skynet.time(),
    }
    agents[agent] = fd
    
    skynet.error(string.format("连接已建立: fd=%d, agent=:%08x", 
        fd, agent))
end

local function on_disconnect(fd)
    local conn = connections[fd]
    if not conn then return end
    
    skynet.error(string.format("连接断开: fd=%d", fd))
    
    -- 通知 agent
    if conn.agent then
        pcall(skynet.send, conn.agent, "lua", "disconnect")
    end
    
    agents[conn.agent] = nil
    connections[fd] = nil
end

-- ===== 命令处理 =====

local CMD = {}

-- 网关配置
function CMD.open(conf)
    config.address = conf.address or config.address
    config.port = conf.port or config.port
    config.maxclient = conf.maxclient or config.maxclient
    
    -- 监听端口
    local listen_fd = socket.listen(config.address, config.port)
    skynet.error(string.format("WebSocket 网关启动: %s:%d", 
        config.address, config.port))
    
    socket.start(listen_fd, function(fd, addr)
        skynet.fork(on_connect, fd, addr)
    end)
end

-- 向客户端发送消息
function CMD.send(fd, data)
    local conn = connections[fd]
    if conn then
        send_text(fd, data)
    end
end

-- 向客户端发送二进制消息
function CMD.send_binary(fd, data)
    local conn = connections[fd]
    if conn then
        send_binary(fd, data)
    end
end

-- 关闭连接
function CMD.close(fd, code, reason)
    local conn = connections[fd]
    if conn then
        send_close(fd, code or 1000, reason)
        skynet.timeout(100, function()
            socket.close(fd)
        end)
    end
end

-- 获取在线连接数
function CMD.get_online()
    local count = 0
    for _ in pairs(connections) do
        count = count + 1
    end
    return count
end

-- 广播消息
function CMD.broadcast(data)
    for fd in pairs(connections) do
        send_text(fd, data)
    end
end

skynet.start(function()
    -- 启动消息处理循环
    skynet.fork(function()
        while true do
            skynet.sleep(100)  -- 每秒检查
            local now = skynet.time()
            
            -- 检查超时连接
            for fd, conn in pairs(connections) do
                if now - conn.last_active > config.heartbeat_timeout then
                    skynet.error("心跳超时,断开连接:", fd)
                    send_close(fd, 1001, "heartbeat timeout")
                    skynet.timeout(100, function()
                        socket.close(fd)
                    end)
                end
            end
        end
    end)
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd], "Unknown command: " .. cmd)
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
    
    -- 处理 socket 消息
    skynet.dispatch("socket", function(session, source, cmd, fd, data)
        if cmd == "data" then
            local conn = connections[fd]
            if conn then
                conn.last_active = skynet.time()
                
                -- 解析 WebSocket 帧
                local frame = read_frame(fd)
                if frame then
                    if frame.opcode == WS_CLOSE then
                        send_close(fd, 1000)
                        socket.close(fd)
                    elseif frame.opcode == WS_PING then
                        send_pong(fd, frame.payload)
                    elseif frame.opcode == WS_PONG then
                        -- 心跳响应,忽略
                    elseif frame.opcode == WS_TEXT or frame.opcode == WS_BINARY then
                        -- 转发给 agent
                        if conn.agent then
                            skynet.send(conn.agent, "lua", "message", 
                                fd, frame.payload)
                        end
                    end
                end
            end
        elseif cmd == "close" or cmd == "error" then
            on_disconnect(fd)
        elseif cmd == "warning" then
            skynet.error("socket warning:", fd, data)
        end
    end)
end)

Agent 服务(处理客户端逻辑)

-- service/wsagent.lua
local skynet = require "skynet"
local cjson = require "cjson"

local fd = tonumber(...)
local addr = select(2, ...)
local path = select(3, ...)

local user_id = nil

local CMD = {}

function CMD.message(fd, data)
    -- 解析客户端消息(JSON 格式)
    local ok, msg = pcall(cjson.decode, data)
    if not ok then
        skynet.error("消息解析失败:", data)
        return
    end
    
    local msg_type = msg.type
    local msg_data = msg.data
    
    skynet.error(string.format("收到消息: type=%s", msg_type))
    
    if msg_type == "login" then
        user_id = msg_data.user_id
        skynet.error(string.format("用户 %s 登录", user_id))
        
        -- 发送登录成功响应
        send_response("login", {success = true, user_id = user_id})
        
    elseif msg_type == "chat" then
        -- 转发给聊天服务
        skynet.send(".chat_service", "lua", "send", user_id, msg_data)
        
    elseif msg_type == "move" then
        -- 处理移动
        handle_move(msg_data)
        
    elseif msg_type == "action" then
        -- 处理动作
        handle_action(msg_data)
    end
end

function CMD.disconnect()
    skynet.error(string.format("用户 %s 断开连接", user_id))
    -- 清理资源
    if user_id then
        skynet.send(".user_service", "lua", "offline", user_id)
    end
    skynet.exit()
end

-- 发送响应给客户端
function send_response(msg_type, data)
    local response = cjson.encode({
        type = msg_type,
        data = data,
        timestamp = os.time(),
    })
    
    skynet.send(".wsgate", "lua", "send", fd, response)
end

-- 处理移动
function handle_move(data)
    -- 广播移动给其他玩家
    skynet.send(".game_service", "lua", "player_move", user_id, data)
    
    -- 回复确认
    send_response("move_ack", {
        x = data.x,
        y = data.y,
    })
end

-- 处理动作
function handle_action(data)
    -- 调用游戏逻辑服务
    local result = skynet.call(".game_service", "lua", 
        "player_action", user_id, data)
    
    send_response("action_result", result)
end

skynet.start(function()
    skynet.error(string.format("Agent 启动: fd=%d, addr=%s, path=%s", 
        fd, addr, path))
    
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd], "Unknown command: " .. cmd)
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

多协议网关

一个网关同时支持 WebSocket 和 TCP:

-- service/unified_gate.lua
local skynet = require "skynet"
local socket = require "skynet.socket"

local connections = {}
local PROTO_WS = "ws"
local PROTO_TCP = "tcp"

local CMD = {}

function CMD.open(conf)
    -- WebSocket 端口
    if conf.ws_port then
        local ws_fd = socket.listen(conf.address or "0.0.0.0", conf.ws_port)
        socket.start(ws_fd, function(fd, addr)
            skynet.fork(handle_ws_connect, fd, addr)
        end)
        skynet.error("WebSocket 端口:", conf.ws_port)
    end
    
    -- TCP 端口
    if conf.tcp_port then
        local tcp_fd = socket.listen(conf.address or "0.0.0.0", conf.tcp_port)
        socket.start(tcp_fd, function(fd, addr)
            skynet.fork(handle_tcp_connect, fd, addr)
        end)
        skynet.error("TCP 端口:", conf.tcp_port)
    end
end

-- WebSocket 连接处理
function handle_ws_connect(fd, addr)
    -- ... WebSocket 握手逻辑(参考上文)
    local conn = {proto = PROTO_WS, addr = addr}
    connections[fd] = conn
    -- ... 后续处理
end

-- TCP 连接处理
function handle_tcp_connect(fd, addr)
    socket.start(fd)
    skynet.error("TCP 连接:", fd, addr)
    
    local conn = {proto = PROTO_TCP, addr = addr}
    connections[fd] = conn
    
    while true do
        -- TCP 消息格式:4 字节长度 + 数据
        local header = socket.read(fd, 4)
        if not header or #header < 4 then
            break
        end
        
        local len = string.unpack(">I4", header)
        local data = socket.read(fd, len)
        if not data or #data < len then
            break
        end
        
        -- 转发给 agent
        local agent = conn.agent
        if agent then
            skynet.send(agent, "lua", "message", fd, data)
        end
    end
    
    -- 连接断开
    socket.close(fd)
    connections[fd] = nil
end

-- 发送消息(根据协议自动编码)
function CMD.send(fd, data)
    local conn = connections[fd]
    if not conn then return end
    
    if conn.proto == PROTO_WS then
        -- WebSocket 文本帧
        -- write_frame(fd, WS_TEXT, data)
    elseif conn.proto == PROTO_TCP then
        -- TCP:长度前缀
        local packet = string.pack(">I4", #data) .. data
        socket.write(fd, packet)
    end
end

skynet.start(function()
    skynet.dispatch("lua", function(session, source, cmd, ...)
        local f = assert(CMD[cmd])
        if session ~= 0 then
            skynet.retpack(f(...))
        else
            f(...)
        end
    end)
end)

SSL/TLS 支持

Skynet 的 socket 模块支持 TLS,需要在编译时启用 OpenSSL:

-- 启用 TLS 监听
local tls_fd = socket.listen("0.0.0.0", 443)
socket.start(tls_fd, function(fd, addr)
    -- 启用 TLS
    local tls = socket.tls(fd, {
        certificate = "./cert/server.crt",
        key = "./cert/server.key",
    })
    
    -- 后续按普通 socket 处理
    skynet.fork(handle_ws_connect, tls, addr)
end)

客户端 JavaScript 示例

// 客户端连接示例
class GameClient {
    constructor(url) {
        this.url = url;
        this.ws = null;
        this.handlers = {};
    }

    connect() {
        this.ws = new WebSocket(this.url);

        this.ws.onopen = () => {
            console.log("连接已建立");
            this.login("player_001");
        };

        this.ws.onmessage = (event) => {
            const msg = JSON.parse(event.data);
            const handler = this.handlers[msg.type];
            if (handler) handler(msg.data);
        };

        this.ws.onclose = (event) => {
            console.log("连接关闭:", event.code, event.reason);
        };

        this.ws.onerror = (error) => {
            console.error("连接错误:", error);
        };
    }

    send(type, data) {
        if (this.ws.readyState === WebSocket.OPEN) {
            this.ws.send(JSON.stringify({type, data}));
        }
    }

    login(userId) {
        this.send("login", {user_id: userId});
    }

    move(x, y) {
        this.send("move", {x, y, timestamp: Date.now()});
    }

    chat(message) {
        this.send("chat", {message, timestamp: Date.now()});
    }

    on(type, handler) {
        this.handlers[type] = handler;
    }
}

// 使用
const client = new GameClient("ws://localhost:8888/game");
client.connect();

client.on("login", (data) => {
    console.log("登录成功:", data);
});

client.on("move_ack", (data) => {
    console.log("移动确认:", data);
});

总结

本教程涵盖了 Skynet WebSocket 支持的完整实现:

  1. 协议原理:WebSocket 握手、帧格式
  2. 网关实现:连接管理、协议解析
  3. Agent 模式:每个连接独立处理逻辑
  4. 多协议支持:同时支持 WS 和 TCP
  5. 安全性:TLS 加密

参考资料

  1. WebSocket 协议规范:RFC 6455
  2. Skynet socket 模块文档
  3. Skynet TLS 支持

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页